Skip to main content

ROS2视觉运动算法仿真

· 45 min read
Allen
normal software engineer
此内容根据文章生成,仅用于文章内容的解释与总结

本章节通过 ROS2 官方提供的 Gazebo 虚拟仿真,实现视觉运动算法仿真。

info

什么是视觉运动控制算法?

运动控制算法主要是通过传感器,将采集到的数据,转化为运动控制指令。

传感器包含:视觉、重力、力反馈、加速度、测距等多种类型。特斯拉的自动驾驶使用的方案中视觉是最主要的信息来源。

运动控制从易到难有:履带结构、差速结构、阿克曼结构、全向结构、足腿结构。

info

为什么要虚拟仿真?

目前较为先进的腿足结构机器人想要完成高难度动作,例如:后空翻、前空翻、韦伯斯特空翻。一旦失败会对本不充裕的原型机造成严重的损坏。

所以通过仿真软件训练,有如下好处:

  • 可以在低风险的情况下完成算法的调试和优化。
  • 对于复杂算法,可以同时启动多个仿真软件,进行并行训练,提高训练效率。
warning

虚拟仿真需要一定的算力。本地使用前,首先确保你的电脑包含显卡。

  • ✔推荐使用原生Ubuntu最大限度发挥硬件性能。非原生环境下可能出现消息异常、时间异常等问题。

  • 高算力的电脑可以使用wsl安装Ubuntu。

  • ❌使用VM虚拟机会产生较大的延迟。

info

用机器学习中的概念来类比,Gazebo 好比是 PyTorch,ROS2 好比是 Python。

你希望使用 PyTorch 开始深度学习,所以要查看最新版 PyTorch 适配的 Python 版本(3.10+),需要英伟达显卡才能加速,再查看 Python 支持的操作系统(Mac、Linux、Windows)。

搞清楚之后,再从系统开始,安装 Python,最后 PyTorch 和显卡驱动安装成功。谋定而后动。

ROS2框架

ROS 2 需要依赖于 Ubuntu 系统,其微控制器相较于普通 MCU 成本更高。主要用于高级机器人。

通信优势

丰富的通信类型

ROS 2 节省自己搭建网络、定义通信类型、管理连接的时间。

通信模式作用特点应用场景
话题:发布-订阅 (Publish-Subscribe)单向、异步数据流高效、解耦,适用于传感器数据流、状态更新传感器数据流(如相机图像、激光雷达数据)、机器人状态更新(如位置、速度)
服务:服务端-客户端 (Service-Client)同步、请求-响应阻塞式通信,用于需要明确结果的任务触发特定动作(如抓取物体)、查询物体 ID
动作 (Actions)长时任务处理异步、支持进度反馈和取消移动机器人到指定位置、机械臂执行复杂任务
参数 (Parameters)动态配置节点行为非实时通信,用于启动时或运行时的配置调整调整 PID 控制器增益、修改传感器发布频率

自动发现机制

同为 ROS 2,在同一网段下的不同设备可以自动发现

info

自动发现前提:指要求中继设备允许子设备自由通信。

中继设备可能是路由器、或者手机的热点、随身WIFI等。

允许子设备自由通信有利有弊,如果你在商场或者咖啡馆连接他们的WIFI,你的电脑可能提醒你:你的网络活动可能被其他设备发现。自由通信会带来一定的安全隐患。因此并不是所有的路由器都默认允许允许子设备自由通信。

有的设备会有防火墙,临时关闭防火墙:sudo ufw disable,另外要允许UDP多播端口(默认7400~7500)。

如果一切正常,你应该可以通过 ping <其他机器的 IP> 并联通成功。

此时你可以通过 ros2 运行节点发布消息,其他设备的接收者会收到你的消息。

隔离机制

如果你同个网段下,有无人机和机器狗。你想将其编队为:空军 + 陆军;又或者 5 台无人机 + 机器狗为一组。

ROS 2 提供了通信隔离机制

在同个编队的所有设备终端中执行:export ROS_DOMAIN_ID=1。(值范围为 0~101,0 为默认值)

这个命令是设置了一个环境变量。ROS 2 通过判断这个环境变量来识别是否为一组。不同 ID 节点无法通信。

因此如果你想将其分为两组,只需要在一半的设备上改为非 0 的,相同的 ID 即可。

多设备跨域通信

如果你的设备分别在两个城市,可以通过配置一台公网下的服务器(也可以在两个网段下,配置一个都能访问的设备作为中继),将两个城市的设备连接到这台服务器上。这种通信方式被称为Fast-DDS

在服务器上运行:fastdds discovery --server-id 0

服务器开放11888端口。sudo ufw allow 11888/tcp

在设备上运行:export ROS_DISCOVERY_SERVERS=<服务器IP>:<服务器端口>。(端口默认11888)

设置ROS中间件为rmw_fastrtps_cpp(别人写好的包):export RMW_IMPLEMENTATION=rmw_fastrtps_cpp 然后重新监听:ros2 daemon stop,再重新启动:ros2 daemon start

模块仓库

ROS 2 有大量写好的机器人相关的包,包也称:模块、功能包、库。

安装相机标定: sudo apt install ros-jazzy-camera-calibration

安装 SLAM: sudo apt install ros-jazzy-slam-toolbox

你会发现你只需要安装一个包,就可以获得一个功能。和 Python 通过 pip 安装包一样。安装后即可实现"一行代码"实现某个功能。

ROS 2安装

安装过程中注意:ROS 2 的版本往往与 Ubuntu 的版本有关。查看 https://docs.ros.org/ 可获得目前主流的版本。

设置编码

sudo apt update && sudo apt install locales
sudo locale-gen en_US en_US.UTF-8
sudo update-locale LC_ALL=en_US.UTF-8 LANG=en_US.UTF-8
export LANG=en_US.UTF-8

添加源

sudo apt update && sudo apt install curl gnupg lsb-release 
sudo curl -sSL https://raw.githubusercontent.com/ros/rosdistro/master/ros.key -o /usr/share/keyrings/ros-archive-keyring.gpg
echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/ros-archive-keyring.gpg] http://packages.ros.org/ros2/ubuntu $(source /etc/os-release && echo $UBUNTU_CODENAME) main" | sudo tee /etc/apt/sources.list.d/ros2.list > /dev/null

安装ROS 2

sudo apt update

sudo apt upgrade

# 如果你是 Ubuntu 24.04,那么安装 Jazzy 版本
sudo apt install ros-jazzy-desktop

设置环境变量

# 立刻执行脚本
source /opt/ros/jazzy/setup.bash

# 之后每次开启终端执行执行脚本
echo " source /opt/ros/jazzy/setup.bash" >> ~/.bashrc

你也可以使用Docker简化这个过程。

复杂的功能需要很多依赖,系统中默认的 Python3 出于安全考虑,可能不让你在全局环境中安装包。代码的提示给了很多选项,例如安装在虚拟环境中,删除安全文件。加上关键字,你应该学会通过翻译软件,翻译报错信息。它让你看某个文件,也请学会使用翻译软件,翻译文件内容。

error: externally-managed-environment

× This environment is externally managed
╰─> To install Python packages system-wide, try apt install
python3-xyz, where xyz is the package you are trying to
install.

If you wish to install a non-Debian-packaged Python package,
create a virtual environment using python3 -m venv path/to/venv.
Then use path/to/venv/bin/python and path/to/venv/bin/pip. Make
sure you have python3-full installed.

If you wish to install a non-Debian packaged Python application,
it may be easiest to use pipx install xyz, which will manage a
virtual environment for you. Make sure you have pipx installed.

See /usr/share/doc/python3.12/README.venv for more information.

note: If you believe this is a mistake, please contact your Python installation or OS distribution provider. You can override this, at the risk of breaking your Python installation or OS, by passing --break-system-packages.
hint: See PEP 668 for the detailed specification.

根据提示,得到的解决方式之一:pip3 install 你的包 --break-system-packages

或者可以把externally managed文件删除。

sudo apt install -y python3-pip

sudo pip3 install rosdepc --break-system-packages

sudo rosdepc init

rosdepc update

节点基操

节点是通信的基本单位,节点可以变成:发布者、订阅者、服务端、客户端、动作、参数。

创建节点

创建节点前可以先创建一个目录

mkdir -p ~/workspace/src

如何在 ROS2 中创建一个功能包呢?我们可以使用这个指令:

ros2 pkg create --build-type <build-type> <package_name>

ros2 命令中:

  • pkg:表示功能包相关的功能;
  • create:表示创建功能包;
  • build-type:表示新创建的功能包是 C++ 还是 Python 的,如果使用 C++ 或者 C,那这里就跟 ament_cmake,如果使用 Python,就跟 ament_python;
  • package_name:新建功能包的名字。
# 创建 Python 功能包
ros2 pkg create --build-type ament_python learning_node

# 创建 C++ 功能包
ros2 pkg create --build-type ament_cmake learning_node

只需执行其中一个命令即可,如果你会 Python,就执行创建 Python 节点的命令。命令创建一个名为 learning_node 文件夹。

这里我推荐大家创建节点名的时候,使用有辨识性的名字。例如:ros2 pkg create --build-type ament_python learning_node

info

注意:对于节点同名,ROS 2 在编译时的处理方式不是使用第一个节点或最后一个节点,而是会报错。

编写代码

接着在learning_node文件夹我们就可以准备编写代码了,此时你的路径应该是:

~/workspace
|_src
|_learning_node/
|_learning_node
|_ __init__.py
|_package.xml
|_setup.py
|_setup.cfg
|_test/
|_resource/

此时你就可以编写代码了,在 __init__.py 同级下创建一个 .py 文件。

符合文件命名规则前提下,你想叫什么都可以,但是最好有一定的含义。

如果你的功能是打印 Hello World,那么你就可以叫 node_helloworld.py

Python 的函数通常是需要调用的,例如 abc() 函数,你需要在代码中输入 abc(),才会调用这个函数。和 C++ 有且仅有一个 main 函数不同,那么如何确定哪个函数是入口?更进一步思考:不同的指令对应不同的函数。

因此有了 setup.py 最下方的入口点定义。编写完成后记得修改 setup.py 最下方的入口点。

详细内容参可以参考现成的仓库:

info

拉取 Git 仓库,可免去自己从 0 开始手敲代码。如果你喜欢自己从 0 开始抄写代码,也可以跳过本部分。

Git 安装:sudo apt install git

拉取命令:git clone https://gitee.com/guyuehome/ros2_21_tutorials.git

这个命令会把仓库克隆到本地。执行后,你应该能在当前文件夹下看到名为 ros2_21_tutorials 的文件夹,内含多个子文件夹。每个子文件夹对应一个功能包。因为 ROS2 是模块化的,不同的功能被拆解为不同的功能包后期更好维护。

注意:如果你之前在 src 里创建了一些 Python 节点之后,又把 ros2_21_tutorials 文件夹也放置在 src 里,那么你需要检查你的创建的节点名,是否与 ros2_21_tutorials 内已有节点重名。

编译

# 安装 colcon(此步全局仅在第一次执行时需要执行,如果失败会导致 colcon build 命令不可用)
sudo apt install python3-colcon-ros

# 进入工作空间
cd ~/workspace

# 编译工作空间所有功能包
# 不论其是直接在src下,还是在src下的子文件夹下
colcon build

# 当你只有一个包更改,不希望全编译,仅编译learning_node
colcon build --packages-select learning_node

colcon build 命令会将所有功能包,编译成可执行文件。

编译成功后,就可以在工作空间中看到自动生产的build、log、install文件夹了。

  • src,代码空间,未来编写的代码、脚本,都需要人为的放置到这里;
  • build,编译空间,保存编译过程中产生的中间文件;
  • install,安装空间,放置编译得到的可执行文件和脚本;
  • log,日志空间,编译和运行过程中,保存各种警告、错误、信息等日志。

我们把编译后,含有build、log、install、src文件夹的workspace称为工作空间

如果是 Python 这样无需编译的,会将代码直接拷贝到 install 文件夹下。

编译过程可能出现的报错:

  • 节点重名:对于节点同名,ROS 的处理方式不是使用第一个节点或最后一个节点,而是会报错。应该删除多余节点。确保你 src 文件夹下,所有的 xml 文件中,节点的名称是唯一的。

  • 语法错误:如果代码语法错误,则会编译失败,需要手动解决。

  • 缺少依赖包:如果你严格跟随参考资料且为新系统,首先检查你是否安装过程网络异常。如果是曾用系统,检查是否含有多个 Python 环境。

# 删除缓存
rm -rf build/ install/ log/

# 修改环境变量PATH的值,把系统默认的环境提到最前
export PATH="/usr/bin:$PATH"

# PYTHON_EXECUTABLE 是一个 CMake 变量,用于指定构建过程中应该使用哪个 Python 解释器
export PYTHON_EXECUTABLE=/usr/bin/python3 && colcon build --cmake-args -DPYTHON_EXECUTABLE=/usr/bin/python3

在不更换工作空间的情况下,以下命令仅在第一次执行时需要执行。如果你的工作空间路径不是~/workspace,请将~/workspace替换为你的工作空间路径。

# 立刻执行脚本
source ~/workspace/install/setup.sh

# 之后每次开启终端执行执行脚本
echo "source ~/workspace/install/setup.sh" >> ~/.bashrc

运行

运行脚本:ros2 run learning_node node_helloworld

查看正在运行的 node:ros2 node list

rclpy

rclpy 模块,ROS 2 Python 标准接口(ROS 2 canonical API with Python)

官网:https://docs.ros.org/en/rolling/p/rclpy/index.html

受限于篇幅,只简单介绍其中初始化部分

入门体验

下面是一个标准的初始化代码 init 支持传入多种参数。

import rclpy                                     # ROS2 Python接口库
from rclpy.node import Node # ROS2 节点类
import time

def main(args=None): # ROS2节点主入口main函数
rclpy.init(args=args) # ROS2 Python接口初始化
node = Node("node_helloworld") # 创建ROS2节点对象并进行初始化

while rclpy.ok(): # ROS2系统是否正常运行
node.get_logger().info("Hello World") # ROS2日志输出
time.sleep(0.5) # 休眠控制循环时间
node.destroy_node() # 销毁节点对象
rclpy.shutdown() # 关闭ROS2 Python接口
初始化

rclpy.init(*, args: List[str] | None = None, context: Context | None = None, domain_id: int | None = None, signal_handler_options: rpyutils.import_c_library.SignalHandlerOptions | None = None)→ InitContextManager

直接初始化比上下文初始化拥有更多的参数。

Parameters:

  • args – 命令行参数列表。
  • context – 要初始化的上下文。如果为 None,则使用默认上下文。
  • domain_id – ROS 域 ID。
  • signal_handler_options – 指示要安装的信号处理程序。如果 None,则在初始化默认上下文时将安装 SIGINT 和 SIGTERM。【类似于 Python 的信号(signal)机制,是操作系统发出的异步事件。而非异常,异常是程序内部发生的同步事件。】

Returns:

  • 一个 InitContextManager,可以与 Python 上下文管理器一起使用进行清理。
循环执行

ROS 2 中的 Executor(执行器)是用来管理和运行 ROS 2 节点中所有回调函数的核心组件。你可以把它看作是一个调度器,它决定了节点里的各种任务(例如接收新消息、处理服务请求、定时器触发)何时执行。

Executor 负责调度和执行节点中的回调函数。一个 ROS 2 节点可以有多个回调函数,比如:

  • 订阅者的回调函数:每当收到一个新消息时被调用。

  • 服务的请求回调函数:每当收到一个服务请求时被调用。

  • 定时器的回调函数:每隔一定时间被调用。

节点本身并不会自动运行这些回调函数,它需要一个 Executor 来 "spin"(自旋),也就是不断地检查是否有就绪的任务并执行它们。ROS 2 将节点和执行器分开,提供了灵活性。你可以选择不同的 Executor 来适应你的应用需求:

  • SingleThreadedExecutor:这是最简单的执行器,它在一个单独的线程里顺序执行所有就绪的回调函数。

  • MultiThreadedExecutor:这个执行器使用一个线程池,可以同时执行多个回调函数,这对于需要高并发处理的节点非常有用。

rclpy.spin(node: Node, executor: Executor | None = None)→ None:执行工作,直到与执行器关联的上下文关闭。

  • node – 要添加到执行器以检查工作的节点。
  • 要使用的执行器,如果是 None,则为全局执行器。

rclpy.spin_once(node: Node, *, executor: Executor | None = None, timeout_sec: float | None = None)→ None:执行一项工作或等待超时到期。

只要该回调在超时到期之前准备就绪,提供的执行器就会执行一个回调。如果未提供执行器(即 None),则使用全局执行器。如果全局执行器具有部分完成的协程,则完成的工作可能是针对提供的节点以外的节点。

  • node – 要添加到执行器以检查工作的节点。
  • executor – 要使用的执行器,如果是 None,则为全局执行器。
  • timeout_sec – 等待几秒钟。如果为 “无” 或“负数,则永久阻止”。如果为 0,请不要等待。

rclpy.spin_until_future_complete(node: Node, future: Future[Any], executor: Executor | None = None, timeout_sec: float | None = None)→ None : 执行工作,直到Future完成。

  • node – 要添加到执行器以检查工作的节点。
  • 要等待的 future 对象。直到 future.done() 返回 True 或与执行器关联的上下文为 shutdown。
  • 要使用的执行器,如果是 None,则为全局执行器。
  • 等待几秒钟。如果为 None 或负数,则阻止直到未来完成。如果为 0,请不要等待。
关闭

rclpy.shutdown(*, context: Context | None = None, uninstall_handlers: bool | None = None)→ None

关闭以前初始化的上下文(如果尚未关闭)。这也将关闭全局执行器,如果已关闭会报错。

  • context – 要失效的上下文。如果为 None,则使用默认上下文
  • uninstall_handlers – 如果为 None,则在关闭默认上下文时将卸载信号处理程序。如果为 True,则将卸载信号处理程序。如果为 False,则不会卸载信号处理程序。和前面signal_handler_options对应上。

rclpy.try_shutdown(*, context: Context | None = None, uninstall_handlers: bool | None = None)→ None

关闭以前初始化的上下文(如果尚未关闭)。这也将关闭全局执行器,如果已关闭不会报错。

  • context – 要失效的上下文。如果为 None,则使用默认上下文
  • uninstall_handlers – 如果为 None,则在关闭默认上下文时将卸载信号处理程序。如果为 True,则将卸载信号处理程序。如果为 False,则不会卸载信号处理程序。和前面signal_handler_options对应上。

上下文

上下文管理器 Context 与直接使用 rclpy 基本一致,但背后其实是线程管理,每个独立的上下文管理器都是独立的线程。

如果使用 rclpy.init(args=args) 则无法完成创建 2 个不同的 domain_id 的节点。

from rclpy.node import Node                      # ROS2 节点类
from rclpy.context import Context # ROS2 上下文类
import time

def main(args=None): # ROS2节点主入口main函数
# 使用Context上下文管理器的正确方式
context = Context()
context.init(args,domain_id=1)
node = Node("node_helloworld", context=context) # 创建节点并关联上下文

context2 = Context()
context2.init(args,domain_id=2)
node2 = Node("node_helloworld", context=context2) # 创建节点并关联上下文

while context.ok() and context2.ok(): # 检查上下文是否正常
node.get_logger().info("{}".format(context.get_domain_id())) # ROS2日志输出
node2.get_logger().info("{}".format(context2.get_domain_id())) # ROS2日志输出
time.sleep(0.5) # 休眠控制循环时间
context2.shutdown()
context.shutdown()

使用上下文管理器 Context 的 with 语法可以自动 init(不支持参数),并在 with 语句结束时调用 shutdown,代码更加简洁。

类似于 Python 的 with openclose

from rclpy.node import Node                      # ROS2 节点类
from rclpy.context import Context # ROS2 上下文类
import time

def main(args=None): # ROS2节点主入口main函数
# 使用Context上下文管理器的正确方式
with Context() as context:
node = Node("node_helloworld", context=context) # 创建节点并关联上下文
while context.ok(): # 检查上下文是否正常
# 输出当前的domain id ,默认是0
# 修改domain id方式1 :修改当前终端变量
# export ROS_DOMAIN_ID=1
node.get_logger().info("{}".format(context.get_domain_id()))
time.sleep(0.5) # 休眠控制循环时间
初始化

context.init(args: List[str] | None = None, *, initialize_logging: bool = True, domain_id: int | None = None)→ None

为给定上下文初始化 ROS 通信。

  • args – 命令行参数列表。
  • initialize_logging – 是否初始化整个过程的日志记录。默认值是初始化日志记录。
  • domain_id – 要用于此上下文的域 ID。如果为无(默认值),则使用域 ID 0。
状态检察

context.get_domain_id()→ int:获取上下文的 domain ID。

context.ok()→ bool:检查上下文是否未关闭。

跟踪

track_node(node: Node)→ None:跟踪与此上下文关联的节点。当上下文被销毁时,它将销毁它跟踪的每个节点。

untrack_node(node: Node)→ None:如果一个节点在上下文之前被销毁,我们不再需要跟踪它是否销毁了上下文,所以在此处将其删除。

关闭上下文

context.shutdown()→ None:关闭此上下文,会等待正在进行的操作完成,然后清理资源。好比执行系统关机,如果已关闭会报错。

on_shutdown(callback: Callable[[], None])→ None:添加关机时调用的回调。

try_shutdown()→ None:尝试关闭此上下文(如果尚未关闭),如果已关闭也不会报错。

context.destroy()→ None:销毁上下文,立即释放资源,不等待正在进行的操作。好比直接断电。

通信示例

ROS2的基础通信类型的示例代码都在ros2_21_tutorials仓库中。拉取到src文件夹中,参考上文编译、运行即可。

话题

  • 运行话题的发布者节点 : ros2 run learning_topic topic_helloworld_pub
  • 运行话题的订阅者节点 : ros2 run learning_topic topic_helloworld_sub

可以看到发布者循环发布“Hello World”字符串消息,订阅者也以几乎同样的频率收到该话题的消息数据。

服务

  • 运行服务的服务端节点 : ros2 run learning_service service_adder_server
  • 运行服务的客户端节点 : ros2 run learning_service service_adder_client 2 3

动作

  • 运行动作的服务端节点 : ros2 run learning_action action_move_server
  • 运行动作的客户端节点 : ros2 run learning_action action_move_client

参数

等价于机器学习中的超参数即:在开始训练前设置的参数,例如:学习率、隐藏层大小、迭代次数等等。

ROS 机器人系统中的参数是全局字典,可以运行多个节点中共享数据。
  • 运行节点:ros2 run learning_parameter param_declare
  • 在另外终端设置参数:ros2 param set param_declare robot_name turtle

Gazebo

Gazebo 是一款开源的 3D 仿真软件,用于模拟机器人在复杂环境中的行为。它提供了丰富的物理引擎和传感器模型,可以模拟机器人的运动、碰撞、传感器数据等。

# 安装 gz
sudo apt-get install ros-jazzy-ros-gz

# 测试(参数 empty.sdf 表示空场景)
# 测试能否正常启动且无卡顿,gz_args:=empty.sdf 表示使用空场景
ros2 launch ros_gz_sim gz_sim.launch.py gz_args:=empty.sdf

# 我的 Gazebo 版本
"""
Gazebo Sim, version 8.9.0
Copyright (C) 2018 Open Source Robotics Foundation.
Released under the Apache 2.0 License.
"""

gazebosim 有大量下载好的模型(models)与场景(worlds)可以使用。

想简单点就选一个自带摄像头的机器人模型(model),如果有中意的机器人模型没摄像头的,可以自己加一个摄像头。

选赛道类的,有明显的车道线,可以较好的测试算法。如果场景过于简单可以适当的添加一些模型,例如:路灯、树木、箱子等。

从结构上来说,场景(world)包含 N 个模型(model),模型包含 M 个传感器(sensor)。

虚拟仿真的世界观和 Blender 的世界观类似,一个世界需要有物理引擎、光源、模型等等。首先我们需要简单了解下这样的世界观。

SDF 格式中世界可接收的子元素:http://sdformat.org/spec?ver=1.4&elem=world

SDF 格式中模型可接收的子元素:http://sdformat.org/spec?ver=1.4&elem=model

Gazebo 的传感器库:https://gazebosim.org/libs/sensors/

SDF 格式中传感器可接收的子元素http://sdformat.org/spec?ver=1.4&elem=sensor

这种结构化的文档可读性要比传统的纯文档观感好很多。

URDF

URDF(Unified Robot Description Format)是一种用于描述机器人模型的 XML 格式。

一个两轮机器人运动示例 官方教程

通过这个部分的学习,你可以学会如何操纵你的小车运动。

其中 pose 标签有 6 个参数,官方文档认为比较简单没讲,这里提一下:

  • 前三个参数表示三维空间坐标(取值范围为空间内即可)
  • 后三个参数表示绕某个轴的角度(控制物体面朝什么方向)
方向正值含义颜色标识
X 轴前后方向向前(North)🔴 红色
Y 轴左右方向向左(West)🟢 绿色
Z 轴上下方向向上🔵 蓝色
角度旋转方向常用值
Roll绕 X 轴旋转翻滚通常为 0
Pitch绕 Y 轴旋转俯仰通常为 0
Yaw绕 Z 轴旋转左右转头0, 1.57, 3.14, -1.57

Yaw 角度对照表

Yaw (弧度)角度朝向示意
0朝向 +X(前)
1.5790°朝向 +Y(左)
3.14180°朝向 -X(后)
-1.57-90°朝向 -Y(右)

传感器

基于不同的目的,我们会选择不同的传感器结合不同的算法来实现。

传感器应用场景优势劣势
激光雷达SLAM建图、避障精度高,不受光照影响价格贵,无颜色信息
摄像头目标识别、视觉SLAM成本低,信息丰富受光照影响,计算量大
深度相机室内SLAM、3D重建提供RGB+深度,实时性好仅室内可用,范围有限(5m)
IMU姿态估计、传感器融合高频输出,不依赖环境存在漂移,需融合其他传感器

机器人专业可以参考 官方文档教程,其中使用的是 IMU 传感器、接触式传感器和激光雷达传感器。

人工智能专业可以下载我设计的 简易地图

ros_gz_bridge

此部分代码来自官方文档 https://gazebosim.org/docs/latest/ros2_integration/

数据流向

  • 摄像头 → /camera 话题 → 你的视觉算法节点
  • 你的算法节点 → /cmd_vel 话题 → 机器人运动控制

这个项目中涉及到 2 种通信信息格式的转换:

从 ROS2_Gazebo 流出的传感器数据,需要转为 Python 能处理的数据格式。

内置摄像头插件:会自动将摄像头数据发布到 /camera 话题

启动后可以通过 rviz2 查看接收到的图片。

从算法节点流入 ROS2_Gazebo 的控制信息,需要是 ROS2_Gazebo 能理解的数据格式。

运动控制插件gz::sim::systems::DiffDrive 插件会监听 /cmd_vel 话题来控制机器人运动

我们把需要执行的命令写入到 Makefile 里,便于后续执行。

Makefile
.PHONY: world sim img cam vel bridge go left right stop rv node start

# 启动完整世界
world:
gz sim -r world.sdf


# 桥接摄像头和控制话题 (simple.sdf 的 /camera和/cmd_vel)
bridge:
ros2 run ros_gz_bridge parameter_bridge /camera@sensor_msgs/msg/Image@gz.msgs.Image /cmd_vel@geometry_msgs/msg/Twist]gz.msgs.Twist

start:
colcon build --packages-select learning_node
ros2 run learning_node node_helloworld

# 单独桥接摄像头 (simple.sdf 的 /camera)
cam:
ros2 run ros_gz_bridge parameter_bridge /camera@sensor_msgs/msg/Image@gz.msgs.Image
# 启动 rviz2
rv:
rviz2

# 单独桥接控制话题 (simple.sdf 的 /cmd_vel)
vel:
ros2 run ros_gz_bridge parameter_bridge /cmd_vel@geometry_msgs/msg/Twist]gz.msgs.Twist

# 前进
go:
ros2 topic pub -1 /cmd_vel geometry_msgs/msg/Twist "{linear: {x: 1.0, y: 0.0, z: 0.0}, angular: {x: 0.0, y: 0.0, z: 0.0}}"
ros2 topic pub -1 /cmd_vel geometry_msgs/msg/Twist "{linear: {x: 0.0, y: 0.0, z: 0.0}, angular: {x: 0.0, y: 0.0, z: 0.0}}"

# 左转
left:
ros2 topic pub -1 /cmd_vel geometry_msgs/msg/Twist "{linear: {x: 0.5, y: 0.0, z: 0.0}, angular: {x: 0.0, y: 0.0, z: 0.5}}"
ros2 topic pub -1 /cmd_vel geometry_msgs/msg/Twist "{linear: {x: 0.0, y: 0.0, z: 0.0}, angular: {x: 0.0, y: 0.0, z: 0.0}}"

# 右转
right:
ros2 topic pub -1 /cmd_vel geometry_msgs/msg/Twist "{linear: {x: 0.5, y: 0.0, z: 0.0}, angular: {x: 0.0, y: 0.0, z: -0.5}}"
ros2 topic pub -1 /cmd_vel geometry_msgs/msg/Twist "{linear: {x: 0.0, y: 0.0, z: 0.0}, angular: {x: 0.0, y: 0.0, z: 0.0}}"

# 停止
stop:
ros2 topic pub -1 /cmd_vel geometry_msgs/msg/Twist "{linear: {x: 0.0, y: 0.0, z: 0.0}, angular: {x: 0.0, y: 0.0, z: 0.0}}"

调试命令

# 查看 GZ 中的话题
gz topic -l

# 查看 ROS2 中活跃的话题
ros2 topic list

# 查看图像话题数据
ros2 topic echo /camera/image
# 查看速度控制话题
ros2 topic echo /model/my_robot/cmd_vel

算法节点

tip

你是否需要去了解一个算法?

从功利角度来说:

  • 当你需要改进这个算法时,需要了解它的工作原理和实现细节。
  • 当你需要调参时,需要了解参数背后的意义。

如果某个算法默认参数表现足够,其实可以先不用知道它是怎么实现的。

下面代码实现了一个视觉运动控制节点,从 /camera 话题订阅图像数据,并处理后发布到 /cmd_vel 话题。

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2
import numpy as np
from geometry_msgs.msg import Twist

class VisualMotionController(Node):
def __init__(self):
super().__init__('visual_motion_controller')

# 订阅摄像头话题 (simple.sdf 中是 /camera)
self.image_subscription = self.create_subscription(
Image,
'/camera',
self.process_and_control,
10
)

# 发布控制话题 (simple.sdf 中是 /cmd_vel)
self.velocity_publisher = self.create_publisher(
Twist,
'/cmd_vel',
10
)

self.bridge = CvBridge()
self.move_speed = 0.5
self.turn_speed = 0.5
self.step = 0 # 计步器
self.get_logger().info('视觉运动控制节点已启动')


def process_and_control(self, msg):
"""处理图像并生成控制命令"""
try:
# 将ROS图像消息转换为OpenCV格式
cv_image = self.bridge.imgmsg_to_cv2(msg, 'bgr8')
cv2.imshow('car',cv_image)
cv2.waitKey(1)
self.step += 1

# 这里可以添加你的视觉处理算法
# 例如:检测障碍物、识别路径等
# gray = cv2.cvtColor(cv_image, cv2.COLOR_BGR2GRAY)
# edges = cv2.Canny(gray, 50, 150)
# 在这里编写你的运动控制算法

# 简单的控制逻辑示例
cmd = Twist()
if self.step < 100:
# 前进100帧
cmd.linear.x = self.move_speed
cmd.angular.z = 0.0
self.get_logger().info(f'前进中... 步数: {self.step}')
elif self.step < 160:
# 左转60帧
cmd.linear.x = 0.0
cmd.angular.z = self.turn_speed
self.get_logger().info(f'左转中... 步数: {self.step}')
else:
# 重置计步器
self.step = 0

self.velocity_publisher.publish(cmd)

except Exception as e:
self.get_logger().error(f'处理图像时出错: {str(e)}')


def main(args=None):
"""主函数"""
rclpy.init(args=args)

# 创建视觉运动控制节点
controller = VisualMotionController()

try:
# 启动节点
rclpy.spin(controller)
except KeyboardInterrupt:
controller.get_logger().info('节点被用户中断')
finally:
# 发送停止指令
stop_cmd = Twist()
controller.velocity_publisher.publish(stop_cmd)
# 清理资源
controller.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()

启动命令:

# 编译工作空间(以learning_node为例)
colcon build --packages-select learning_node

# 启动节点(以learning_node为例)
ros2 run learning_node node_helloworld

项目目标是在一个凹字形的地图上完成循迹运动,同时注意避开斑马线上的行人。

不是一个很难的项目,但是一个和实际项目流程一致要独立完成思路设计、算法选型、参数可视化的项目。

任务分析与思路:

预处理

预处理是基本流程,和数据分析前先数据清洗一样。有固定的流程和套路,任何一本Opencv教材都包含这部分内容。

感兴趣区域

好处:

  • 减少无关信息的干扰
  • 加快计算速度

地平线以上的内容其实和我们关系不大,只有检测行人的时候才会有影响。可以在检测到斑马线的时候计算完整图片。

所以我们先设置 roi(感兴趣区域)为图像的下面的 50%

def extract_roi(image ,roi_start_ratio=0.5):
"""提取感兴趣区域(ROI)

Args:
image: 输入的BGR图像
roi_start_ratio: 感兴趣区域。(0表示整张都感兴趣、0.5表示下半部分感兴趣)

Returns:
roi_image: ROI区域图像
"""
# 获取图像尺寸
height, width = image.shape[:2]

# 计算ROI起始行(从图像中部开始,关注前方地面)
roi_start = int(height * roi_start_ratio)

# 提取ROI区域(图像下半部分)
roi_image = image[roi_start:height, 0:width]

return roi_image

灰度化与二值化

这一步一般是为后面的边缘检测做铺垫,简单的算法也可以直接基于灰度化和二值化的结果做处理。

二值化的算法就很多了,我们的世界是有光源和树木的投影的,另外计算速度要快,所以排除 固定阈值法Otsu's 方法自适应高斯法

这里 全局平均值法自适应均值法 和我们可以都写出来,后面看情况选择。


def preprocess_and_binarize(bgr_image, method='mean', **kwargs):
"""完整的预处理和二值化流程

invert: 是否反向二值化(True=低于阈值变白,False=高于阈值变白)

Args:
bgr_image: 输入的BGR彩色图像
method: 二值化方法,可选 'mean', 'adaptive'
**kwargs: 各方法的额外参数
- invert: 是否反向二值化
- block_size: 自适应方法的邻域大小
- C: 自适应方法的常数偏移

Returns:
binary_image: 二值化结果
"""
# Step 1: 灰度化
gray_image = cv2.cvtColor(bgr_image, cv2.COLOR_BGR2GRAY)

# Step 2: 根据选择的方法进行二值化
invert = kwargs.get('invert', False)

if method == 'adaptive':
# 选择二值化类型
thresh_type = cv2.THRESH_BINARY_INV if invert else cv2.THRESH_BINARY

# 自适应均值法:每个像素的阈值由其邻域的均值决定
binary_image = cv2.adaptiveThreshold(
gray_image,
255, # 最大值
cv2.ADAPTIVE_THRESH_MEAN_C, # 使用邻域均值
thresh_type, # 二值化类型(正向或反向)
11, # 邻域大小(必须是奇数),越大对光照变化越不敏感
2 # 常数偏移从均值中减去的常数,用于微调阈值
)
elif method == 'mean':
thresh_type = cv2.THRESH_BINARY_INV if invert else cv2.THRESH_BINARY
_, binary_image = cv2.threshold(gray_image, gray_image.mean(), 255, thresh_type)
else:
raise ValueError(f"未知的二值化方法: {method}")

return binary_image
去噪

这个项目中都是纯色,只有斑马线附近有异常色块。考虑到实际摄像头会有噪点。做个简单的开运算(先腐蚀后膨胀,用于去除图像中的小噪声)

def denoise_binary_image(binary_image, kernel_size=(5, 5)):
"""使用形态学开运算去除二值图像中的噪声

Args:
binary_image: 输入的二值化图像(0或255)
kernel_size: 结构元素大小,默认为(5, 5)

Returns:
denoised_image: 去噪后的二值图像
"""
# 创建结构元素(矩形核)
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, kernel_size)

# 开运算 = 先腐蚀后膨胀
# 作用:去除小的白色噪点(孤立的亮点)
denoised_image = cv2.morphologyEx(binary_image, cv2.MORPH_OPEN, kernel)

return denoised_image

可视化

预处理完成之后就可以可视化展示了,这里我觉得可以展示感兴趣的原图、去噪后的图。

def visualize_preprocessing(roi_image, binary_image):
"""可视化预处理的各个阶段

Args:
roi_image: 感兴趣区域的原图(ROI)
denoised_image: 去噪后的二值化图像
"""
# 显示ROI区域原图
cv2.imshow('ROI', roi_image)

# 显示去噪后的结果
cv2.imshow('ROI-II', binary_image)

cv2.waitKey(1)

运动算法设计

这里我学习 CV 的时候学习了边缘检测、轮廓检测、光流法。我自己可以想到直接计算车道像素中心、像素逐行扫描。

OpenCV 的算法 API 有成百上千种(没开玩笑),这里我记住了常见场景的主流算法。

之前我们常说:"talk is cheap, show me your code."

现在 AI 时代,人人都能写代码,写好提示词变得更重要了,所以是:"code is cheap, show me your talk."

这里是我给 AI 的提示词:

  • 头脑风暴一下车道线跟随算法,适合弯道较多,有斑马线的。简单列出原理、优缺点。

以下是 Claude-4.5-sonnet 的答案:


### 🎯 适合弯道多+斑马线场景的算法(按推荐度排序)

**⭐⭐⭐ 推荐方案1:滑动窗口法 + 多项式拟合**
- **原理**:从图像底部开始,用小窗口逐层向上搜索黑色像素 → 记录窗口中心 → 用二次/三次多项式拟合曲线
- **优点**:
- ✅ 对弯道适应性极好(多项式天然支持曲线)
- ✅ 对斑马线有很强抗干扰性(逐层搜索,局部异常不影响整体)
- ✅ 可以处理车道线断裂
- ✅ 业界成熟方案(自动驾驶常用)
- **缺点**:
- ❌ 实现复杂度较高
- ❌ 计算量大(需要多个窗口迭代)
- **适用场景**:复杂环境、弯道、遮挡、斑马线

---

**⭐⭐⭐ 推荐方案2:像素列扫描法 + 边界追踪**
- **原理**:逐列扫描图像 → 统计每列的黑色像素分布 → 识别左右边界 → 计算中心线 → 过滤异常列(斑马线)
- **优点**:
- ✅ 实现简单,计算量适中
- ✅ 对弯道适应好(每列独立处理)
- ✅ 可以检测车道宽度变化
- ✅ 容易过滤斑马线(通过连续性检查)
- **缺点**:
- ❌ 需要假设车道在视野中
- ❌ 对噪声敏感(需要平滑处理)
- **适用场景**:中等复杂度,对实时性要求高

---

**⭐⭐ 推荐方案3:直方图投影法 + 峰值追踪**
- **原理**:垂直投影ROI → 找到左右车道的双峰 → 逐层向上追踪峰值移动 → 拟合曲线
- **优点**:
- ✅ 对光照不敏感
- ✅ 可以明确区分左右车道
- ✅ 斑马线在垂直投影中容易识别(横向特征)
- ✅ 计算效率高
- **缺点**:
- ❌ 急弯时左右车道可能重叠,峰值不明显
- ❌ 需要车道相对对称
- **适用场景**:车道清晰、对称性好、中等弯道

---

**⭐⭐ 推荐方案4:分区域重心法 + 轨迹连接**
- **原理**:将ROI分成多个水平带 → 每个带计算黑色像素重心 → 连接各带重心形成轨迹 → 过滤异常点(斑马线)
- **优点**:
- ✅ 对弯道适应好(分段处理)
- ✅ 实现简单,是重心法的改进版
- ✅ 可以检测局部异常(斑马线导致的重心跳变)
- ✅ 计算量小
- **优点**:
- ❌ 仍然对噪声敏感
- ❌ 需要设计异常点过滤规则
- **适用场景**:资源受限、需要快速原型

---

**⭐ 推荐方案5:边缘检测 + 分段霍夫变换**
- **原理**:Canny边缘检测 → 将ROI分成多段 → 每段独立做霍夫直线检测 → 连接直线段近似曲线 → 过滤斑马线(横向线)
- **优点**:
- ✅ 经典方法,鲁棒性好
- ✅ 可以用线段方向过滤斑马线(横向vs纵向)
- ✅ 分段处理可适应弯道
- **缺点**:
- ❌ 参数调节复杂
- ❌ 计算量大
- ❌ 对断裂车道线处理困难
- **适用场景**:车道线清晰、对准确度要求高

---

**⭐ 推荐方案6:形态学骨架提取 + 中轴线追踪**
- **原理**:二值化 → 形态学细化(skeletonization) → 提取黑色区域的中轴线 → 追踪中轴线
- **优点**:
- ✅ 直接得到车道中心,无需计算
- ✅ 对弯道自然适应
- ✅ 斑马线会被细化但不影响主路径
- **缺点**:
- ❌ 细化算法计算量大
- ❌ 对断裂敏感
- ❌ 可能产生分支(需要后处理)
- **适用场景**:车道连续、形状复杂

---

### 🔥 针对你的场景(弯道多+斑马线)最佳组合方案

**方案A:滑动窗口法(最优效果)**

二值化 → 去噪 → 从底部开始滑动窗口 →
逐层找车道像素 → 多项式拟合 →
选取前视点 → Pure Pursuit控制


**方案B:分区域重心法(平衡方案)** ⭐ 推荐快速实现

二值化 → 去噪 → ROI分成5-10个水平带 →
每带计算重心 → 过滤异常点(斑马线) →
拟合平滑曲线 → 跟随中心点

**方案C:像素列扫描法(实时性强)**

二值化 → 去噪 → 逐列扫描 →
找左右边界 → 连续性检查过滤斑马线 →
计算中心 → 控制

如果以上方法都不合适,可以选择寻找新的算法和算法调参。现在的目的是验证方案是否可行,只有大致可用我们才进一步追求理解这个代码。

我们接着输入提示词:

  • 帮我实现:滑动窗口法 + 多项式拟合。要求:函数式编程,添加function docstring,无需逐行注释。
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2
import numpy as np
from geometry_msgs.msg import Twist

def extract_roi(image ,roi_start_ratio=0.5):
"""提取感兴趣区域(ROI)

Args:
image: 输入的BGR图像
roi_start_ratio: 感兴趣区域。(0表示整张都感兴趣、0.5表示下半部分感兴趣)

Returns:
roi_image: ROI区域图像
"""
# 获取图像尺寸
height, width = image.shape[:2]

# 计算ROI起始行(从图像中部开始,关注前方地面)
roi_start = int(height * roi_start_ratio)

# 提取ROI区域(图像下半部分)
roi_image = image[roi_start:height, 0:width]

return roi_image

def preprocess_and_binarize(bgr_image, method ,invert):
"""完整的预处理和二值化流程

otsu方法会自动计算最佳阈值,适合双峰分布的图像(如黑色车道+灰色地面)

adaptive自适应均值法适应局部光照变化,适合有阴影和光照不均的场景

invert: 是否反向二值化(True=低于阈值变白,False=高于阈值变白)

Args:
bgr_image: 输入的BGR彩色图像
method: 二值化方法,可选 'mean', 'adaptive'
**kwargs: 各方法的额外参数
- invert: 是否反向二值化
- block_size: 自适应方法的邻域大小
- C: 自适应方法的常数偏移

Returns:
binary_image: 二值化结果
"""
# 灰度化
gray_image = cv2.cvtColor(bgr_image, cv2.COLOR_BGR2GRAY)


if method == 'adaptive':
# 选择二值化类型
thresh_type = cv2.THRESH_BINARY_INV if invert else cv2.THRESH_BINARY

# 自适应均值法:每个像素的阈值由其邻域的均值决定
binary_image = cv2.adaptiveThreshold(
gray_image,
255, # 最大值
cv2.ADAPTIVE_THRESH_MEAN_C, # 使用邻域均值
thresh_type, # 二值化类型(正向或反向)
11, # 邻域大小(必须是奇数),越大对光照变化越不敏感
2 # 常数偏移从均值中减去的常数,用于微调阈值
)
elif method == 'mean':
thresh_type = cv2.THRESH_BINARY_INV if invert else cv2.THRESH_BINARY
_, binary_image = cv2.threshold(gray_image, gray_image.mean(), 255, thresh_type)
else:
raise ValueError(f"未知的二值化方法: {method}")

return binary_image

def denoise_binary_image(binary_image, kernel_size=(5, 5)):
"""使用形态学开运算去除二值图像中的噪声

Args:
binary_image: 输入的二值化图像(0或255)
kernel_size: 结构元素大小,默认为(5, 5)

Returns:
denoised_image: 去噪后的二值图像
"""
# 创建结构元素(矩形核)
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, kernel_size)

# 开运算 = 先腐蚀后膨胀
# 作用:去除小的白色噪点(孤立的亮点)
denoised_image = cv2.morphologyEx(binary_image, cv2.MORPH_OPEN, kernel)

return denoised_image

def visualize_preprocessing(roi_image, binary_image):
"""可视化预处理的各个阶段

Args:
roi_image: 感兴趣区域的原图(ROI)
denoised_image: 去噪后的二值化图像
"""
cv2.imshow('ROI', roi_image)
cv2.imshow('ROI-II', binary_image)
cv2.waitKey(1)


def find_lane_base_positions(binary_image, n_windows=9, edge_detection=True):
"""通过直方图找到左右车道线的起始位置

Args:
binary_image: 二值化图像(白色为车道线)
n_windows: 滑动窗口数量
edge_detection: 是否检测边缘模式(适合宽道路)

Returns:
left_base: 左车道线起始x坐标(None表示未检测到)
right_base: 右车道线起始x坐标(None表示未检测到)
"""
histogram = np.sum(binary_image[binary_image.shape[0]//2:, :], axis=0)
midpoint = histogram.shape[0] // 2

if edge_detection:
threshold = np.max(histogram) * 0.3
white_pixels = np.where(histogram > threshold)[0]

if len(white_pixels) == 0:
return None, None

left_candidates = white_pixels[white_pixels < midpoint]
right_candidates = white_pixels[white_pixels >= midpoint]

left_base = np.min(left_candidates) if len(left_candidates) > 0 else None
right_base = np.max(right_candidates) if len(right_candidates) > 0 else None
else:
left_half = histogram[:midpoint]
right_half = histogram[midpoint:]

left_base = np.argmax(left_half) if np.max(left_half) > 0 else None
right_base = np.argmax(right_half) + midpoint if np.max(right_half) > 0 else None

return left_base, right_base


def detect_road_edges(binary_image):
"""检测宽道路的左右边缘

Args:
binary_image: 二值化图像(白色为道路)

Returns:
left_edge_x: 左边缘x坐标数组
left_edge_y: 左边缘y坐标数组
right_edge_x: 右边缘x坐标数组
right_edge_y: 右边缘y坐标数组
"""
height, width = binary_image.shape
left_edge_x, left_edge_y = [], []
right_edge_x, right_edge_y = [], []

for y in range(height):
row = binary_image[y, :]
white_pixels = np.where(row > 127)[0]

if len(white_pixels) > 10:
left_edge_x.append(white_pixels[0])
left_edge_y.append(y)
right_edge_x.append(white_pixels[-1])
right_edge_y.append(y)

return (np.array(left_edge_x), np.array(left_edge_y)), (np.array(right_edge_x), np.array(right_edge_y))


def sliding_window_detection(binary_image, n_windows=9, margin=80, min_pixels=50):
"""使用滑动窗口法检测车道线像素

Args:
binary_image: 二值化图像(白色为车道线,0或255)
n_windows: 滑动窗口数量
margin: 窗口宽度的一半(像素)
min_pixels: 重新定位窗口所需的最小像素数

Returns:
left_lane_pixels: 左车道线像素坐标 (x_coords, y_coords),未检测到则为 ([], [])
right_lane_pixels: 右车道线像素坐标 (x_coords, y_coords),未检测到则为 ([], [])
window_info: 窗口信息列表,用于可视化 [(x_center, y_center, width, height), ...]
"""
height, width = binary_image.shape
window_height = height // n_windows

nonzero = binary_image.nonzero()
nonzero_y = np.array(nonzero[0])
nonzero_x = np.array(nonzero[1])

left_base, right_base = find_lane_base_positions(binary_image, n_windows, edge_detection=True)

left_current = left_base
right_current = right_base

left_lane_indices = []
right_lane_indices = []
window_info = []

for window in range(n_windows):
win_y_low = height - (window + 1) * window_height
win_y_high = height - window * window_height

if left_current is not None:
win_xleft_low = left_current - margin
win_xleft_high = left_current + margin
window_info.append(('left', win_xleft_low, win_y_low, win_xleft_high, win_y_high))

good_left_indices = ((nonzero_y >= win_y_low) & (nonzero_y < win_y_high) &
(nonzero_x >= win_xleft_low) & (nonzero_x < win_xleft_high)).nonzero()[0]
left_lane_indices.append(good_left_indices)

if len(good_left_indices) > min_pixels:
left_current = int(np.mean(nonzero_x[good_left_indices]))

if right_current is not None:
win_xright_low = right_current - margin
win_xright_high = right_current + margin
window_info.append(('right', win_xright_low, win_y_low, win_xright_high, win_y_high))

good_right_indices = ((nonzero_y >= win_y_low) & (nonzero_y < win_y_high) &
(nonzero_x >= win_xright_low) & (nonzero_x < win_xright_high)).nonzero()[0]
right_lane_indices.append(good_right_indices)

if len(good_right_indices) > min_pixels:
right_current = int(np.mean(nonzero_x[good_right_indices]))

left_lane_indices = np.concatenate(left_lane_indices) if left_lane_indices else np.array([])
right_lane_indices = np.concatenate(right_lane_indices) if right_lane_indices else np.array([])

left_x = nonzero_x[left_lane_indices] if len(left_lane_indices) > 0 else np.array([])
left_y = nonzero_y[left_lane_indices] if len(left_lane_indices) > 0 else np.array([])
right_x = nonzero_x[right_lane_indices] if len(right_lane_indices) > 0 else np.array([])
right_y = nonzero_y[right_lane_indices] if len(right_lane_indices) > 0 else np.array([])

return (left_x, left_y), (right_x, right_y), window_info


def fit_polynomial(lane_pixels, degree=2):
"""对车道线像素进行多项式拟合

Args:
lane_pixels: 车道线像素坐标 (x_coords, y_coords)
degree: 多项式阶数,默认为2(二次多项式)

Returns:
poly_coeffs: 多项式系数 [a, b, c, ...],使得 x = a*y^2 + b*y + c
如果拟合失败则返回 None
"""
x_coords, y_coords = lane_pixels

if len(x_coords) < degree + 1:
return None

poly_coeffs = np.polyfit(y_coords, x_coords, degree)
return poly_coeffs


def calculate_lane_center_line(left_coeffs, right_coeffs, image_height):
"""根据左右车道线的多项式系数计算中心线

Args:
left_coeffs: 左车道线多项式系数 [a, b, c]
right_coeffs: 右车道线多项式系数 [a, b, c]
image_height: 图像高度

Returns:
center_coeffs: 中心线多项式系数 [a, b, c]
center_points: 中心线采样点 [(x, y), ...]
"""
if left_coeffs is None or right_coeffs is None:
return None, []

y_vals = np.linspace(0, image_height - 1, image_height)

left_x = np.polyval(left_coeffs, y_vals)
right_x = np.polyval(right_coeffs, y_vals)
center_x = (left_x + right_x) / 2

center_coeffs = np.polyfit(y_vals, center_x, 2)

sample_points = 50
y_samples = np.linspace(0, image_height - 1, sample_points, dtype=int)
x_samples = np.polyval(center_coeffs, y_samples)
center_points = list(zip(x_samples.astype(int), y_samples))

return center_coeffs, center_points


def calculate_control_command(center_coeffs, image_width, image_height,
lookahead_ratio=0.7, k_p=2.0,
v_straight=1.0, v_curve=0.5, curve_threshold=0.0005):
"""基于中心线多项式系数计算运动控制指令

Args:
center_coeffs: 中心线多项式系数 [a, b, c],x = a*y^2 + b*y + c
image_width: 图像宽度
image_height: 图像高度
lookahead_ratio: 前视点位置比例(0-1),越大越远
k_p: 比例增益
v_straight: 直道速度 (m/s)
v_curve: 弯道速度 (m/s)
curve_threshold: 曲率阈值,超过此值认为是弯道

Returns:
linear_velocity: 线速度 (m/s)
angular_velocity: 角速度 (rad/s)
lateral_error: 横向偏差(像素)
curvature: 曲率
"""
if center_coeffs is None:
return 0.0, 0.0, 0.0, 0.0

lookahead_y = int(image_height * lookahead_ratio)
lookahead_x = np.polyval(center_coeffs, lookahead_y)

image_center_x = image_width / 2
lateral_error = lookahead_x - image_center_x
normalized_error = lateral_error / (image_width / 2)

a, b, c = center_coeffs
curvature = abs(2 * a)

if curvature > curve_threshold or abs(normalized_error) > 0.2:
linear_velocity = v_curve
else:
linear_velocity = v_straight

angular_velocity = -k_p * normalized_error
angular_velocity = np.clip(angular_velocity, -2.0, 2.0)

return linear_velocity, angular_velocity, lateral_error, curvature


def visualize_lane_detection(original_image, binary_image, left_pixels, right_pixels,
left_coeffs, right_coeffs, center_coeffs, window_info,
lookahead_point=None, lateral_error=0.0,
linear_vel=0.0, angular_vel=0.0):
"""可视化车道线检测结果

Args:
original_image: 原始彩色图像
binary_image: 二值化图像
left_pixels: 左车道线像素 (x_coords, y_coords)
right_pixels: 右车道线像素 (x_coords, y_coords)
left_coeffs: 左车道线多项式系数
right_coeffs: 右车道线多项式系数
center_coeffs: 中心线多项式系数
window_info: 滑动窗口信息
lookahead_point: 前视点坐标 (x, y)
lateral_error: 横向偏差
linear_vel: 线速度
angular_vel: 角速度

Returns:
vis_image: 可视化图像
"""
vis_image = cv2.cvtColor(binary_image, cv2.COLOR_GRAY2BGR)
height = vis_image.shape[0]
width = vis_image.shape[1]

for window in window_info:
lane_type, x_low, y_low, x_high, y_high = window
color = (0, 255, 0) if lane_type == 'left' else (255, 0, 0)
cv2.rectangle(vis_image, (x_low, y_low), (x_high, y_high), color, 2)

left_x, left_y = left_pixels
right_x, right_y = right_pixels

if len(left_x) > 0:
for x, y in zip(left_x, left_y):
cv2.circle(vis_image, (int(x), int(y)), 2, (0, 255, 255), -1)

if len(right_x) > 0:
for x, y in zip(right_x, right_y):
cv2.circle(vis_image, (int(x), int(y)), 2, (255, 255, 0), -1)

y_vals = np.linspace(0, height - 1, height).astype(int)

if left_coeffs is not None:
left_fit_x = np.polyval(left_coeffs, y_vals).astype(int)
for i in range(len(y_vals) - 1):
cv2.line(vis_image, (left_fit_x[i], y_vals[i]),
(left_fit_x[i+1], y_vals[i+1]), (0, 255, 0), 3)

if right_coeffs is not None:
right_fit_x = np.polyval(right_coeffs, y_vals).astype(int)
for i in range(len(y_vals) - 1):
cv2.line(vis_image, (right_fit_x[i], y_vals[i]),
(right_fit_x[i+1], y_vals[i+1]), (255, 0, 0), 3)

if center_coeffs is not None:
center_fit_x = np.polyval(center_coeffs, y_vals).astype(int)
for i in range(len(y_vals) - 1):
cv2.line(vis_image, (center_fit_x[i], y_vals[i]),
(center_fit_x[i+1], y_vals[i+1]), (0, 0, 255), 4)

image_center_x = width // 2
cv2.line(vis_image, (image_center_x, 0), (image_center_x, height), (255, 255, 255), 2)

if lookahead_point is not None:
lx, ly = lookahead_point
cv2.circle(vis_image, (int(lx), int(ly)), 10, (255, 0, 255), -1)
cv2.arrowedLine(vis_image, (image_center_x, int(ly)), (int(lx), int(ly)),
(255, 0, 255), 3)

font = cv2.FONT_HERSHEY_SIMPLEX
cv2.putText(vis_image, f'Error: {lateral_error:.1f}px', (10, 30),
font, 0.6, (255, 255, 255), 2)

turn_info = "LEFT" if angular_vel > 0 else "RIGHT" if angular_vel < 0 else "STRAIGHT"
cv2.putText(vis_image, f'Angular: {angular_vel:.3f} ({turn_info})', (10, 60),
font, 0.6, (255, 255, 255), 2)

cv2.putText(vis_image, f'Speed: {linear_vel:.2f} m/s', (10, 90),
font, 0.6, (255, 255, 255), 2)

return vis_image


class VisualMotionController(Node):
"""基于视觉的车道线跟随控制节点

功能:
- 接收摄像头图像
- 滑动窗口法检测车道线
- 多项式拟合车道线
- 计算运动控制指令
- 发布速度控制命令
"""

def __init__(self):
super().__init__('visual_motion_controller')

# 订阅摄像头话题
self.image_subscription = self.create_subscription(
Image,
'/camera',
self.process_and_control,
10
)

# 发布控制话题
self.velocity_publisher = self.create_publisher(
Twist,
'/cmd_vel',
10
)

self.bridge = CvBridge()

# ========== 图像处理参数 ==========
self.roi_start_ratio = 0.5

# ========== 运动控制参数 ==========
self.lookahead_ratio = 0.7
self.k_p = 2.0
self.v_straight = 1.0
self.v_curve = 0.5
self.curve_threshold = 0.0005

# ========== 调试模式 ==========
self.debug_mode = False


def process_and_control(self, msg):
"""处理图像并计算运动控制指令(主回调函数)

Args:
msg: ROS2图像消息
"""
try:
# Step 1: 图像格式转换
cv_image = self.bridge.imgmsg_to_cv2(msg, 'bgr8')

# Step 2: 提取ROI
roi_image = extract_roi(cv_image, self.roi_start_ratio)

# Step 3: 图像预处理和二值化
binary_image = preprocess_and_binarize(roi_image, method='mean', invert=True)

# Step 4: 去噪
denoised = denoise_binary_image(binary_image)

# 可视化
visualize_preprocessing(roi_image, denoised)

# Step 5: 滑动窗口检测车道线
left_pixels, right_pixels, windows = sliding_window_detection(denoised)

# Step 6: 多项式拟合
left_fit = fit_polynomial(left_pixels)
right_fit = fit_polynomial(right_pixels)

# Step 7: 计算中心线
center_fit, center_pts = calculate_lane_center_line(
left_fit, right_fit, denoised.shape[0]
)

# Step 8: 计算运动控制指令
linear_vel, angular_vel, lateral_error, curvature = calculate_control_command(
center_fit,
denoised.shape[1],
denoised.shape[0],
lookahead_ratio=self.lookahead_ratio,
k_p=self.k_p,
v_straight=self.v_straight,
v_curve=self.v_curve,
curve_threshold=self.curve_threshold
)

# Step 9: 计算前视点用于可视化
lookahead_point = None
if center_fit is not None:
lookahead_y = int(denoised.shape[0] * self.lookahead_ratio)
lookahead_x = np.polyval(center_fit, lookahead_y)
lookahead_point = (lookahead_x, lookahead_y)

# Step 10: 可视化
vis = visualize_lane_detection(
roi_image, denoised,
left_pixels, right_pixels,
left_fit, right_fit, center_fit,
windows,
lookahead_point=lookahead_point,
lateral_error=lateral_error,
linear_vel=linear_vel,
angular_vel=angular_vel
)
cv2.imshow('Lane Detection', vis)
cv2.waitKey(1)

# Step 11: 发布控制命令
cmd = Twist()
if self.debug_mode:
cmd.linear.x = 0.0
cmd.angular.z = 0.0
self.get_logger().info('调试模式:运动已暂停', throttle_duration_sec=2.0)
else:
cmd.linear.x = linear_vel
cmd.angular.z = angular_vel
self.velocity_publisher.publish(cmd)

except Exception as e:
self.get_logger().error(f'处理图像时出错: {str(e)}')
import traceback
self.get_logger().error(traceback.format_exc())

def main(args=None):
"""主函数"""
rclpy.init(args=args)

# 创建视觉运动控制节点
controller = VisualMotionController()

try:
# 启动节点
rclpy.spin(controller)
except KeyboardInterrupt:
controller.get_logger().info('节点被用户中断')
finally:
# 发送停止指令
stop_cmd = Twist()
controller.velocity_publisher.publish(stop_cmd)
# 清理资源
cv2.destroyAllWindows()
controller.destroy_node()
rclpy.shutdown()


if __name__ == '__main__':
main()

注意:由于不同设备算力不同,因此对于低算力设备来说,运算时间过长可能会冲出车道线,需要降低运动速度。