Skip to main content

4 posts tagged with "smarthome"

View All Tags

微信小程序蓝牙通信示例

· 17 min read
Allen
software engineer
此内容根据文章生成,仅用于文章内容的解释与总结

作为开发者,最讨厌的事情莫过于多平台适配,在手机端由于大家型号不同,编个APP通过蓝牙控制显然是不方便的,于是做了一个蓝牙小程序来与ESP32通信。

界面设计

这里我想设计成方块按键的格式,所以创建一个矩阵,然后在矩阵对应的位置添加上按钮。非常的简单,只是有一些差异需要注意。

wxml标签

蓝牙小程序标签与html略有不同,以下是小程序标签(即wxml标签)与 HTML 略有不同的标签的对比表:

wxml标签HTML 标签描述
<view><div>用于容器和布局,类似于 HTML 中的 <div>
<text><span>用于文本显示,类似于 HTML 中的 <span>
<button><button>用于创建按钮,与 HTML 中的 <button> 功能相同。
<image><img>用于显示图片,类似于 HTML 中的 <img>,但属性有所不同。
<navigator><a>用于页面导航,类似于 HTML 中的 <a> 标签。
<picker>N/A用于多种选择器,HTML 中无直接对应的标签。
<scroll-view>N/A用于可滚动的视图区域,HTML 中无直接对应的标签。
<swiper>N/A用于滑动视图容器,HTML 中无直接对应的标签。
<map><iframe>用于展示地图,类似于 HTML 中嵌入地图的方式。
<swiper-item>N/A<swiper> 配合使用,HTML 中无直接对应的标签。
<rich-text>N/A用于展示富文本,HTML 中无直接对应的标签。
<block>N/A无实际渲染效果,类似于 HTML 中的 <template>

JS

以下是微信小程序的 JavaScript(JS)与网页的 JavaScript 的对比表格:

特性/功能微信小程序 JS网页 JS描述
全局对象wxwindow微信小程序中使用 wx 对象来调用特定的 API,而在网页 JS 中,所有的全局对象都挂载在 window 对象下。
API 调用基于 wx 对象提供的 API,如 wx.request()wx.navigateTo()使用浏览器提供的 API,如 fetch()window.location微信小程序有一套独立的 API,专门用于微信环境下的开发,无法直接使用标准的浏览器 API。
页面与组件管理通过小程序的 PageComponent 函数定义页面和组件通过 HTML 文件和 JavaScript 结合使用前端框架或直接操作 DOM微信小程序使用特殊的 PageComponent 函数来定义页面和组件,网页 JS 则通过 DOM 结合 JavaScript 实现页面和组件管理。
数据绑定使用 this.setData() 进行数据绑定和更新通常使用 innerHTMLtextContent 或前端框架(如 React 的 setState微信小程序使用 this.setData() 来绑定和更新数据,而在网页 JS 中,常通过直接操作 DOM 或使用前端框架来更新数据。
生命周期函数提供页面与组件的生命周期函数,如 onLoadonShow通过事件绑定或框架提供的生命周期函数(如 React 的 componentDidMount微信小程序有特定的生命周期函数供开发者使用,而网页 JS 通常需要结合框架或事件来处理生命周期管理。
路由与导航使用 wx.navigateTo()wx.redirectTo() 等方法进行页面跳转通过改变 window.location 或使用 history.pushState() 进行路由小程序的路由机制是由微信管理的,开发者需要使用专门的 API 进行导航,而网页 JS 可以直接操作 URL。
模块化使用 require() 和模块化文件系统使用 ES6 import/export 或 CommonJS 模块系统小程序内置的模块化系统与 Node.js 类似,使用 require() 导入模块,而网页 JS 中可以使用 ES6 模块或 CommonJS 模块系统。
网络请求使用 wx.request() 发起 HTTP 请求使用 fetch()XMLHttpRequest 发起 HTTP 请求微信小程序提供了 wx.request() 方法用于网络请求,而网页 JS 通常使用 fetch()XMLHttpRequest
文件系统访问通过 wx.getFileSystemManager() 访问文件系统通过 File API、Blob、FileReader 等访问文件小程序提供了 wx.getFileSystemManager() 接口来管理文件系统,而网页 JS 可以使用浏览器提供的 File API。
样式与布局使用 WXML 和 WXSS 定义页面结构和样式使用 HTML 和 CSS 定义页面结构和样式微信小程序使用 WXML 和 WXSS 分别来代替 HTML 和 CSS,专门为小程序定制。
事件处理事件绑定使用 bindtapcatchtap 等绑定事件事件绑定使用 addEventListener 或内联 onclick微信小程序的事件处理是通过特定的属性绑定事件,而网页 JS 可以直接使用标准的事件绑定方法。
调试与工具使用微信开发者工具进行调试使用浏览器的开发者工具进行调试小程序开发和调试通常在微信开发者工具中进行,而网页开发则依赖于浏览器提供的开发者工具。
存储提供 wx.setStorage()wx.getStorage() 进行数据持久化存储使用 localStoragesessionStorage 进行数据存储微信小程序的存储 API 类似于浏览器的 localStorage,但使用 wx 提供的 API 进行调用。
原生 API 调用不支持直接调用浏览器或操作系统的原生 API可以使用浏览器 API 或通过插件访问系统 API微信小程序无法直接调用浏览器或操作系统的原生 API,而网页 JS 则可以直接使用这些 API。
平台限制运行于微信环境,仅支持在微信客户端中运行运行于浏览器环境,可以在任何支持的浏览器中运行小程序只能在微信客户端中运行,而网页 JS 则可以在任何现代浏览器中运行。

页面代码

界面部分因为手机的尺寸实在太多,所以我创建一个矩阵,然后把有按键的地方加上边框实现规则布局。中间输入设备名称。

<scroll-view class="scrollarea" scroll-y type="list">
<view class="container">
<text>蓝牙连接状态:{{status}}</text>
<div class="button-lines">
<input type="text" placeholder="请输入设备名称" bindinput="onDeviceNameInput" />
</div>
<!-- 3x7 矩阵布局 -->
<view class="button-grid">
<view class="row">
<view class="cell"></view>
<view class="cell"><button bindtouchstart="sendUp" bindtouchend="handleTouchEnd"></button></view>
<view class="cell"></view>
<view class="cell"><button bindtouchstart="connectDevice">连接</button></view>
<view class="cell"></view>
<view class="cell"><button bindtouchstart="sendA">A</button></view>
<view class="cell"></view>
</view>
<view class="row">
<view class="cell"><button bindtouchstart="sendLeft" bindtouchend="handleTouchEnd"></button></view>
<view class="cell"></view>
<view class="cell"><button bindtouchstart="sendRight" bindtouchend="handleTouchEnd"></button></view>
<view class="cell"></view>
<view class="cell"><button bindtouchstart="sendB">B</button></view>
<view class="cell"></view>
<view class="cell"><button bindtouchstart="sendD">D</button></view>
</view>
<view class="row">
<view class="cell"></view>
<view class="cell"><button bindtouchstart="sendDown" bindtouchend="handleTouchEnd"></button></view>
<view class="cell"></view>
<view class="cell"><button bindtouchstart="sendData">文件</button></view>
<view class="cell"></view>
<view class="cell"><button bindtouchstart="sendC" >C</button></view>
<view class="cell"></view>
</view>
</view>
</view>
</scroll-view>

样式代码

/**index.wxss**/
page {
height: 100vh;
display: flex;
flex-direction: column;
}
.scrollarea {
flex: 1;
overflow-y: hidden;
}
.container {
padding: 20px;
}
.button-lines {
display: flex;
flex-wrap: wrap;
justify-content: center;
align-items: center;
width: 100%;
height: 100%;
}
input{
border: 1px solid #ccc;
padding: 8px;
margin-right: 5px;
width: 50%;
height: 30%;
}
.button-grid {
display: flex;
flex-wrap: wrap;
justify-content: center;
align-items: center;
width: 100%;
height: 100%;
}

.row {
width: 100%;
display: flex;
justify-content: space-between;
}

.cell{
width: 12%;
height: 50px;
margin: 5px;

display: flex;
align-items: center;
justify-content: center;
box-sizing: border-box;
}

.cell button{
border: 1px solid #ccc;
}
button {
width: 100%;
height: 100%;
box-sizing: border-box;
}

逻辑设计

考虑到不同设备的蓝牙名称不同,因此我在页面的中间设计一个输入框,输入对应的设备名称(大小写敏感)后,点击连接按钮,即可触发搜索接口。为了让自己知道是否已经连接上,我在输入框的上面添加了一个状态显示,考虑到部分用户不能理解红色绿色的默认含义,我使用了中文来描述连接状态。

底部做了一些按键发送数据的功能,包括:中文上下左右、英文ABCD、还有大文件一键传输(我设置了范围为txt和py)

tip

网页开发中,浏览器的渲染主线程会在解析DOM树的时候给所有HTML节点根据权重添加上属性,而小程序中,一旦缺省关键的属性,在开发界面会正常显示,上真机就会异常,这点尤其需要注意。

蓝牙设备的搜索、连接等功能由微信的API接口提供,其中蓝牙的权限上,如果使用的是:仅在使用中允许,在部分安卓手机上,会出现切后台再返回时蓝牙权限丢失的情况。因此改为:每次使用时询问权限。目前在官方论坛上留言了,我更倾向于是安卓设备的问题。

另外安卓中蓝牙权限与位置权限关联,因此仅开启蓝牙权限依然无法使用。

微信的蓝牙接口搜索到的设备便不再出现,假设我周边存在设备A、设备B、设备C

如果我首先输入了设备B,蓝牙搜索API根据信号强弱依次返回:设备A、设备B(判定成功,建立连接)

此时我再输入设备A,点击连接,就会出现搜不到设备的情况,当然这里是可以优化的,设置一个点击按钮:刷新。不过我右上角点击重新进入小程序也是可以的,所以这里就不是很有必要加这个逻辑判断。

蓝牙设备的连接非常简单,根据参考文档一步一步来即可,需要注意的是,发送中文时可能会乱码,JS原生的解码又不能用,所以我导入了一个包import TextEncoder from './miniprogram-text-encoder'来自动判断文本是中文还是英文执行对应的转化。

既然蓝牙可以通信,传输中文和英文,那么是不是可以传本书过去?首先尝试直接传输,发现接收方只收到了前20字节,后续数据丢失。那么修改程序,将文件分片、每次发20个字节,发送完成之后在发送一个END标记。

和之前发送数据的代码写在一起,就变成了这样:


import TextEncoder from './miniprogram-text-encoder'

Page({
data: {
status: '未连接',
deviceId: null,
serviceId: null,
characteristicId: null,
deviceName: 'None' // 默认设备名称
},
onLoad() {
this.initBluetooth();
},
onDeviceNameInput(e) {
console.log(e.detail.value);
this.setData({
deviceName: e.detail.value,
})
;
},
initBluetooth() {
const that = this;
wx.openBluetoothAdapter({
success(res) {
console.log('初始化蓝牙适配器成功');
that.startBluetoothDevicesDiscovery();
wx.showToast({
title: '蓝牙权限成功',
icon: 'success',
duration: 2000
});
},
fail(res) {
console.log('初始化蓝牙适配器失败', res);
wx.showToast({
title: '蓝牙权限失败',
icon: 'error',
duration: 2000
});
}
});
},
startBluetoothDevicesDiscovery() {
const that = this;
console.log(that.data.deviceName, '57');

// 如果 deviceName 是 "None",不进行蓝牙设备搜索
if (that.data.deviceName === "None") {
console.log('设备名称为 "None",不进行蓝牙设备搜索');
return;
}

wx.startBluetoothDevicesDiscovery({
success(res) {
console.log('开始搜索蓝牙设备');
that.onBluetoothDeviceFound();
},
fail(res) {
console.log('搜索蓝牙设备失败', res);
}
});
},
onBluetoothDeviceFound() {
const that = this;
wx.onBluetoothDeviceFound((devices) => {
devices.devices.forEach(device => {
console.log('发现设备名称:', device.name); // 打印所有发现的设备名称
if (device.name === that.data.deviceName) {
wx.showToast({
title: '发现蓝牙设备',
icon: 'success',
duration: 2000
});
that.createBLEConnection(device.deviceId);
}
});
});
},
createBLEConnection(deviceId) {
const that = this;
wx.createBLEConnection({
deviceId: deviceId,
success(res) {
console.log('连接蓝牙设备成功');
that.setData({
status: '已连接',
deviceId: deviceId
});
that.getBLEDeviceServices(deviceId);
},
fail(res) {
console.log('连接蓝牙设备失败', res);
}
});
},
getBLEDeviceServices(deviceId) {
const that = this;
wx.getBLEDeviceServices({
deviceId: deviceId,
success(res) {
console.log('获取服务成功:', res.services);
for (let i = 0; i < res.services.length; i++) {
if (res.services[i].isPrimary) {
that.getBLEDeviceCharacteristics(deviceId, res.services[i].uuid);
return;
}
}
}
});
},
getBLEDeviceCharacteristics(deviceId, serviceId) {
const that = this;
wx.getBLEDeviceCharacteristics({
deviceId: deviceId,
serviceId: serviceId,
success(res) {
console.log('获取特征值成功:', res.characteristics);
for (let i = 0; i < res.characteristics.length; i++) {
if (res.characteristics[i].properties.write) {
that.setData({
serviceId: serviceId,
characteristicId: res.characteristics[i].uuid
});
return;
}
}
}
});
},
connectDevice() {
this.startBluetoothDevicesDiscovery();
},
sendData() {
const that = this;
// 选择本地 TXT 或 PY 文件
wx.chooseMessageFile({
count: 1,
type: 'file',
extension: ['txt', 'py'],
success(res) {
const filePath = res.tempFiles[0].path;
const fileName = res.tempFiles[0].name;

// 读取文件内容为 ArrayBuffer
wx.getFileSystemManager().readFile({
filePath: filePath,
success(readRes) {
const fileBuffer = readRes.data;
console.log(readRes.data)
const chunkSize = 20; // 每次发送20字节
const totalChunks = Math.ceil(fileBuffer.byteLength / chunkSize);

// 发送文件名称和分片数
const fileInfo = `${fileName}|${totalChunks}`;
const fileInfoBuffer = that.stringToArrayBuffer(fileInfo);
wx.writeBLECharacteristicValue({
deviceId: that.data.deviceId,
serviceId: that.data.serviceId,
characteristicId: that.data.characteristicId,
value: fileInfoBuffer,
success(res) {
console.log('文件信息发送成功');
},
fail(res) {
console.error('文件信息发送失败', res);
}
});

// 逐块发送文件数据
for (let i = 0; i < totalChunks; i++) {
const start = i * chunkSize;
const end = Math.min(start + chunkSize, fileBuffer.byteLength);
const chunk = fileBuffer.slice(start, end);
const progress = ((i + 1) / totalChunks) * 100;

// 发送当前块数据
wx.writeBLECharacteristicValue({
deviceId: that.data.deviceId,
serviceId: that.data.serviceId,
characteristicId: that.data.characteristicId,
value: chunk,
success(res) {
console.log(`数据发送成功: ${i + 1}/${totalChunks} (${progress}%)`);
if (i === totalChunks - 1) {
// 发送结束标志
const endBuffer = that.stringToArrayBuffer('END');
wx.writeBLECharacteristicValue({
deviceId: that.data.deviceId,
serviceId: that.data.serviceId,
characteristicId: that.data.characteristicId,
value: endBuffer,
success(res) {
console.log('所有数据发送完成');
}
});
}
},
fail(res) {
console.error(`数据发送失败: ${i + 1}/${totalChunks}`, res);
}
});
}
},
fail(err) {
console.error('文件读取失败', err);
}
});
},
fail(err) {
console.error('文件选择失败', err);
}
});
},

// 将字符串转换为 ArrayBuffer
stringToArrayBuffer(str) {
const base64 = wx.arrayBufferToBase64(new TextEncoder().encode(str).buffer);
return wx.base64ToArrayBuffer(base64);
},

// 发送控制消息
sendMessage(message) {
const that = this;
const buffer = that.stringToArrayBuffer(message);
wx.writeBLECharacteristicValue({
deviceId: that.data.deviceId,
serviceId: that.data.serviceId,
characteristicId: that.data.characteristicId,
value: buffer,
success(res) {
console.log(`消息发送成功: ${message}`);
},
fail(res) {
console.error(`消息发送失败: ${message}`, res);
}
});
},
// 松开按钮时发送消息
handleTouchEnd() {
this.sendMessage('释放');
},
sendUp() {
this.sendMessage('上');
},
sendDown() {
this.sendMessage('下');
},
sendLeft() {
this.sendMessage('左');
},
sendRight() {
this.sendMessage('右');
},
sendA() {
this.sendMessage('A');
},
sendB() {
this.sendMessage('B');
},
sendC() {
this.sendMessage('C');
},
sendD() {
this.sendMessage('D');
}

后话

这里是完整代码

程序的优化是无穷无尽的,所以这里我只实现了最少的功能,如果项目对你有帮助,不用问我,直接拿去用。

摄像头云台控制指令解析

· 12 min read
Allen
software engineer
此内容根据文章生成,仅用于文章内容的解释与总结

购买了一款云台摄像头,你可以在淘宝搜这个关键词知道它长什么样子。

它默认能通过 RS232 协议控制云台转动,但是现在新的主板已经没有这种圆形的接口了,基本都是 USB。

他也可以通过遥控器控制,但是我想尝试自己编写逻辑代码通过键盘控制。

效果是通过监听键盘上下左右等事件,调用对应云台运动的指令,运动到合适角度之后可以按下空格停止云台运动,按下 ESC 退出控制程序。

也可以按下某个按键如数字1,直接运动到预设角度。

咨询商家后,商家提供了 RS232 协议的指令集,所以这里通过 python 的 serial 库尝试通过 USB 口对其云台调用。

RS232 协议说明书

CommandCommand PacketComments
Stop8x 01 06 01 VV WW 03 03 FFVV: Pan Speed
Left8x 01 06 01 VV WW 01 03 FFWW: Tilt Speed
Right8x 01 06 01 VV WW 02 03 FFYYYY: Pan Position
Up8x 01 06 01 VV WW 03 01 FFZZZZ: Tilt Position
Down8x 01 06 01 VV WW 03 02 FF
UpLeft8x 01 06 01 VV WW 01 01 FF
UpRight8x 01 06 01 VV WW 02 01 FF
DownLeft8x 01 06 01 VV WW 01 02 FF
DownRight8x 01 06 01 VV WW 02 02 FF
Absolute Position8x 01 06 02 VV WW 0Y 0Y 0Y 0Y 0Z 0Z 0Z 0Z FF
Relative Position8x 01 06 03 VV WW 0Y 0Y 0Y 0Y 0Z 0Z 0Z 0Z FF
Home8x 01 06 04 FF
Reset8x 01 06 05 FF

这里有一些复合指令,譬如 UpRight:向上的同时向右,如果是手柄控制比较好,键盘控制比较鸡肋,所以这里我们实现:上下左右、暂停、复位、绝对定位这几个能用到与可能会用到的。

基数转换

这里的绝对定位和相对定位部分,出现了0Y 0Y 0Y 0Y0Z 0Z 0Z 0Z,我希望传入一个 10 进制的角度,譬如0、90、180,怎么映射到其中呢?这就体现我们学完二进制之后的敏感度了,把 20 转成 2 进制的过程是:

20 ÷ 2 = 10 余数: 0
10 ÷ 2 = 5 余数: 0
5 ÷ 2= 2 余数: 1
2 ÷ 2=1 余数: 0
1 ÷ 2= 0 余数: 1
按余数倒序排列: 10100

这里我们观察范例,对数据做了拆分,即如果需要把10进制映射到16进制上,譬如17转成16进制是11,那么应该变成0101。每个位置之间插入0

所以可以写出如下代码:

def calculate_pan_position_bytes(pan_pos_value):
HEX_VALUES = [4096, 256, 16, 1] # 定义常量
pan_pos_ints = []
for i, value in enumerate(HEX_VALUES):
pan_pos_ints.append(pan_pos_value // value)
pan_pos_value %= value
# 转换为2位16进制字符串
pan_pos_strs = [f"{i:02X}" for i in pan_pos_ints]
return "".join(pan_pos_strs)
# 将17转化为16进制,应该是11,拆分加0,应该是0101
# 前方补0到总长度为8位,结果与我们预期一致
print(calculate_pan_position_bytes(17))
# 00000101

接下来通过代入0到6000这样的数值传输给串口后发现,只能向左转。

0对应居中,4500对应向左转180,数字再大也是转到底。

4500比180 = 25比1,所以我们输入角度,乘以25就得到了对应的信号值。

根据手册说明水平转动范围为355度,一半则是177.5度,与肉眼观察基本一致,Z轴的范围是上下各21度。

刚刚只能向左转,那么向右转的答案就呼之欲出了,要么是补码(异或运算后加1),要么是首位为符号位。我们添加上限位和映射,先用补码试试完成这个函数(结果直接成了)。

def calculate_pan_position_bytes(pan_pos_value, axis_type):
"""
计算轴(旋转)的位置字节。

参数:
pan_pos_value (int): 位置值,
axis_type (str): 轴的类型 ('y' or 'Y' for Y-axis, others for Z-axis)

返回:
pan_step_str (str): 计算得到的平移位置字节,格式为十六进制字符串。
"""
if axis_type.lower() == "y":
pan_pos_value = max(-177.5, min(pan_pos_value, 177.5)) # 限制取值范围
else:
pan_pos_value = max(-21, min(pan_pos_value, 21)) # 限制取值范围

pan_pos_value = int(pan_pos_value * 25) # 将角度转换为步长
pan_direction = "-" if pan_pos_value < 0 else "+" # 设定旋转方向
pan_pos_value = abs(pan_pos_value) # 取绝对值

HEX_VALUES = [4096, 256, 16, 1] # 定义常量

pan_pos_ints = []
for i, value in enumerate(HEX_VALUES):
if pan_direction == "+":
pan_pos_ints.append(pan_pos_value // value)
else: # 异或操作
pan_pos_ints.append((pan_pos_value // value)^ 0xF)
if i == 3 : # 最后一个数字,取反后加1
pan_pos_ints[-1] = pan_pos_ints[-1]+1
pan_pos_value %= value

# 转换为2位16进制字符串
pan_pos_strs = [f"{i:02X}" for i in pan_pos_ints]
return "".join(pan_pos_strs)

代码目标效果

希望具体的指令都可以通过 Python 函数来实现,同时暴露出所有可能需要修改的参数。最后关联键盘事件。例如:

camera_control.py
import keyboard
from usbcamera import *
from usbcamera import move_to_absolute_position
"""
设备 "/dev/ttyUSB0" 的云台旋转至绝对定位:
Y轴转到180度,速度为9.
Z轴转到30度,速度为10
"""
move_to_absolute_position(vv=9, ww=10, Y=180, Z=30, device="COM16")

# 关联键盘事件和控制函数
keyboard.on_press_key("up", lambda _: turn_up(device="COM16"))
keyboard.on_press_key("down", lambda _: turn_down(device="COM16"))
keyboard.on_press_key("left", lambda _: turn_left(device="COM16"))
keyboard.on_press_key("right", lambda _: turn_right(device="COM16"))
keyboard.on_press_key("enter", lambda _: move_home(device="COM16"))
keyboard.on_press_key("space", lambda _: turn_stop(device="COM16"))
# 按下数字1则转动到水平最左,垂直最下,可以根据自己需要多预设几个目标角度。
keyboard.on_press_key("1", lambda _: move_to_absolute_position(vv=10, ww=10, Y=180, Z=-30, device="COM16"))

# 让脚本保持运行状态以捕获事件
keyboard.wait("esc") # 按 'esc' 键退出

信号机制

  • 当收到左转信号时,摄像头会持续左转,直到到达限位位置或接收到新指令。

  • 如果想要提前结束左转,可以在发送左转信号一定时间后发送停止指令,摄像头收到停止指令时会停止。

  • 每个云台旋转操作会持续一定时间,如果在旋转期间收到其他指令,会终止旧指令,执行当前指令。

逻辑代码

通常在 Windows 系统上,串口名称通常是 COMx(如 COM1、COM2),而在 Linux 系统上通常是/dev/ttyUSBx(如/dev/ttyUSB0)。

usbcamera.py
#!/usr/bin/env python3
# coding:utf-8

import serial
import serial.tools.list_ports
import time

# VISCA命令集
commands = {
"stop": "81010601{vv}{ww}0303FF",
"left": "81010601{vv}{ww}0103FF",
"right": "81010601{vv}{ww}0203FF",
"up": "81010601{vv}{ww}0301FF",
"down": "81010601{vv}{ww}0302FF",
"upleft": "81010601{vv}{ww}0101FF",
"upright": "81010601{vv}{ww}0201FF",
"downleft": "81010601{vv}{ww}0102FF",
"downright": "81010601{vv}{ww}0202FF",
"absolute_position": "81010602{vv}{ww}{Y}{Z}FF",
"relative_position": "81010603{vv}{ww}{Y}{Z}FF",
"home": "81010604FF",
"reset": "81010605FF",
}


def send_visca_command(command, device):
"""
通过串口向摄像机发送VISCA命令。

参数:
command (str): 要发送的VISCA命令,格式为十六进制字符串。

返回:
response (bytes): 从摄像机接收到的响应。
"""
try:
ser = serial.Serial(device, 9600, timeout=1) # 初始化串口
command_bytes = bytearray.fromhex(command) # 将命令转换为字节
ser.write(command_bytes) # 发送命令
response = ser.read_all() # 读取响应
ser.close() # 关闭串口
return response
except:
ports_list = list(serial.tools.list_ports.comports())
if len(ports_list) <= 0:
print("未发现端口")
else:
for comport in ports_list:
if "USB" in str(comport):
print("发现USB端口:", comport.device, comport.description)


def calculate_pan_speed_bytes(pan_speed_value):
"""
计算轴(旋转)的位置字节。

参数:
pan_speed_value (int): 速度值,0-16

返回:
pan_step_str (str): 计算得到的平移位置字节,格式为十六进制字符串。
"""

pan_speed_value = max(0, min(pan_speed_value, 16)) # 限制取值范围

# 转为2位16进制
return f"{pan_speed_value:02X}"


def calculate_pan_position_bytes(pan_pos_value, axis_type):
"""
计算轴(旋转)的位置字节。

参数:
pan_pos_value (int): 位置值,
axis_type (str): 轴的类型 ('y' or 'Y' for Y-axis, others for Z-axis)

返回:
pan_step_str (str): 计算得到的平移位置字节,格式为十六进制字符串。
"""
if axis_type.lower() == "y":
pan_pos_value = max(-177.5, min(pan_pos_value, 177.5)) # 限制取值范围
else:
pan_pos_value = max(-21, min(pan_pos_value, 21)) # 限制取值范围

pan_pos_value = int(pan_pos_value * 25) # 将角度转换为步长
pan_direction = "-" if pan_pos_value < 0 else "+" # 设定旋转方向
pan_pos_value = abs(pan_pos_value) # 取绝对值

HEX_VALUES = [4096, 256, 16, 1] # 定义常量

pan_pos_ints = []
for i, value in enumerate(HEX_VALUES):
if pan_direction == "+":
pan_pos_ints.append(pan_pos_value // value)
else: # 异或操作
pan_pos_ints.append((pan_pos_value // value)^ 0xF)
if i == 3 : # 最后一个数字,取反后加1
pan_pos_ints[-1] = pan_pos_ints[-1]+1
pan_pos_value %= value

# 转换为2位16进制字符串
pan_pos_strs = [f"{i:02X}" for i in pan_pos_ints]
return "".join(pan_pos_strs)


def create_command(command_key, vv=10, ww=10, Y=None, Z=None):
"""
创建VISCA命令。

参数:
command_key (str): 命令键名。
vv (str): 水平方向速度,取值范围为0-16
ww (str): 垂直方向速度,取值范围为0-16
Y (str): 控制水平旋转的位置。
Z (str): 控制垂直旋转的位置。

返回:
command (str): 格式化后的VISCA命令字符串。

异常:
ValueError: 当命令需要Y和Z参数时,若未提供,则抛出异常。
"""
if command_key in ["home", "reset"]:
return commands[command_key]
if command_key in ["absolute_position", "relative_position"]:
if Y is None or Z is None:
raise ValueError("Y和Z为位置命令,必须提供")
return commands[command_key].format(
vv=calculate_pan_speed_bytes(vv),
ww=calculate_pan_speed_bytes(ww),
Y=calculate_pan_position_bytes(Y, "y"),
Z=calculate_pan_position_bytes(Z, "z"),
)

return commands[command_key].format(
vv=calculate_pan_speed_bytes(vv),
ww=calculate_pan_speed_bytes(ww),
)


# 控制函数示例
def turn_stop(vv=0, ww=0, device="/dev/ttyUSB0"):
return send_visca_command(create_command("stop", vv, ww), device)


def turn_left(vv=10, ww=10, device="/dev/ttyUSB0"):
return send_visca_command(create_command("left", vv, ww), device)


def turn_right(vv=10, ww=10, device="/dev/ttyUSB0"):
return send_visca_command(create_command("right", vv, ww), device)


def turn_up(vv=10, ww=10, device="/dev/ttyUSB0"):
return send_visca_command(create_command("up", vv, ww), device)


def turn_down(vv=10, ww=10, device="/dev/ttyUSB0"):
return send_visca_command(create_command("down", vv, ww), device)


def move_home(device="/dev/ttyUSB0"):
return send_visca_command(create_command("home"), device)


def move_to_absolute_position(vv=10, ww=10, Y=0, Z=0, device="/dev/ttyUSB0"):
return send_visca_command(create_command("absolute_position", vv, ww, Y, Z), device)

后话

硬件相比软件来说,资料比较少,所以编写过程主要靠经验。

猜测轴旋转的角度和 4 个参数对应关系是最有意思的过程,有趣的功能背后全是数学。

用Flask搭建屏幕共享工具

· 5 min read
Allen
software engineer
此内容根据文章生成,仅用于文章内容的解释与总结
  1. 当你的电脑无法通过视频线连接到电视机,可能是由于线缆长度不足或者接口不兼容,而你的电视机恰好支持浏览器功能。

  2. 当你在外面参加培训或交流活动,现场只提供了一个WiFi网络。大家刚刚熟悉,马上就要开始屏幕分享,你需要一个快速的方式让大家都能看到你的屏幕。

这个时候,你就需要一个工具来捕获和分享你的屏幕和音频(包括设备音频和麦克风输入),并通过网页形式与他人共享。这样,观众无需下载任何会议软件,仅需打开浏览器即可观看。

安装依赖

pip install opencv-python Pillow greenlet pyaudio Flask

代码

share.py
from flask import Flask, Response, render_template_string, stream_with_context
import time
import cv2
from PIL import ImageGrab, Image
import threading
from io import BytesIO
from greenlet import getcurrent as get_ident
import pyaudio


class Audio:
def __init__(self):
self.FORMAT = pyaudio.paInt16
self.CHANNELS = 2
self.RATE = 44100
self.CHUNK = 1024
self.bitsPerSample = 16
self.audio = pyaudio.PyAudio()
self.first_run = True
self.wav_header = self.genHeader(self.RATE, self.bitsPerSample, self.CHANNELS)
self.stream = self.audio.open(
format=self.FORMAT,
channels=self.CHANNELS,
rate=self.RATE,
input=True,
input_device_index=1,
frames_per_buffer=self.CHUNK,
)
self.get_audio = self._get_audio_first_time

def genHeader(self, sampleRate, bitsPerSample, channels):
datasize = 2000 * 10**6
o = bytes("RIFF", "ascii")
o += (datasize + 36).to_bytes(4, "little")
o += bytes("WAVE", "ascii")
o += bytes("fmt ", "ascii")
o += (16).to_bytes(4, "little")
o += (1).to_bytes(2, "little")
o += (channels).to_bytes(2, "little")
o += (sampleRate).to_bytes(4, "little")
o += (sampleRate * channels * bitsPerSample // 8).to_bytes(4, "little")
o += (channels * bitsPerSample // 8).to_bytes(2, "little")
o += (bitsPerSample).to_bytes(2, "little")
o += bytes("data", "ascii")
o += (datasize).to_bytes(4, "little")
return o

def _get_audio_first_time(self):
data = self.stream.read(self.CHUNK)
self.get_audio = self._get_audio_subsequent_times # switch the method
return self.wav_header + data

def _get_audio_subsequent_times(self):
return self.stream.read(self.CHUNK)

class CameraEvent(object):
def __init__(self):
self.events = {}

def wait(self):
ident = get_ident()
if ident not in self.events:
self.events[ident] = [threading.Event(), time.time()]
return self.events[ident][0].wait()

def set(self):
now = time.time()
remove = None
for ident, event in self.events.items():
if not event[0].is_set():
event[0].set()
event[1] = now
else:
if now - event[1] > 5:
remove = ident
if remove:
del self.events[remove]

def clear(self):
self.events[get_ident()][0].clear()


class BaseCamera(object):
thread = None
frame = None
last_access = 0
event = CameraEvent()

def __init__(self):
if BaseCamera.thread is None:
BaseCamera.last_access = time.time()
BaseCamera.thread = threading.Thread(target=self._thread)
BaseCamera.thread.start()
while self.get_frame() is None:
time.sleep(0)

def get_frame(self):
BaseCamera.last_access = time.time()
BaseCamera.event.wait()
BaseCamera.event.clear()
return BaseCamera.frame

@staticmethod
def frames():
raise RuntimeError("Must be implemented by subclasses.")

@classmethod
def _thread(cls):
print("Starting camera thread.")
frames_iterator = cls.frames()
for frame in frames_iterator:
BaseCamera.frame = frame
BaseCamera.event.set()
time.sleep(0)
if time.time() - BaseCamera.last_access > 10:
frames_iterator.close()
print("Stopping camera thread due to inactivity.")
break
BaseCamera.thread = None


class Camera(BaseCamera):
video_source = 0

@staticmethod
def set_video_source(source):
Camera.video_source = source

@staticmethod
def frames():
camera = cv2.VideoCapture(Camera.video_source)
if not camera.isOpened():
raise RuntimeError("Error")
while True:
image = ImageGrab.grab()
image = image.resize((1366, 750), Image.LANCZOS)
output_buffer = BytesIO()
image.save(output_buffer, format="JPEG", quality=100)
frame = output_buffer.getvalue()
yield frame
app = Flask(__name__)

def gen(camera):
while True:
frame = camera.get_frame()
yield (b"--frame\r\n" b"Content-Type: image/jpeg\r\n\r\n" + frame + b"\r\n")

def gen_audio(audio):
while True:
data = audio.get_audio()
yield data

@app.route("/video_feed")
def video_feed():
return Response(gen(Camera()), mimetype="multipart/x-mixed-replace; boundary=frame")

@app.route("/audio_feed")
def audio_feed():
return Response(stream_with_context(gen_audio(Audio())))

@app.route("/")
def index():
global mode
video_tag = """<img src="{{ url_for('video_feed') }}">"""
audio_tag = """<audio autoplay style="display:none;"><source src="{{ url_for('audio_feed') }}" type="audio/x-wav; codec=pcm">Your browser does not support the audio element.</audio>"""

tags = {0: video_tag + audio_tag, 1: audio_tag, 2: video_tag}

content = tags[mode]

return render_template_string(
"""<html>
<head>
<title>{title}</title>
<link rel="icon" href="">
</head>
<body>{content}</body>
</html>""".format(
title=["Intranet Broadcast", "Audio Sharing", "Screen Sharing"][mode],
content=content,
)
)

if __name__ == "__main__":
local_host = "127.0.0.1"
ip_host = "0.0.0.0"
port = 8001
mode = int(input("Please select the mode: 0 for Intranet Broadcast, 1 for Audio Sharing, 2 for Screen Sharing: "))
app.run(threaded=True, host=ip_host, port=port)

运行程序后,程序会提示你输入一个数字:

0表示同时分享屏幕和音频
1表示仅分享音频
2表示仅分享屏幕

输入相应数字后按回车键即可。程序运行后,会在控制台输出一个URL。你只需在浏览器中输入这个URL,就可以看到你的屏幕和音频了。

后话

你可以在此项目的基础上进行扩展,增加更多功能,如:

  • 识别当前音频并将其转化为文本,与屏幕共享一起传输,这样观众就可以在屏幕上看到你的讲话内容。
  • 结合翻译API,实现实时翻译功能。
  • 压缩屏幕画面质量,获得更流畅的传输效果等等。

自制智能家居流程

· 17 min read
Allen
software engineer
此内容根据文章生成,仅用于文章内容的解释与总结

如果你想要实现这样一个功能:当窗外开始下雨,窗户自动关闭

如果你想知道解决方案,可以直接跳到最后一部分。

我们仅看前半部分,那么你需要:检测窗外是否有雨水,并把这个信息传递给窗户控制器

这个过程中,你需要:

  • 传感器:检测窗外是否有雨水/或者获取网络上的天气信息
  • 单片机主控板:可以接收传感器的数据,并收发网络请求
  • 执行器:窗户控制器,可以接收单片机的指令,控制窗户的开关
  • 供电:保证传感器和主控板的正常工作

这个过程你需要知道以下内容:单片机如何烧录程序、传感器如何连接、如何获取传感器数据、如何发送网络数据、如何制作外壳(例如 3D 打印)。

传感器基础知识

  • 负极表示符号: - / G / Gnd / 黑色
  • 正极表示符号:+ / V / Vcc / 红色
  • 信号管脚: S 可以表示信号,根据传感器的不同,参数范围是 0-1023 或 0 1
  • 模拟量信号管脚表示符号: A 参数范围在 0-1023
  • 数字量信号管脚表示符号:D 参数为 0 或 1
tip

如果标识与颜色发生冲突,一般以标识为准:例如接口上写着 V,但连接线颜色为黑,一般当作正极处理。

当单片机通电时,所有的引脚都带电,但是只有信号管脚的电压会随着传感器的变化而变化。

因此传感器的正负极理论上可以任意连接,只需保证信号管脚连接指定的即可。

A 口的功能比 D 口更加强大,因为 A 口可以接收模拟信号,而 D 口只能接收数字信号。因此部分传感器 D 接 A 也可以正常读数。

两管脚

常见的有扬声器、电机(俗称马达)。

这类设备因为较为特殊,一般有专门的接口,或者占用 2 个信号端口,通过信号的变化来工作。

马达往往需要更大的工作电压,如果没有专门的连接口,有可能需要在板上使用跳针切换工作电压。

三管脚

这类传感器数量最多,往往由 GVA 或者 GVD 组成。

使用时,正负极与单片机正负极连接,信号线与板上标注的 A 或 D 进行连接。

四管脚

四管脚传感器分为很多类

特殊接口的,譬如:人体温度传感器

正负极与信号口一般都专门对应的位置供连接。

双信号接口的,譬如:超声波

一般有四个接口:GVTE,其中 GV 正常连接,T 和 E 都接在信号管脚上。

同时接收 AD 的,譬如:烟雾传感器

一般有四个接口:GVAD,其中 GV 正常连接,D 表示有没有烟雾,A 表示烟雾浓度。分别接在对应的信号管脚即可。

五管脚

譬如:摇杆传感器

一般五个接口为:GVXYB,其中 GV 正常连接,X 表示 X 轴(是模拟量接 A)、Y 表示 Y 轴(是模拟量接 A)、B 表示按钮(是数字量接 D)

其他特殊类

其他特殊的传感器一般有特殊接口,譬如:摄像头、屏幕等。

根据说明接入即可。

与单片机通信

单片机(Microcontroller Unit,简称 MCU)是指一个微型计算机集成在一个单独的微型芯片中,它包括处理器(CPU)、内存(通常包括 RAM 和 ROM)、以及各种输入/输出(I/O)接口等在内的完整计算设备。

单片机设计用于嵌入式应用,通常在硬件设备中执行特定任务。例如,你的电视遥控器可能就是由一个单片机控制的,它可以接收你的输入,然后发送相应的信号到电视上。其他常见的单片机应用包括玩具、家用电器、医疗设备、汽车等。

有的单片机可以使用完整的 Python,譬如华硕的 thinker edge R、部分树莓派,有的 Arduino 板子、ESP32 等只能使用简化的 MicroPython。

tip

当我们希望通过 windows 计算机的 USB 接口和单片机设备进行串口通信时,需要将 USB 接口转换为标准的串行接口,这个过程需要一个介于 USB 和串口之间的翻译,我下面的驱动就是这个翻译。

在搜索引擎中搜 CH341SER 驱动

过程中所有弹窗有下一步点下一步,有确认点确认

在编程软件中识别单片机

常用的编程软件有:Scratch、Mixly、Mixly2、MaixPy 等。

有的支持图形化编程与代码编程,有的需要仅支持代码编程。

下载对应的编程软件后,打开软件。

选择主控这个环节,不同软件的选择方式不同。

tip
  • Vegeta 这样基于 Scratch 的编程软件,需要从左下角选择添加对应的主控型号。

  • Mixly 从右下角,串口旁的下拉菜单选择对应的主控型号。

  • Mixly2 从登录菜单中主控型号后,进入代码编辑页,右上角选择串口旁可以选择更加详细的主控型号。

  • MaixPy 从上方的工具页面中选择开发板型号。

通过连接线连接电脑与单片机。此时可能会有多种情况:

  1. 会提示:有串口连接,并弹出且仅弹出 1 个串口。
  2. 识别计算机上的所有串口,需要自己选择(可以通过反复插拔确认新增的端口号)。
  3. 不弹出任何串口,需要主控通电启动后才识别串口。
  4. 也有的串口时有时无,此时可以考虑:连接线接触不良(更换连接线),或者是主控/USB 电压不稳定——常见于学校机房(主控或电脑独立供电)
  5. 还有的默认的波特率需要调整,否则无法识别传输信号。

连接成功后记得初始化固件,使其恢复到软件对应的固件版本。类似 Android 手机的刷机/恢复出厂设置。

单片机编程

这里的传感器特指狭义的通过半导体检测物理量的传感器,如温度传感器、湿度传感器、光敏传感器等。这些传感器的特点是:输出信号是数字/模拟信号。

数字量传感器的输出信号是数字信号,他的特点是只 返回/发出 两种状态:高电平和低电平。对应在代码中是 1 和 0 。

  • 如声音传感器如果是数字量传感器,当检测到声音时输出高电平,否则输出低电平。

  • 如小灯,输出高电平表示亮,输出低电平表示灭。

模拟量传感器的输出信号是模拟信号,他的特点是输出的电压值是连续变化的。对应在代码中是 0-1023(通常如此,并非绝对) 。

  • 还是以声音传感器为例,如果是模拟量传感器,当检测到声音时输出的电压值会随着声音的大小而变化。

  • 还是以小灯为例,输出最大值表示最亮,输出最小值表示最暗,亮度会随输出的电压值变化。

有的传感器同时支持数字量和模拟量输出,有的不是。

因此,对于不确认的传感器,我们一般先假设传感器是模拟量传感器,如果不是,再当作数据量处理。

模拟量传感器读取

下面以 32 接口为例

import machine
adc32 = machine.ADC(machine.Pin(32))
while True:
print(adc32.read_u16())

模拟量传感器输出

下面以 0 接口为例

import machine
pwm0 = machine.PWM(machine.Pin(0))
pwm0.duty_u16(0)
pwm0.duty_u16(255)

然而,有些动力类传感器需要设置占空比:占空比主要与脉冲宽度调制(Pulse Width Modulation,PWM)相关,它是一种模拟信号的数字化表示方法。在 PWM 中,一个周期内的高电平时间占总周期时间的比例就是占空比。

传感器的输出类型可以有多种,包括模拟电压、模拟电流、数字信号(如 I2C、SPI、UART 等)、频率、PWM 等。只有在使用 PWM 输出的传感器时,才需要设置占空比。例如,一些伺服电机会使用 PWM 信号来控制其位置,这时就需要设置占空比。

对于其他类型的传感器,如模拟电压输出的传感器、数字信号输出的传感器等,就不需要设置占空比。这些传感器的输出通常是连续的或者是特定的数字信号,不涉及到占空比的概念。

from machine import Pin, PWM
import time

# 创建一个PWM对象
pwm = PWM(Pin(2))

# 设置PWM信号的频率为50Hz
# 每秒50个周期,所以每个周期的时间是1秒/50,即20ms。
pwm.freq(50)

# 一般来说,当PWM信号的高电平时间为1ms时,舵机转到0度;
# 当高电平时间为2ms时,舵机转到最大角度。
# 这个范围内的其他高电平时间对应的是0到180度之间的其他角度。

# 转到0度()
pwm.duty(52) # 1ms / 20ms * 1024 = 51.2 取不低于最小值的整:52
time.sleep(1) # 等待一段时间让舵机转到指定位置

# 转到180度
pwm.duty(102) # 2ms / 20ms * 1024 = 102.4 取不高于最大值的整:102
time.sleep(1) # 等待一段时间让舵机转到指定位置

# 关闭PWM
pwm.deinit()

数字量传感器读取

import machine

pin0 = machine.Pin(0, machine.Pin.IN)
while True:
print(pin0.value())

数字量传感器输出

import machine
import time

pin13 = machine.Pin(13, machine.Pin.OUT)
while True:
pin13.value(0)
time.sleep_ms(50)
pin13.value(1)
time.sleep_ms(50)

单片机网络通信

获取天气

心知天气 API 分为免费版、付费版等多个坂本,不同的版本返回的数据数量有所不同。

免费版仅返回三种基本数据,付费版可以返回多种数据。mixly 中默认的 KEY 为高级付费版,可返回全部数据。

数据返回的格式为字典,因此可以通过如下方式进行解包,下面的代码提供了部分数据解包的方法。

需要注意的是,该功能为联网功能,需要在联网环境下使用,确保 wifi 名和密码正确。

import mixiot
import machine
import seniverse_api


mixiot.wlan_connect('wifiname','wifipassword')
print(seniverse_api.weather_now('SGJl0ExVN-4j27msR','北京'))

onenet 物联网传输数据至云端

onenet 物联网是中国移动推出的物联网交互平台,主要面向一般开发者,因此 AIbox 这款设备可以使用 onenet 物联网平台进行数据传输。

相比于 mixio 这样专注于单片机的物联网平台来说,onenet 的文档与接口可能会频繁变动,如有出入以官网教程为准。

onenet 物联网平台网址:https://open.iot.10086.cn/doc/

文档中提供了传输文本与文件 2 种方式

import json
import asyncio
import websockets
from uuid import uuid4

# 音频文件测试路径。
audioFile = "test.mp3"
# 使用自己产品Id和apikey替换下列参数。
productId = "x"
apikey = "x"

#发送文本请求
async def textRequest(ws):
content = {
"aiType":"dm",
"topic": 'nlu.input.text',
"recordId": uuid4().hex,
"refText": "测试" #修改文本请求的输入
}
try:
await ws.send(json.dumps(content))
resp = await ws.recv()
print(resp)
except websockets.exceptions.ConnectionClosed as exp:
print(exp)

#发送音频请求
async def audioRequest(ws):
content = {
"aiType": "dm", #可选dm/asr, dm获取对话结果,asr只获取asr结果
"topic": "recorder.stream.start",
"recordId": uuid4().hex,
"audio": {
"audioType": "mp3", #修改为测试文件的类型
"sampleRate": 16000, #修改为测试文件的sampleRate
"channel": 1, #修改为测试文件的channel
"sampleBytes": 2 #修改为测试文件的sampleBytes
},
"asrParams": {
"realBack": True, #实时返回asr结果
"enableVAD": True, #启动VAD
"enablePunctuation": True, #返回结果是否带拼音
"enableTone": True, #返回结果是否带声调
"enableConfidence": True, #返回结果是否带置信度
"enableNumberConvert": True, #返回结果是否进行数字转换
},
}
try:
#发送文本消息
await ws.send(json.dumps(content))
# 发送音频消息
with open(audioFile, 'rb') as f:
while True:
chunk = f.read(400) #wav buffsize=3200 其他的400
if not chunk:
await ws.send(bytes("", encoding="utf-8"))
break
print(len(chunk))
await ws.send(chunk)
async for message in ws:
print(message)
resp = json.loads(message)
if 'dm' in resp:
break
except websockets.exceptions.ConnectionClosed as exp:
print(exp)
ws.close()

async def dds_demo():
url = f"ws://botai-dsg.and-home.cn:4443/dsg/v1/prod?productId={productId}&apikey={apikey}"
print(url)
async with websockets.connect(url) as websocket:
#await textRequest(websocket) #发送文本请求
await audioRequest(websocket) #发送音频请求
asyncio.get_event_loop().run_until_complete(dds_demo())

后话

最后,通过大量的学习和试错打样,你发现米家雨水传感器,淘宝 46 包邮,搞活动更便宜,这大概是你最后的选择。