Skip to main content

九月生活与职业成长随想

· 11 min read
Allen
software engineer
此内容根据文章生成,仅用于文章内容的解释与总结

需要避免的变化

随着年龄的增长,有两种变化需要避免:

  • 固执自己的世界观,忽视其他年龄段人群的需求与视角。
  • 夸夸其谈自己的想法,却对事情的真相一无所知。

商品与生活

酒吧是接近学生时代爱情的地方,色彩、旋律、烟雾、甘甜等元素交织在一起,带来感官上的迷惑。只看你是否幽默帅气,不考虑家庭环境,也不管你即将去往何方。酒吧的食物和酒不是最有利润的商品,尽管它溢价很高。最有利润的商品是人。

摘要

  • “躲天意,避因果,诸般枷锁困真我;顺天意,承因果,今日方知我是我。”——《阳明心学》

  • “愿意自由换取保障的人,既得不到自由,也得不到保障。”——哈耶克

  • “只有金钱会向穷人伸手,而权力不会。”——哈耶克

  • “如果允许人类自由迁徙,那么人流的方向,就是文明的方向。”——哈耶克

  • “我所热爱着的世界,绝不能交给混子!”——我自己

教育供应商

原本用于考核学生的标准被用来考核指导老师,导致老师倾向于选择提供答案的厂商。机构与教师的核心目标是消课,提供适合长度的PPT/教案。

在AI时代,可以采用AI智能课的形式辅助学生学习:将电脑屏幕二分,一侧显示课件,另一侧提供AI助手,便于学生提问。这样,机构和教师的工作量转变为考核学生及处理问题。

学习建模技术

最近我间歇性学习建模技术,发现自己更倾向于选择中间款软件,它能快速生成及格的作品。

通勤学习

指定学习计划是利用碎片化时间的首要目标,阅读书籍、观看TED演讲或自己的收藏会让学习过程更有成就感。应避免泛信息输入(如新闻、推荐刷屏),而应选择适当的半精准信息输入(如读书、博客、GitHub等)。通勤后及时整理记录。

随着职业地位的提升,不必要的内耗逐渐减少。高中时非常在意学校的排名,而如今在大厂面前都统一成了“双非”。想要进入世界一流的企业,需要有相应的世界排名。只有世界级的排名与项目才能证明你的价值。

每个阶段有不同的评价标准。从学生时期的院校评价,到工作初期的薪资待遇,再到中年的事业与家庭评价,均体现出人生的不同阶段。

下个阶段目标:

  • 精通Python(算法)
  • 熟悉JS/TS(Vue、React)
  • 熟悉Fushion360、Blender建模软件,了解新技术(如Rust、AI模型方向等)
  • 成为GitHub开源项目XX的作者(20k)
  • 在Discord、GitHub上使用英语/中文与优秀开发者友好合作

我努力推动全球团结,吸取各家所长以解决当今与未来的问题。在解决问题时,坚持实事求是,能解决问题的理论才是好理论。

平淡生活的解药

平淡生活源于外部世界缺乏新输入。当工作多年没有变化,内部的平淡便开始显现,只有不断成长,才能支撑更远的目标与更好的生活。

如果我们把感情看作0和游戏。那么总会会有人因长期利益受损而对这段关系感到不满。

因此,只有当一方的所求不是另一方的付出时,0和才会被打破。即:某一行为是二者都收益的。

企业与员工关系亦是如此,企业提供平台员工发挥,员工将在平台创造的利润与企业分成。

迷你工作室

最近我发现迷你工作室非常适合我。迷你工作室通常由1-5人组成,项目可以是软件、游戏或网站。

成立工作室是为了开发票,能接到更多的项目。在AI时代,有两种发展方向:加入大公司研究通用人工智能,或选择中小型公司做垂类人工智能,专注于特定领域的模型开发。

除了开发人工智能模型外,还可以拓展其应用,如语音控制家庭设备。技术的爆发得益于高速通信和丰富的硬件资源。

经济下行时创业或找工作的挑战包括投资回报难、企业扩张减慢及岗位的消失,传统岗位正在逐步被淘汰,需尽早转型。

项目开发

项目的高度重复性促使我创建一个通用的第三方库。在开发中,代码的简洁易读是首要任务。

购车后

买车了,买完车除了要交保险之外,还需买一些必要的物品:行车记录仪、实习车标、挪车号码牌、车载除甲醛香薰、手机导航支架。

侧方位停车复习:开至肩膀与库中齐平,向车库反向打成45度,倒车回方向直至左右两边距离合适停正。

电脑新配

最近新买了一台台式机,含键鼠屏11998元,主要用于编程、跑模型和3D建模。虽然组装电脑是个有趣且省钱的过程,但因加班较多,我选择花钱让别人组装。

目前显卡溢价严重,后期计划升级。有程序员读者计划新购电脑的可以参考以下配置:

配件描述价格 (元)
CPUAMD锐龙 R9 7950X 16核32线程3695
硬盘英睿达固态硬盘1T M.2 PCIE5.01767
显卡华硕RTX30601674
内存光威内存 龙武 32G*2 64G 6400 C32 马甲条1330
主板华硕 TUF GAMING B650M PLUS 蓝牙 WIFI DDR51170
屏幕乐视显示器32英寸 1K 75hz699
电源利民TG-1000 额定1000W 金牌全模组 ATX3.0认证 黑色520
机箱及风扇航嘉 GX750A掠夺者 黑色 ATX 360 + 链力无光风扇*6300
水冷散热钛钽 SJ-A080 360WH295
键盘GANSS高斯 GS87C 104C299
鼠标Razer雷蛇炼狱蝰蛇V2X极速版双模无线电池笔记本蓝牙电竞鼠标249

将Cursor添加到右键菜单

通过脚本读取当前用户名,并在桌面生成名为AddCursorToContextMenu.reg的注册表文件。可在桌面找到该文件,以管理员身份运行。

import os

username = os.getenv('USERNAME')
cursor_path = fr"C:\\Users\\{username}\\AppData\\Local\\Programs\\Cursor\\Cursor.exe"


reg_content = f"""Windows Registry Editor Version 5.00

[HKEY_CLASSES_ROOT\\*\\shell\\Open with Cursor]
@="Open with Cursor"
"Icon"="{cursor_path}"

[HKEY_CLASSES_ROOT\\*\\shell\\Open with Cursor\\command]
@="\\"{cursor_path}\\" \\"%1\\""

[HKEY_CLASSES_ROOT\\Directory\\shell\\Open with Cursor]
@="Open with Cursor"
"Icon"="{cursor_path}"

[HKEY_CLASSES_ROOT\\Directory\\shell\\Open with Cursor\\command]
@="\\"{cursor_path}\\" \\"%1\\""

[HKEY_CLASSES_ROOT\\Directory\\Background\\shell\\Open with Cursor]
@="Open with Cursor current folder"
"Icon"="{cursor_path}"

[HKEY_CLASSES_ROOT\\Directory\\Background\\shell\\Open with Cursor\\command]
@="\\"{cursor_path}\\" \\"%V\\""

"""

# 获取当前用户的桌面路径
desktop_path = os.path.join(os.path.expanduser("~"), 'Desktop')

# 构造 Cursor.exe 的路径

# 生成 .reg 文件的完整路径,保存在桌面
reg_file_path = os.path.join(desktop_path, "AddCursorToContextMenu.reg")

# 将注册表内容写入 .reg 文件,使用 UTF-8 编码
with open(reg_file_path, "w", encoding="utf-8") as file:
file.write(reg_content)

print(f".reg 文件已保存到: {reg_file_path}")

导入模块

现在大家都知道可以通过pip install -r requirements.txt安装模块,但是有些模块可能安装失败,需要人手动再次安装。不能做到自动化。这里提供一个脚本,可以自动安装失败的模块2次,并保存到failed_requirements.txt

import subprocess

def install_requirements(requirements_file, failed_file):
failed_packages = []

# 打开 requirements.txt 并读取模块名称
with open(requirements_file, 'r') as file:
packages = file.readlines()

# 遍历每个包名并尝试安装
for package in packages:
package = package.strip()
if package: # 跳过空行
print(f"正在安装: {package}")
try:
# 使用 subprocess 执行 pip 安装命令
result = subprocess.run(
['pip', 'install', package],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)

# 检查安装结果,如果返回码不为0则表示安装失败
if result.returncode != 0:
print(f"安装失败: {package}")
failed_packages.append(package)

except Exception as e:
print(f"发生错误: {e}")
failed_packages.append(package)

# 将安装失败的模块写入 failed_requirements.txt
if failed_packages:
with open(failed_file, 'w') as f:
f.write("\n".join(failed_packages))
print(f"安装失败的模块已保存到 {failed_file}")
else:
print("所有模块均安装成功")
def remove_versions(requirements_file, output_file):
with open(requirements_file, 'r') as file:
lines = file.readlines()

cleaned_lines = []
for line in lines:
# 去除版本号部分(==后面的内容)
package = line.split('==')[0].strip()
if package: # 跳过空行
cleaned_lines.append(package)

# 将去掉版本号的包名写入新的文件
with open(output_file, 'w') as output:
output.write("\n".join(cleaned_lines))

print(f"已将无版本号的依赖项写入 {output_file}")

for i in range(2):
install_requirements('requirements.txt', 'failed_requirements.txt')
remove_versions('failed_requirements.txt', 'requirements.txt')

机器学习与LLM舆情分析

· 12 min read
Allen
software engineer
此内容根据文章生成,仅用于文章内容的解释与总结

该项目适合作为小型的项目原型,适合教学和练手。最初这个项目的灵感源于我的个人需求,我需要一个工具来查看主流话题,同时又不想下载一堆 APP 来接收推送。项目最终的数据使用github pages来展示。

可以结合我的另一个项目pocwatchdog实现定时执行加发送邮件。

项目开源地址:https://github.com/jiangyangcreate/SocialMood

项目查看地址:https://jiangyangcreate.github.io/SocialMood/

使用流程

# 安装依赖
cd python
python setup.py

# 运行数据抓取,生成静态网页
python crawler.py

依赖安装

所需依赖可以通过 setup.py 下载安装。因为有些模块不是pip就算安装好的。

setup.py
import subprocess
import sys
import nltk

def install_requirements():
print("正在安装依赖...")
subprocess.check_call([sys.executable, "-m", "pip", "install", "-r", "requirements.txt"])

def install_browser():
print("正在安装浏览器...")
subprocess.check_call([sys.executable, "-m", "playwright", "install"])

def download_nltk_data():
print("正在下载NLTK词典文件...")
nltk.download('vader_lexicon')

def download_qwen2_model():
try:
print("正在下载Qwen2大模型(7B)...")
from transformers import AutoModelForCausalLM, AutoTokenizer

model_name = "Qwen/Qwen2.5-7B-Instruct"

model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype="auto",
device_map="auto"
)
tokenizer = AutoTokenizer.from_pretrained(model_name)

prompt = "请回复:你好,我是Qwen2大模型,有什么可以帮您的吗?"
messages = [
{"role": "system", "content": "You are Qwen, created by Alibaba Cloud. You are a helpful assistant."},
{"role": "user", "content": prompt}
]
text = tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True
)
model_inputs = tokenizer([text], return_tensors="pt").to(model.device)

generated_ids = model.generate(
**model_inputs,
max_new_tokens=512
)
generated_ids = [
output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
]

response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]

print('Qwen2大模型(7B)响应:', response)
except Exception as e:
print(f"跳过下载Qwen2大模型(7B),出现错误:{e}")

def main():
try:
install_requirements()
install_browser()
download_nltk_data()
download_qwen2_model()
print("所有安装步骤已完成!")
except subprocess.CalledProcessError as e:
print(f"安装过程中出现错误:{e}")
sys.exit(1)

if __name__ == "__main__":
main()

主要功能

该系统的主要功能包括:

  • 抓取热搜数据:从微博、抖音、B站等平台抓取热搜数据。

这里也可以通过API获取,爬取注意不要变成DDOS攻击。

  • 数据处理:使用 Pandas 进行数据清洗与处理。

使用pandas主要是处理一些文本型的数据,譬如10万要换算为100000。

使用jieba分词用于后续词云图生成,需要剔除一些单字与标点符号。当然,最近b站很喜欢在标题中加空格,所以要先去空格再分词。

有些数据的热度值还没计算出来,可以使用幂律分布的线性回归填补热度缺失值。这里使用指数回归、普通线性回归效果都不好。

幂律分布常见的样子:个人UP主粉丝排名前几名粉丝差距有百万,像指数分布。但是后续排名的up粉丝差距就很小,接近线性分布。如果我随机扣掉一个排名的up主的粉丝数,让你预测,你可以预测多准。我面对的大概就是这样的一个问题。

下面是我的解决方案:

def estimate_missing_heat(self):
"""估算缺失的热度值"""
features = ["排名"]
target = "处理后热度"

def impute_group(group):
X = group[features]
y = group[target]

# 如果没有缺失值,直接返回原始组
if not y.isnull().any():
return group

# 幂律分布拟合(通过对数变换实现)
X_train = X[y.notnull()]
y_train = y[y.notnull()]

# 将 X_train 转换为浮点数类型
X_train = X_train.astype(float)

# 对X和y进行对数变换,处理可能的零值
log_X_train = np.log(X_train + 1)
log_y_train = np.log(y_train + 1)

lr = LinearRegression()
lr.fit(log_X_train, log_y_train)

# 获取拟合的系数
slope = lr.coef_[0]
intercept = lr.intercept_
# print(f"拟合的幂律方程为: y = {np.exp(intercept):.2f} * x^{slope:.2f}")

# 预测缺失值
X_missing = X[y.isnull()]
if not X_missing.empty:
# 将 X_missing 转换为浮点数
X_missing = X_missing.astype(float)
log_X_missing = np.log(X_missing + 1)
log_predictions = lr.predict(log_X_missing)
predictions = np.exp(log_predictions) - 1 # 反向变换

# 确保预测值不小于0
predictions = np.maximum(predictions, 0)

# 确保预测值符合排名顺序
for i, pred in enumerate(predictions):
rank = X_missing.iloc[i]["排名"]
higher_ranks = y[(X["排名"] < rank) & y.notnull()]
lower_ranks = y[(X["排名"] > rank) & y.notnull()]

if len(higher_ranks) > 0:
pred = min(pred, higher_ranks.min())
if len(lower_ranks) > 0:
pred = max(pred, lower_ranks.max())

predictions[i] = pred

group.loc[y.isnull(), target] = predictions

return group

# 对每个信息来源分别进行缺失值填充
self.df = self.df.groupby("信息来源").apply(impute_group)

# 删除仍然为None的行(如果有的话)
self.df = self.df.dropna(subset=["处理后热度"])
self.df["处理后热度"] = self.df["处理后热度"].astype(int)
  • 情绪分析:监测和分析公众情绪。算出单条标题的情绪数值之后,标准化到 (-1,1) 这个区间之中。最后通过热度与排名计算出对社会的情感影响力。正数数则是积极影响,负数则是负面影响。

如果你的电脑性能还不错,可以使用 Qwen2.5 本地模型作为情绪分析的核心,根据自己的设备选择模型大小。

class Qwen2Model:
_model = None
_tokenizer = None
_device = None
model_name = "Qwen/Qwen2.5-7B-Instruct"
@classmethod
def _initialize(cls):
if cls._model is None or cls._tokenizer is None:
print("初始化Qwen2模型")
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
torch.cuda.empty_cache() # 清理缓存
cls._device = "cuda"
cls._model = AutoModelForCausalLM.from_pretrained(
cls.model_name,
torch_dtype=torch.float16,
device_map="auto" if torch.cuda.is_available() else None
).to(cls._device)
cls._tokenizer = AutoTokenizer.from_pretrained(cls.model_name)

@classmethod
def get_sentiment_score(cls,text):
prompt = "请仅对用户提供的句子进行情感评分,并返回介于-1到1之间的分数。-1表示非常负面,1表示非常正面,0表示中立。无需提供其他信息或上下文。"
messages = [
{"role": "system", "content": prompt},
{"role": "user", "content": text}
]
text = cls._tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True
)
model_inputs = cls._tokenizer([text], return_tensors="pt").to(cls._model.device)

generated_ids = cls._model.generate(
**model_inputs,
max_new_tokens=512
)
generated_ids = [
output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
]

response = cls._tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]

return float(response)

以上代码均在 crawler.py

数据存储

系统使用 sqlite3 保存中间数据,相关代码在 database.py 中,数据文件为 news.db,包含两张表:

  • news:清洗后的可用数据。
  • raw_html:原始网页数据。

这样可以保证速度的同时,可以保持文件夹的整洁,如果数据量大可以直接平滑的迁移到正式数据库中。

database.py
# encoding: utf-8
'''
将数据存储到数据库
'''

import sqlite3
import pandas as pd
from collections import Counter
import json

class NewsDatabase:
def __init__(self, db_name='news.db'):
self.db_name = db_name
self.create_tables()

def create_tables(self):
conn = sqlite3.connect(self.db_name)
cursor = conn.cursor()

cursor.execute('''CREATE TABLE IF NOT EXISTS news
(id INTEGER PRIMARY KEY AUTOINCREMENT,
日期 TEXT,
时间 TEXT,
信息来源 TEXT,
标题 TEXT,
排名 INTEGER,
热度 TEXT,
处理后热度 TEXT,
链接 TEXT,
情感得分 REAL,
加权情感得分 REAL,
分词 TEXT)''')

cursor.execute('''CREATE TABLE IF NOT EXISTS raw_html
(id INTEGER PRIMARY KEY AUTOINCREMENT,
日期 TEXT,
时间 TEXT,
内容 TEXT)''')

conn.commit()
conn.close()

def save_data(self, df: pd.DataFrame, table_name='news'):
conn = sqlite3.connect(self.db_name)

if table_name == 'news':
df['分词'] = df['分词'].apply(json.dumps)

df.to_sql(table_name, conn, if_exists='append', index=False)

conn.close()

def save_raw_html(self, html_content: str):
df = pd.DataFrame({
'日期': [pd.Timestamp.now().strftime('%Y-%m-%d')],
'时间': [pd.Timestamp.now().strftime('%H:%M:%S')],
'内容': [html_content]
})
self.save_data(df, 'raw_html')

def read_data(self, query="SELECT * FROM news"):
conn = sqlite3.connect(self.db_name)

df = pd.read_sql_query(query, conn)
if '分词' in df.columns:
df['分词'] = df['分词'].apply(json.loads)

conn.close()
return df

def execute_query(self, query, params=None):
conn = sqlite3.connect(self.db_name)
cursor = conn.cursor()

if params:
cursor.execute(query, params)
else:
cursor.execute(query)

conn.commit()
conn.close()

def fetch_data(self, query, params=None):
conn = sqlite3.connect(self.db_name)
cursor = conn.cursor()

if params:
cursor.execute(query, params)
else:
cursor.execute(query)

rows = cursor.fetchall()
conn.close()
return rows

def delete_table(self,table_name:str):
"""删除news子表"""
conn = sqlite3.connect(self.db_name)
try:
with conn:
conn.execute(f"DROP TABLE IF EXISTS {table_name}")
print("news表已成功删除")
except sqlite3.Error as e:
print(f"删除news表时发生错误:{e}")

def clear_table(self, table_name):
"""清空指定表的所有数据"""
conn = sqlite3.connect(self.db_name)
try:
with conn:
conn.execute(f"DELETE FROM {table_name}")
print(f"{table_name}表已成功清空")
except sqlite3.Error as e:
print(f"清空{table_name}表时发生错误:{e}")

def export_word_cloud_data(self, query="SELECT 分词 FROM news WHERE 日期 = (SELECT MAX(日期) FROM news)"):
"""导出词云数据"""
rows = self.fetch_data(query)
words = []
for row in rows:
words.extend(eval(row[0])) # 使用 eval 来解析字符串列表
word_counts = Counter(words)
word_cloud_data = [(word,count) for word, count in word_counts.items()]
return word_cloud_data

def export_pie_chart_data(self, query="SELECT 信息来源, SUM(CAST(处理后热度 AS INTEGER)) FROM news WHERE 日期 = (SELECT MAX(日期) FROM news) GROUP BY 信息来源"):
"""导出饼图数据"""
rows = self.fetch_data(query)
pie_chart_data = [(row[0],row[1]) for row in rows]
return pie_chart_data

def export_line_chart_data(self, query = """
SELECT 日期, SUM(加权情感得分) as total_sentiment
FROM news
GROUP BY 日期
ORDER BY 日期
"""):
"""导出折线图数据"""

rows = self.fetch_data(query)
line_chart_data = [(row[0], int(row[1])) for row in rows]
return line_chart_data

def export_bar_chart_data(self, query="""
SELECT 信息来源, 加权情感得分
FROM news
WHERE 日期 = (SELECT MAX(日期) FROM news)
"""):
"""导出柱状图数据"""
rows = self.fetch_data(query)
bar_chart_data = {}
for row in rows:
source = row[0]
sentiment_score = row[1]
if source not in bar_chart_data:
bar_chart_data[source] = {"positive": 0, "negative": 0}
if sentiment_score > 0:
bar_chart_data[source]["positive"] += sentiment_score
elif sentiment_score < 0:
bar_chart_data[source]["negative"] += sentiment_score

formatted_data = [ (source, int(data["positive"]), abs(int(data["negative"])),abs(int(data["positive"])+int(data["negative"])))
for source, data in bar_chart_data.items()]
return formatted_data

数据可视化

接下来绘制一些图表,包括:

  • 各个平台的情绪红绿图。
  • 公众情绪的涨跌折线图。
  • 每日全网词云图。

通过pyecharts的脚手架,导出为静态网页。这里为了代码直观,封装为了类。

class ChartGenerator:
def create_bar_chart(self, data , y_label="Sentiment",title="Sentiment Scores by Source") -> Bar:
"""Generate a bar chart with positive, negative, and absolute sentiment scores."""

x_data, y_data_positive, y_data_negative, y_data_absolute = zip(*data)
bar = (
Bar()
.add_xaxis(x_data)
.add_yaxis("Positive Sentiment", y_data_positive, stack="stack1", category_gap="50%")
.add_yaxis("Negative Sentiment", y_data_negative, stack="stack2", category_gap="50%")
.add_yaxis("Absolute Sentiment", y_data_absolute, stack="stack3", category_gap="50%")
.set_global_opts(
title_opts=opts.TitleOpts(title=title, pos_left="center"),
xaxis_opts=opts.AxisOpts(name="Source"),
yaxis_opts=opts.AxisOpts(name="Sentiment Score"),
legend_opts=opts.LegendOpts(pos_top="10%", pos_right="10%"),
)
)
return bar

def create_line_chart(self,data,y_label="Weighted Sentiment Score Sum",title="Sum of Weighted Sentiment Scores Over Time") -> Line:
"""Generate a line chart."""
x_data, y_data = zip(*data)
line = (
Line()
.add_xaxis(x_data)
.add_yaxis(y_label, y_data, is_smooth=True)
.set_global_opts(
title_opts=opts.TitleOpts(title=title, pos_left="center"),
xaxis_opts=opts.AxisOpts(name="Date", axislabel_opts=opts.LabelOpts(rotate=45)),
yaxis_opts=opts.AxisOpts(name=y_label),
legend_opts=opts.LegendOpts(pos_bottom="0%")
)
)
return line

def create_pie_chart(self, data_pairs, title="Information Source Heat") -> Pie:
"""Generate a pie chart."""
pie = (
Pie()
.add("", data_pairs)
.set_global_opts(
title_opts=opts.TitleOpts(title=title, pos_left="center"),
legend_opts=opts.LegendOpts(pos_bottom="0%") # Move legend to the bottom
)
.set_series_opts(label_opts=opts.LabelOpts(formatter="{b}: {d}%"))
)
return pie

def create_wordcloud(self, words, title="Word Cloud") -> WordCloud:
"""Generate a word cloud."""
wordcloud = (
WordCloud()
.add("", words, word_size_range=[20, 100],shape="circle")
.set_global_opts(title_opts=opts.TitleOpts(title=title, pos_left="center"))
)
return wordcloud

def render_charts(self, charts, output_file=None):
"""Render multiple charts to a single HTML file."""
if output_file is None:
output_file = os.path.join("docs", "index.html")

page = Page(layout=Page.SimplePageLayout)
page.add(*charts)
page.render(output_file)



代码设置

随着时间的推移,爬虫部分的代码可能需要自己修改。你也可以在main函数中,将debug设为True,这样不会真的爬取,而是调用本地的已爬取的网页。生成后的内容在docs/index.html

微信小程序蓝牙通信示例

· 19 min read
Allen
software engineer
此内容根据文章生成,仅用于文章内容的解释与总结

作为开发者,最讨厌的事情莫过于多平台适配,在手机端由于大家型号不同,编个APP通过蓝牙控制显然是不方便的,于是做了一个蓝牙小程序来与ESP32通信。

界面设计

这里我想设计成方块按键的格式,所以创建一个矩阵,然后在矩阵对应的位置添加上按钮。非常的简单,只是有一些差异需要注意。

wxml标签

蓝牙小程序标签与html略有不同,以下是小程序标签(即wxml标签)与 HTML 略有不同的标签的对比表:

wxml标签HTML 标签描述
<view><div>用于容器和布局,类似于 HTML 中的 <div>
<text><span>用于文本显示,类似于 HTML 中的 <span>
<button><button>用于创建按钮,与 HTML 中的 <button> 功能相同。
<image><img>用于显示图片,类似于 HTML 中的 <img>,但属性有所不同。
<navigator><a>用于页面导航,类似于 HTML 中的 <a> 标签。
<picker>N/A用于多种选择器,HTML 中无直接对应的标签。
<scroll-view>N/A用于可滚动的视图区域,HTML 中无直接对应的标签。
<swiper>N/A用于滑动视图容器,HTML 中无直接对应的标签。
<map><iframe>用于展示地图,类似于 HTML 中嵌入地图的方式。
<swiper-item>N/A<swiper> 配合使用,HTML 中无直接对应的标签。
<rich-text>N/A用于展示富文本,HTML 中无直接对应的标签。
<block>N/A无实际渲染效果,类似于 HTML 中的 <template>

JS

以下是微信小程序的 JavaScript(JS)与网页的 JavaScript 的对比表格:

特性/功能微信小程序 JS网页 JS描述
全局对象wxwindow微信小程序中使用 wx 对象来调用特定的 API,而在网页 JS 中,所有的全局对象都挂载在 window 对象下。
API 调用基于 wx 对象提供的 API,如 wx.request()wx.navigateTo()使用浏览器提供的 API,如 fetch()window.location微信小程序有一套独立的 API,专门用于微信环境下的开发,无法直接使用标准的浏览器 API。
页面与组件管理通过小程序的 PageComponent 函数定义页面和组件通过 HTML 文件和 JavaScript 结合使用前端框架或直接操作 DOM微信小程序使用特殊的 PageComponent 函数来定义页面和组件,网页 JS 则通过 DOM 结合 JavaScript 实现页面和组件管理。
数据绑定使用 this.setData() 进行数据绑定和更新通常使用 innerHTMLtextContent 或前端框架(如 React 的 setState微信小程序使用 this.setData() 来绑定和更新数据,而在网页 JS 中,常通过直接操作 DOM 或使用前端框架来更新数据。
生命周期函数提供页面与组件的生命周期函数,如 onLoadonShow通过事件绑定或框架提供的生命周期函数(如 React 的 componentDidMount微信小程序有特定的生命周期函数供开发者使用,而网页 JS 通常需要结合框架或事件来处理生命周期管理。
路由与导航使用 wx.navigateTo()wx.redirectTo() 等方法进行页面跳转通过改变 window.location 或使用 history.pushState() 进行路由小程序的路由机制是由微信管理的,开发者需要使用专门的 API 进行导航,而网页 JS 可以直接操作 URL。
模块化使用 require() 和模块化文件系统使用 ES6 import/export 或 CommonJS 模块系统小程序内置的模块化系统与 Node.js 类似,使用 require() 导入模块,而网页 JS 中可以使用 ES6 模块或 CommonJS 模块系统。
网络请求使用 wx.request() 发起 HTTP 请求使用 fetch()XMLHttpRequest 发起 HTTP 请求微信小程序提供了 wx.request() 方法用于网络请求,而网页 JS 通常使用 fetch()XMLHttpRequest
文件系统访问通过 wx.getFileSystemManager() 访问文件系统通过 File API、Blob、FileReader 等访问文件小程序提供了 wx.getFileSystemManager() 接口来管理文件系统,而网页 JS 可以使用浏览器提供的 File API。
样式与布局使用 WXML 和 WXSS 定义页面结构和样式使用 HTML 和 CSS 定义页面结构和样式微信小程序使用 WXML 和 WXSS 分别来代替 HTML 和 CSS,专门为小程序定制。
事件处理事件绑定使用 bindtapcatchtap 等绑定事件事件绑定使用 addEventListener 或内联 onclick微信小程序的事件处理是通过特定的属性绑定事件,而网页 JS 可以直接使用标准的事件绑定方法。
调试与工具使用微信开发者工具进行调试使用浏览器的开发者工具进行调试小程序开发和调试通常在微信开发者工具中进行,而网页开发则依赖于浏览器提供的开发者工具。
存储提供 wx.setStorage()wx.getStorage() 进行数据持久化存储使用 localStoragesessionStorage 进行数据存储微信小程序的存储 API 类似于浏览器的 localStorage,但使用 wx 提供的 API 进行调用。
原生 API 调用不支持直接调用浏览器或操作系统的原生 API可以使用浏览器 API 或通过插件访问系统 API微信小程序无法直接调用浏览器或操作系统的原生 API,而网页 JS 则可以直接使用这些 API。
平台限制运行于微信环境,仅支持在微信客户端中运行运行于浏览器环境,可以在任何支持的浏览器中运行小程序只能在微信客户端中运行,而网页 JS 则可以在任何现代浏览器中运行。

页面代码

界面部分因为手机的尺寸实在太多,所以我创建一个矩阵,然后把有按键的地方加上边框实现规则布局。中间输入设备名称。

在正常的网页开发过程之中。缺省是非常常见的,也就是某一个属性大门不去填写的时候浏览器会默认给一个属性。但是在小程序开发过程之中,他并不会给一个默认的属性或者是说他给的默认属性与浏览器的不同。这就会导致在模拟的时候看到的界面是一个样子(因为使用的是浏览器渲染),真机调试的时候又是另一个样子。因为在手机上跑的服务是小程序自己的编译后的,跑在手机上的小程序环境。

所以我们需要把必要的属性全部都填写上。

<scroll-view class="scrollarea" scroll-y type="list">
<view class="container">
<text>蓝牙连接状态:{{status}}</text>
<div class="button-lines">
<input type="text" placeholder="请输入设备名称" bindinput="onDeviceNameInput" />
</div>
<!-- 3x7 矩阵布局 -->
<view class="button-grid">
<view class="row">
<view class="cell"></view>
<view class="cell"><button bindtouchstart="sendUp" bindtouchend="handleTouchEnd"></button></view>
<view class="cell"></view>
<view class="cell"><button bindtouchstart="connectDevice">连接</button></view>
<view class="cell"></view>
<view class="cell"><button bindtouchstart="sendA">A</button></view>
<view class="cell"></view>
</view>
<view class="row">
<view class="cell"><button bindtouchstart="sendLeft" bindtouchend="handleTouchEnd"></button></view>
<view class="cell"></view>
<view class="cell"><button bindtouchstart="sendRight" bindtouchend="handleTouchEnd"></button></view>
<view class="cell"></view>
<view class="cell"><button bindtouchstart="sendB">B</button></view>
<view class="cell"></view>
<view class="cell"><button bindtouchstart="sendD">D</button></view>
</view>
<view class="row">
<view class="cell"></view>
<view class="cell"><button bindtouchstart="sendDown" bindtouchend="handleTouchEnd"></button></view>
<view class="cell"></view>
<view class="cell"><button bindtouchstart="sendData">文件</button></view>
<view class="cell"></view>
<view class="cell"><button bindtouchstart="sendC" >C</button></view>
<view class="cell"></view>
</view>
</view>
</view>
</scroll-view>

样式代码

/**index.wxss**/
page {
height: 100vh;
display: flex;
flex-direction: column;
}
.scrollarea {
flex: 1;
overflow-y: hidden;
}
.container {
padding: 20px;
}
.button-lines {
display: flex;
flex-wrap: wrap;
justify-content: center;
align-items: center;
width: 100%;
height: 100%;
}
input{
border: 1px solid #ccc;
padding: 8px;
margin-right: 5px;
width: 50%;
height: 30%;
}
.button-grid {
display: flex;
flex-wrap: wrap;
justify-content: center;
align-items: center;
width: 100%;
height: 100%;
}

.row {
width: 100%;
display: flex;
justify-content: space-between;
}

.cell{
width: 12%;
height: 50px;
margin: 5px;

display: flex;
align-items: center;
justify-content: center;
box-sizing: border-box;
}

.cell button{
border: 1px solid #ccc;
}
button {
width: 100%;
height: 100%;
box-sizing: border-box;
}

逻辑设计

考虑到不同设备的蓝牙名称不同,因此我在页面的中间设计一个输入框,输入对应的设备名称(大小写敏感)后,点击连接按钮,即可触发搜索接口。为了让自己知道是否已经连接上,我在输入框的上面添加了一个状态显示,考虑到部分用户不能理解红色绿色的默认含义,我使用了中文来描述连接状态。

底部做了一些按键发送数据的功能,包括:中文上下左右、英文ABCD、还有大文件一键传输(我设置了范围为txt和py)

tip

网页开发中,浏览器的渲染主线程会在解析DOM树的时候给所有HTML节点根据权重添加上属性,而小程序中,一旦缺省关键的属性,在开发界面会正常显示,上真机就会异常,这点尤其需要注意。

蓝牙设备的搜索、连接等功能由微信的API接口提供,其中蓝牙的权限上,如果使用的是:仅在使用中允许,在部分安卓手机上,会出现切后台再返回时蓝牙权限丢失的情况。因此改为:每次使用时询问权限。目前在官方论坛上留言了,我更倾向于是安卓设备的问题。

另外安卓中蓝牙权限与位置权限关联,因此仅开启蓝牙权限依然无法使用。

微信的蓝牙接口搜索到的设备便不再出现,假设我周边存在设备A、设备B、设备C

如果我首先输入了设备B,蓝牙搜索API根据信号强弱依次返回:设备A、设备B(判定成功,建立连接)

此时我再输入设备A,点击连接,就会出现搜不到设备的情况,当然这里是可以优化的,设置一个点击按钮:刷新。不过我右上角点击重新进入小程序也是可以的,所以这里就不是很有必要加这个逻辑判断。

蓝牙设备的连接非常简单,根据参考文档一步一步来即可,需要注意的是,发送中文时可能会乱码,JS原生的解码又不能用,所以我导入了一个包import TextEncoder from './miniprogram-text-encoder'来自动判断文本是中文还是英文执行对应的转化。

既然蓝牙可以通信,传输中文和英文,那么是不是可以传本书过去?首先尝试直接传输,发现接收方只收到了前20字节,后续数据丢失。那么修改程序,将文件分片、每次发20个字节,发送完成之后在发送一个END标记。

和之前发送数据的代码写在一起,就变成了这样:


import TextEncoder from './miniprogram-text-encoder'

Page({
data: {
status: '未连接',
deviceId: null,
serviceId: null,
characteristicId: null,
deviceName: 'None' // 默认设备名称
},
onLoad() {
this.initBluetooth();
},
onDeviceNameInput(e) {
console.log(e.detail.value);
this.setData({
deviceName: e.detail.value,
})
;
},
initBluetooth() {
const that = this;
wx.openBluetoothAdapter({
success(res) {
console.log('初始化蓝牙适配器成功');
that.startBluetoothDevicesDiscovery();
wx.showToast({
title: '蓝牙权限成功',
icon: 'success',
duration: 2000
});
},
fail(res) {
console.log('初始化蓝牙适配器失败', res);
wx.showToast({
title: '蓝牙权限失败',
icon: 'error',
duration: 2000
});
}
});
},
startBluetoothDevicesDiscovery() {
const that = this;
console.log(that.data.deviceName, '57');

// 如果 deviceName 是 "None",不进行蓝牙设备搜索
if (that.data.deviceName === "None") {
console.log('设备名称为 "None",不进行蓝牙设备搜索');
return;
}

wx.startBluetoothDevicesDiscovery({
success(res) {
console.log('开始搜索蓝牙设备');
that.onBluetoothDeviceFound();
},
fail(res) {
console.log('搜索蓝牙设备失败', res);
}
});
},
onBluetoothDeviceFound() {
const that = this;
wx.onBluetoothDeviceFound((devices) => {
devices.devices.forEach(device => {
console.log('发现设备名称:', device.name); // 打印所有发现的设备名称
if (device.name === that.data.deviceName) {
wx.showToast({
title: '发现蓝牙设备',
icon: 'success',
duration: 2000
});
that.createBLEConnection(device.deviceId);
}
});
});
},
createBLEConnection(deviceId) {
const that = this;
wx.createBLEConnection({
deviceId: deviceId,
success(res) {
console.log('连接蓝牙设备成功');
that.setData({
status: '已连接',
deviceId: deviceId
});
that.getBLEDeviceServices(deviceId);
},
fail(res) {
console.log('连接蓝牙设备失败', res);
}
});
},
getBLEDeviceServices(deviceId) {
const that = this;
wx.getBLEDeviceServices({
deviceId: deviceId,
success(res) {
console.log('获取服务成功:', res.services);
for (let i = 0; i < res.services.length; i++) {
if (res.services[i].isPrimary) {
that.getBLEDeviceCharacteristics(deviceId, res.services[i].uuid);
return;
}
}
}
});
},
getBLEDeviceCharacteristics(deviceId, serviceId) {
const that = this;
wx.getBLEDeviceCharacteristics({
deviceId: deviceId,
serviceId: serviceId,
success(res) {
console.log('获取特征值成功:', res.characteristics);
for (let i = 0; i < res.characteristics.length; i++) {
if (res.characteristics[i].properties.write) {
that.setData({
serviceId: serviceId,
characteristicId: res.characteristics[i].uuid
});
return;
}
}
}
});
},
connectDevice() {
this.startBluetoothDevicesDiscovery();
},
sendData() {
const that = this;
// 选择本地 TXT 或 PY 文件
wx.chooseMessageFile({
count: 1,
type: 'file',
extension: ['txt', 'py'],
success(res) {
const filePath = res.tempFiles[0].path;
const fileName = res.tempFiles[0].name;

// 读取文件内容为 ArrayBuffer
wx.getFileSystemManager().readFile({
filePath: filePath,
success(readRes) {
const fileBuffer = readRes.data;
console.log(readRes.data)
const chunkSize = 20; // 每次发送20字节
const totalChunks = Math.ceil(fileBuffer.byteLength / chunkSize);

// 发送文件名称和分片数
const fileInfo = `${fileName}|${totalChunks}`;
const fileInfoBuffer = that.stringToArrayBuffer(fileInfo);
wx.writeBLECharacteristicValue({
deviceId: that.data.deviceId,
serviceId: that.data.serviceId,
characteristicId: that.data.characteristicId,
value: fileInfoBuffer,
success(res) {
console.log('文件信息发送成功');
},
fail(res) {
console.error('文件信息发送失败', res);
}
});

// 逐块发送文件数据
for (let i = 0; i < totalChunks; i++) {
const start = i * chunkSize;
const end = Math.min(start + chunkSize, fileBuffer.byteLength);
const chunk = fileBuffer.slice(start, end);
const progress = ((i + 1) / totalChunks) * 100;

// 发送当前块数据
wx.writeBLECharacteristicValue({
deviceId: that.data.deviceId,
serviceId: that.data.serviceId,
characteristicId: that.data.characteristicId,
value: chunk,
success(res) {
console.log(`数据发送成功: ${i + 1}/${totalChunks} (${progress}%)`);
if (i === totalChunks - 1) {
// 发送结束标志
const endBuffer = that.stringToArrayBuffer('END');
wx.writeBLECharacteristicValue({
deviceId: that.data.deviceId,
serviceId: that.data.serviceId,
characteristicId: that.data.characteristicId,
value: endBuffer,
success(res) {
console.log('所有数据发送完成');
}
});
}
},
fail(res) {
console.error(`数据发送失败: ${i + 1}/${totalChunks}`, res);
}
});
}
},
fail(err) {
console.error('文件读取失败', err);
}
});
},
fail(err) {
console.error('文件选择失败', err);
}
});
},

// 将字符串转换为 ArrayBuffer
stringToArrayBuffer(str) {
const base64 = wx.arrayBufferToBase64(new TextEncoder().encode(str).buffer);
return wx.base64ToArrayBuffer(base64);
},

// 发送控制消息
sendMessage(message) {
const that = this;
const buffer = that.stringToArrayBuffer(message);
wx.writeBLECharacteristicValue({
deviceId: that.data.deviceId,
serviceId: that.data.serviceId,
characteristicId: that.data.characteristicId,
value: buffer,
success(res) {
console.log(`消息发送成功: ${message}`);
},
fail(res) {
console.error(`消息发送失败: ${message}`, res);
}
});
},
// 松开按钮时发送消息
handleTouchEnd() {
this.sendMessage('释放');
},
sendUp() {
this.sendMessage('上');
},
sendDown() {
this.sendMessage('下');
},
sendLeft() {
this.sendMessage('左');
},
sendRight() {
this.sendMessage('右');
},
sendA() {
this.sendMessage('A');
},
sendB() {
this.sendMessage('B');
},
sendC() {
this.sendMessage('C');
},
sendD() {
this.sendMessage('D');
}

安卓系统有一个"运行时允许权限",该权限在不可复现的场景下会出现后台程序还在运行,但权限未授予。可以改为本次使用允许,让每次使用时都询问获取权限。

后话

这里是完整代码

程序的优化是无穷无尽的,所以这里我只实现了最少的功能,如果项目对你有帮助,不用问我,直接拿去用。

摄像头云台控制指令解析

· 12 min read
Allen
software engineer
此内容根据文章生成,仅用于文章内容的解释与总结

购买了一款云台摄像头,你可以在淘宝搜这个关键词知道它长什么样子。

它默认能通过 RS232 协议控制云台转动,但是现在新的主板已经没有这种圆形的接口了,基本都是 USB。

他也可以通过遥控器控制,但是我想尝试自己编写逻辑代码通过键盘控制。

效果是通过监听键盘上下左右等事件,调用对应云台运动的指令,运动到合适角度之后可以按下空格停止云台运动,按下 ESC 退出控制程序。

也可以按下某个按键如数字1,直接运动到预设角度。

咨询商家后,商家提供了 RS232 协议的指令集,所以这里通过 python 的 serial 库尝试通过 USB 口对其云台调用。

RS232 协议说明书

CommandCommand PacketComments
Stop8x 01 06 01 VV WW 03 03 FFVV: Pan Speed
Left8x 01 06 01 VV WW 01 03 FFWW: Tilt Speed
Right8x 01 06 01 VV WW 02 03 FFYYYY: Pan Position
Up8x 01 06 01 VV WW 03 01 FFZZZZ: Tilt Position
Down8x 01 06 01 VV WW 03 02 FF
UpLeft8x 01 06 01 VV WW 01 01 FF
UpRight8x 01 06 01 VV WW 02 01 FF
DownLeft8x 01 06 01 VV WW 01 02 FF
DownRight8x 01 06 01 VV WW 02 02 FF
Absolute Position8x 01 06 02 VV WW 0Y 0Y 0Y 0Y 0Z 0Z 0Z 0Z FF
Relative Position8x 01 06 03 VV WW 0Y 0Y 0Y 0Y 0Z 0Z 0Z 0Z FF
Home8x 01 06 04 FF
Reset8x 01 06 05 FF

这里有一些复合指令,譬如 UpRight:向上的同时向右,如果是手柄控制比较好,键盘控制比较鸡肋,所以这里我们实现:上下左右、暂停、复位、绝对定位这几个能用到与可能会用到的。

基数转换

这里的绝对定位和相对定位部分,出现了0Y 0Y 0Y 0Y0Z 0Z 0Z 0Z,我希望传入一个 10 进制的角度,譬如0、90、180,怎么映射到其中呢?这就体现我们学完二进制之后的敏感度了,把 20 转成 2 进制的过程是:

20 ÷ 2 = 10 余数: 0
10 ÷ 2 = 5 余数: 0
5 ÷ 2= 2 余数: 1
2 ÷ 2=1 余数: 0
1 ÷ 2= 0 余数: 1
按余数倒序排列: 10100

这里我们观察范例,对数据做了拆分,即如果需要把10进制映射到16进制上,譬如17转成16进制是11,那么应该变成0101。每个位置之间插入0

所以可以写出如下代码:

def calculate_pan_position_bytes(pan_pos_value):
HEX_VALUES = [4096, 256, 16, 1] # 定义常量
pan_pos_ints = []
for i, value in enumerate(HEX_VALUES):
pan_pos_ints.append(pan_pos_value // value)
pan_pos_value %= value
# 转换为2位16进制字符串
pan_pos_strs = [f"{i:02X}" for i in pan_pos_ints]
return "".join(pan_pos_strs)
# 将17转化为16进制,应该是11,拆分加0,应该是0101
# 前方补0到总长度为8位,结果与我们预期一致
print(calculate_pan_position_bytes(17))
# 00000101

接下来通过代入0到6000这样的数值传输给串口后发现,只能向左转。

0对应居中,4500对应向左转180,数字再大也是转到底。

4500比180 = 25比1,所以我们输入角度,乘以25就得到了对应的信号值。

根据手册说明水平转动范围为355度,一半则是177.5度,与肉眼观察基本一致,Z轴的范围是上下各21度。

刚刚只能向左转,那么向右转的答案就呼之欲出了,要么是补码(异或运算后加1),要么是首位为符号位。我们添加上限位和映射,先用补码试试完成这个函数(结果直接成了)。

def calculate_pan_position_bytes(pan_pos_value, axis_type):
"""
计算轴(旋转)的位置字节。

参数:
pan_pos_value (int): 位置值,
axis_type (str): 轴的类型 ('y' or 'Y' for Y-axis, others for Z-axis)

返回:
pan_step_str (str): 计算得到的平移位置字节,格式为十六进制字符串。
"""
if axis_type.lower() == "y":
pan_pos_value = max(-177.5, min(pan_pos_value, 177.5)) # 限制取值范围
else:
pan_pos_value = max(-21, min(pan_pos_value, 21)) # 限制取值范围

pan_pos_value = int(pan_pos_value * 25) # 将角度转换为步长
pan_direction = "-" if pan_pos_value < 0 else "+" # 设定旋转方向
pan_pos_value = abs(pan_pos_value) # 取绝对值

HEX_VALUES = [4096, 256, 16, 1] # 定义常量

pan_pos_ints = []
for i, value in enumerate(HEX_VALUES):
if pan_direction == "+":
pan_pos_ints.append(pan_pos_value // value)
else: # 异或操作
pan_pos_ints.append((pan_pos_value // value)^ 0xF)
if i == 3 : # 最后一个数字,取反后加1
pan_pos_ints[-1] = pan_pos_ints[-1]+1
pan_pos_value %= value

# 转换为2位16进制字符串
pan_pos_strs = [f"{i:02X}" for i in pan_pos_ints]
return "".join(pan_pos_strs)

代码目标效果

希望具体的指令都可以通过 Python 函数来实现,同时暴露出所有可能需要修改的参数。最后关联键盘事件。例如:

camera_control.py
import keyboard
from usbcamera import *
from usbcamera import move_to_absolute_position
"""
设备 "/dev/ttyUSB0" 的云台旋转至绝对定位:
Y轴转到180度,速度为9.
Z轴转到30度,速度为10
"""
move_to_absolute_position(vv=9, ww=10, Y=180, Z=30, device="COM16")

# 关联键盘事件和控制函数
keyboard.on_press_key("up", lambda _: turn_up(device="COM16"))
keyboard.on_press_key("down", lambda _: turn_down(device="COM16"))
keyboard.on_press_key("left", lambda _: turn_left(device="COM16"))
keyboard.on_press_key("right", lambda _: turn_right(device="COM16"))
keyboard.on_press_key("enter", lambda _: move_home(device="COM16"))
keyboard.on_press_key("space", lambda _: turn_stop(device="COM16"))
# 按下数字1则转动到水平最左,垂直最下,可以根据自己需要多预设几个目标角度。
keyboard.on_press_key("1", lambda _: move_to_absolute_position(vv=10, ww=10, Y=180, Z=-30, device="COM16"))

# 让脚本保持运行状态以捕获事件
keyboard.wait("esc") # 按 'esc' 键退出

信号机制

  • 当收到左转信号时,摄像头会持续左转,直到到达限位位置或接收到新指令。

  • 如果想要提前结束左转,可以在发送左转信号一定时间后发送停止指令,摄像头收到停止指令时会停止。

  • 每个云台旋转操作会持续一定时间,如果在旋转期间收到其他指令,会终止旧指令,执行当前指令。

逻辑代码

通常在 Windows 系统上,串口名称通常是 COMx(如 COM1、COM2),而在 Linux 系统上通常是/dev/ttyUSBx(如/dev/ttyUSB0)。

usbcamera.py
#!/usr/bin/env python3
# coding:utf-8

import serial
import serial.tools.list_ports
import time

# VISCA命令集
commands = {
"stop": "81010601{vv}{ww}0303FF",
"left": "81010601{vv}{ww}0103FF",
"right": "81010601{vv}{ww}0203FF",
"up": "81010601{vv}{ww}0301FF",
"down": "81010601{vv}{ww}0302FF",
"upleft": "81010601{vv}{ww}0101FF",
"upright": "81010601{vv}{ww}0201FF",
"downleft": "81010601{vv}{ww}0102FF",
"downright": "81010601{vv}{ww}0202FF",
"absolute_position": "81010602{vv}{ww}{Y}{Z}FF",
"relative_position": "81010603{vv}{ww}{Y}{Z}FF",
"home": "81010604FF",
"reset": "81010605FF",
}


def send_visca_command(command, device):
"""
通过串口向摄像机发送VISCA命令。

参数:
command (str): 要发送的VISCA命令,格式为十六进制字符串。

返回:
response (bytes): 从摄像机接收到的响应。
"""
try:
ser = serial.Serial(device, 9600, timeout=1) # 初始化串口
command_bytes = bytearray.fromhex(command) # 将命令转换为字节
ser.write(command_bytes) # 发送命令
response = ser.read_all() # 读取响应
ser.close() # 关闭串口
return response
except:
ports_list = list(serial.tools.list_ports.comports())
if len(ports_list) <= 0:
print("未发现端口")
else:
for comport in ports_list:
if "USB" in str(comport):
print("发现USB端口:", comport.device, comport.description)


def calculate_pan_speed_bytes(pan_speed_value):
"""
计算轴(旋转)的位置字节。

参数:
pan_speed_value (int): 速度值,0-16

返回:
pan_step_str (str): 计算得到的平移位置字节,格式为十六进制字符串。
"""

pan_speed_value = max(0, min(pan_speed_value, 16)) # 限制取值范围

# 转为2位16进制
return f"{pan_speed_value:02X}"


def calculate_pan_position_bytes(pan_pos_value, axis_type):
"""
计算轴(旋转)的位置字节。

参数:
pan_pos_value (int): 位置值,
axis_type (str): 轴的类型 ('y' or 'Y' for Y-axis, others for Z-axis)

返回:
pan_step_str (str): 计算得到的平移位置字节,格式为十六进制字符串。
"""
if axis_type.lower() == "y":
pan_pos_value = max(-177.5, min(pan_pos_value, 177.5)) # 限制取值范围
else:
pan_pos_value = max(-21, min(pan_pos_value, 21)) # 限制取值范围

pan_pos_value = int(pan_pos_value * 25) # 将角度转换为步长
pan_direction = "-" if pan_pos_value < 0 else "+" # 设定旋转方向
pan_pos_value = abs(pan_pos_value) # 取绝对值

HEX_VALUES = [4096, 256, 16, 1] # 定义常量

pan_pos_ints = []
for i, value in enumerate(HEX_VALUES):
if pan_direction == "+":
pan_pos_ints.append(pan_pos_value // value)
else: # 异或操作
pan_pos_ints.append((pan_pos_value // value)^ 0xF)
if i == 3 : # 最后一个数字,取反后加1
pan_pos_ints[-1] = pan_pos_ints[-1]+1
pan_pos_value %= value

# 转换为2位16进制字符串
pan_pos_strs = [f"{i:02X}" for i in pan_pos_ints]
return "".join(pan_pos_strs)


def create_command(command_key, vv=10, ww=10, Y=None, Z=None):
"""
创建VISCA命令。

参数:
command_key (str): 命令键名。
vv (str): 水平方向速度,取值范围为0-16
ww (str): 垂直方向速度,取值范围为0-16
Y (str): 控制水平旋转的位置。
Z (str): 控制垂直旋转的位置。

返回:
command (str): 格式化后的VISCA命令字符串。

异常:
ValueError: 当命令需要Y和Z参数时,若未提供,则抛出异常。
"""
if command_key in ["home", "reset"]:
return commands[command_key]
if command_key in ["absolute_position", "relative_position"]:
if Y is None or Z is None:
raise ValueError("Y和Z为位置命令,必须提供")
return commands[command_key].format(
vv=calculate_pan_speed_bytes(vv),
ww=calculate_pan_speed_bytes(ww),
Y=calculate_pan_position_bytes(Y, "y"),
Z=calculate_pan_position_bytes(Z, "z"),
)

return commands[command_key].format(
vv=calculate_pan_speed_bytes(vv),
ww=calculate_pan_speed_bytes(ww),
)


# 控制函数示例
def turn_stop(vv=0, ww=0, device="/dev/ttyUSB0"):
return send_visca_command(create_command("stop", vv, ww), device)


def turn_left(vv=10, ww=10, device="/dev/ttyUSB0"):
return send_visca_command(create_command("left", vv, ww), device)


def turn_right(vv=10, ww=10, device="/dev/ttyUSB0"):
return send_visca_command(create_command("right", vv, ww), device)


def turn_up(vv=10, ww=10, device="/dev/ttyUSB0"):
return send_visca_command(create_command("up", vv, ww), device)


def turn_down(vv=10, ww=10, device="/dev/ttyUSB0"):
return send_visca_command(create_command("down", vv, ww), device)


def move_home(device="/dev/ttyUSB0"):
return send_visca_command(create_command("home"), device)


def move_to_absolute_position(vv=10, ww=10, Y=0, Z=0, device="/dev/ttyUSB0"):
return send_visca_command(create_command("absolute_position", vv, ww, Y, Z), device)

后话

硬件相比软件来说,资料比较少,所以编写过程主要靠经验。

猜测轴旋转的角度和 4 个参数对应关系是最有意思的过程,有趣的功能背后全是数学。

用Flask搭建屏幕共享工具

· 7 min read
Allen
software engineer
此内容根据文章生成,仅用于文章内容的解释与总结
  1. 当你的电脑无法通过视频线连接到电视机,可能是由于线缆长度不足或者接口不兼容,而你的电视机恰好支持浏览器功能。

  2. 当你在外面参加培训或交流活动,现场只提供了一个WiFi网络。大家刚刚熟悉,马上就要开始屏幕分享,你需要一个快速的方式让大家都能看到你的屏幕。

这个时候,你就需要一个工具来捕获和分享你的屏幕和音频(包括设备音频和麦克风输入),并通过网页形式与他人共享。这样,观众无需下载任何会议软件,仅需打开浏览器即可观看。

当然有时候你需要提供必要的文件,比如代码,文档等。所以这个程序还允许你上传与下载文件。上传的文件会保存在当前目录下的upload文件夹中,你也可以从upload文件夹中下载文件。

例如一个伙伴上传了文件,另一个伙伴可以从这个地址下载文件。这个项目我做了一次较大的调整,原本是用python编写的屏幕和麦克风捕获,为了跨平台使用的同时保持代码的简洁,使用了JS重构了代码,使用通用的浏览器获取本地权限的方式,希望这个项目对你有帮助。

项目开源地址:https://github.com/jiangyangcreate/WebScreenShare.git

安装依赖

pip install flask

项目目录结构

项目下的文件夹:

templates 文件夹中包含一个 upload.html 文件,用于页面展示。

uploads 文件夹用于保存上传的文件。

static 文件夹用于放置js代码逻辑:检测摄像头是否存在。

your_project/
├── templates/
│ └── upload.html
│ └── stream.html
├── uploads/
├── static/
└── app.py

核心代码

替换上传表单的样式

upload.html
<!doctype html>
<html lang="zh-CN">
<head>
<style>
/* 美化上传按钮 */
.file-upload {
position: relative;
display: inline-block;
overflow: hidden;
cursor: pointer;
background-color: #ff6600;
color: #fff;
padding: 10px 20px;
border: none;
border-radius: 5px;
transition: background-color 0.3s;
font-size: 16px;
}
.file-upload:hover {
background-color: #e65c00;
}
.file-upload input[type="file"] {
position: absolute;
font-size: 18px;
right: 0;
top: 0;
opacity: 0;
cursor: pointer;
width: 100%;
height: 100%;
}
/* 显示文件名 */
.file-name {
margin-top: 10px;
font-size: 14px;
color: #f0f0f0;
}
input[type="submit"] {
background-color: #ff6600;
color: #fff;
border: none;
padding: 10px 20px;
cursor: pointer;
border-radius: 5px;
transition: background-color 0.3s;
font-size: 16px;
margin-top: 10px;
}
input[type="submit"]:hover {
background-color: #e65c00;
}
</style>
<!-- 引入设备检测的 JavaScript 文件 -->
<script src="{{ url_for('static', filename='js/deviceDetection.js') }}"></script>
<script>
// 显示选中的文件名
function displayFileName(input) {
const fileNameElement = document.getElementById('file-name');
if (input.files.length > 0) {
fileNameElement.textContent = `已选择文件: ${input.files[0].name}`;
} else {
fileNameElement.textContent = '';
}
}
// 显示上传表单
window.onload = function() {
const uploadContainer = document.getElementById('upload-form-container');
const uploadForm = uploadContainer.querySelector('form');
uploadForm.style.display = 'flex';
}
</script>
</head>
<body>
<h2>上传新文件</h2>
<div id="upload-form-container">
<form method="post" enctype="multipart/form-data">
<label class="file-upload">
上传文件
<input type="file" name="file" required accept="image/png, image/jpeg, image/gif, image/jpg" onchange="displayFileName(this)">
</label>
<div class="file-name" id="file-name"></div>
<input type="submit" value="确认">
</form>
</div>
</body>
</html>

映射路由与文件允许范围代码

app.py
# app.py

from flask import Flask, request, redirect, url_for, send_from_directory, render_template
import os

app = Flask(__name__)

# 设置上传文件夹
UPLOAD_FOLDER = 'uploads'
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER

# 允许的文件扩展名
ALLOWED_EXTENSIONS = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif'}

def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS

@app.route('/')
def index():
return render_template('index.html')

@app.route('/upload', methods=['GET', 'POST'])
def upload_file():
if request.method == 'POST':
# 检查是否有文件上传
if 'file' not in request.files:
return redirect(request.url)
file = request.files['file']
if file.filename == '':
return redirect(request.url)
if file and allowed_file(file.filename):
filename = file.filename
file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
return redirect(url_for('upload_file'))

# 显示上传表单和文件列表
files = os.listdir(app.config['UPLOAD_FOLDER'])
return render_template('upload.html', files=files)

@app.route('/uploads/<filename>')
def download_file(filename):
return send_from_directory(app.config['UPLOAD_FOLDER'], filename)

# 新增的路由:设备选择页面
@app.route('/media_selection')
def media_selection():
return render_template('media_selection.html')

# 新增的路由:媒体接口页面
@app.route('/media_interface')
def media_interface():
# 获取用户选择的参数
screen = request.args.get('screen') == 'on'
camera = request.args.get('camera') == 'on'
sound = request.args.get('sound') == 'on'

# 至少选择一项已经在前端验证,这里不再重复
return render_template('media_interface.html', screen=screen, camera=camera, sound=sound)

if __name__ == '__main__':
app.run(debug=True,host='0.0.0.0',port=8080)

使用浏览器接口设备检测

deviceDetection.js

// 检测摄像头是否存在
function hasCamera() {
return navigator.mediaDevices.enumerateDevices()
.then(devices => {
return devices.some(device => device.kind === 'videoinput');
})
.catch(err => {
console.error('Error detecting camera:', err);
return false;
});
}

// 检测麦克风是否存在
function hasMicrophone() {
return navigator.mediaDevices.enumerateDevices()
.then(devices => {
return devices.some(device => device.kind === 'audioinput');
})
.catch(err => {
console.error('Error detecting microphone:', err);
return false;
});
}

// 显示检测结果
function displayDeviceStatus() {
hasCamera().then(camera => {
const cameraStatus = document.getElementById('camera-status');
if (camera) {
cameraStatus.textContent = '摄像头: 已连接';
cameraStatus.style.color = 'green';
} else {
cameraStatus.textContent = '摄像头: 未连接';
cameraStatus.style.color = 'red';
}
});

hasMicrophone().then(microphone => {
const microphoneStatus = document.getElementById('microphone-status');
if (microphone) {
microphoneStatus.textContent = '麦克风: 已连接';
microphoneStatus.style.color = 'green';
} else {
microphoneStatus.textContent = '麦克风: 未连接';
microphoneStatus.style.color = 'red';
}
});
}

// 当页面加载完成后执行
window.addEventListener('load', displayDeviceStatus);

function detectDevices() {
if (!navigator.mediaDevices || !navigator.mediaDevices.enumerateDevices) {
console.log('浏览器不支持设备检测');
return;
}

navigator.mediaDevices.enumerateDevices()
.then(function(devices) {
let hasCamera = false;
let hasMicrophone = false;

devices.forEach(function(device) {
if (device.kind === 'videoinput') {
hasCamera = true;
}
if (device.kind === 'audioinput') {
hasMicrophone = true;
}
});

const cameraCheckbox = document.getElementById('camera-checkbox');
const soundCheckbox = document.getElementById('sound-checkbox');

if (hasCamera) {
cameraCheckbox.disabled = false;
} else {
cameraCheckbox.disabled = true;
cameraCheckbox.checked = false;
}

if (hasMicrophone) {
soundCheckbox.disabled = false;
} else {
soundCheckbox.disabled = true;
soundCheckbox.checked = false;
}
})
.catch(function(err) {
console.error('设备检测时出错:', err);
});
}

// 当页面内容加载完毕后执行设备检测
document.addEventListener('DOMContentLoaded', detectDevices);

总结

浏览器是现在网络应用的标配,使用浏览器接口获取设备权限,可以实现简单优雅的跨平台使用。每种语言都有自己的优势,Python的生态强大,JS的灵活性高,两者结合可以实现很多有趣的应用。

自制智能家居流程

· 17 min read
Allen
software engineer
此内容根据文章生成,仅用于文章内容的解释与总结

如果你想要实现这样一个功能:当窗外开始下雨,窗户自动关闭

如果你想知道解决方案,可以直接跳到最后一部分。

我们仅看前半部分,那么你需要:检测窗外是否有雨水,并把这个信息传递给窗户控制器

这个过程中,你需要:

  • 传感器:检测窗外是否有雨水/或者获取网络上的天气信息
  • 单片机主控板:可以接收传感器的数据,并收发网络请求
  • 执行器:窗户控制器,可以接收单片机的指令,控制窗户的开关
  • 供电:保证传感器和主控板的正常工作

这个过程你需要知道以下内容:单片机如何烧录程序、传感器如何连接、如何获取传感器数据、如何发送网络数据、如何制作外壳(例如 3D 打印)。

传感器基础知识

  • 负极表示符号: - / G / Gnd / 黑色
  • 正极表示符号:+ / V / Vcc / 红色
  • 信号管脚: S 可以表示信号,根据传感器的不同,参数范围是 0-1023 或 0 1
  • 模拟量信号管脚表示符号: A 参数范围在 0-1023
  • 数字量信号管脚表示符号:D 参数为 0 或 1
tip

如果标识与颜色发生冲突,一般以标识为准:例如接口上写着 V,但连接线颜色为黑,一般当作正极处理。

当单片机通电时,所有的引脚都带电,但是只有信号管脚的电压会随着传感器的变化而变化。

因此传感器的正负极理论上可以任意连接,只需保证信号管脚连接指定的即可。

A 口的功能比 D 口更加强大,因为 A 口可以接收模拟信号,而 D 口只能接收数字信号。因此部分传感器 D 接 A 也可以正常读数。

两管脚

常见的有扬声器、电机(俗称马达)。

这类设备因为较为特殊,一般有专门的接口,或者占用 2 个信号端口,通过信号的变化来工作。

马达往往需要更大的工作电压,如果没有专门的连接口,有可能需要在板上使用跳针切换工作电压。

三管脚

这类传感器数量最多,往往由 GVA 或者 GVD 组成。

使用时,正负极与单片机正负极连接,信号线与板上标注的 A 或 D 进行连接。

四管脚

四管脚传感器分为很多类

特殊接口的,譬如:人体温度传感器

正负极与信号口一般都专门对应的位置供连接。

双信号接口的,譬如:超声波

一般有四个接口:GVTE,其中 GV 正常连接,T 和 E 都接在信号管脚上。

同时接收 AD 的,譬如:烟雾传感器

一般有四个接口:GVAD,其中 GV 正常连接,D 表示有没有烟雾,A 表示烟雾浓度。分别接在对应的信号管脚即可。

五管脚

譬如:摇杆传感器

一般五个接口为:GVXYB,其中 GV 正常连接,X 表示 X 轴(是模拟量接 A)、Y 表示 Y 轴(是模拟量接 A)、B 表示按钮(是数字量接 D)

其他特殊类

其他特殊的传感器一般有特殊接口,譬如:摄像头、屏幕等。

根据说明接入即可。

与单片机通信

单片机(Microcontroller Unit,简称 MCU)是指一个微型计算机集成在一个单独的微型芯片中,它包括处理器(CPU)、内存(通常包括 RAM 和 ROM)、以及各种输入/输出(I/O)接口等在内的完整计算设备。

单片机设计用于嵌入式应用,通常在硬件设备中执行特定任务。例如,你的电视遥控器可能就是由一个单片机控制的,它可以接收你的输入,然后发送相应的信号到电视上。其他常见的单片机应用包括玩具、家用电器、医疗设备、汽车等。

有的单片机可以使用完整的 Python,譬如华硕的 thinker edge R、部分树莓派,有的 Arduino 板子、ESP32 等只能使用简化的 MicroPython。

tip

当我们希望通过 windows 计算机的 USB 接口和单片机设备进行串口通信时,需要将 USB 接口转换为标准的串行接口,这个过程需要一个介于 USB 和串口之间的翻译,我下面的驱动就是这个翻译。

信号线的损坏的表现除了信号中断无法传输,也可能导致信号到达时间提前或滞后。

并非所有接口一样的数据线都具有相同功能:有的线仅能慢速充电,有的线可以快速充电,有的线只能充电不能传输数据。确保专线专用。

在搜索引擎中搜 CH341SER 驱动

过程中所有弹窗有下一步点下一步,有确认点确认

在编程软件中识别单片机

常用的编程软件有:Scratch、Mixly、Mixly2、MaixPy 等。

有的支持图形化编程与代码编程,有的需要仅支持代码编程。

下载对应的编程软件后,打开软件。

选择主控这个环节,不同软件的选择方式不同。

tip
  • Vegeta 这样基于 Scratch 的编程软件,需要从左下角选择添加对应的主控型号。

  • Mixly 从右下角,串口旁的下拉菜单选择对应的主控型号。

  • Mixly2 从登录菜单中主控型号后,进入代码编辑页,右上角选择串口旁可以选择更加详细的主控型号。

  • MaixPy 从上方的工具页面中选择开发板型号。

通过连接线连接电脑与单片机。此时可能会有多种情况:

  1. 会提示:有串口连接,并弹出且仅弹出 1 个串口。
  2. 识别计算机上的所有串口,需要自己选择(可以通过反复插拔确认新增的端口号)。
  3. 不弹出任何串口,需要主控通电启动后才识别串口。
  4. 也有的串口时有时无,此时可以考虑:连接线接触不良(更换连接线),或者是主控/USB 电压不稳定——常见于学校机房(主控或电脑独立供电)
  5. 还有的默认的波特率需要调整,否则无法识别传输信号。

连接成功后记得初始化固件,使其恢复到软件对应的固件版本。类似 Android 手机的刷机/恢复出厂设置。

单片机编程

这里的传感器特指狭义的通过半导体检测物理量的传感器,如温度传感器、湿度传感器、光敏传感器等。这些传感器的特点是:输出信号是数字/模拟信号。

数字量传感器的输出信号是数字信号,他的特点是只 返回/发出 两种状态:高电平和低电平。对应在代码中是 1 和 0 。

  • 如声音传感器如果是数字量传感器,当检测到声音时输出高电平,否则输出低电平。

  • 如小灯,输出高电平表示亮,输出低电平表示灭。

模拟量传感器的输出信号是模拟信号,他的特点是输出的电压值是连续变化的。对应在代码中是 0-1023(通常如此,并非绝对) 。

  • 还是以声音传感器为例,如果是模拟量传感器,当检测到声音时输出的电压值会随着声音的大小而变化。

  • 还是以小灯为例,输出最大值表示最亮,输出最小值表示最暗,亮度会随输出的电压值变化。

有的传感器同时支持数字量和模拟量输出,有的不是。

因此,对于不确认的传感器,我们一般先假设传感器是模拟量传感器,如果不是,再当作数据量处理。

单片机的运行内存往往很小,当创建一个非常复杂的代码时,有可能会导致内存问题,对应各种报错都有可能。

模拟量传感器读取

下面以 32 接口为例

import machine
adc32 = machine.ADC(machine.Pin(32))
while True:
print(adc32.read_u16())

模拟量传感器输出

下面以 0 接口为例

import machine
pwm0 = machine.PWM(machine.Pin(0))
pwm0.duty_u16(0)
pwm0.duty_u16(255)

然而,有些动力类传感器需要设置占空比:占空比主要与脉冲宽度调制(Pulse Width Modulation,PWM)相关,它是一种模拟信号的数字化表示方法。在 PWM 中,一个周期内的高电平时间占总周期时间的比例就是占空比。

传感器的输出类型可以有多种,包括模拟电压、模拟电流、数字信号(如 I2C、SPI、UART 等)、频率、PWM 等。只有在使用 PWM 输出的传感器时,才需要设置占空比。例如,一些伺服电机会使用 PWM 信号来控制其位置,这时就需要设置占空比。

对于其他类型的传感器,如模拟电压输出的传感器、数字信号输出的传感器等,就不需要设置占空比。这些传感器的输出通常是连续的或者是特定的数字信号,不涉及到占空比的概念。

from machine import Pin, PWM
import time

# 创建一个PWM对象
pwm = PWM(Pin(2))

# 设置PWM信号的频率为50Hz
# 每秒50个周期,所以每个周期的时间是1秒/50,即20ms。
pwm.freq(50)

# 一般来说,当PWM信号的高电平时间为1ms时,舵机转到0度;
# 当高电平时间为2ms时,舵机转到最大角度。
# 这个范围内的其他高电平时间对应的是0到180度之间的其他角度。

# 转到0度()
pwm.duty(52) # 1ms / 20ms * 1024 = 51.2 取不低于最小值的整:52
time.sleep(1) # 等待一段时间让舵机转到指定位置

# 转到180度
pwm.duty(102) # 2ms / 20ms * 1024 = 102.4 取不高于最大值的整:102
time.sleep(1) # 等待一段时间让舵机转到指定位置

# 关闭PWM
pwm.deinit()

数字量传感器读取

import machine

pin0 = machine.Pin(0, machine.Pin.IN)
while True:
print(pin0.value())

数字量传感器输出

import machine
import time

pin13 = machine.Pin(13, machine.Pin.OUT)
while True:
pin13.value(0)
time.sleep_ms(50)
pin13.value(1)
time.sleep_ms(50)

单片机网络通信

获取天气

心知天气 API 分为免费版、付费版等多个坂本,不同的版本返回的数据数量有所不同。

免费版仅返回三种基本数据,付费版可以返回多种数据。mixly 中默认的 KEY 为高级付费版,可返回全部数据。

数据返回的格式为字典,因此可以通过如下方式进行解包,下面的代码提供了部分数据解包的方法。

需要注意的是,该功能为联网功能,需要在联网环境下使用,确保 wifi 名和密码正确。

import mixiot
import machine
import seniverse_api


mixiot.wlan_connect('wifiname','wifipassword')
print(seniverse_api.weather_now('SGJl0ExVN-4j27msR','北京'))

onenet 物联网传输数据至云端

onenet 物联网是中国移动推出的物联网交互平台,主要面向一般开发者,因此 AIbox 这款设备可以使用 onenet 物联网平台进行数据传输。

相比于 mixio 这样专注于单片机的物联网平台来说,onenet 的文档与接口可能会频繁变动,如有出入以官网教程为准。

onenet 物联网平台网址:https://open.iot.10086.cn/doc/

文档中提供了传输文本与文件 2 种方式

import json
import asyncio
import websockets
from uuid import uuid4

# 音频文件测试路径。
audioFile = "test.mp3"
# 使用自己产品Id和apikey替换下列参数。
productId = "x"
apikey = "x"

#发送文本请求
async def textRequest(ws):
content = {
"aiType":"dm",
"topic": 'nlu.input.text',
"recordId": uuid4().hex,
"refText": "测试" #修改文本请求的输入
}
try:
await ws.send(json.dumps(content))
resp = await ws.recv()
print(resp)
except websockets.exceptions.ConnectionClosed as exp:
print(exp)

#发送音频请求
async def audioRequest(ws):
content = {
"aiType": "dm", #可选dm/asr, dm获取对话结果,asr只获取asr结果
"topic": "recorder.stream.start",
"recordId": uuid4().hex,
"audio": {
"audioType": "mp3", #修改为测试文件的类型
"sampleRate": 16000, #修改为测试文件的sampleRate
"channel": 1, #修改为测试文件的channel
"sampleBytes": 2 #修改为测试文件的sampleBytes
},
"asrParams": {
"realBack": True, #实时返回asr结果
"enableVAD": True, #启动VAD
"enablePunctuation": True, #返回结果是否带拼音
"enableTone": True, #返回结果是否带声调
"enableConfidence": True, #返回结果是否带置信度
"enableNumberConvert": True, #返回结果是否进行数字转换
},
}
try:
#发送文本消息
await ws.send(json.dumps(content))
# 发送音频消息
with open(audioFile, 'rb') as f:
while True:
chunk = f.read(400) #wav buffsize=3200 其他的400
if not chunk:
await ws.send(bytes("", encoding="utf-8"))
break
print(len(chunk))
await ws.send(chunk)
async for message in ws:
print(message)
resp = json.loads(message)
if 'dm' in resp:
break
except websockets.exceptions.ConnectionClosed as exp:
print(exp)
ws.close()

async def dds_demo():
url = f"ws://botai-dsg.and-home.cn:4443/dsg/v1/prod?productId={productId}&apikey={apikey}"
print(url)
async with websockets.connect(url) as websocket:
#await textRequest(websocket) #发送文本请求
await audioRequest(websocket) #发送音频请求
asyncio.get_event_loop().run_until_complete(dds_demo())

后话

最后,通过大量的学习和试错打样,你发现米家雨水传感器,淘宝 46 包邮,搞活动更便宜,这大概是你最后的选择。

乐高、激光切割与3D打印

· 18 min read
Allen
software engineer
此内容根据文章生成,仅用于文章内容的解释与总结

内容为最近一年的个人硬件产品的结构设计的对比分析,作为 DIY 爱好者,必然会有不足之处,欢迎指正。

假设你想自己制作一个桥梁模型送给朋友,手工制作一个当然很好,使用木头、或者捏一个土坯然后烧制、又或者直接使用车床加工。但是这需要一定的技艺,以及各种器具。

但是除了手工制作之外,你依然有很多种方式可以选择:

  • 乐高搭建,你只需要知道乐高拼搭基础,设计出图纸后导出并购买零件(正版与非正版价格差距极大)。

  • 3D 打印,3D 打印有多种不同的原理,材料可选类型也比较多,设计出模型之后切片打印即可。

  • 激光切割,使用激光切割机切割亚克力板或者木板,之后拼搭。

简短对比 :

技术乐高3D 打印(FDM)激光切割
特点模块化设计,易于组装和拆解可以打印出复杂的三维结构可以精确地切割出复杂的二维结构
结构强度玩具级工具级工具级
设计难度小时级小时级小时级
调试难度儿童中等
设备价格千元级千元级
材料价格十元级十元级百元级
制作速度小时级小时级分钟级
气味有(根据材料不同,可能有塑料熔化的气味)有(切割过程可能产生烧焦的气味)
噪音有(打印过程可能会产生噪音)有(切割过程可能会产生噪音)
优点总结1. 适合所有年龄层,易于上手
2. 可重复使用,具有高度的灵活性
3. 无需特殊工具或设备
1. 可以制作出复杂的三维结构
2. 可以打印出定制的零件,适合个性化设计
1. 可以精确地切割出复杂的二维结构
2. 结构稳固,适合制作承受重负的结构
缺点总结1. 结构可能不够稳固,不适合制作大型或承受重负的结构
2. 设计和功能可能受到乐高模块种类和数量的限制
1. 打印速度较慢,大型结构可能需要很长时间
2. 需要一定的设计和操作技能,学习曲线较陡峭
1. 设备价格高昂,运行和维护成本也较高
2. 需要一定的设计和操作技能,学习曲线较陡峭

乐高类

优点

  • 设计快速、简单,易于拆卸,非常适合快速验证创新想法。

  • 乐高模型可以随时调整,拆解后的部件可再次使用,避免了材料的直接损耗。相比之下,3D 打印和激光切割等技术一旦出现错误,调整难度较大,材料利用率相对较低。

缺点

  • 乐高结构的强度有限,不适合承受较大的力,因此更适合轻量级的应用。

  • 乐高零件种类有限,对复杂结构的设计造成一定限制。

  • 乐高零件的精度也有限,主要的结构单位和半个单位,这意味着你的模型尺寸需要是 0.5 个单位长度的倍数。

学习心得

搜索一些搭建图纸,可以帮助你更好地理解乐高的设计原理和技巧。不用管书籍的语言是德语还是英语。

软件

乐高结构设计软件:https://studiohelp.bricklink.com/hc/en-us

优点:软件可以导出零件名称,一键跳转采购。

总结

乐高设计快,但是采购之后自己拼搭比较耗时,遇到问题时可以先思考我之前有没有见过类似的图纸,没有见过而且比较复杂的话,一般选择 3D 打印。

3D 打印

3D 打印是个人制造的一种重要方式,它可以将数字模型转化为实体物体。一张 A4 纸的大小大约是 210mm×297mm,可以在此基础上选购合适的成型尺寸。

3D 打印分为多种技术,常见的有 FDM(熔融沉积成型)、SLA(光固化成型)、SLS(激光烧结成型)等。其中 FDM 是最常见的 3D 打印技术,也是我使用的技术。

3D 打印目前主要有:

  • 定制手办方向

需要深入的学习建模软件

学习调色与上色

tip

目前桌面级打印机难以直接获得彩色精美手办。大多是光固化再上色或彩色 FDM 打印后打磨。

设备需要常维护:使用洗涤器清洗底板、挪动后重新调平。便宜的设备往往打印慢的同时需要调试:层纹错位、堵头、首层不平、拉丝翘边、悬空下垂等问题。

  • 工业设计

简单的学习建模软件

深入学习螺丝螺母磁铁钢针等材料的嵌入

tip

嵌入螺丝与螺母

暂停埋入螺母,预留 0.2mm 的间隙,然后再埋入螺母,可以避免打印头碰到螺母。

暂停层设置方法:切片后点右边的 z 轴预览条,右键可以选择:添加暂停打印

嵌入磁铁

普通磁铁与 N52 磁铁价格相差较大,但依旧推荐使用 N52 磁铁。因为普通磁铁在打印件中,磁吸力会有所衰减,而 N52 磁铁衰减后依然有不错的吸力。如果因为吸力不够导致打印件整体作废得不偿失。

设计孔位时磁铁最好下沉,齐平与突出都会让磁铁被撞碎。下沉太多会导致吸力下降。也侧水平设置槽位,让磁铁便于更换。

留出旷量给胶水(慢干胶与UHU胶水会腐蚀结构件,推荐快干胶502)、或者使用过盈、或者暂停+封顶(选择强磁,否则吸力下降严重)

不同尺寸、磁力的磁铁阵列排布时,距离过近会互相影响。常见的有:6mm直径 2mm 的N55磁铁、10mm直径的磁铁

分辨磁铁NS极可以使用一个3d打印件,一头标注N,一头标注S,放置磁铁后,吸起来哪个就放进去。

嵌入钢针

钢针是最为常见的材料,可以选择的尺寸最多,将钢针以不同角度垂直嵌入 3D 打印件中可以提高打印件的强度。

想要将钢针水平嵌入到 3D 打印件中同时避免磕碰打印头、保持与打印件接触紧密,可以通过将打印一个辅助件完成:长方体的内切圆柱(剪切掉内切圆柱),然后将钢针嵌入到辅助件中,再通过暂停层将辅助件嵌入到打印件中。

另外也可以利用钢针制作合页等结构,比纯3D打印更耐用。

在 3D 打印过程中,材料的选择对打印效果和性能有重要影响。下面是部分常见材料及其价格与特点。

tip

不同品牌价格不同,仅供参考,以下是一些材料的前缀与后缀的简要解释:

  • CR ,表示更多色彩
  • CF ,表示碳纤维复合
  • Matte ,表示哑光质感
  • Silk ,表示丝绸质感
  • Hyper/Fast ,表示高速打印
材料全称特点及应用说明价格
ABS丙烯腈-丁二烯-苯乙烯共聚物强度高、耐热性好、易加工,适用于一般性的 3D 打印项目。59 元/KG
PETG改性聚对苯二甲酸乙二醇酯耐热、透明度高、耐化学性强,适用于透明或耐热性能的打印项目。59 元/KG
PLA聚乳酸生物降解、环保、易于打印、无毒,适用于一般性的 3D 打印项目。69 元/KG
PA聚酰胺/尼龙 (Nylon)机械性能更强,耐热,用于制作普通机械强度要求的打印项目。79 元/KG
TPU热塑性聚氨酯弹性好、耐磨性强,常用于制作柔软的零件或弹性组件。98 元/KG
PET-CF碳纤维增强聚对苯二甲酸乙二醇酯更高的强度和刚性,适用于要求更高机械性能的打印项目。108 元/KG
PLA-CF碳纤维增强聚乳酸更高的强度和刚性,适用于要求更高机械性能的打印项目。129 元/KG
ASA聚酰胺酸酯工程塑料,耐热性和机械性能优异,适用于高强度和耐热性的打印项目。169 元/KG
PC聚碳酸酯抗冲击性和透明度优异,常用于制作耐用的零件或透明的构件。219 元/KG
PA-CF碳纤维增强聚酰胺/尼龙碳纤维更高的强度和耐热性,适用于要求更高机械性能的打印项目。249 元/KG

建模软件

建模软件是 3D 打印的基础,可选的非常多,譬如Fusion 360Blender

模型可以从 0 开始建,也在别人的基础上修改,由于家用3D打印机尺寸较小,工具物品难以一体成型,因此模块化设计思维较为重要。

成品模型库:

优点

  • 只需要一个晚上就可以验证你的复杂创意。

  • 提升空间大,可以选择一体成型,或者碳纤维材料,也可以使用暂停打印嵌入螺丝、螺母、磁石、金属棒等来创作一些强度极高的混合材质的工具。

缺点

  • 连接处的强度不够需要使用人工介入使用特殊手段提升。

  • 打印时间长而且偶尔成品有瑕疵。

学习心得

3D 打印的知识相对来说比较碎片化,需要持续学习与实战验证。可以关注一些 3D 打印博主、逛逛 3D 打印社区,可以学习到很多建模、切片、机器的调试、嵌入等技巧。

软件

从 0 开始 3D 打印需要用到 3D 建模软件与切片软件

建模软件就是构建出你要打印的物品的模型的软件。

切片软件则是把这个模型切割为一层一层,控制 3D 打印机的喷嘴运动路径等参数的软件。目前我的 3D 打印机只认切片后的文件。

这里建模软件推荐找个教程,教程上用什么跟着学也用什么。

切片软件一般厂家有提供。

激光切割

激光切割机主要用于切割和雕刻材料,如亚克力板、金属片、木板等。激光切割机通过激光束对材料进行加热,使其熔化或气化,然后通过气流将熔化或气化的材料吹走,从而实现切割。

可以通过调整激光头的运行速度和激光强度来实现穿透切割和表面雕刻。

激光强度大,激光头移动速度慢,可以实现穿透切割,即将材料完全切断(速度:10 分钟级别)。

激光强度小,激光头移动速度快,可以实现表面雕刻,即在材料表面刻出图案或文字(速度:秒级别)。

优点

  • 刻字速度极快,立等可取。

  • 支持材料多样。

缺点

  • 设备占地较大,气味明显需要单独通风,水冷型的需要偶尔换水。

  • 部分材料切割容易边缘不光滑、发黑。

学习心得

激光切割主要是调试激光能量强度与材料的关系。常见的材料有以下几种:

亚克力板,比较常见的像一块 30*30cm 5mm 厚的透明板材

个人很喜欢的材料,透明的材质用来装日用摆件类的单片机很有“探索版”的感觉。做工具用比较脆,有一定韧性,但保存不当会产生划痕。

金属片:强度比木板要好不少,相比于亚克力板的硬、脆,金属片可以在切割后折弯,用来制作需要弯曲且强度要求高的结构。

无可替代的优势,你永远可以相信金属。

木板:木板具有良好的硬度和强度,面积较大的区域有一定韧性,但细的地方极其容易断裂。另外木板在潮湿的环境下可能会变形,因此需要在适当的环境中存储和使用。

用来制作各种家居装饰、艺术品和模型。木板的颜色和纹理使得切割出来的产品具有自然的美感和温馨的氛围。此外,木板也可以通过砂纸或者涂料进行后期处理,以改变其颜色和质感,增加产品的美观性和耐用性。

有些材料理论上可行,但是我实际工作中没有使用,不便评价:玻璃和陶瓷、织物和皮革、塑料和橡胶

相比前面两项个人级别的制作,激光切割家用较少,其一是气味较大,需要单独通风,有些激光切割机还需要用水桶接冷凝水。其二是占地面积较大,3D 打印机可以放在桌子上,激光切割机需要放在地上,且需要有一定的安全距离。

在学校、工作室等场所,通常会有激光切割机,可以提供激光切割服务。

软件

厂家附赠非开源软件。

服务迁移群晖完整指南

· 7 min read
Allen
software engineer
此内容根据文章生成,仅用于文章内容的解释与总结

群晖是一款非常优秀的 NAS 产品,它可以提供文件存储、多媒体服务、远程访问等功能。一般来说,一台群晖的寿命 4-6 年,折合下来比服务器便宜一点点。因此,我决定做云时代的逆行者,将服务器上的一些服务迁移到群晖上。

注意:云服务器的优点有很多,包括:更加安全、更加灵活、更加便宜等等,迁移需谨慎。

原环境分析

租用的服务器是阿里云产品,选择了宝塔面板进行管理。

网站使用了前后端分离的设计模式,前端基于 Vue 生成的静态页面,后端则基于 Java。后端允许上传一些文件,使用的是 3000 端口进行通信,文件传输则在 8000 端口,路径设置为/upload/。通过 Nginx 进行了反向代理,将/upload/路径的请求转发到 8080 端口,将其他请求(80 端口)转发到 3000 端口。因此,对于这个网站,通信主要通过 80 端口和 8080 端口进行。

tip

如果你遇到了类似的问题,只需要梳理出我们最终通过宝塔面板的哪些端口访问就可以了。

迁移策略

服务器迁移

创建一个 Docker 容器(如果原本用的是宝塔面板,那就继续使用宝塔面板),并按照原来的部署文档在容器内部重新运行服务。

将相关的命令设置为开机自启动。

另外,重要的文件等资料需要单独挂载,并可以设置为只读模式,以防止数据丢失同时更加安全。

端口映射

Docker 的镜像可以通过端口映射的方式,将容器内的端口映射到宿主机(群晖)的端口上。

路由器可以将外网流量转发到宿主机(群晖)上,因此只需要在路由器上设置端口转发即可。

tip

需要注意的是,群晖的部分端口可能已被群晖自身使用,可以通过群晖官网查询。外网的部分端口(如 80 端口)可能被运营商封掉,或者路由器自身需要使用。因此,我们需要选择一些不常用的端口,以避免出现服务异常。

因此有了以下的端口映射规则(你可以先停止容器再设置):

容器内端口(宝塔)宿主机端口(群晖)外网端口
8040804080
808080804880

域名解析

现在我们需要将域名解析到服务器上,由于 80 端口不能直接访问,并且小区和域名供应商都会要求备案,这就需要我们使用 Cloudflare 的 DNS 解析服务。Cloudflare 可以充当中间人,将流量转发到服务器上。

即无法通过 http://www.xxx.com 直接访问到服务器,只能通过域名+端口号如 http://www.xxx.com:8080 访问。

在 cloudflare 中添加域名,全程按照提示操作添加即可。

添加完成后,选择规则->回源(Origin Rules)。因为 Java 用到了两个端口,所以需要添加两个端口转发规则:

  • 当满足条件时(访问域名且路径不以/upload/开头),将流量转发到服务器的 4080 端口。

  • 当满足条件时(访问域名且路径以/upload/开头),将流量转发到服务器的 4880 端口。

结果

通过以上步骤,我们成功地将网站迁移到了新服务器上。用户访问网站时,流程如下:

  • 通过 DNS 解析,找到 cloudflare 的服务器。
  • cloudflare 根据规则,把 80 流量转发到路由器的 4080 上。
  • 服务器接收到流量,把流量转发到群晖的 4080 端口。
  • 群晖接收到流量,把流量转发到容器的 80 端口。
  • 容器接收到流量,通过 Nginx 分配数据。
  • Nginx 根据规则,把流量转发到容器内的 8080 端口。
  • 数据按照原本的路径逐一返回至 cloudflare。
  • cloudflare 把数据返回给访问者。

如法炮制可以继续迁移其他站点。

这种迁移方式不仅保证了原有服务的连续性,也确保了服务器不会被外网直接访问,从而提高了网络安全性。即使网站被黑,也不会影响到其他服务,只需重启这个容器即可恢复原状。

后话

这次的迁移过程体现了分层思想的重要性,这主要来自《白帽子讲 Web 安全》这本书。整个过程没有遇到什么问题,只需要对容器化和 Cloudflare 有一定的了解。我希望我的经验能给读者带来一些启示。

生成式AI提取博客精华

· 8 min read
Allen
software engineer
此内容根据文章生成,仅用于文章内容的解释与总结

前段时间偶然间看到了一些生成式 AI 文本摘要项目,觉得很有意思。个人不太信任第三方服务,于是就加到待办里,想着自己也实现一个,最近终于有空了。

逻辑上的核心功能是:自动生成,无需人工干预,一次生成,再次生成消耗 key

样式上的核心功能是:逐字显示,好像是个机器人真的在实时生成。

本篇文章将记录如何实现这个功能。

原型

博客是基于 Docusaurus 搭建的,而 Docusaurus 是基于 React 的,文章内容是通过 markdown 文件写的,所以需要设计一个 React 组件,传入 markdown 文件内的文本内容,每次有请求时,将文章内容转换为文本摘要。

但是这样做一些问题,主要的是重复的每次请求都会消耗 key,因此需要储存已请求内容。

判断条件可以设为如果内容不存在,则直接调用,否则就重新生成,然后存储。

由此可知我们至少需要:内容(用来判断是否重复)、摘要(用来显示)

{
"This is the text to summarize": "This is the summary",
"This is the text to summarize 2": "This is the summary 2",
}

如果储存是需要成本的,我们可以使用hash值来判断内容是否相同,如果hash值相同,那么就不需要重新生成摘要了。这样不要存储一篇文章,只需要存储hash值和摘要就可以了。

{
"248ae1890a0084b3bbc30bd3c0c2e17e": "summary"
}

如果有多个文章如何每次请求只请求指定的文章呢?

我们可以使用路径来区分不同的文章,在服务器上我们的方法就太多了。

但是静态的话我使用文件名来区分不同的文章。将文章路径中的/替换为_,然后加上.json后缀,就可以了。

blog_1.json
{
"248ae1890a0084b3bbc30bd3c0c2e17e": "summary"
}

把这个代码逻辑插入到 React 组件中就可以实现了,根据你调用的API不同,你也许可以设置返回的摘要长度等参数。

记得别直接把key写在代码里,而是通过环境变量传入。如果你的项目通过github pages部署,那么可以在项目的setting中设置环境变量REACT_APP_API_KEY,然后在代码中通过process.env.REACT_APP_API_KEY来获取。

实现

当然,这只是一个比较粗糙的想法,接下来让我们完善下代码细节,让它优雅的同时,可以在博客中使用。

逻辑功能

我在reflex-chat#20里提交了关于百度API的实现,在这个仓库里你应该能找到其他API的操作方式。

main.py
import os
import json
import time
import hashlib
import pathlib
import requests
import feedparser
from parsel import Selector
from datetime import datetime
from jinja2 import Environment, FileSystemLoader
class BaiduAI:
def __init__(self):
self.BAIDU_API_KEY = os.getenv("BAIDU_API_KEY")
self.BAIDU_SECRET_KEY = os.getenv("BAIDU_SECRET_KEY")
self.token = self.get_access_token()

def get_access_token(self):
"""
:return: access_token
"""
url = "https://aip.baidubce.com/oauth/2.0/token"
params = {
"grant_type": "client_credentials",
"client_id": self.BAIDU_API_KEY,
"client_secret": self.BAIDU_SECRET_KEY,
}
return str(requests.post(url, params=params).json().get("access_token"))

def get_result(self, text: str):
messages = json.dumps(
{
"messages": [
{
"role": "user",
"content": "阅读下面的博文,然后尽可能接近50个词的范围内,提供一个总结。只需要回复总结后的文本:{}".format(
text
),
}
]
}
)
session = requests.request(
"POST",
"https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/completions_pro?access_token="
+ self.token,
headers={"Content-Type": "application/json"},
data=messages,
)
json_data = json.loads(session.text)
if "result" in json_data.keys():
answer_text = json_data["result"]
return answer_text


class Jsonsummary:
def __init__(self):
root = pathlib.Path(__file__).parent.resolve()
self.json_file_path = os.path.join(root,"summary")
self.url = "https://jiangmiemie.com/"
self