本章节通过 ROS2 官方提供的 Gazebo 虚拟仿真,实现视觉运动算法仿真。
什么是视觉运动控制算法?
运动控制算法主要是通过传感器,将采集到的数据,转化为运动控制指令。
传感器包含:视觉、重力、力反馈、加速度、测距等多种类型。特斯拉的自动驾驶使用的方案中视觉是最主要的信息来源。
运动控制从易到难有:履带结构、差速结构、阿克曼结构、全向结构、足腿结构。
为什么要虚拟仿真?
目前较为先进的腿足结构机器人想要完成高难度动作,例如:后空翻、前空翻、韦伯斯特空翻。一旦失败会对本不充裕的原型机造成严重的损坏。
所以通过仿真软件训练,有如下好处:
- 可以在低风险的情况下完成算法的调试和优化。
- 对于复杂算法,可以同时启动多个仿真软件,进行并行训练,提高训练效率。
虚拟仿真需要一定的算力。本地使用前,首先确保你的电脑包含显卡。
-
✔推荐使用原生Ubuntu最大限度发挥硬件性能。非原生环境下可能出现消息异常、时间异常等问题。
-
高算力的电脑可以使用wsl安装Ubuntu。
-
❌使用VM虚拟机会产生较大的延迟。
用机器学习中的概念来类比,Gazebo 好比是 PyTorch,ROS2 好比是 Python。
你希望使用 PyTorch 开始深度学习,所以要查看最新版 PyTorch 适配的 Python 版本(3.10+),需要英伟达显卡才能加速,再查看 Python 支持的操作系统(Mac、Linux、Windows)。
搞清楚之后,再从系统开始,安装 Python,最后 PyTorch 和显卡驱动安装成功。谋定而后动。
ROS2框架
ROS 2 需要依赖于 Ubuntu 系统,其微控制器相较于普通 MCU 成本更高。主要用于高级机器人。
通信优势
丰富的通信类型
ROS 2 节省自己搭建网络、定义通信类型、管理连接的时间。
通信模式 | 作用 | 特点 | 应用场景 |
---|---|---|---|
话题:发布-订阅 (Publish-Subscribe) | 单向、异步数据流 | 高效、解耦,适用于传感器数据流、状态更新 | 传感器数据流(如相机图像、激光雷达数据)、机器人状态更新(如位置、速度) |
服务:服务端-客户端 (Service-Client) | 同步、请求-响应 | 阻塞式通信,用于需要明确结果的任务 | 触发特定动作(如抓取物体) 、查询物体 ID |
动作 (Actions) | 长时任务处理 | 异步、支持进度反馈和取消 | 移动机器人到指定位置、机械臂执行复杂任务 |
参数 (Parameters) | 动态配置节点行为 | 非实时通信,用于启动时或运行时的配置调整 | 调整 PID 控制器增益、修改传感器发布频率 |
自动发现机制
同为 ROS 2,在同一网段下的不同设备可以自动发现。
自动发现前提:指要求中继设备允许子设备自由通信。
中继设备可能是路由器、或者手机的热点、随身WIFI等。
允许子设备自由通信有利有弊,如果你在商场或者咖啡馆连接他们的WIFI,你的电脑可能提醒你:你的网络活动可能被其他设备发现。自由通信会带来一定的安全隐患。因此并不是所有的路由器都默认允许允许子设备自由通信。
有的设备会有防火墙,临时关闭防火墙:sudo ufw disable
,另外要允许UDP
多播端口(默认7400~7500)。
如果一切正常,你应该可以通过 ping <其他机器的 IP>
并联通成功。
此时你可以通过 ros2
运行节点发布消息,其他设备的接收者会收到你的消息。
隔离机制
如果你同个网段下,有无人机和机器狗。你想将其编队为:空军 + 陆军;又或者 5 台无人机 + 机器狗为一组。
ROS 2 提供了通信隔离机制。
在同个编队的所有设备终端中执行:export ROS_DOMAIN_ID=1
。(值范围为 0~101,0 为默认值)
这个命令是设置了一个环境变量。ROS 2 通过判断这个环境变量来识别是否为一组。不同 ID 节点无法通信。
因此如果你想将其分为两组,只需要在一半的设备上改为非 0 的,相同的 ID 即可。
多设备跨域通信
如果你的设备分别在两个城市,可以通过配置一台公网下的服务器(也可以在两个网段下,配置一个都能访问的设备作为中继),将两个城市的设备连接到这台服务器上。这种通信方式被称为Fast-DDS。
在服务器上运行:fastdds discovery --server-id 0
。
服务器开放11888端口。sudo ufw allow 11888/tcp
在设备上运行:export ROS_DISCOVERY_SERVERS=<服务器IP>:<服务器端口>
。(端口默认11888)
设置ROS中间件为rmw_fastrtps_cpp
(别人写好的包):export RMW_IMPLEMENTATION=rmw_fastrtps_cpp
然后重新监听:ros2 daemon stop
,再重新启动:ros2 daemon start
。
模块仓库
ROS 2 有大量写好的机器人相关的包,包也称:模块、功能包、库。
安装相机标定:
sudo apt install ros-jazzy-camera-calibration
安装 SLAM:
sudo apt install ros-jazzy-slam-toolbox
你会发现你只需要安装一个包,就可以获得一个功能。和 Python 通过 pip 安装包一样。安装后即可实现"一行代码"实现某个功能。
ROS 2安装
安装过程中注意:ROS 2 的版本往往与 Ubuntu 的版本有关。查看 https://docs.ros.org/ 可获得目前主流的版本。
设置编码
sudo apt update && sudo apt install locales
sudo locale-gen en_US en_US.UTF-8
sudo update-locale LC_ALL=en_US.UTF-8 LANG=en_US.UTF-8
export LANG=en_US.UTF-8
添加源
sudo apt update && sudo apt install curl gnupg lsb-release
sudo curl -sSL https://raw.githubusercontent.com/ros/rosdistro/master/ros.key -o /usr/share/keyrings/ros-archive-keyring.gpg
echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/ros-archive-keyring.gpg] http://packages.ros.org/ros2/ubuntu $(source /etc/os-release && echo $UBUNTU_CODENAME) main" | sudo tee /etc/apt/sources.list.d/ros2.list > /dev/null
安装ROS 2
sudo apt update
sudo apt upgrade
# 如果你是 Ubuntu 24.04,那么安装 Jazzy 版本
sudo apt install ros-jazzy-desktop
设置环境变量
# 立刻执行脚本
source /opt/ros/jazzy/setup.bash
# 之后每次开启终端执行执行脚本
echo " source /opt/ros/jazzy/setup.bash" >> ~/.bashrc
你也可以使用Docker简化这个过程。
复杂的功能需要很多依赖,系统中默认的 Python3 出于安全考虑,可能不让你在全局环境中安装包。代码的提示给了很多选项,例如安装在虚拟环境中,删除安全文件。加上关键字,你应该学会通过翻译软件,翻译报错信息。它让你看某个文件,也请学会使用翻译软件,翻译文件内容。
error: externally-managed-environment
× This environment is externally managed
╰─> To install Python packages system-wide, try apt install
python3-xyz, where xyz is the package you are trying to
install.
If you wish to install a non-Debian-packaged Python package,
create a virtual environment using python3 -m venv path/to/venv.
Then use path/to/venv/bin/python and path/to/venv/bin/pip. Make
sure you have python3-full installed.
If you wish to install a non-Debian packaged Python application,
it may be easiest to use pipx install xyz, which will manage a
virtual environment for you. Make sure you have pipx installed.
See /usr/share/doc/python3.12/README.venv for more information.
note: If you believe this is a mistake, please contact your Python installation or OS distribution provider. You can override this, at the risk of breaking your Python installation or OS, by passing --break-system-packages.
hint: See PEP 668 for the detailed specification.
根据提示,得到的解决方式之一:pip3 install 你的包 --break-system-packages
或者可以把externally managed
文件删除。
sudo apt install -y python3-pip
sudo pip3 install rosdepc --break-system-packages
sudo rosdepc init
rosdepc update
节点基操
节点是通信的基本单位,节点可以变成:发布者、订阅者、服务端、客户端、动作、参数。
创建节点
创建节点前可以先创 建一个目录
mkdir -p ~/workspace/src
如何在 ROS2 中创建一个功能包呢?我们可以使用这个指令:
ros2 pkg create --build-type <build-type> <package_name>
ros2 命令中:
- pkg:表示功能包相关的功能;
- create:表示创建功能包;
- build-type:表示新创建的功能包是 C++ 还是 Python 的,如果使用 C++ 或者 C,那这里就跟 ament_cmake,如果使用 Python,就跟 ament_python;
- package_name:新建功能包的名字。
# 创建 Python 功能包
ros2 pkg create --build-type ament_python learning_node
# 创建 C++ 功能包
ros2 pkg create --build-type ament_cmake learning_node
只需执行其中一个命令即可,如果你会 Python,就执行创建 Python 节点的命令。命令创建一个名为 learning_node
文件夹。
这里我推荐大家创建节点名的时候,使用有辨识性的名字。例如:ros2 pkg create --build-type ament_python learning_node
。
注意:对于节点同名,ROS 2 在编译时的处理方式不是使用第一个节点或最后一个节点,而是会报错。
编写代码
接着在learning_node
文件夹我们就可以准备编写代码了,此时你的路径应该是:
~/workspace
|_src
|_learning_node/
|_learning_node
|_ __init__.py
|_package.xml
|_setup.py
|_setup.cfg
|_test/
|_resource/
此时你就可以编写代码了,在 __init__.py
同级下创建一个 .py
文件。
符合文件命名规则前提下,你想叫什么都可以,但是最好有一定的含义。
如果你的功能是打印 Hello World
,那么你就可以叫 node_helloworld.py
。
Python 的函数通常是需要调用的,例如 abc()
函数,你需要在代码中输入 abc()
,才会调用这个函数。和 C++ 有且仅有一个 main
函数不同,那么如何确定哪个函数是入口?更进一步思考:不同的指令对应不同的函数。
因此有了 setup.py
最下方的入口点定义。编写完成后记得修改 setup.py
最下方的入口点。
详细内容参可以参考现成的仓库:
拉取 Git 仓库,可免去自己从 0 开始手敲代码。如果你喜欢自己从 0 开始抄写代码,也可以跳过本部分。
Git 安装:sudo apt install git
拉取命令:git clone https://gitee.com/guyuehome/ros2_21_tutorials.git
这个命令会把仓库克隆到本地。执行后,你应该能在当前文件夹下看到名为 ros2_21_tutorials
的文件夹,内含多个子文件夹。每个子文件夹对应一个功能包。因为 ROS2 是模块化的,不同的功能被拆解为不同的功能包后期更好维护。
注意:如果你之前在 src
里创建了一些 Python 节点之后,又把 ros2_21_tutorials
文件夹也放置在 src
里,那么你需要检查你的创建的节点名,是否与 ros2_21_tutorials
内已有节点重名。
编译
# 安装 colcon(此步全局仅在第一次执行时需要执行,如果失败会导致 colcon build 命令不可用)
sudo apt install python3-colcon-ros
# 进入工作空间
cd ~/workspace
# 编译工作空间所有功能包
# 不论其是直接在src下,还是在src下的子文件夹下
colcon build
# 当你只有一个包更改,不希望全编译,仅编译learning_node
colcon build --packages-select learning_node
colcon build
命令会将所有功能包,编译成可执行文件。
编译成功后,就可以在工作空间中看到自动生产的build、log、install文件夹了 。
- src,代码空间,未来编写的代码、脚本,都需要人为的放置到这里;
- build,编译空间,保存编译过程中产生的中间文件;
- install,安装空间,放置编译得到的可执行文件和脚本;
- log,日志空间,编译和运行过程中,保存各种警告、错误、信息等日志。
我们把编译后,含有build、log、install、src
文件夹的workspace
称为工作空间。
如果是 Python 这样无需编译的,会将代码直接拷贝到 install
文件夹下。
编译过程可能出现的报错:
-
节点重名:对于节点同名,ROS 的处理方式不是使用第一个节点或最后一个节点,而是会报错。应该删除多余节点。确保你
src
文件夹下,所有的xml
文件中,节点的名称是唯一的。 -
语法错误:如果代码语法错误,则会编译失败,需要手动解决。
-
缺少依赖包:如果你严格跟随参考资料且为新系统,首先检查你是否安装过程网络异常。如果是曾用系统,检查是否含有多个 Python 环境。
# 删除缓存
rm -rf build/ install/ log/
# 修改环境变量PATH的值,把系统默认的环境提到最前
export PATH="/usr/bin:$PATH"
# PYTHON_EXECUTABLE 是一个 CMake 变量,用于指定构建过程中应该使用哪个 Python 解释器
export PYTHON_EXECUTABLE=/usr/bin/python3 && colcon build --cmake-args -DPYTHON_EXECUTABLE=/usr/bin/python3
在不更换工作空间的情况下,以下命令仅在第一次执行时需要执行。如果你的工作空间路径不是~/workspace
,请将~/workspace
替换为你的工作空间路径。
# 立刻执行脚本
source ~/workspace/install/setup.sh
# 之后每次开启终端执行执行脚本
echo "source ~/workspace/install/setup.sh" >> ~/.bashrc
运行
运行脚本:ros2 run learning_node node_helloworld
查看正在运行的 node:ros2 node list
rclpy
rclpy 模块,ROS 2 Python 标准接口(ROS 2 canonical API with Python)
官网:https://docs.ros.org/en/rolling/p/rclpy/index.html
受限于篇幅,只简单介绍其中初始化部分
入门体验
下面是一个标准的初始化代码 init
支持传入多种参数。
import rclpy # ROS2 Python接口库
from rclpy.node import Node # ROS2 节点类
import time
def main(args=None): # ROS2节点主入口main函数
rclpy.init(args=args) # ROS2 Python接口初始化
node = Node("node_helloworld") # 创建ROS2节点对象并进行初始化
while rclpy.ok(): # ROS2系统是否正常运行
node.get_logger().info("Hello World") # ROS2日志输出
time.sleep(0.5) # 休眠控制循环时间
node.destroy_node() # 销毁节点对象
rclpy.shutdown() # 关闭ROS2 Python接口
初始化
rclpy.init(*, args: List[str] | None = None, context: Context | None = None, domain_id: int | None = None, signal_handler_options: rpyutils.import_c_library.SignalHandlerOptions | None = None)→ InitContextManager
直接初始化比上下文初始化拥有更多的参数。
Parameters:
- args – 命令行参数列表。
- context – 要初始化的上下文。如果为 None,则使用默认上下文。
- domain_id – ROS 域 ID。
- signal_handler_options – 指示要安装的信号处理程序。如果 None,则在初始化默认上下文时将安装 SIGINT 和 SIGTERM。【类似于 Python 的信号(signal)机制,是操作系统发出的异步事件。而非异常,异常是程序内部发生的同步事件。】
Returns:
- 一个 InitContextManager,可以与 Python 上下文管理器一起使用进行清理。
循环执行
ROS 2 中的 Executor(执行器)是用来管理和运行 ROS 2 节点中所有回调函数的核心组件。你可以把它看作是一个调度器,它决定了节点里的各种任务(例如接收新消息、处理服务请求、定时器触发)何时执行。
Executor 负责调度和执行节点中的回调函数。一个 ROS 2 节点可以有多个回调函数,比如:
-
订阅者的回调函数:每当收到一个新消息时被调用。
-
服务的请求回调函数:每当收到一个服务请求时被调用。
-
定时器的回调函数:每隔一定时间被调用。
节点本身并不会自动运行这些回调函数,它需要一个 Executor 来 "spin"(自旋),也就是不断地检查是否有就绪的任务并执行它们。ROS 2 将节点和执行器分开,提供了灵活性。你可以选择不同的 Executor 来适应你的应用需求:
-
SingleThreadedExecutor:这是最简单的执行器,它在一个单独的线程里顺序执行所有就绪的回调函数。
-
MultiThreadedExecutor:这个执行器使用一个线程池,可以同时执行多个回调函数,这对于需要高并发处理的节点非常有用。
rclpy.spin(node: Node, executor: Executor | None = None)→ None
:执行工作,直到与执行器关联的上下文关闭。
- node – 要添加到执行器以检查工作的节点。
- 要使用的执行器,如果是 None,则为全局执行器。
rclpy.spin_once(node: Node, *, executor: Executor | None = None, timeout_sec: float | None = None)→ None
:执行一项工作或等待超时到期。
只要该回调在超时到期之前准备就绪,提供的执行器就会执行一个回调。如果未提供执行器(即 None),则使用全局执行器。如果全局执行器具有部分完成的协程,则完成的工作可能是针对提供的节点以外的节点。
- node – 要添加到执行器以检查工作的节点。
- executor – 要使用的执行器,如果是 None,则为全局执行器。
- timeout_sec – 等待几秒钟。如果为 “无” 或“负数,则永久阻止”。如果为 0,请不要等待。
rclpy.spin_until_future_complete(node: Node, future: Future[Any], executor: Executor | None = None, timeout_sec: float | None = None)→ None
: 执行工作,直到Future完成。
- node – 要添加到执行器以检查工作的节点。
- 要等待的 future 对象。直到 future.done() 返回 True 或与执行器关联的上下文为 shutdown。
- 要使用的执行器,如果是 None,则为全局执行器。
- 等待几秒钟。如果为 None 或负数,则阻止直到未来完成。如果为 0,请不要等待。
关闭
rclpy.shutdown(*, context: Context | None = None, uninstall_handlers: bool | None = None)→ None
关闭以前初始化的上下文(如果尚未关闭)。这也将关闭全局执行器,如果已关闭会报错。
- context – 要失效的上下文。如果为 None,则使用默认上下文
- uninstall_handlers – 如果为 None,则在关闭默认上下文时将卸载信号处理程序。如果为 True,则将卸载信号处理程序。如果为 False,则不会卸载信号处理程序。和前面signal_handler_options对应上。
rclpy.try_shutdown(*, context: Context | None = None, uninstall_handlers: bool | None = None)→ None
关闭以前初始化的上下文(如果尚未关闭)。这也将关闭全局执行器,如果已关闭不会报错。
- context – 要失效的上下文。如果为 None,则使用默认上下文
- uninstall_handlers – 如果为 None,则在关闭默认上下文时将卸载信号处理程序。如果为 True,则将卸载信号处理程序。如果为 False,则不会卸载信号处理程序。和前面signal_handler_options对应上。
上下文
上下文管理器 Context 与直接使用 rclpy 基本一致,但背后其实是线程管理,每个独立的上下文管理器都是独立的线程。
如果使用 rclpy.init(args=args)
则无法完成创建 2 个不同的 domain_id 的节点。
from rclpy.node import Node # ROS2 节点类
from rclpy.context import Context # ROS2 上下文类
import time
def main(args=None): # ROS2节点主入口main函数
# 使用Context上下文管理器的正确方式
context = Context()
context.init(args,domain_id=1)
node = Node("node_helloworld", context=context) # 创建节点并关联上下文
context2 = Context()
context2.init(args,domain_id=2)
node2 = Node("node_helloworld", context=context2) # 创建节点并关联上下文
while context.ok() and context2.ok(): # 检查上下文是否正常
node.get_logger().info("{}".format(context.get_domain_id())) # ROS2日志输出
node2.get_logger().info("{}".format(context2.get_domain_id())) # ROS2日志输出
time.sleep(0.5) # 休眠控制循环时间
context2.shutdown()
context.shutdown()
使用上下文管理器 Context 的 with
语 法可以自动 init
(不支持参数),并在 with 语句结束时调用 shutdown
,代码更加简洁。
类似于 Python 的 with open
与 close
。
from rclpy.node import Node # ROS2 节点类
from rclpy.context import Context # ROS2 上下文类
import time
def main(args=None): # ROS2节点主入口main函数
# 使用Context上下文管理器的正确方式
with Context() as context:
node = Node("node_helloworld", context=context) # 创建节点并关联上下文
while context.ok(): # 检查上下文是否正常
# 输出当前的domain id ,默认是0
# 修改domain id方式1 :修改当前终端变量
# export ROS_DOMAIN_ID=1
node.get_logger().info("{}".format(context.get_domain_id()))
time.sleep(0.5) # 休眠控制循环时间
初始化
context.init(args: List[str] | None = None, *, initialize_logging: bool = True, domain_id: int | None = None)→ None
为给定上下文初始化 ROS 通信。
- args – 命令行参数列表。
- initialize_logging – 是否初始化 整个过程的日志记录。默认值是初始化日志记录。
- domain_id – 要用于此上下文的域 ID。如果为无(默认值),则使用域 ID 0。
状态检察
context.get_domain_id()→ int
:获取上下文的 domain ID。
context.ok()→ bool
:检查上下文是否未关闭。
跟踪
track_node(node: Node)→ None
:跟踪与此上下文关联的节点。当上下文被销毁时,它将销毁它跟踪的每个节点。
untrack_node(node: Node)→ None
:如果一个节点在上下文之前被销毁,我们不再需要跟踪它是否销毁了上下文,所以在此处将其删除。
关闭上下文
context.shutdown()→ None
:关闭此上下文,会等待正在进行的操作完成,然后清理资源。好比执行系统关机,如果已关闭会报错。
on_shutdown(callback: Callable[[], None])→ None
:添加关机时调用的回调。
try_shutdown()→ None
:尝试关闭此上下文(如果尚未关闭),如果已关闭也不会报错。
context.destroy()→ None
:销毁上下文,立即释放资源,不等待正在进行的操作。好比直接断电。
通信示例
ROS2的基础通信类型的示例代码都在ros2_21_tutorials仓库中。拉取到src
文件夹中,参考上文编译、运行即可。
话题
- 运行话题的发布者节点 :
ros2 run learning_topic topic_helloworld_pub
- 运行话题的订阅者节点 :
ros2 run learning_topic topic_helloworld_sub
可以看到发布者循环发布“Hello World”字符串消息,订阅者也以几乎同样的频率收到该话题的消息数据。
服务
- 运行服务的服务端节点 :
ros2 run learning_service service_adder_server
- 运行服务的客户端节点 :
ros2 run learning_service service_adder_client 2 3
动作
- 运行动作的服务端节点 :
ros2 run learning_action action_move_server
- 运行动作的客户端节点 :
ros2 run learning_action action_move_client
参数
等价于机器学习中的超参数
即:在开始训练前设 置的参数,例如:学习率、隐藏层大小、迭代次数等等。
- 运行节点:
ros2 run learning_parameter param_declare
- 在另外终端设置参数:
ros2 param set param_declare robot_name turtle
Gazebo
Gazebo 是一款开源的 3D 仿真软件,用于模拟机器人在复杂环境中的行为。它提供了丰富的物理引擎和传感器模型,可以模拟机器人的运动、碰撞、传感器数据等。
# 安装 gz
sudo apt-get install ros-jazzy-ros-gz
# 测试(参数 empty.sdf 表示空场景)
# 测试能否正常启动且无卡顿,gz_args:=empty.sdf 表示使用空场景
ros2 launch ros_gz_sim gz_sim.launch.py gz_args:=empty.sdf
# 我的 Gazebo 版本
"""
Gazebo Sim, version 8.9.0
Copyright (C) 2018 Open Source Robotics Foundation.
Released under the Apache 2.0 License.
"""
gazebosim 有大量下载好的模型(models)与场景(worlds)可以使用。
想简单点就选一个自带摄像头的机器人模型(model),如果有中意的机器人模型没摄像头的,可以自己加一个摄像头。
选赛道类的,有明显的车道线,可以较好的测试算法。如果场景过于简单可以适当的添加一些模型,例如:路灯、树木、箱子等。
从结构上来说,场景(world)包含 N 个模型(model),模型包含 M 个传感器(sensor)。
虚拟仿真的世界观和 Blender 的世界观类似,一个世界需要有物理引擎、光源、模型等等。首先我们需要简单了解下这样的世界观。
SDF 格式中世界可接收的子元素:http://sdformat.org/spec?ver=1.4&elem=world
SDF 格式中模型可接收的子元素:http://sdformat.org/spec?ver=1.4&elem=model
Gazebo 的传感器库:https://gazebosim.org/libs/sensors/
SDF 格式中传感器可接收的子元素:http://sdformat.org/spec?ver=1.4&elem=sensor
这种结构化的文档可读性要比传统的纯文档观感好很多。
URDF
URDF(Unified Robot Description Format)是一种用于描述机器人模型的 XML 格式。
一个两轮机器人运动示例 官方教程
通过这个部分的学习,你可以学会如何操纵你的小车运动。
其中 pose
标签有 6 个参数,官方文档认为比较简单没讲,这里提一下:
- 前三个参数表示三维空间坐标(取值范围为空间内即可)
- 后三个参数表示绕某个轴的角度(控制物体面朝什么方向)
轴 | 方向 | 正值含义 | 颜色标识 |
---|---|---|---|
X 轴 | 前后方向 | 向前(North) | 🔴 红色 |
Y 轴 | 左右方向 | 向左(West) | 🟢 绿色 |
Z 轴 | 上下方向 | 向上 | 🔵 蓝色 |
角度 | 轴 | 旋转方向 | 常用值 |
Roll | 绕 X 轴旋转 | 翻滚 | 通常为 0 |
Pitch | 绕 Y 轴旋转 | 俯仰 | 通常为 0 |
Yaw | 绕 Z 轴旋转 | 左右转头 | 0, 1.57, 3.14, -1.57 |
Yaw 角度对照表:
Yaw (弧度) | 角度 | 朝向 | 示意 |
---|---|---|---|
0 | 0° | 朝向 +X(前) | → |
1.57 | 90° | 朝向 +Y(左) | ↑ |
3.14 | 180° | 朝向 -X(后) | ← |
-1.57 | -90° | 朝向 -Y(右) | ↓ |
传感器
基于不同的目的,我们会选择不同的传感器结合不同的算法来实现。
传感器 | 应用场景 | 优势 | 劣势 |
---|---|---|---|
激光雷达 | SLAM建图、避障 | 精度高,不受光照影响 | 价格贵,无颜色信息 |
摄像头 | 目标识别、视觉SLAM | 成本低,信息丰富 | 受光照影响,计算量大 |
深度相机 | 室内SLAM、3D重建 | 提供RGB+深度,实时性好 | 仅室内可用,范围有限(5m) |
IMU | 姿态估计、传感器融合 | 高频输出,不依赖环境 | 存在漂移,需融合其他传感器 |
机器人专业可以参考 官方文档教程,其中使用的是 IMU 传感器、接触式传感器和激光雷达传感器。
人工智能专业可以下载我设计的 简易地图
ros_gz_bridge
此部分代码来自官方文档 https://gazebosim.org/docs/latest/ros2_integration/
数据流向:
- 摄像头 →
/camera
话题 → 你的视觉算法节点 - 你的算法节点 →
/cmd_vel
话题 → 机器人运动控制
这个项目中涉及到 2 种通信信息格式的转换:
从 ROS2_Gazebo 流出的传感器数据,需要转为 Python 能处理的数据格式。
内置摄像头插件:会自动将摄像头数据发布到 /camera
话题
启动后可以通过 rviz2
查看接收到的图片。
从算法节点流入 ROS2_Gazebo 的控制信息,需要是 ROS2_Gazebo 能理解的数据格式。
运动控制插件:gz::sim::systems::DiffDrive
插件会监听 /cmd_vel
话题来控制机器人运动
我们把需要执行的命令写入到 Makefile 里,便于后续执行。
.PHONY: world sim img cam vel bridge go left right stop rv node start
# 启动完整世界
world:
gz sim -r world.sdf
# 桥接摄像头和控制话题 (simple.sdf 的 /camera和/cmd_vel)
bridge:
ros2 run ros_gz_bridge parameter_bridge /camera@sensor_msgs/msg/Image@gz.msgs.Image /cmd_vel@geometry_msgs/msg/Twist]gz.msgs.Twist
start:
colcon build --packages-select learning_node
ros2 run learning_node node_helloworld
# 单独桥接摄像头 (simple.sdf 的 /camera)
cam:
ros2 run ros_gz_bridge parameter_bridge /camera@sensor_msgs/msg/Image@gz.msgs.Image
# 启动 rviz2
rv:
rviz2
# 单独桥接控制话题 (simple.sdf 的 /cmd_vel)
vel:
ros2 run ros_gz_bridge parameter_bridge /cmd_vel@geometry_msgs/msg/Twist]gz.msgs.Twist
# 前进
go:
ros2 topic pub -1 /cmd_vel geometry_msgs/msg/Twist "{linear: {x: 1.0, y: 0.0, z: 0.0}, angular: {x: 0.0, y: 0.0, z: 0.0}}"
ros2 topic pub -1 /cmd_vel geometry_msgs/msg/Twist "{linear: {x: 0.0, y: 0.0, z: 0.0}, angular: {x: 0.0, y: 0.0, z: 0.0}}"
# 左转
left:
ros2 topic pub -1 /cmd_vel geometry_msgs/msg/Twist "{linear: {x: 0.5, y: 0.0, z: 0.0}, angular: {x: 0.0, y: 0.0, z: 0.5}}"
ros2 topic pub -1 /cmd_vel geometry_msgs/msg/Twist "{linear: {x: 0.0, y: 0.0, z: 0.0}, angular: {x: 0.0, y: 0.0, z: 0.0}}"
# 右转
right:
ros2 topic pub -1 /cmd_vel geometry_msgs/msg/Twist "{linear: {x: 0.5, y: 0.0, z: 0.0}, angular: {x: 0.0, y: 0.0, z: -0.5}}"
ros2 topic pub -1 /cmd_vel geometry_msgs/msg/Twist "{linear: {x: 0.0, y: 0.0, z: 0.0}, angular: {x: 0.0, y: 0.0, z: 0.0}}"
# 停止
stop:
ros2 topic pub -1 /cmd_vel geometry_msgs/msg/Twist "{linear: {x: 0.0, y: 0.0, z: 0.0}, angular: {x: 0.0, y: 0.0, z: 0.0}}"
调试命令
# 查看 GZ 中的话题
gz topic -l
# 查看 ROS2 中活跃的话题
ros2 topic list
# 查看图像话题数据
ros2 topic echo /camera/image
# 查看速度控制话题
ros2 topic echo /model/my_robot/cmd_vel
算法节点
你是否需要去了解一个算法?
从功利角度来说:
- 当你需要改进这个算法时,需要了解它的工作原理和实现细节。
- 当你需要调参时,需要了解参数背后的意义。
如果某个算法默认参数表现足够,其实可以先不用知道它是怎么实现的。
下面代码实现了一个视觉运动控制节点,从 /camera
话题订阅图像数据,并处理后发布到 /cmd_vel
话题。
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2
import numpy as np
from geometry_msgs.msg import Twist
class VisualMotionController(Node):
def __init__(self):
super().__init__('visual_motion_controller')
# 订阅摄像头话题 (simple.sdf 中是 /camera)
self.image_subscription = self.create_subscription(
Image,
'/camera',
self.process_and_control,
10
)
# 发布控制话题 (simple.sdf 中是 /cmd_vel)
self.velocity_publisher = self.create_publisher(
Twist,
'/cmd_vel',
10
)
self.bridge = CvBridge()
self.move_speed = 0.5
self.turn_speed = 0.5
self.step = 0 # 计步器
self.get_logger().info('视觉运动控制节点已启动')
def process_and_control(self, msg):
"""处理图像并生成控制命令"""
try:
# 将ROS图像消息转换为OpenCV格式
cv_image = self.bridge.imgmsg_to_cv2(msg, 'bgr8')
cv2.imshow('car',cv_image)
cv2.waitKey(1)
self.step += 1
# 这里可以添加你的视觉处理算法
# 例如:检测障碍物、识别路径等
# gray = cv2.cvtColor(cv_image, cv2.COLOR_BGR2GRAY)
# edges = cv2.Canny(gray, 50, 150)
# 在这里编写你的运动控制算法
# 简单的控制逻辑示例
cmd = Twist()
if self.step < 100:
# 前进100帧
cmd.linear.x = self.move_speed
cmd.angular.z = 0.0
self.get_logger().info(f'前进中... 步数: {self.step}')
elif self.step < 160:
# 左转60帧
cmd.linear.x = 0.0
cmd.angular.z = self.turn_speed
self.get_logger().info(f'左转中... 步数: {self.step}')
else:
# 重置计步器
self.step = 0
self.velocity_publisher.publish(cmd)
except Exception as e:
self.get_logger().error(f'处理图像时出错: {str(e)}')
def main(args=None):
"""主函数"""
rclpy.init(args=args)
# 创建视觉运动控制节点
controller = VisualMotionController()
try:
# 启动节点
rclpy.spin(controller)
except KeyboardInterrupt:
controller.get_logger().info('节点被用户中断')
finally:
# 发送停止指令
stop_cmd = Twist()
controller.velocity_publisher.publish(stop_cmd)
# 清理资源
controller.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
启动命令:
# 编译工作空间(以learning_node为例)
colcon build --packages-select learning_node
# 启动节点(以learning_node为例)
ros2 run learning_node node_helloworld
项目目标是在一个凹字形的地图上完成循迹运动,同时注意避开斑马线上的行人。
不是一个很难的项目,但是一个和实际项目流程一致 要独立完成思路设计、算法选型、参数可视化的项目。
任务分析与思路:
预处理
预处理是基本流程,和数据分析前先数据清洗一样。有固定的流程和套路,任何一本Opencv教材都包含这部分内容。
感兴趣区域
好处:
- 减少无关信息的干扰
- 加快计算速度
地平线以上的内容其实和我们关系不大,只有检测行人的时候才会有影响。可以在检测到斑马线的时候计算完整图片。
所以我们先设置 roi
(感兴趣区域)为图像的下面的 50%
def extract_roi(image ,roi_start_ratio=0.5):
"""提取感兴趣区域(ROI)
Args:
image: 输入的BGR图像
roi_start_ratio: 感兴趣区域。(0表示整张都感兴趣、0.5表示下半部分感兴趣)
Returns:
roi_image: ROI区域图像
"""
# 获取图像尺寸
height, width = image.shape[:2]
# 计算ROI起始行(从图像中部开始,关注前方地面)
roi_start = int(height * roi_start_ratio)
# 提取ROI区域(图像下半部分)
roi_image = image[roi_start:height, 0:width]
return roi_image
灰度化与二值化
这一步一般是为后面的边缘检测做铺垫,简单的算法也可以直接基于灰度化和二值化的结果做处理。
二值化的算法就很多了,我们的世界是有光源和树木的投影的,另外计算速度要快,所以排除 固定阈值法
、Otsu's 方法
、自适应高斯法
。
这里 全局平均值法
、自适应均值法
和我们可以都写出来,后面看情况选择。
def preprocess_and_binarize(bgr_image, method='mean', **kwargs):
"""完整的预处理和二值化流程
invert: 是否反向二值化(True=低于阈值变白,False=高于阈值变白)
Args:
bgr_image: 输入的BGR彩色图像
method: 二值化方法,可选 'mean', 'adaptive'
**kwargs: 各方法的额外参数
- invert: 是否反向二值化
- block_size: 自适应方法的邻域大小
- C: 自适应方法的常数偏移
Returns:
binary_image: 二值化结果
"""
# Step 1: 灰度化
gray_image = cv2.cvtColor(bgr_image, cv2.COLOR_BGR2GRAY)
# Step 2: 根据选择的方法进行二值化
invert = kwargs.get('invert', False)
if method == 'adaptive':
# 选择二值化类型
thresh_type = cv2.THRESH_BINARY_INV if invert else cv2.THRESH_BINARY
# 自适应均值法:每个像素的阈值由其邻域的均值决定
binary_image = cv2.adaptiveThreshold(
gray_image,
255, # 最大值
cv2.ADAPTIVE_THRESH_MEAN_C, # 使用邻域均值
thresh_type, # 二值化类型(正向或反向)
11, # 邻域大小(必须是奇数),越大对光照变化越不敏感
2 # 常数偏移从均值中减去的常数,用于微调阈值
)
elif method == 'mean':
thresh_type = cv2.THRESH_BINARY_INV if invert else cv2.THRESH_BINARY
_, binary_image = cv2.threshold(gray_image, gray_image.mean(), 255, thresh_type)
else:
raise ValueError(f"未知的二值化方法: {method}")
return binary_image
去噪
这个项目中都是纯色,只有斑马线附近有异常色块。考虑到实际摄像头会有噪点。做个简单的开运算(先腐蚀后膨胀,用于去除图像中的小噪声)
def denoise_binary_image(binary_image, kernel_size=(5, 5)):
"""使用形态学开运算去除二值图像中的噪声
Args:
binary_image: 输入的二值化图像(0或255)
kernel_size: 结构元素大小,默认为(5, 5)
Returns:
denoised_image: 去噪后的二值图像
"""
# 创建结构元素(矩形核)
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, kernel_size)
# 开运算 = 先腐蚀后膨胀
# 作用:去除小的白色噪点(孤立的亮点)
denoised_image = cv2.morphologyEx(binary_image, cv2.MORPH_OPEN, kernel)
return denoised_image
可视化
预处理完成之后就可以可视化展示了,这里我觉得可以展示感兴趣的原图、去噪后的图。
def visualize_preprocessing(roi_image, binary_image):
"""可视化预处理的各个阶段
Args:
roi_image: 感兴趣区域的原图(ROI)
denoised_image: 去噪后的二值化图像
"""
# 显示ROI区域原图
cv2.imshow('ROI', roi_image)
# 显示去噪后的结果
cv2.imshow('ROI-II', binary_image)
cv2.waitKey(1)
运动算法设计
这里我学习 CV 的时候学习了边缘检测、轮廓检测、光流法。我自己可以想到直接计算车道像素中心、像素逐行扫描。
OpenCV 的算法 API 有成百上千种(没开玩笑),这里我记住了常见场景的主流算法。
之前我们常说:"talk is cheap, show me your code."
现在 AI 时代,人人都能写代码,写好提示词变得更重要了,所以是:"code is cheap, show me your talk."
这里是我给 AI 的提示词:
- 头脑风暴一下车道线跟随算法,适合弯道较多,有斑马线的。简单列出原理、优缺点。
以下是 Claude-4.5-sonnet 的答案:
### 🎯 适合弯道多+斑马线场景的算法(按推荐度排序)
**⭐⭐⭐ 推荐方案1:滑动窗口法 + 多项式拟合**
- **原理**:从图像底部开始,用小窗口逐层向上搜索黑色像素 → 记录窗口中心 → 用二次/三次多项式拟合曲线
- **优点**:
- ✅ 对弯道适应性极好(多项式天然支持曲线)
- ✅ 对斑马线有很强抗干扰性(逐层搜索,局部异常不影响整体)
- ✅ 可以处理车道线断裂
- ✅ 业界成熟方案(自动驾驶常用)
- **缺点**:
- ❌ 实现复杂度较高
- ❌ 计算量大(需要多个窗口迭代)
- **适用场景**:复杂环境、弯道、遮挡、斑马线
---
**⭐⭐⭐ 推荐方案2:像素列扫描法 + 边界追踪**
- **原理**:逐列扫描图像 → 统计每列的黑色像素分布 → 识别左右边界 → 计算中心线 → 过滤异常列(斑马线)
- **优点**:
- ✅ 实现简单,计算量适中
- ✅ 对弯道适应好(每列独立处理)
- ✅ 可以检测车道宽度变化
- ✅ 容易过滤斑马线(通过连续性检查)
- **缺点**:
- ❌ 需要假设车道在视野中
- ❌ 对噪声敏感(需要平滑处理)
- **适用场景**:中等复杂度,对实时性要求高
---
**⭐⭐ 推荐方案3:直方图投影法 + 峰值追踪**
- **原理**:垂直投影ROI → 找到左右车道的双峰 → 逐层向上追踪峰值移动 → 拟合曲线
- **优点**:
- ✅ 对光照不敏感
- ✅ 可以明确区分左右车道
- ✅ 斑马线在垂直投影中容易识别(横向特征)
- ✅ 计算效率高
- **缺点**:
- ❌ 急弯时左右车道可能重叠,峰值不明显
- ❌ 需要车道相对对称
- **适用场景**:车道清晰、对称性好、中等弯道
---
**⭐⭐ 推荐方案4:分区域重心法 + 轨迹连接**
- **原理**:将ROI分成多个水平带 → 每个带计算黑色像素重心 → 连接各带重心形成轨迹 → 过滤异常点(斑马线)
- **优点**:
- ✅ 对弯道适应好(分段处理)
- ✅ 实现简单,是重心法的改进版
- ✅ 可以检测局部异常(斑马线导致的重心跳变)
- ✅ 计算量小
- **优点**:
- ❌ 仍然对噪声敏感
- ❌ 需要设计异常点过滤规则
- **适用场景**:资源受限、需要快速原型
---
**⭐ 推荐方案5:边缘检测 + 分段霍夫变换**
- **原理**:Canny边缘检测 → 将ROI分成多段 → 每段独立做霍夫直线检测 → 连接直线段近似曲线 → 过滤斑马线(横向线)
- **优点**:
- ✅ 经典方法,鲁棒性好
- ✅ 可以用线段方向过滤斑马线(横向vs纵向)
- ✅ 分段处理可适应弯道
- **缺点**:
- ❌ 参数调节复杂
- ❌ 计算量大
- ❌ 对断裂车道线处理困难
- **适用场景**:车道线清晰、对准确度要求高
---
**⭐ 推荐方案6:形态学骨架提取 + 中轴线追踪**
- **原理**:二值化 → 形态学细化(skeletonization) → 提取黑色区域的中轴线 → 追踪中轴线
- **优点**:
- ✅ 直接得到车道中心,无需计算
- ✅ 对弯道自然适应
- ✅ 斑马线会被细化但不影响主路径
- **缺点**:
- ❌ 细化算法计算量大
- ❌ 对断裂敏感
- ❌ 可能产生分支(需要后处理)
- **适用场景**:车道连续、形状复杂
---
### 🔥 针对你的场景(弯道多+斑马线)最佳组合方案
**方案A:滑动窗口法(最优效果)**
二值化 → 去噪 → 从底部开始滑动窗口 →
逐层找车道像素 → 多项式拟合 →
选取前视点 → Pure Pursuit控制
**方案B:分区域重心法(平衡方案)** ⭐ 推荐快速实现
二值化 → 去噪 → ROI分成5-10个水平带 →
每带计算重心 → 过滤异常点(斑马线) →
拟合平滑曲线 → 跟随中心点
**方案C:像素列扫描法(实时性强)**
二值化 → 去噪 → 逐列扫描 →
找左右边界 → 连续性检查过滤斑马线 →
计算中心 → 控制
如果以上方法都不合适,可以选择寻找新的算法和算法调参。现在的目的是验证方案是否可行,只有大致可用我们才进一步追求理解这个代码。
我们接着输入提示词:
- 帮我实现:滑动窗口法 + 多项式拟合。要求:函数式编程,添加function docstring,无需逐行注释。
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2
import numpy as np
from geometry_msgs.msg import Twist
def extract_roi(image ,roi_start_ratio=0.5):
"""提取感兴趣区域(ROI)
Args:
image: 输入的BGR图像
roi_start_ratio: 感兴趣区域。(0表示整张都感兴趣、0.5表示下半部分感兴趣)
Returns:
roi_image: ROI区域图像
"""
# 获取图像尺寸
height, width = image.shape[:2]
# 计算ROI起始行(从图像中部开始,关注前方地面)
roi_start = int(height * roi_start_ratio)
# 提取ROI区域(图像下半部分)
roi_image = image[roi_start:height, 0:width]
return roi_image
def preprocess_and_binarize(bgr_image, method ,invert):
"""完整的预处理和二值化流程
otsu方法会自动计算最佳阈值,适合双峰分布的图像(如黑色车道+灰色地面)
adaptive自适应均值法适应局部光照变化,适合有阴影和光照不均的场景
invert: 是否反向二值化(True=低于阈值变白,False=高于阈值变白)
Args:
bgr_image: 输入的BGR彩色图像
method: 二值化方法,可选 'mean', 'adaptive'
**kwargs: 各方法的额外参数
- invert: 是否反向二值化
- block_size: 自适应方法的邻域大小
- C: 自适应方法的常数偏移
Returns:
binary_image: 二值化结果
"""
# 灰度化
gray_image = cv2.cvtColor(bgr_image, cv2.COLOR_BGR2GRAY)
if method == 'adaptive':
# 选择二值化类型
thresh_type = cv2.THRESH_BINARY_INV if invert else cv2.THRESH_BINARY
# 自适应均值法:每个像素的阈值由其邻域的均值决定
binary_image = cv2.adaptiveThreshold(
gray_image,
255, # 最大值
cv2.ADAPTIVE_THRESH_MEAN_C, # 使用邻域均值
thresh_type, # 二值化类型(正向或反向)
11, # 邻域大小(必须是奇数),越大对光照变化越不敏感
2 # 常数偏移从均值中减去的常数,用于微调阈值
)
elif method == 'mean':
thresh_type = cv2.THRESH_BINARY_INV if invert else cv2.THRESH_BINARY
_, binary_image = cv2.threshold(gray_image, gray_image.mean(), 255, thresh_type)
else:
raise ValueError(f"未知的二值化方法: {method}")
return binary_image
def denoise_binary_image(binary_image, kernel_size=(5, 5)):
"""使用形态学开运算去除二值图像中的噪声
Args:
binary_image: 输入的二值化图像(0或255)
kernel_size: 结构元素大小,默认为(5, 5)
Returns:
denoised_image: 去噪后的二值图像
"""
# 创建结构元素(矩形核)
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, kernel_size)
# 开运算 = 先腐蚀后膨胀
# 作用:去除小的白色噪点(孤立的亮点)
denoised_image = cv2.morphologyEx(binary_image, cv2.MORPH_OPEN, kernel)
return denoised_image
def visualize_preprocessing(roi_image, binary_image):
"""可视化预处理的各个阶段
Args:
roi_image: 感兴趣区域的原图(ROI)
denoised_image: 去噪后的二值化图像
"""
cv2.imshow('ROI', roi_image)
cv2.imshow('ROI-II', binary_image)
cv2.waitKey(1)
def find_lane_base_positions(binary_image, n_windows=9, edge_detection=True):
"""通过直方图找到左右车道线的起始位置
Args:
binary_image: 二值化图像(白色为车道线)
n_windows: 滑动窗口数量
edge_detection: 是否检测边缘模式(适合宽道路)
Returns:
left_base: 左车道线起始x坐标(None表示未检测到)
right_base: 右车道线起始x坐标(None表示未检测到)
"""
histogram = np.sum(binary_image[binary_image.shape[0]//2:, :], axis=0)
midpoint = histogram.shape[0] // 2
if edge_detection:
threshold = np.max(histogram) * 0.3
white_pixels = np.where(histogram > threshold)[0]
if len(white_pixels) == 0:
return None, None
left_candidates = white_pixels[white_pixels < midpoint]
right_candidates = white_pixels[white_pixels >= midpoint]
left_base = np.min(left_candidates) if len(left_candidates) > 0 else None
right_base = np.max(right_candidates) if len(right_candidates) > 0 else None
else:
left_half = histogram[:midpoint]
right_half = histogram[midpoint:]
left_base = np.argmax(left_half) if np.max(left_half) > 0 else None
right_base = np.argmax(right_half) + midpoint if np.max(right_half) > 0 else None
return left_base, right_base
def detect_road_edges(binary_image):
"""检测宽道路的左右边缘
Args:
binary_image: 二值化图像(白色为道路)
Returns:
left_edge_x: 左边缘x坐标数组
left_edge_y: 左边缘y坐标数组
right_edge_x: 右边缘x坐标数组
right_edge_y: 右边缘y坐标数组
"""
height, width = binary_image.shape
left_edge_x, left_edge_y = [], []
right_edge_x, right_edge_y = [], []
for y in range(height):
row = binary_image[y, :]
white_pixels = np.where(row > 127)[0]
if len(white_pixels) > 10:
left_edge_x.append(white_pixels[0])
left_edge_y.append(y)
right_edge_x.append(white_pixels[-1])
right_edge_y.append(y)
return (np.array(left_edge_x), np.array(left_edge_y)), (np.array(right_edge_x), np.array(right_edge_y))
def sliding_window_detection(binary_image, n_windows=9, margin=80, min_pixels=50):
"""使用滑动窗口法检测车道线像素
Args:
binary_image: 二值化图像(白色为车道线,0或255)
n_windows: 滑动窗口数量
margin: 窗口宽度的一半(像素)
min_pixels: 重新定位窗口所需的最小像素数
Returns:
left_lane_pixels: 左车道线像素坐标 (x_coords, y_coords),未检测到则为 ([], [])
right_lane_pixels: 右车道线像素坐标 (x_coords, y_coords),未检测到则为 ([], [])
window_info: 窗口信息列表,用于可视化 [(x_center, y_center, width, height), ...]
"""
height, width = binary_image.shape
window_height = height // n_windows
nonzero = binary_image.nonzero()
nonzero_y = np.array(nonzero[0])
nonzero_x = np.array(nonzero[1])
left_base, right_base = find_lane_base_positions(binary_image, n_windows, edge_detection=True)
left_current = left_base
right_current = right_base
left_lane_indices = []
right_lane_indices = []
window_info = []
for window in range(n_windows):
win_y_low = height - (window + 1) * window_height
win_y_high = height - window * window_height
if left_current is not None:
win_xleft_low = left_current - margin
win_xleft_high = left_current + margin
window_info.append(('left', win_xleft_low, win_y_low, win_xleft_high, win_y_high))
good_left_indices = ((nonzero_y >= win_y_low) & (nonzero_y < win_y_high) &
(nonzero_x >= win_xleft_low) & (nonzero_x < win_xleft_high)).nonzero()[0]
left_lane_indices.append(good_left_indices)
if len(good_left_indices) > min_pixels:
left_current = int(np.mean(nonzero_x[good_left_indices]))
if right_current is not None:
win_xright_low = right_current - margin
win_xright_high = right_current + margin
window_info.append(('right', win_xright_low, win_y_low, win_xright_high, win_y_high))
good_right_indices = ((nonzero_y >= win_y_low) & (nonzero_y < win_y_high) &
(nonzero_x >= win_xright_low) & (nonzero_x < win_xright_high)).nonzero()[0]
right_lane_indices.append(good_right_indices)
if len(good_right_indices) > min_pixels:
right_current = int(np.mean(nonzero_x[good_right_indices]))
left_lane_indices = np.concatenate(left_lane_indices) if left_lane_indices else np.array([])
right_lane_indices = np.concatenate(right_lane_indices) if right_lane_indices else np.array([])
left_x = nonzero_x[left_lane_indices] if len(left_lane_indices) > 0 else np.array([])
left_y = nonzero_y[left_lane_indices] if len(left_lane_indices) > 0 else np.array([])
right_x = nonzero_x[right_lane_indices] if len(right_lane_indices) > 0 else np.array([])
right_y = nonzero_y[right_lane_indices] if len(right_lane_indices) > 0 else np.array([])
return (left_x, left_y), (right_x, right_y), window_info
def fit_polynomial(lane_pixels, degree=2):
"""对车道线像素进行多项式拟合
Args:
lane_pixels: 车道线像素坐标 (x_coords, y_coords)
degree: 多项式阶数,默认为2(二次多项式)
Returns:
poly_coeffs: 多项式系数 [a, b, c, ...],使得 x = a*y^2 + b*y + c
如果拟合失败则返回 None
"""
x_coords, y_coords = lane_pixels
if len(x_coords) < degree + 1:
return None
poly_coeffs = np.polyfit(y_coords, x_coords, degree)
return poly_coeffs
def calculate_lane_center_line(left_coeffs, right_coeffs, image_height):
"""根据左右车道线的多项式系数计算中心线
Args:
left_coeffs: 左车道线多项式系数 [a, b, c]
right_coeffs: 右车道线多项式系数 [a, b, c]
image_height: 图像高度
Returns:
center_coeffs: 中心线多项式系数 [a, b, c]
center_points: 中心线采样点 [(x, y), ...]
"""
if left_coeffs is None or right_coeffs is None:
return None, []
y_vals = np.linspace(0, image_height - 1, image_height)
left_x = np.polyval(left_coeffs, y_vals)
right_x = np.polyval(right_coeffs, y_vals)
center_x = (left_x + right_x) / 2
center_coeffs = np.polyfit(y_vals, center_x, 2)
sample_points = 50
y_samples = np.linspace(0, image_height - 1, sample_points, dtype=int)
x_samples = np.polyval(center_coeffs, y_samples)
center_points = list(zip(x_samples.astype(int), y_samples))
return center_coeffs, center_points
def calculate_control_command(center_coeffs, image_width, image_height,
lookahead_ratio=0.7, k_p=2.0,
v_straight=1.0, v_curve=0.5, curve_threshold=0.0005):
"""基于中心线多项式系数计算运动控制指令
Args:
center_coeffs: 中心线多项式系数 [a, b, c],x = a*y^2 + b*y + c
image_width: 图像宽度
image_height: 图像高度
lookahead_ratio: 前视点位置比例(0-1),越大越远
k_p: 比例增益
v_straight: 直道速度 (m/s)
v_curve: 弯道速度 (m/s)
curve_threshold: 曲率阈值,超过此值认为是弯道
Returns:
linear_velocity: 线速度 (m/s)
angular_velocity: 角速度 (rad/s)
lateral_error: 横向偏差(像素)
curvature: 曲率
"""
if center_coeffs is None:
return 0.0, 0.0, 0.0, 0.0
lookahead_y = int(image_height * lookahead_ratio)
lookahead_x = np.polyval(center_coeffs, lookahead_y)
image_center_x = image_width / 2
lateral_error = lookahead_x - image_center_x
normalized_error = lateral_error / (image_width / 2)
a, b, c = center_coeffs
curvature = abs(2 * a)
if curvature > curve_threshold or abs(normalized_error) > 0.2:
linear_velocity = v_curve
else:
linear_velocity = v_straight
angular_velocity = -k_p * normalized_error
angular_velocity = np.clip(angular_velocity, -2.0, 2.0)
return linear_velocity, angular_velocity, lateral_error, curvature
def visualize_lane_detection(original_image, binary_image, left_pixels, right_pixels,
left_coeffs, right_coeffs, center_coeffs, window_info,
lookahead_point=None, lateral_error=0.0,
linear_vel=0.0, angular_vel=0.0):
"""可视化车道线检测结果
Args:
original_image: 原始彩色图像
binary_image: 二值化图像
left_pixels: 左车道线像素 (x_coords, y_coords)
right_pixels: 右车道线像素 (x_coords, y_coords)
left_coeffs: 左车道线多项式系数
right_coeffs: 右车道线多项式系数
center_coeffs: 中心线多项式系数
window_info: 滑动窗口信息
lookahead_point: 前视点坐标 (x, y)
lateral_error: 横向偏差
linear_vel: 线速度
angular_vel: 角速度
Returns:
vis_image: 可视化图像
"""
vis_image = cv2.cvtColor(binary_image, cv2.COLOR_GRAY2BGR)
height = vis_image.shape[0]
width = vis_image.shape[1]
for window in window_info:
lane_type, x_low, y_low, x_high, y_high = window
color = (0, 255, 0) if lane_type == 'left' else (255, 0, 0)
cv2.rectangle(vis_image, (x_low, y_low), (x_high, y_high), color, 2)
left_x, left_y = left_pixels
right_x, right_y = right_pixels
if len(left_x) > 0:
for x, y in zip(left_x, left_y):
cv2.circle(vis_image, (int(x), int(y)), 2, (0, 255, 255), -1)
if len(right_x) > 0:
for x, y in zip(right_x, right_y):
cv2.circle(vis_image, (int(x), int(y)), 2, (255, 255, 0), -1)
y_vals = np.linspace(0, height - 1, height).astype(int)
if left_coeffs is not None:
left_fit_x = np.polyval(left_coeffs, y_vals).astype(int)
for i in range(len(y_vals) - 1):
cv2.line(vis_image, (left_fit_x[i], y_vals[i]),
(left_fit_x[i+1], y_vals[i+1]), (0, 255, 0), 3)
if right_coeffs is not None:
right_fit_x = np.polyval(right_coeffs, y_vals).astype(int)
for i in range(len(y_vals) - 1):
cv2.line(vis_image, (right_fit_x[i], y_vals[i]),
(right_fit_x[i+1], y_vals[i+1]), (255, 0, 0), 3)
if center_coeffs is not None:
center_fit_x = np.polyval(center_coeffs, y_vals).astype(int)
for i in range(len(y_vals) - 1):
cv2.line(vis_image, (center_fit_x[i], y_vals[i]),
(center_fit_x[i+1], y_vals[i+1]), (0, 0, 255), 4)
image_center_x = width // 2
cv2.line(vis_image, (image_center_x, 0), (image_center_x, height), (255, 255, 255), 2)
if lookahead_point is not None:
lx, ly = lookahead_point
cv2.circle(vis_image, (int(lx), int(ly)), 10, (255, 0, 255), -1)
cv2.arrowedLine(vis_image, (image_center_x, int(ly)), (int(lx), int(ly)),
(255, 0, 255), 3)
font = cv2.FONT_HERSHEY_SIMPLEX
cv2.putText(vis_image, f'Error: {lateral_error:.1f}px', (10, 30),
font, 0.6, (255, 255, 255), 2)
turn_info = "LEFT" if angular_vel > 0 else "RIGHT" if angular_vel < 0 else "STRAIGHT"
cv2.putText(vis_image, f'Angular: {angular_vel:.3f} ({turn_info})', (10, 60),
font, 0.6, (255, 255, 255), 2)
cv2.putText(vis_image, f'Speed: {linear_vel:.2f} m/s', (10, 90),
font, 0.6, (255, 255, 255), 2)
return vis_image
class VisualMotionController(Node):
"""基于视觉的车道线跟随控制节点
功能:
- 接收摄像头图像
- 滑动窗口法检测车道线
- 多项式拟合车道线
- 计算运动控制指令
- 发布速度控制命令
"""
def __init__(self):
super().__init__('visual_motion_controller')
# 订阅摄像头话题
self.image_subscription = self.create_subscription(
Image,
'/camera',
self.process_and_control,
10
)
# 发布控制话题
self.velocity_publisher = self.create_publisher(
Twist,
'/cmd_vel',
10
)
self.bridge = CvBridge()
# ========== 图像处理参数 ==========
self.roi_start_ratio = 0.5
# ========== 运动控制参数 ==========
self.lookahead_ratio = 0.7
self.k_p = 2.0
self.v_straight = 1.0
self.v_curve = 0.5
self.curve_threshold = 0.0005
# ========== 调试模式 ==========
self.debug_mode = False
def process_and_control(self, msg):
"""处理图像并计算运动控制指令(主回调函数)
Args:
msg: ROS2图像消息
"""
try:
# Step 1: 图像格式转换
cv_image = self.bridge.imgmsg_to_cv2(msg, 'bgr8')
# Step 2: 提取ROI
roi_image = extract_roi(cv_image, self.roi_start_ratio)
# Step 3: 图像预处理和二值化
binary_image = preprocess_and_binarize(roi_image, method='mean', invert=True)
# Step 4: 去噪
denoised = denoise_binary_image(binary_image)
# 可视化
visualize_preprocessing(roi_image, denoised)
# Step 5: 滑动窗口检测车道线
left_pixels, right_pixels, windows = sliding_window_detection(denoised)
# Step 6: 多项式拟合
left_fit = fit_polynomial(left_pixels)
right_fit = fit_polynomial(right_pixels)
# Step 7: 计算中心线
center_fit, center_pts = calculate_lane_center_line(
left_fit, right_fit, denoised.shape[0]
)
# Step 8: 计算运动控制指令
linear_vel, angular_vel, lateral_error, curvature = calculate_control_command(
center_fit,
denoised.shape[1],
denoised.shape[0],
lookahead_ratio=self.lookahead_ratio,
k_p=self.k_p,
v_straight=self.v_straight,
v_curve=self.v_curve,
curve_threshold=self.curve_threshold
)
# Step 9: 计算前视点用于可视化
lookahead_point = None
if center_fit is not None:
lookahead_y = int(denoised.shape[0] * self.lookahead_ratio)
lookahead_x = np.polyval(center_fit, lookahead_y)
lookahead_point = (lookahead_x, lookahead_y)
# Step 10: 可视化
vis = visualize_lane_detection(
roi_image, denoised,
left_pixels, right_pixels,
left_fit, right_fit, center_fit,
windows,
lookahead_point=lookahead_point,
lateral_error=lateral_error,
linear_vel=linear_vel,
angular_vel=angular_vel
)
cv2.imshow('Lane Detection', vis)
cv2.waitKey(1)
# Step 11: 发布控制命令
cmd = Twist()
if self.debug_mode:
cmd.linear.x = 0.0
cmd.angular.z = 0.0
self.get_logger().info('调试模式:运动已暂停', throttle_duration_sec=2.0)
else:
cmd.linear.x = linear_vel
cmd.angular.z = angular_vel
self.velocity_publisher.publish(cmd)
except Exception as e:
self.get_logger().error(f'处 理图像时出错: {str(e)}')
import traceback
self.get_logger().error(traceback.format_exc())
def main(args=None):
"""主函数"""
rclpy.init(args=args)
# 创建视觉运动控制节点
controller = VisualMotionController()
try:
# 启动节点
rclpy.spin(controller)
except KeyboardInterrupt:
controller.get_logger().info('节点被用户中断')
finally:
# 发送停止指令
stop_cmd = Twist()
controller.velocity_publisher.publish(stop_cmd)
# 清理资源
cv2.destroyAllWindows()
controller.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
注意:由于不同设备算力不同,因此对于低算力设备来说,运算时间过长可能会冲出车道线,需要降低运动速度。