微信小程序通过http通信控制庐山派
- 游戏开发
- 2025-09-02 12:57:01

首先要形成同一个局域网才能够实现
这是微信小程序的代码
Page({
data: {
isConnected: false, // WiFi连接状态
serverUrl: 'http://192.168.1.225:8081/command' // 服务器地址
},
onLoad() {
this.authorizeLocation();
},
// 授权地理位置
authorizeLocation() {
wx.authorize({
scope: 'scope.userLocation',
success: () => {
this.startWifi();
},
fail: () => {
wx.showToast({
title: '授权失败',
icon: 'none'
});
}
});
},
// 开启WiFi模块
startWifi() {
wx.startWifi({
success: () => {
wx.connectWifi({
SSID: 'DS',
password: 'desen8888.',
maunal: true, // 跳转到系统设置页进行连接
success: () => {
this.setData({ isConnected: true });
wx.showToast({ title: 'WiFi连接成功' });
},
fail: (err) => {
console.error('WiFi连接失败', err);
wx.showToast({ title: 'WiFi连接失败', icon: 'none' });
}
});
},
fail: (err) => {
console.error('WiFi模块启动失败', err);
wx.showToast({ title: 'WiFi模块启动失败', icon: 'none' });
}
});
},
// 发送HTTP请求
sendHttpRequest(data) {
const { serverUrl } = this.data;
wx.request({
url: serverUrl,
method: 'POST',
data: data,
header: {
'Content-Type': 'application/json'
},
success: (res) => {
console.log('请求成功:', res.data);
wx.showToast({ title: '指令发送成功' });
},
fail: (err) => {
console.error('请求失败:', err);
wx.showToast({ title: '指令发送失败', icon: 'none' });
}
});
},
// 控制逻辑
onForward() {
this.sendHttpRequest({ command: 'forward' });
},
onBackward() {
this.sendHttpRequest({ command: 'backward' });
},
onTurnLeft() {
this.sendHttpRequest({ command: 'turn_left' });
},
onTurnRight() {
this.sendHttpRequest({ command: 'turn_right' });
},
onStop() {
this.sendHttpRequest({ command: 'stop' });
},
onButton1Click() {
this.sendHttpRequest({ command: 'action_1' });
},
onButton2Click() {
this.sendHttpRequest({ command: 'action_2' });
}
});
这是庐山派的代码
import time import os import sys import socket import network import ujson import aicube from media.sensor import * from media.display import * from media.media import * import nncase_runtime as nn import ulab.numpy as np import image import gc from machine import UART, FPIOA, Pin from machine import Timer
# 创建FPIOA对象,用于初始化引脚功能配置 fpioa = FPIOA()
# 配置UART1引脚 fpioa.set_function(3, FPIOA.UART1_TXD) fpioa.set_function(4, FPIOA.UART1_RXD)
# 配置UART2引脚 fpioa.set_function(5, FPIOA.UART2_TXD) fpioa.set_function(6, FPIOA.UART2_RXD)
# 将62、20、63号引脚映射为GPIO,用于控制RGB灯的三个颜色通道 fpioa.set_function(62, FPIOA.GPIO62) fpioa.set_function(20, FPIOA.GPIO20) fpioa.set_function(63, FPIOA.GPIO63)
LED_R = Pin(62, Pin.OUT, pull=Pin.PULL_NONE, drive=7) # 红色LED LED_G = Pin(20, Pin.OUT, pull=Pin.PULL_NONE, drive=7) # 绿色LED LED_B = Pin(63, Pin.OUT, pull=Pin.PULL_NONE, drive=7) # 蓝色LED
# 初始化时先关闭所有LED灯 LED_R.high() LED_G.high() LED_B.high()
# 初始化UART1,波特率115200,8位数据位,无校验,1位停止位 uart1_DriveMotor = UART(UART.UART1, baudrate=115200, bits=UART.EIGHTBITS, parity=UART.PARITY_NONE, stop=UART.STOPBITS_ONE)
# 初始化UART2,波特率115200,8位数据位,无校验,1位停止位 uart1_ToPC = UART(UART.UART2, baudrate=115200, bits=UART.EIGHTBITS, parity=UART.PARITY_NONE, stop=UART.STOPBITS_ONE)
# 指令映射表 INSTRUCTION_MAP = { "Straight lane": '{"T":1,"L":0.5,"R":0.5}\n', "Straight ahead arrow": '{"T":1,"L":0.5,"R":0.5}\n' }
# 连接网络 def network_use_wlan(is_wlan=True): if is_wlan: sta = network.WLAN(0) sta.connect("DS", "desen8888.") print(sta.status()) while sta.ifconfig()[0] == '0.0.0.0': os.exitpoint() print(sta.ifconfig()) ip = sta.ifconfig()[0] return ip else: a = network.LAN() if not a.active(): raise RuntimeError("LAN interface is not active.") a.ifconfig("dhcp") print(a.ifconfig()) ip = a.ifconfig()[0] return ip
# 初始化摄像头 def init_camera(): global sensor try: print("camera_test") # 根据默认配置构建 Sensor 对象 sensor = Sensor() # 复位 sensor sensor.reset()
# 设置通道 0 分辨率为 1920x1080 sensor.set_framesize(Sensor.FHD) # 设置通道 0 格式为 YUV420SP sensor.set_pixformat(Sensor.YUV420SP) # 绑定通道 0 到显示 VIDEO1 层 bind_info = sensor.bind_info() Display.bind_layer(**bind_info, layer=Display.LAYER_VIDEO1)
# 设置通道 1 分辨率和格式 sensor.set_framesize(width=640, height=480, chn=CAM_CHN_ID_1) sensor.set_pixformat(Sensor.RGB888, chn=CAM_CHN_ID_1)
# 设置通道 2 分辨率和格式 sensor.set_framesize(width=640, height=480, chn=CAM_CHN_ID_2) sensor.set_pixformat(Sensor.RGB565, chn=CAM_CHN_ID_2)
# 初始化 HDMI 和 IDE 输出显示,若屏幕无法点亮,请参考 API 文档中的 K230_CanMV_Display 模块 API 手册进行配置 Display.init(Display.LT9611, to_ide=True, osd_num=2) # 初始化媒体管理器 MediaManager.init() # 启动 sensor sensor.run() print("Camera initialized successfully.") return True except Exception as e: print(f"Camera initialization error: {e}") return False
# 启动 Web 服务器 def start_web_server(): ip = network_use_wlan(True)
s = socket.socket() ai = socket.getaddrinfo("0.0.0.0", 8081) print("Bind address info:", ai) addr = ai[0][-1] s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind(addr) s.listen(5) print("Listening, connect your browser to http://%s:8081/" % (ip))
while True: res = s.accept() client_sock = res[0] client_addr = res[1] print("Client address:", client_addr) print("Client socket:", client_sock) client_sock.setblocking(False) client_stream = client_sock # 非阻塞读取请求数据 request = b'' while True: try: chunk = client_stream.recv(1024) if chunk: # 有数据 request += chunk if b'\r\n\r\n' in request: # 检测到请求头结束 break else: # 无数据,继续视频流 break except OSError: # 无数据可读 break
# 如果有完整的请求头 if request and b'\r\n\r\n' in request: headers = request.split(b'\r\n\r\n')[0].decode() print('Received request headers:', headers)
# 处理POST请求 if 'POST /command' in headers: try: # 获取Content-Length content_length = 0 for line in headers.split('\r\n'): if line.lower().startswith('content-length:'): content_length = int(line.split(': ')[1]) break
# 读取剩余请求体 body = request.split(b'\r\n\r\n')[1] while len(body) < content_length: chunk = client_stream.recv(content_length - len(body)) if not chunk: break body += chunk
# 解析JSON data = ujson.loads(body.decode()) command = data.get('command') if command: print('Executing command:', command) send_instruction(command) response = ('HTTP/1.1 200 OK\r\n' 'Content-Type: application/json\r\n' '\r\n' '{"status": "success"}') else: response = ('HTTP/1.1 400 Bad Request\r\n' 'Content-Type: application/json\r\n' '\r\n' '{"error": "Missing command"}') except Exception as e: print('Error processing command:', e) response = ('HTTP/1.1 400 Bad Request\r\n' 'Content-Type: application/json\r\n' '\r\n' '{"error": "Invalid request"}')
# 发送响应 client_stream.write(response.encode()) client_stream.close() continue
# 发送指令时亮灯,间隔一秒关灯 def send_instruction(instruction): if instruction in INSTRUCTION_MAP: cmd = INSTRUCTION_MAP[instruction] uart1_DriveMotor.write(bytes(cmd, 'utf-8')) # 发送到UART1 uart1_ToPC.write(bytes(cmd, 'utf-8')) # 同时发送到UART2 print(f"已发送指令: {cmd}") LED_G.low() # 点亮绿灯 time.sleep(1) LED_G.high() # 关闭绿灯
# 主函数 def main(): if not init_camera(): print("Camera initialization failed. Exiting...") return start_web_server()
if __name__ == "__main__": try: main() except KeyboardInterrupt as e: print("用户停止: ", e) except BaseException as e: print(f"异常: {e}") finally: # 停止 sensor if isinstance(sensor, Sensor): sensor.stop() # 销毁显示 Display.deinit() os.exitpoint(os.EXITPOINT_ENABLE_SLEEP) time.sleep_ms(100) # 释放媒体缓冲区 MediaManager.deinit()
庐山派打印窗口这样是成功的
微信小程序通过http通信控制庐山派由讯客互联游戏开发栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“微信小程序通过http通信控制庐山派”