主页 > 游戏开发  > 

微信小程序通过http通信控制庐山派

微信小程序通过http通信控制庐山派

首先要形成同一个局域网才能够实现

这是微信小程序的代码

Page({

  data: {

    isConnected: false, // WiFi连接状态

    serverUrl: 'http://192.168.1.225:8081/command' // 服务器地址

  },

  onLoad() {

    this.authorizeLocation();

  },

  // 授权地理位置

  authorizeLocation() {

    wx.authorize({

      scope: 'scope.userLocation',

      success: () => {

        this.startWifi();

      },

      fail: () => {

        wx.showToast({

          title: '授权失败',

          icon: 'none'

        });

      }

    });

  },

  // 开启WiFi模块

  startWifi() {

    wx.startWifi({

      success: () => {

        wx.connectWifi({

          SSID: 'DS',

          password: 'desen8888.',

          maunal: true, // 跳转到系统设置页进行连接

          success: () => {

            this.setData({ isConnected: true });

            wx.showToast({ title: 'WiFi连接成功' });

          },

          fail: (err) => {

            console.error('WiFi连接失败', err);

            wx.showToast({ title: 'WiFi连接失败', icon: 'none' });

          }

        });

      },

      fail: (err) => {

        console.error('WiFi模块启动失败', err);

        wx.showToast({ title: 'WiFi模块启动失败', icon: 'none' });

      }

    });

  },

  // 发送HTTP请求

  sendHttpRequest(data) {

    const { serverUrl } = this.data;

    wx.request({

      url: serverUrl,

      method: 'POST',

      data: data,

      header: {

        'Content-Type': 'application/json'

      },

      success: (res) => {

        console.log('请求成功:', res.data);

        wx.showToast({ title: '指令发送成功' });

      },

      fail: (err) => {

        console.error('请求失败:', err);

        wx.showToast({ title: '指令发送失败', icon: 'none' });

      }

    });

  },

  // 控制逻辑

  onForward() {

    this.sendHttpRequest({ command: 'forward' });

  },

  onBackward() {

    this.sendHttpRequest({ command: 'backward' });

  },

  onTurnLeft() {

    this.sendHttpRequest({ command: 'turn_left' });

  },

  onTurnRight() {

    this.sendHttpRequest({ command: 'turn_right' });

  },

  onStop() {

    this.sendHttpRequest({ command: 'stop' });

  },

  onButton1Click() {

    this.sendHttpRequest({ command: 'action_1' });

  },

  onButton2Click() {

    this.sendHttpRequest({ command: 'action_2' });

  }

});

这是庐山派的代码

import time import os import sys import socket import network import ujson import aicube from media.sensor import * from media.display import * from media.media import * import nncase_runtime as nn import ulab.numpy as np import image import gc from machine import UART, FPIOA, Pin from machine import Timer

# 创建FPIOA对象,用于初始化引脚功能配置 fpioa = FPIOA()

# 配置UART1引脚 fpioa.set_function(3, FPIOA.UART1_TXD) fpioa.set_function(4, FPIOA.UART1_RXD)

# 配置UART2引脚 fpioa.set_function(5, FPIOA.UART2_TXD) fpioa.set_function(6, FPIOA.UART2_RXD)

# 将62、20、63号引脚映射为GPIO,用于控制RGB灯的三个颜色通道 fpioa.set_function(62, FPIOA.GPIO62) fpioa.set_function(20, FPIOA.GPIO20) fpioa.set_function(63, FPIOA.GPIO63)

LED_R = Pin(62, Pin.OUT, pull=Pin.PULL_NONE, drive=7)  # 红色LED LED_G = Pin(20, Pin.OUT, pull=Pin.PULL_NONE, drive=7)  # 绿色LED LED_B = Pin(63, Pin.OUT, pull=Pin.PULL_NONE, drive=7)  # 蓝色LED

# 初始化时先关闭所有LED灯 LED_R.high() LED_G.high() LED_B.high()

# 初始化UART1,波特率115200,8位数据位,无校验,1位停止位 uart1_DriveMotor = UART(UART.UART1, baudrate=115200, bits=UART.EIGHTBITS, parity=UART.PARITY_NONE, stop=UART.STOPBITS_ONE)

# 初始化UART2,波特率115200,8位数据位,无校验,1位停止位 uart1_ToPC = UART(UART.UART2, baudrate=115200, bits=UART.EIGHTBITS, parity=UART.PARITY_NONE, stop=UART.STOPBITS_ONE)

# 指令映射表 INSTRUCTION_MAP = {     "Straight lane": '{"T":1,"L":0.5,"R":0.5}\n',     "Straight ahead arrow": '{"T":1,"L":0.5,"R":0.5}\n' }

# 连接网络 def network_use_wlan(is_wlan=True):     if is_wlan:         sta = network.WLAN(0)         sta.connect("DS", "desen8888.")         print(sta.status())         while sta.ifconfig()[0] == '0.0.0.0':             os.exitpoint()         print(sta.ifconfig())         ip = sta.ifconfig()[0]         return ip     else:         a = network.LAN()         if not a.active():             raise RuntimeError("LAN interface is not active.")         a.ifconfig("dhcp")         print(a.ifconfig())         ip = a.ifconfig()[0]         return ip

# 初始化摄像头 def init_camera():     global sensor     try:         print("camera_test")         # 根据默认配置构建 Sensor 对象         sensor = Sensor()         # 复位 sensor         sensor.reset()

        # 设置通道 0 分辨率为 1920x1080         sensor.set_framesize(Sensor.FHD)         # 设置通道 0 格式为 YUV420SP         sensor.set_pixformat(Sensor.YUV420SP)         # 绑定通道 0 到显示 VIDEO1 层         bind_info = sensor.bind_info()         Display.bind_layer(**bind_info, layer=Display.LAYER_VIDEO1)

        # 设置通道 1 分辨率和格式         sensor.set_framesize(width=640, height=480, chn=CAM_CHN_ID_1)         sensor.set_pixformat(Sensor.RGB888, chn=CAM_CHN_ID_1)

        # 设置通道 2 分辨率和格式         sensor.set_framesize(width=640, height=480, chn=CAM_CHN_ID_2)         sensor.set_pixformat(Sensor.RGB565, chn=CAM_CHN_ID_2)

        # 初始化 HDMI 和 IDE 输出显示,若屏幕无法点亮,请参考 API 文档中的 K230_CanMV_Display 模块 API 手册进行配置         Display.init(Display.LT9611, to_ide=True, osd_num=2)         # 初始化媒体管理器         MediaManager.init()         # 启动 sensor         sensor.run()         print("Camera initialized successfully.")         return True     except Exception as e:         print(f"Camera initialization error: {e}")         return False

# 启动 Web 服务器 def start_web_server():     ip = network_use_wlan(True)

    s = socket.socket()     ai = socket.getaddrinfo("0.0.0.0", 8081)     print("Bind address info:", ai)     addr = ai[0][-1]     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)     s.bind(addr)     s.listen(5)     print("Listening, connect your browser to http://%s:8081/" % (ip))

    while True:         res = s.accept()         client_sock = res[0]         client_addr = res[1]         print("Client address:", client_addr)         print("Client socket:", client_sock)         client_sock.setblocking(False)         client_stream = client_sock         # 非阻塞读取请求数据         request = b''         while True:             try:                 chunk = client_stream.recv(1024)                 if chunk:  # 有数据                     request += chunk                     if b'\r\n\r\n' in request:  # 检测到请求头结束                         break                 else:  # 无数据,继续视频流                     break             except OSError:  # 无数据可读                 break

        # 如果有完整的请求头         if request and b'\r\n\r\n' in request:             headers = request.split(b'\r\n\r\n')[0].decode()             print('Received request headers:', headers)

            # 处理POST请求             if 'POST /command' in headers:                 try:                     # 获取Content-Length                     content_length = 0                     for line in headers.split('\r\n'):                         if line.lower().startswith('content-length:'):                             content_length = int(line.split(': ')[1])                             break

                    # 读取剩余请求体                     body = request.split(b'\r\n\r\n')[1]                     while len(body) < content_length:                         chunk = client_stream.recv(content_length - len(body))                         if not chunk:                             break                         body += chunk

                    # 解析JSON                     data = ujson.loads(body.decode())                     command = data.get('command')                     if command:                         print('Executing command:', command)                         send_instruction(command)                         response = ('HTTP/1.1 200 OK\r\n'                                    'Content-Type: application/json\r\n'                                    '\r\n'                                    '{"status": "success"}')                     else:                         response = ('HTTP/1.1 400 Bad Request\r\n'                                    'Content-Type: application/json\r\n'                                    '\r\n'                                    '{"error": "Missing command"}')                 except Exception as e:                     print('Error processing command:', e)                     response = ('HTTP/1.1 400 Bad Request\r\n'                                'Content-Type: application/json\r\n'                                '\r\n'                                '{"error": "Invalid request"}')

                # 发送响应                 client_stream.write(response.encode())                 client_stream.close()                 continue

# 发送指令时亮灯,间隔一秒关灯 def send_instruction(instruction):     if instruction in INSTRUCTION_MAP:         cmd = INSTRUCTION_MAP[instruction]         uart1_DriveMotor.write(bytes(cmd, 'utf-8'))  # 发送到UART1         uart1_ToPC.write(bytes(cmd, 'utf-8'))  # 同时发送到UART2         print(f"已发送指令: {cmd}")         LED_G.low()  # 点亮绿灯         time.sleep(1)         LED_G.high()  # 关闭绿灯

# 主函数 def main():     if not init_camera():         print("Camera initialization failed. Exiting...")         return     start_web_server()

if __name__ == "__main__":     try:         main()     except KeyboardInterrupt as e:         print("用户停止: ", e)     except BaseException as e:         print(f"异常: {e}")     finally:         # 停止 sensor         if isinstance(sensor, Sensor):             sensor.stop()         # 销毁显示         Display.deinit()         os.exitpoint(os.EXITPOINT_ENABLE_SLEEP)         time.sleep_ms(100)         # 释放媒体缓冲区         MediaManager.deinit()

庐山派打印窗口这样是成功的

标签:

微信小程序通过http通信控制庐山派由讯客互联游戏开发栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“微信小程序通过http通信控制庐山派