Skip to main content

3 posts tagged with "smarthome"

View All Tags

解析摄像头云台控制指令

· 12 min read
Allen
software engineer
此内容根据文章生成,仅用于文章内容的解释与总结

购买了一款云台摄像头,你可以在淘宝搜这个关键词知道它长什么样子。

它默认能通过 RS232 协议控制云台转动,但是现在新的主板已经没有这种圆形的接口了,基本都是 USB。

他也可以通过遥控器控制,但是我想尝试自己编写逻辑代码通过键盘控制。

效果是通过监听键盘上下左右等事件,调用对应云台运动的指令,运动到合适角度之后可以按下空格停止云台运动,按下 ESC 退出控制程序。

也可以按下某个按键如数字1,直接运动到预设角度。

咨询商家后,商家提供了 RS232 协议的指令集,所以这里通过 python 的 serial 库尝试通过 USB 口对其云台调用。

RS232 协议说明书

CommandCommand PacketComments
Stop8x 01 06 01 VV WW 03 03 FFVV: Pan Speed
Left8x 01 06 01 VV WW 01 03 FFWW: Tilt Speed
Right8x 01 06 01 VV WW 02 03 FFYYYY: Pan Position
Up8x 01 06 01 VV WW 03 01 FFZZZZ: Tilt Position
Down8x 01 06 01 VV WW 03 02 FF
UpLeft8x 01 06 01 VV WW 01 01 FF
UpRight8x 01 06 01 VV WW 02 01 FF
DownLeft8x 01 06 01 VV WW 01 02 FF
DownRight8x 01 06 01 VV WW 02 02 FF
Absolute Position8x 01 06 02 VV WW 0Y 0Y 0Y 0Y 0Z 0Z 0Z 0Z FF
Relative Position8x 01 06 03 VV WW 0Y 0Y 0Y 0Y 0Z 0Z 0Z 0Z FF
Home8x 01 06 04 FF
Reset8x 01 06 05 FF

这里有一些复合指令,譬如 UpRight:向上的同时向右,如果是手柄控制比较好,键盘控制比较鸡肋,所以这里我们实现:上下左右、暂停、复位、绝对定位这几个能用到与可能会用到的。

基数转换

这里的绝对定位和相对定位部分,出现了0Y 0Y 0Y 0Y0Z 0Z 0Z 0Z,我希望传入一个 10 进制的角度,譬如0、90、180,怎么映射到其中呢?这就体现我们学完二进制之后的敏感度了,把 20 转成 2 进制的过程是:

20 ÷ 2 = 10 余数: 0
10 ÷ 2 = 5 余数: 0
5 ÷ 2= 2 余数: 1
2 ÷ 2=1 余数: 0
1 ÷ 2= 0 余数: 1
按余数倒序排列: 10100

这里我们观察范例,对数据做了拆分,即如果需要把10进制映射到16进制上,譬如17转成16进制是11,那么应该变成0101。每个位置之间插入0

所以可以写出如下代码:

def calculate_pan_position_bytes(pan_pos_value):
HEX_VALUES = [4096, 256, 16, 1] # 定义常量
pan_pos_ints = []
for i, value in enumerate(HEX_VALUES):
pan_pos_ints.append(pan_pos_value // value)
pan_pos_value %= value
# 转换为2位16进制字符串
pan_pos_strs = [f"{i:02X}" for i in pan_pos_ints]
return "".join(pan_pos_strs)
# 将17转化为16进制,应该是11,拆分加0,应该是0101
# 前方补0到总长度为8位,结果与我们预期一致
print(calculate_pan_position_bytes(17))
# 00000101

接下来通过代入0到6000这样的数值传输给串口后发现,只能向左转。

0对应居中,4500对应向左转180,数字再大也是转到底。

4500比180 = 25比1,所以我们输入角度,乘以25就得到了对应的信号值。

根据手册说明水平转动范围为355度,一半则是177.5度,与肉眼观察基本一致,Z轴的范围是上下各21度。

刚刚只能向左转,那么向右转的答案就呼之欲出了,要么是补码(异或运算后加1),要么是首位为符号位。我们添加上限位和映射,先用补码试试完成这个函数(结果直接成了)。

def calculate_pan_position_bytes(pan_pos_value, axis_type):
"""
计算轴(旋转)的位置字节。

参数:
pan_pos_value (int): 位置值,
axis_type (str): 轴的类型 ('y' or 'Y' for Y-axis, others for Z-axis)

返回:
pan_step_str (str): 计算得到的平移位置字节,格式为十六进制字符串。
"""
if axis_type.lower() == "y":
pan_pos_value = max(-177.5, min(pan_pos_value, 177.5)) # 限制取值范围
else:
pan_pos_value = max(-21, min(pan_pos_value, 21)) # 限制取值范围

pan_pos_value = int(pan_pos_value * 25) # 将角度转换为步长
pan_direction = "-" if pan_pos_value < 0 else "+" # 设定旋转方向
pan_pos_value = abs(pan_pos_value) # 取绝对值

HEX_VALUES = [4096, 256, 16, 1] # 定义常量

pan_pos_ints = []
for i, value in enumerate(HEX_VALUES):
if pan_direction == "+":
pan_pos_ints.append(pan_pos_value // value)
else: # 异或操作
pan_pos_ints.append((pan_pos_value // value)^ 0xF)
if i == 3 : # 最后一个数字,取反后加1
pan_pos_ints[-1] = pan_pos_ints[-1]+1
pan_pos_value %= value

# 转换为2位16进制字符串
pan_pos_strs = [f"{i:02X}" for i in pan_pos_ints]
return "".join(pan_pos_strs)

代码目标效果

希望具体的指令都可以通过 Python 函数来实现,同时暴露出所有可能需要修改的参数。最后关联键盘事件。例如:

camera_control.py
import keyboard
from usbcamera import *
from usbcamera import move_to_absolute_position
"""
设备 "/dev/ttyUSB0" 的云台旋转至绝对定位:
Y轴转到180度,速度为9.
Z轴转到30度,速度为10
"""
move_to_absolute_position(vv=9, ww=10, Y=180, Z=30, device="COM16")

# 关联键盘事件和控制函数
keyboard.on_press_key("up", lambda _: turn_up(device="COM16"))
keyboard.on_press_key("down", lambda _: turn_down(device="COM16"))
keyboard.on_press_key("left", lambda _: turn_left(device="COM16"))
keyboard.on_press_key("right", lambda _: turn_right(device="COM16"))
keyboard.on_press_key("enter", lambda _: move_home(device="COM16"))
keyboard.on_press_key("space", lambda _: turn_stop(device="COM16"))
# 按下数字1则转动到水平最左,垂直最下,可以根据自己需要多预设几个目标角度。
keyboard.on_press_key("1", lambda _: move_to_absolute_position(vv=10, ww=10, Y=180, Z=-30, device="COM16"))

# 让脚本保持运行状态以捕获事件
keyboard.wait("esc") # 按 'esc' 键退出

信号机制

  • 当收到左转信号时,摄像头会持续左转,直到到达限位位置或接收到新指令。

  • 如果想要提前结束左转,可以在发送左转信号一定时间后发送停止指令,摄像头收到停止指令时会停止。

  • 每个云台旋转操作会持续一定时间,如果在旋转期间收到其他指令,会终止旧指令,执行当前指令。

逻辑代码

通常在 Windows 系统上,串口名称通常是 COMx(如 COM1、COM2),而在 Linux 系统上通常是/dev/ttyUSBx(如/dev/ttyUSB0)。

usbcamera.py
#!/usr/bin/env python3
# coding:utf-8

import serial
import serial.tools.list_ports
import time

# VISCA命令集
commands = {
"stop": "81010601{vv}{ww}0303FF",
"left": "81010601{vv}{ww}0103FF",
"right": "81010601{vv}{ww}0203FF",
"up": "81010601{vv}{ww}0301FF",
"down": "81010601{vv}{ww}0302FF",
"upleft": "81010601{vv}{ww}0101FF",
"upright": "81010601{vv}{ww}0201FF",
"downleft": "81010601{vv}{ww}0102FF",
"downright": "81010601{vv}{ww}0202FF",
"absolute_position": "81010602{vv}{ww}{Y}{Z}FF",
"relative_position": "81010603{vv}{ww}{Y}{Z}FF",
"home": "81010604FF",
"reset": "81010605FF",
}


def send_visca_command(command, device):
"""
通过串口向摄像机发送VISCA命令。

参数:
command (str): 要发送的VISCA命令,格式为十六进制字符串。

返回:
response (bytes): 从摄像机接收到的响应。
"""
try:
ser = serial.Serial(device, 9600, timeout=1) # 初始化串口
command_bytes = bytearray.fromhex(command) # 将命令转换为字节
ser.write(command_bytes) # 发送命令
response = ser.read_all() # 读取响应
ser.close() # 关闭串口
return response
except:
ports_list = list(serial.tools.list_ports.comports())
if len(ports_list) <= 0:
print("未发现端口")
else:
for comport in ports_list:
if "USB" in str(comport):
print("发现USB端口:", comport.device, comport.description)


def calculate_pan_speed_bytes(pan_speed_value):
"""
计算轴(旋转)的位置字节。

参数:
pan_speed_value (int): 速度值,0-16

返回:
pan_step_str (str): 计算得到的平移位置字节,格式为十六进制字符串。
"""

pan_speed_value = max(0, min(pan_speed_value, 16)) # 限制取值范围

# 转为2位16进制
return f"{pan_speed_value:02X}"


def calculate_pan_position_bytes(pan_pos_value, axis_type):
"""
计算轴(旋转)的位置字节。

参数:
pan_pos_value (int): 位置值,
axis_type (str): 轴的类型 ('y' or 'Y' for Y-axis, others for Z-axis)

返回:
pan_step_str (str): 计算得到的平移位置字节,格式为十六进制字符串。
"""
if axis_type.lower() == "y":
pan_pos_value = max(-177.5, min(pan_pos_value, 177.5)) # 限制取值范围
else:
pan_pos_value = max(-21, min(pan_pos_value, 21)) # 限制取值范围

pan_pos_value = int(pan_pos_value * 25) # 将角度转换为步长
pan_direction = "-" if pan_pos_value < 0 else "+" # 设定旋转方向
pan_pos_value = abs(pan_pos_value) # 取绝对值

HEX_VALUES = [4096, 256, 16, 1] # 定义常量

pan_pos_ints = []
for i, value in enumerate(HEX_VALUES):
if pan_direction == "+":
pan_pos_ints.append(pan_pos_value // value)
else: # 异或操作
pan_pos_ints.append((pan_pos_value // value)^ 0xF)
if i == 3 : # 最后一个数字,取反后加1
pan_pos_ints[-1] = pan_pos_ints[-1]+1
pan_pos_value %= value

# 转换为2位16进制字符串
pan_pos_strs = [f"{i:02X}" for i in pan_pos_ints]
return "".join(pan_pos_strs)


def create_command(command_key, vv=10, ww=10, Y=None, Z=None):
"""
创建VISCA命令。

参数:
command_key (str): 命令键名。
vv (str): 水平方向速度,取值范围为0-16
ww (str): 垂直方向速度,取值范围为0-16
Y (str): 控制水平旋转的位置。
Z (str): 控制垂直旋转的位置。

返回:
command (str): 格式化后的VISCA命令字符串。

异常:
ValueError: 当命令需要Y和Z参数时,若未提供,则抛出异常。
"""
if command_key in ["home", "reset"]:
return commands[command_key]
if command_key in ["absolute_position", "relative_position"]:
if Y is None or Z is None:
raise ValueError("Y和Z为位置命令,必须提供")
return commands[command_key].format(
vv=calculate_pan_speed_bytes(vv),
ww=calculate_pan_speed_bytes(ww),
Y=calculate_pan_position_bytes(Y, "y"),
Z=calculate_pan_position_bytes(Z, "z"),
)

return commands[command_key].format(
vv=calculate_pan_speed_bytes(vv),
ww=calculate_pan_speed_bytes(ww),
)


# 控制函数示例
def turn_stop(vv=0, ww=0, device="/dev/ttyUSB0"):
return send_visca_command(create_command("stop", vv, ww), device)


def turn_left(vv=10, ww=10, device="/dev/ttyUSB0"):
return send_visca_command(create_command("left", vv, ww), device)


def turn_right(vv=10, ww=10, device="/dev/ttyUSB0"):
return send_visca_command(create_command("right", vv, ww), device)


def turn_up(vv=10, ww=10, device="/dev/ttyUSB0"):
return send_visca_command(create_command("up", vv, ww), device)


def turn_down(vv=10, ww=10, device="/dev/ttyUSB0"):
return send_visca_command(create_command("down", vv, ww), device)


def move_home(device="/dev/ttyUSB0"):
return send_visca_command(create_command("home"), device)


def move_to_absolute_position(vv=10, ww=10, Y=0, Z=0, device="/dev/ttyUSB0"):
return send_visca_command(create_command("absolute_position", vv, ww, Y, Z), device)

后话

硬件相比软件来说,资料比较少,所以编写过程主要靠经验。

猜测轴旋转的角度和 4 个参数对应关系是最有意思的过程,有趣的功能背后全是数学。

使用flask搭建个人共享屏幕工具

· 5 min read
Allen
software engineer
此内容根据文章生成,仅用于文章内容的解释与总结
  1. 当你的电脑无法通过视频线连接到电视机,可能是由于线缆长度不足或者接口不兼容,而你的电视机恰好支持浏览器功能。

  2. 当你在外面参加培训或交流活动,现场只提供了一个WiFi网络。大家刚刚熟悉,马上就要开始屏幕分享,你需要一个快速的方式让大家都能看到你的屏幕。

这个时候,你就需要一个工具来捕获和分享你的屏幕和音频(包括设备音频和麦克风输入),并通过网页形式与他人共享。这样,观众无需下载任何会议软件,仅需打开浏览器即可观看。

安装依赖

pip install opencv-python Pillow greenlet pyaudio Flask

代码

share.py
from flask import Flask, Response, render_template_string, stream_with_context
import time
import cv2
from PIL import ImageGrab, Image
import threading
from io import BytesIO
from greenlet import getcurrent as get_ident
import pyaudio


class Audio:
def __init__(self):
self.FORMAT = pyaudio.paInt16
self.CHANNELS = 2
self.RATE = 44100
self.CHUNK = 1024
self.bitsPerSample = 16
self.audio = pyaudio.PyAudio()
self.first_run = True
self.wav_header = self.genHeader(self.RATE, self.bitsPerSample, self.CHANNELS)
self.stream = self.audio.open(
format=self.FORMAT,
channels=self.CHANNELS,
rate=self.RATE,
input=True,
input_device_index=1,
frames_per_buffer=self.CHUNK,
)
self.get_audio = self._get_audio_first_time

def genHeader(self, sampleRate, bitsPerSample, channels):
datasize = 2000 * 10**6
o = bytes("RIFF", "ascii")
o += (datasize + 36).to_bytes(4, "little")
o += bytes("WAVE", "ascii")
o += bytes("fmt ", "ascii")
o += (16).to_bytes(4, "little")
o += (1).to_bytes(2, "little")
o += (channels).to_bytes(2, "little")
o += (sampleRate).to_bytes(4, "little")
o += (sampleRate * channels * bitsPerSample // 8).to_bytes(4, "little")
o += (channels * bitsPerSample // 8).to_bytes(2, "little")
o += (bitsPerSample).to_bytes(2, "little")
o += bytes("data", "ascii")
o += (datasize).to_bytes(4, "little")
return o

def _get_audio_first_time(self):
data = self.stream.read(self.CHUNK)
self.get_audio = self._get_audio_subsequent_times # switch the method
return self.wav_header + data

def _get_audio_subsequent_times(self):
return self.stream.read(self.CHUNK)

class CameraEvent(object):
def __init__(self):
self.events = {}

def wait(self):
ident = get_ident()
if ident not in self.events:
self.events[ident] = [threading.Event(), time.time()]
return self.events[ident][0].wait()

def set(self):
now = time.time()
remove = None
for ident, event in self.events.items():
if not event[0].is_set():
event[0].set()
event[1] = now
else:
if now - event[1] > 5:
remove = ident
if remove:
del self.events[remove]

def clear(self):
self.events[get_ident()][0].clear()


class BaseCamera(object):
thread = None
frame = None
last_access = 0
event = CameraEvent()

def __init__(self):
if BaseCamera.thread is None:
BaseCamera.last_access = time.time()
BaseCamera.thread = threading.Thread(target=self._thread)
BaseCamera.thread.start()
while self.get_frame() is None:
time.sleep(0)

def get_frame(self):
BaseCamera.last_access = time.time()
BaseCamera.event.wait()
BaseCamera.event.clear()
return BaseCamera.frame

@staticmethod
def frames():
raise RuntimeError("Must be implemented by subclasses.")

@classmethod
def _thread(cls):
print("Starting camera thread.")
frames_iterator = cls.frames()
for frame in frames_iterator:
BaseCamera.frame = frame
BaseCamera.event.set()
time.sleep(0)
if time.time() - BaseCamera.last_access > 10:
frames_iterator.close()
print("Stopping camera thread due to inactivity.")
break
BaseCamera.thread = None


class Camera(BaseCamera):
video_source = 0

@staticmethod
def set_video_source(source):
Camera.video_source = source

@staticmethod
def frames():
camera = cv2.VideoCapture(Camera.video_source)
if not camera.isOpened():
raise RuntimeError("Error")
while True:
image = ImageGrab.grab()
image = image.resize((1366, 750), Image.LANCZOS)
output_buffer = BytesIO()
image.save(output_buffer, format="JPEG", quality=100)
frame = output_buffer.getvalue()
yield frame
app = Flask(__name__)

def gen(camera):
while True:
frame = camera.get_frame()
yield (b"--frame\r\n" b"Content-Type: image/jpeg\r\n\r\n" + frame + b"\r\n")

def gen_audio(audio):
while True:
data = audio.get_audio()
yield data

@app.route("/video_feed")
def video_feed():
return Response(gen(Camera()), mimetype="multipart/x-mixed-replace; boundary=frame")

@app.route("/audio_feed")
def audio_feed():
return Response(stream_with_context(gen_audio(Audio())))

@app.route("/")
def index():
global mode
video_tag = """<img src="{{ url_for('video_feed') }}">"""
audio_tag = """<audio autoplay style="display:none;"><source src="{{ url_for('audio_feed') }}" type="audio/x-wav; codec=pcm">Your browser does not support the audio element.</audio>"""

tags = {0: video_tag + audio_tag, 1: audio_tag, 2: video_tag}

content = tags[mode]

return render_template_string(
"""<html>
<head>
<title>{title}</title>
<link rel="icon" href="">
</head>
<body>{content}</body>
</html>""".format(
title=["Intranet Broadcast", "Audio Sharing", "Screen Sharing"][mode],
content=content,
)
)

if __name__ == "__main__":
local_host = "127.0.0.1"
ip_host = "0.0.0.0"
port = 8001
mode = int(input("Please select the mode: 0 for Intranet Broadcast, 1 for Audio Sharing, 2 for Screen Sharing: "))
app.run(threaded=True, host=ip_host, port=port)

运行程序后,程序会提示你输入一个数字:

0表示同时分享屏幕和音频
1表示仅分享音频
2表示仅分享屏幕

输入相应数字后按回车键即可。程序运行后,会在控制台输出一个URL。你只需在浏览器中输入这个URL,就可以看到你的屏幕和音频了。

后话

你可以在此项目的基础上进行扩展,增加更多功能,如:

  • 识别当前音频并将其转化为文本,与屏幕共享一起传输,这样观众就可以在屏幕上看到你的讲话内容。
  • 结合翻译API,实现实时翻译功能。
  • 压缩屏幕画面质量,获得更流畅的传输效果等等。

自制智能家居

· 17 min read
Allen
software engineer
此内容根据文章生成,仅用于文章内容的解释与总结

如果你想要实现这样一个功能:当窗外开始下雨,窗户自动关闭

如果你想知道解决方案,可以直接跳到最后一部分。

我们仅看前半部分,那么你需要:检测窗外是否有雨水,并把这个信息传递给窗户控制器

这个过程中,你需要:

  • 传感器:检测窗外是否有雨水/或者获取网络上的天气信息
  • 单片机主控板:可以接收传感器的数据,并收发网络请求
  • 执行器:窗户控制器,可以接收单片机的指令,控制窗户的开关
  • 供电:保证传感器和主控板的正常工作

这个过程你需要知道以下内容:单片机如何烧录程序、传感器如何连接、如何获取传感器数据、如何发送网络数据、如何制作外壳(例如 3D 打印)。

传感器基础知识

  • 负极表示符号: - / G / Gnd / 黑色
  • 正极表示符号:+ / V / Vcc / 红色
  • 信号管脚: S 可以表示信号,根据传感器的不同,参数范围是 0-1023 或 0 1
  • 模拟量信号管脚表示符号: A 参数范围在 0-1023
  • 数字量信号管脚表示符号:D 参数为 0 或 1
tip

如果标识与颜色发生冲突,一般以标识为准:例如接口上写着 V,但连接线颜色为黑,一般当作正极处理。

当单片机通电时,所有的引脚都带电,但是只有信号管脚的电压会随着传感器的变化而变化。

因此传感器的正负极理论上可以任意连接,只需保证信号管脚连接指定的即可。

A 口的功能比 D 口更加强大,因为 A 口可以接收模拟信号,而 D 口只能接收数字信号。因此部分传感器 D 接 A 也可以正常读数。

两管脚

常见的有扬声器、电机(俗称马达)。

这类设备因为较为特殊,一般有专门的接口,或者占用 2 个信号端口,通过信号的变化来工作。

马达往往需要更大的工作电压,如果没有专门的连接口,有可能需要在板上使用跳针切换工作电压。

三管脚

这类传感器数量最多,往往由 GVA 或者 GVD 组成。

使用时,正负极与单片机正负极连接,信号线与板上标注的 A 或 D 进行连接。

四管脚

四管脚传感器分为很多类

特殊接口的,譬如:人体温度传感器

正负极与信号口一般都专门对应的位置供连接。

双信号接口的,譬如:超声波

一般有四个接口:GVTE,其中 GV 正常连接,T 和 E 都接在信号管脚上。

同时接收 AD 的,譬如:烟雾传感器

一般有四个接口:GVAD,其中 GV 正常连接,D 表示有没有烟雾,A 表示烟雾浓度。分别接在对应的信号管脚即可。

五管脚

譬如:摇杆传感器

一般五个接口为:GVXYB,其中 GV 正常连接,X 表示 X 轴(是模拟量接 A)、Y 表示 Y 轴(是模拟量接 A)、B 表示按钮(是数字量接 D)

其他特殊类

其他特殊的传感器一般有特殊接口,譬如:摄像头、屏幕等。

根据说明接入即可。

与单片机通信

单片机(Microcontroller Unit,简称 MCU)是指一个微型计算机集成在一个单独的微型芯片中,它包括处理器(CPU)、内存(通常包括 RAM 和 ROM)、以及各种输入/输出(I/O)接口等在内的完整计算设备。

单片机设计用于嵌入式应用,通常在硬件设备中执行特定任务。例如,你的电视遥控器可能就是由一个单片机控制的,它可以接收你的输入,然后发送相应的信号到电视上。其他常见的单片机应用包括玩具、家用电器、医疗设备、汽车等。

有的单片机可以使用完整的 Python,譬如华硕的 thinker edge R、部分树莓派,有的 Arduino 板子、ESP32 等只能使用简化的 MicroPython。

tip

当我们希望通过 windows 计算机的 USB 接口和单片机设备进行串口通信时,需要将 USB 接口转换为标准的串行接口,这个过程需要一个介于 USB 和串口之间的翻译,我下面的驱动就是这个翻译。

在搜索引擎中搜 CH341SER 驱动

过程中所有弹窗有下一步点下一步,有确认点确认

在编程软件中识别单片机

常用的编程软件有:Scratch、Mixly、Mixly2、MaixPy 等。

有的支持图形化编程与代码编程,有的需要仅支持代码编程。

下载对应的编程软件后,打开软件。

选择主控这个环节,不同软件的选择方式不同。

tip
  • Vegeta 这样基于 Scratch 的编程软件,需要从左下角选择添加对应的主控型号。

  • Mixly 从右下角,串口旁的下拉菜单选择对应的主控型号。

  • Mixly2 从登录菜单中主控型号后,进入代码编辑页,右上角选择串口旁可以选择更加详细的主控型号。

  • MaixPy 从上方的工具页面中选择开发板型号。

通过连接线连接电脑与单片机。此时可能会有多种情况:

  1. 会提示:有串口连接,并弹出且仅弹出 1 个串口。
  2. 识别计算机上的所有串口,需要自己选择(可以通过反复插拔确认新增的端口号)。
  3. 不弹出任何串口,需要主控通电启动后才识别串口。
  4. 也有的串口时有时无,此时可以考虑:连接线接触不良(更换连接线),或者是主控/USB 电压不稳定——常见于学校机房(主控或电脑独立供电)
  5. 还有的默认的波特率需要调整,否则无法识别传输信号。

连接成功后记得初始化固件,使其恢复到软件对应的固件版本。类似 Android 手机的刷机/恢复出厂设置。

单片机编程

这里的传感器特指狭义的通过半导体检测物理量的传感器,如温度传感器、湿度传感器、光敏传感器等。这些传感器的特点是:输出信号是数字/模拟信号。

数字量传感器的输出信号是数字信号,他的特点是只 返回/发出 两种状态:高电平和低电平。对应在代码中是 1 和 0 。

  • 如声音传感器如果是数字量传感器,当检测到声音时输出高电平,否则输出低电平。

  • 如小灯,输出高电平表示亮,输出低电平表示灭。

模拟量传感器的输出信号是模拟信号,他的特点是输出的电压值是连续变化的。对应在代码中是 0-1023(通常如此,并非绝对) 。

  • 还是以声音传感器为例,如果是模拟量传感器,当检测到声音时输出的电压值会随着声音的大小而变化。

  • 还是以小灯为例,输出最大值表示最亮,输出最小值表示最暗,亮度会随输出的电压值变化。

有的传感器同时支持数字量和模拟量输出,有的不是。

因此,对于不确认的传感器,我们一般先假设传感器是模拟量传感器,如果不是,再当作数据量处理。

模拟量传感器读取

下面以 32 接口为例

import machine
adc32 = machine.ADC(machine.Pin(32))
while True:
print(adc32.read_u16())

模拟量传感器输出

下面以 0 接口为例

import machine
pwm0 = machine.PWM(machine.Pin(0))
pwm0.duty_u16(0)
pwm0.duty_u16(255)

然而,有些动力类传感器需要设置占空比:占空比主要与脉冲宽度调制(Pulse Width Modulation,PWM)相关,它是一种模拟信号的数字化表示方法。在 PWM 中,一个周期内的高电平时间占总周期时间的比例就是占空比。

传感器的输出类型可以有多种,包括模拟电压、模拟电流、数字信号(如 I2C、SPI、UART 等)、频率、PWM 等。只有在使用 PWM 输出的传感器时,才需要设置占空比。例如,一些伺服电机会使用 PWM 信号来控制其位置,这时就需要设置占空比。

对于其他类型的传感器,如模拟电压输出的传感器、数字信号输出的传感器等,就不需要设置占空比。这些传感器的输出通常是连续的或者是特定的数字信号,不涉及到占空比的概念。

from machine import Pin, PWM
import time

# 创建一个PWM对象
pwm = PWM(Pin(2))

# 设置PWM信号的频率为50Hz
# 每秒50个周期,所以每个周期的时间是1秒/50,即20ms。
pwm.freq(50)

# 一般来说,当PWM信号的高电平时间为1ms时,舵机转到0度;
# 当高电平时间为2ms时,舵机转到最大角度。
# 这个范围内的其他高电平时间对应的是0到180度之间的其他角度。

# 转到0度()
pwm.duty(52) # 1ms / 20ms * 1024 = 51.2 取不低于最小值的整:52
time.sleep(1) # 等待一段时间让舵机转到指定位置

# 转到180度
pwm.duty(102) # 2ms / 20ms * 1024 = 102.4 取不高于最大值的整:102
time.sleep(1) # 等待一段时间让舵机转到指定位置

# 关闭PWM
pwm.deinit()

数字量传感器读取

import machine

pin0 = machine.Pin(0, machine.Pin.IN)
while True:
print(pin0.value())

数字量传感器输出

import machine
import time

pin13 = machine.Pin(13, machine.Pin.OUT)
while True:
pin13.value(0)
time.sleep_ms(50)
pin13.value(1)
time.sleep_ms(50)

单片机网络通信

获取天气

心知天气 API 分为免费版、付费版等多个坂本,不同的版本返回的数据数量有所不同。

免费版仅返回三种基本数据,付费版可以返回多种数据。mixly 中默认的 KEY 为高级付费版,可返回全部数据。

数据返回的格式为字典,因此可以通过如下方式进行解包,下面的代码提供了部分数据解包的方法。

需要注意的是,该功能为联网功能,需要在联网环境下使用,确保 wifi 名和密码正确。

import mixiot
import machine
import seniverse_api


mixiot.wlan_connect('wifiname','wifipassword')
print(seniverse_api.weather_now('SGJl0ExVN-4j27msR','北京'))

onenet 物联网传输数据至云端

onenet 物联网是中国移动推出的物联网交互平台,主要面向一般开发者,因此 AIbox 这款设备可以使用 onenet 物联网平台进行数据传输。

相比于 mixio 这样专注于单片机的物联网平台来说,onenet 的文档与接口可能会频繁变动,如有出入以官网教程为准。

onenet 物联网平台网址:https://open.iot.10086.cn/doc/

文档中提供了传输文本与文件 2 种方式

import json
import asyncio
import websockets
from uuid import uuid4

# 音频文件测试路径。
audioFile = "test.mp3"
# 使用自己产品Id和apikey替换下列参数。
productId = "x"
apikey = "x"

#发送文本请求
async def textRequest(ws):
content = {
"aiType":"dm",
"topic": 'nlu.input.text',
"recordId": uuid4().hex,
"refText": "测试" #修改文本请求的输入
}
try:
await ws.send(json.dumps(content))
resp = await ws.recv()
print(resp)
except websockets.exceptions.ConnectionClosed as exp:
print(exp)

#发送音频请求
async def audioRequest(ws):
content = {
"aiType": "dm", #可选dm/asr, dm获取对话结果,asr只获取asr结果
"topic": "recorder.stream.start",
"recordId": uuid4().hex,
"audio": {
"audioType": "mp3", #修改为测试文件的类型
"sampleRate": 16000, #修改为测试文件的sampleRate
"channel": 1, #修改为测试文件的channel
"sampleBytes": 2 #修改为测试文件的sampleBytes
},
"asrParams": {
"realBack": True, #实时返回asr结果
"enableVAD": True, #启动VAD
"enablePunctuation": True, #返回结果是否带拼音
"enableTone": True, #返回结果是否带声调
"enableConfidence": True, #返回结果是否带置信度
"enableNumberConvert": True, #返回结果是否进行数字转换
},
}
try:
#发送文本消息
await ws.send(json.dumps(content))
# 发送音频消息
with open(audioFile, 'rb') as f:
while True:
chunk = f.read(400) #wav buffsize=3200 其他的400
if not chunk:
await ws.send(bytes("", encoding="utf-8"))
break
print(len(chunk))
await ws.send(chunk)
async for message in ws:
print(message)
resp = json.loads(message)
if 'dm' in resp:
break
except websockets.exceptions.ConnectionClosed as exp:
print(exp)
ws.close()

async def dds_demo():
url = f"ws://botai-dsg.and-home.cn:4443/dsg/v1/prod?productId={productId}&apikey={apikey}"
print(url)
async with websockets.connect(url) as websocket:
#await textRequest(websocket) #发送文本请求
await audioRequest(websocket) #发送音频请求
asyncio.get_event_loop().run_until_complete(dds_demo())

后话

最后,通过大量的学习和试错打样,你发现米家雨水传感器,淘宝 46 包邮,搞活动更便宜,这大概是你最后的选择。