Skip to main content

One post tagged with "ROS2"

View All Tags

ROS2视觉运动算法仿真

· 38 min read
Allen
normal software engineer
此内容根据文章生成,仅用于文章内容的解释与总结

本章节通过ROS2官方提供的Gazebo虚拟仿真,实现视觉运动算法仿真。

info

什么是视觉运动控制算法?

运动控制算法主要是通过传感器,将采集到的数据,转化为运动控制指令。

传感器包含:视觉、重力、力反馈、加速度、测距等多种类型。特斯拉的自动驾驶使用的方案中视觉是最主要的信息来源。

运动控制从易到难有:履带结构、差速结构、阿克曼结构、全向结构、足腿结构。

info

为什么要虚拟仿真?

目前较为先进的腿足结构机器人想要完成高难度动作,例如:后空翻、前空翻、韦伯斯特空翻。一旦失败会对本不充裕的原型机造成严重的损坏。

所以通过仿真软件训练,有如下好处:

  • 可以在低风险的情况下完成算法的调试和优化。
  • 对于复杂算法,可以同时启动多个仿真软件,进行并行训练,提高训练效率。
warning

虚拟仿真需要一定的算力。本地使用前,首先确保你的电脑包含显卡。

  • 高算力的电脑可以使用wsl安装Ubuntu。

  • ❌使用VM虚拟机会产生较大的延迟。

  • ✔普通电脑可以使用原生Ubuntu最大限度发挥硬件性能。

ROS2框架

ROS 2需要依赖于Ubuntu系统,其微控制器相较于普通MCU成本更高。主要用于高级机器人。

通信优势

丰富的通信类型

ROS 2节省自己搭建网络、定义通信类型、管理连接的时间。

通信模式作用特点应用场景
话题:发布-订阅 (Publish-Subscribe)单向、异步数据流高效、解耦,适用于传感器数据流、状态更新传感器数据流(如相机图像、激光雷达数据)、机器人状态更新(如位置、速度)
服务:服务端-客户端 (Service-Client)同步、请求-响应阻塞式通信,用于需要明确结果的任务触发特定动作(如抓取物体)、查询物体 ID
动作 (Actions)长时任务处理异步、支持进度反馈和取消移动机器人到指定位置、机械臂执行复杂任务
参数 (Parameters)动态配置节点行为非实时通信,用于启动时或运行时的配置调整调整 PID 控制器增益、修改传感器发布频率

自动发现机制

同为ROS 2,在同一网段下的不同设备可以自动发现

info

自动发现前提:指要求中继设备允许子设备自由通信。

中继设备可能是路由器、或者手机的热点、随身WIFI等。

允许子设备自由通信有利有弊,如果你在商场或者咖啡馆连接他们的WIFI,你的电脑可能提醒你:你的网络活动可能被其他设备发现。自由通信会带来一定的安全隐患。因此并不是所有的路由器都默认允许允许子设备自由通信。

有的设备会有防火墙,临时关闭防火墙:sudo ufw disable,另外要允许UDP多播端口(默认7400~7500)。

如果一切正常,你应该可以通过ping <其他机器的IP>并联通成功。

此时你可以通过ros2运行节点发布消息,其他设备的接收者会收到你的消息。

隔离机制

如果你同个网段下,有无人机和机器狗。你想将其编队为:空军 + 陆军;又或者5台无人机 + 机器狗为一组。

ROS 2提供了通信隔离机制

在同个编队的所有设备终端中执行:export ROS_DOMAIN_ID=1。(值范围为0~101,0为默认值)

这个命令是设置了一个环境变量。ROS 2通过判断这个环境变量来识别是否为一组。不同ID节点无法通信。

因此如果你想将其分为两组,只需要在一半的设备上改为非0的,相同的ID即可。

多设备跨域通信

如果你的设备分别在两个城市,可以通过配置一台公网下的服务器(也可以在两个网段下,配置一个都能访问的设备作为中继),将两个城市的设备连接到这台服务器上。这种通信方式被称为Fast-DDS

在服务器上运行:fastdds discovery --server-id 0

服务器开放11888端口。sudo ufw allow 11888/tcp

在设备上运行:export ROS_DISCOVERY_SERVERS=<服务器IP>:<服务器端口>。(端口默认11888)

设置ROS中间件为rmw_fastrtps_cpp(别人写好的包):export RMW_IMPLEMENTATION=rmw_fastrtps_cpp 然后重新监听:ros2 daemon stop,再重新启动:ros2 daemon start

模块仓库

ROS 2有大量写好的机器人相关的包,包也称:模块、功能包、库。

安装相机标定: sudo apt install ros-jazzy-camera-calibration

安装SLAM: sudo apt install ros-jazzy-slam-toolbox

你会发现你只需要安装一个包,就可以获得一个功能。和Python通过pip安装包一样。安装后即可实现"一行代码"实现某个功能。

ROS 2安装

安装过程中注意:ROS 2的版本往往与ubuntu的版本有关。查看https://docs.ros.org/可获得目前主流的版本。

设置编码

sudo apt update && sudo apt install locales
sudo locale-gen en_US en_US.UTF-8
sudo update-locale LC_ALL=en_US.UTF-8 LANG=en_US.UTF-8
export LANG=en_US.UTF-8

添加源

sudo apt update && sudo apt install curl gnupg lsb-release 
sudo curl -sSL https://raw.githubusercontent.com/ros/rosdistro/master/ros.key -o /usr/share/keyrings/ros-archive-keyring.gpg
echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/ros-archive-keyring.gpg] http://packages.ros.org/ros2/ubuntu $(source /etc/os-release && echo $UBUNTU_CODENAME) main" | sudo tee /etc/apt/sources.list.d/ros2.list > /dev/null

安装ROS 2

sudo apt update

sudo apt upgrade

# 如果你是ubuntu 24.04,那么安装jazzy版本
sudo apt install ros-jazzy-desktop

设置环境变量

# 立刻执行脚本
source /opt/ros/jazzy/setup.bash

# 之后每次开启终端执行执行脚本
echo " source /opt/ros/jazzy/setup.bash" >> ~/.bashrc

你也可以使用Docker简化这个过程。

复杂的功能需要很多依赖,系统中默认的Python3出于安全考虑,可能不让你在全局环境中安装包。代码的提示给了很多选项,例如安装在虚拟环境中,删除安全文件。加上关键字,你应该学会通过翻译软件,翻译报错信息。它让你看某个文件,也请学会使用翻译软件,翻译文件内容。

error: externally-managed-environment

× This environment is externally managed
╰─> To install Python packages system-wide, try apt install
python3-xyz, where xyz is the package you are trying to
install.

If you wish to install a non-Debian-packaged Python package,
create a virtual environment using python3 -m venv path/to/venv.
Then use path/to/venv/bin/python and path/to/venv/bin/pip. Make
sure you have python3-full installed.

If you wish to install a non-Debian packaged Python application,
it may be easiest to use pipx install xyz, which will manage a
virtual environment for you. Make sure you have pipx installed.

See /usr/share/doc/python3.12/README.venv for more information.

note: If you believe this is a mistake, please contact your Python installation or OS distribution provider. You can override this, at the risk of breaking your Python installation or OS, by passing --break-system-packages.
hint: See PEP 668 for the detailed specification.

根据提示,得到的解决方式之一:pip3 install 你的包 --break-system-packages

或者可以把externally managed文件删除。

sudo apt install -y python3-pip

sudo pip3 install rosdepc --break-system-packages

sudo rosdepc init

rosdepc update

节点基操

节点是通信的基本单位,节点可以变成:发布者、订阅者、服务端、客户端、动作、参数。

创建节点

创建节点前可以先创建一个目录

mkdir -p ~/workspace/src

如何在ROS2中创建一个功能包呢?我们可以使用这个指令:

ros2 pkg create --build-type <build-type> <package_name>

ros2命令中:

  • pkg:表示功能包相关的功能;
  • create:表示创建功能包;
  • build-type:表示新创建的功能包是C++还是Python的,如果使用C++或者C,那这里就跟ament_cmake,如果使用Python,就跟ament_python;
  • package_name:新建功能包的名字。
# 创建Python功能包
ros2 pkg create --build-type ament_python learning_node

# 创建C++功能包
ros2 pkg create --build-type ament_cmake learning_node

只需执行其中一个命令即可,如果你会Python,就执行创建Python节点的命令。命令创建一个名为learning_node文件夹。

这里我推荐大家创建节点名的时候,使用有辨识性的名字。例如:ros2 pkg create --build-type ament_python learning_node

info

注意:对于节点同名,ROS 2在编译时的处理方式不是使用第一个节点或最后一个节点,而是会报错。

编写代码

接着在learning_node文件夹我们就可以准备编写代码了,此时你的路径应该是:

~/workspace
|_src
|_learning_node/
|_learning_node
|_ __init__.py
|_package.xml
|_setup.py
|_setup.cfg
|_test/
|_resource/

此时你就可以编写代码了,在__init__.py同级下创建一个.py文件。

符合文件命名规则前提下,你想叫什么都可以,但是最好有一定的含义。

如果你的功能是打印Hello World,那么你就可以叫node_helloworld.py

python的函数通常是需要调用的,例如abc()函数,你需要在代码中输入abc(),才会调用这个函数。和C++有且仅有一个main函数不同,那么如何确定哪个函数是入口?更进一步思考:不同的指令对应不同的函数。

因此有了setup.py最下方的入口点定义。编写完成后记得修改setup.py最下方的入口点。

详细内容参可以参考现成的仓库:

info

拉取git仓库,可免去自己从0开始手敲代码。如果你喜欢自己从0开始抄写代码,也可以跳过本部分。

git安装:sudo apt install git

拉取命令:git clone https://gitee.com/guyuehome/ros2_21_tutorials.git

这个命令会把仓库克隆到本地。执行后,你应该能在当前文件夹下看到名为ros2_21_tutorials的文件夹,内含多个子文件夹。每个子文件夹对应一个功能包。因为ros2是模块化的,不同的功能被拆解为不同的功能包后期更好维护。

注意:如果你之前在src里创建了一些Python节点之后,又把ros2_21_tutorials文件夹也放置在src里,那么你需要检查你的创建的节点名,是否与ros2_21_tutorials内已有节点重名。

编译

# 安装colcon(此步全局仅在第一次执行时需要执行,如果失败会导致colcon build 命令不可用)
sudo apt install python3-colcon-ros

# 进入工作空间
cd ~/workspace

# 编译工作空间所有功能包
# 不论其是直接在src下,还是在src下的子文件夹下
colcon build

# 当你只有一个包更改,不希望全编译,仅编译learning_node
colcon build --packages-select learning_node

colcon build命令会将所有功能包,编译成可执行文件。

编译成功后,就可以在工作空间中看到自动生产的build、log、install文件夹了。

  • src,代码空间,未来编写的代码、脚本,都需要人为的放置到这里;
  • build,编译空间,保存编译过程中产生的中间文件;
  • install,安装空间,放置编译得到的可执行文件和脚本;
  • log,日志空间,编译和运行过程中,保存各种警告、错误、信息等日志。

我们把编译后,含有build、log、install、src文件夹的workspace称为工作空间

如果是Python这样无需编译的,会将代码直接拷贝到install文件夹下。

编译过程可能出现的报错:

  • 节点重名:对与节点同名,ros的处理方式不是使用第一个节点或最后一个节点,而是会报错。应该删除多余节点。确保你src文件夹下,所有的xml文件中,节点的名称是唯一的。

  • 语法错误:如果代码语法错误,则会编译失败,需要手动解决。

  • 缺少依赖包:如果你严格跟随参考资料且为新系统,首先检查你是否安装过程网络异常。如果是曾用系统,检查是否含有多个python环境。

# 删除缓存
rm -rf build/ install/ log/

# 修改环境变量PATH的值,把系统默认的环境提到最前
export PATH="/usr/bin:$PATH"

# PYTHON_EXECUTABLE 是一个CMake变量,用于指定构建过程中应该使用哪个Python解释器
export PYTHON_EXECUTABLE=/usr/bin/python3 && colcon build --cmake-args -DPYTHON_EXECUTABLE=/usr/bin/python3

在不更换工作空间的情况下,以下命令仅在第一次执行时需要执行。如果你的工作空间路径不是~/workspace,请将~/workspace替换为你的工作空间路径。

# 立刻执行脚本
source ~/workspace/install/setup.sh

# 之后每次开启终端执行执行脚本
echo "source ~/workspace/install/setup.sh" >> ~/.bashrc

运行

运行脚本 : ros2 run learning_node node_helloworld

查看正在运行的node : ros2 node list

rclpy

rclpy模块,ROS 2 Python标准接口 (ROS 2 canonical API with Python)

官网:https://docs.ros.org/en/rolling/p/rclpy/index.html

受限于篇幅,只简单介绍其中初始化部分

入门体验

下面是一个标准的初始化代码init支持传入多种参数。

import rclpy                                     # ROS2 Python接口库
from rclpy.node import Node # ROS2 节点类
import time

def main(args=None): # ROS2节点主入口main函数
rclpy.init(args=args) # ROS2 Python接口初始化
node = Node("node_helloworld") # 创建ROS2节点对象并进行初始化

while rclpy.ok(): # ROS2系统是否正常运行
node.get_logger().info("Hello World") # ROS2日志输出
time.sleep(0.5) # 休眠控制循环时间
node.destroy_node() # 销毁节点对象
rclpy.shutdown() # 关闭ROS2 Python接口
初始化

rclpy.init(*, args: List[str] | None = None, context: Context | None = None, domain_id: int | None = None, signal_handler_options: rpyutils.import_c_library.SignalHandlerOptions | None = None)→ InitContextManager

直接初始化比上下文初始化拥有更多的参数。

Parameters:

  • args – 命令行参数列表。
  • context – 要初始化的上下文。如果为 None,则使用默认上下文。
  • domain_id – ROS 域 ID。
  • signal_handler_options – 指示要安装的信号处理程序。如果 None,则在初始化默认上下文时将安装 SIGINT 和 SIGTERM。【类似于 Python 的信号(signal)机制,是操作系统发出的异步事件。而非异常,异常是程序内部发生的同步事件。】

Returns:

  • 一个 InitContextManager,可以与 Python 上下文管理器一起使用进行清理。
循环执行

ROS 2 中的 Executor(执行器)是用来管理和运行 ROS 2 节点中所有回调函数的核心组件。你可以把它看作是一个调度器,它决定了节点里的各种任务(例如接收新消息、处理服务请求、定时器触发)何时执行。

Executor 负责调度和执行节点中的回调函数。一个 ROS 2 节点可以有多个回调函数,比如:

  • 订阅者的回调函数:每当收到一个新消息时被调用。

  • 服务的请求回调函数:每当收到一个服务请求时被调用。

  • 定时器的回调函数:每隔一定时间被调用。

节点本身并不会自动运行这些回调函数,它需要一个 Executor 来 "spin"(自旋),也就是不断地检查是否有就绪的任务并执行它们。ROS 2 将节点和执行器分开,提供了灵活性。你可以选择不同的 Executor 来适应你的应用需求:

  • SingleThreadedExecutor:这是最简单的执行器,它在一个单独的线程里顺序执行所有就绪的回调函数。

  • MultiThreadedExecutor:这个执行器使用一个线程池,可以同时执行多个回调函数,这对于需要高并发处理的节点非常有用。

rclpy.spin(node: Node, executor: Executor | None = None)→ None:执行工作,直到与执行器关联的上下文关闭。

  • node – 要添加到执行器以检查工作的节点。
  • 要使用的执行器,如果是 None,则为全局执行器。

rclpy.spin_once(node: Node, *, executor: Executor | None = None, timeout_sec: float | None = None)→ None:执行一项工作或等待超时到期。

只要该回调在超时到期之前准备就绪,提供的执行器就会执行一个回调。如果未提供执行器(即 None),则使用全局执行器。如果全局执行器具有部分完成的协程,则完成的工作可能是针对提供的节点以外的节点。

  • node – 要添加到执行器以检查工作的节点。
  • executor – 要使用的执行器,如果是 None,则为全局执行器。
  • timeout_sec – 等待几秒钟。如果为 “无” 或“负数,则永久阻止”。如果为 0,请不要等待。

rclpy.spin_until_future_complete(node: Node, future: Future[Any], executor: Executor | None = None, timeout_sec: float | None = None)→ None : 执行工作,直到Future完成。

  • node – 要添加到执行器以检查工作的节点。
  • 要等待的 future 对象。直到 future.done() 返回 True 或与执行器关联的上下文为 shutdown。
  • 要使用的执行器,如果是 None,则为全局执行器。
  • 等待几秒钟。如果为 None 或负数,则阻止直到未来完成。如果为 0,请不要等待。
关闭

rclpy.shutdown(*, context: Context | None = None, uninstall_handlers: bool | None = None)→ None

关闭以前初始化的上下文(如果尚未关闭)。这也将关闭全局执行器,如果已关闭会报错。

  • context – 要失效的上下文。如果为 None,则使用默认上下文
  • uninstall_handlers – 如果为 None,则在关闭默认上下文时将卸载信号处理程序。如果为 True,则将卸载信号处理程序。如果为 False,则不会卸载信号处理程序。和前面signal_handler_options对应上。

rclpy.try_shutdown(*, context: Context | None = None, uninstall_handlers: bool | None = None)→ None

关闭以前初始化的上下文(如果尚未关闭)。这也将关闭全局执行器,如果已关闭不会报错。

  • context – 要失效的上下文。如果为 None,则使用默认上下文
  • uninstall_handlers – 如果为 None,则在关闭默认上下文时将卸载信号处理程序。如果为 True,则将卸载信号处理程序。如果为 False,则不会卸载信号处理程序。和前面signal_handler_options对应上。

上下文

上下文管理器Context与直接使用rclpy基本一致,但背后其实是线程管理,每个独立的上下文管理器都是独立的线程。

如果使用rclpy.init(args=args)则无法完成创建2个不同的domian_id的节点。

from rclpy.node import Node                      # ROS2 节点类
from rclpy.context import Context # ROS2 上下文类
import time

def main(args=None): # ROS2节点主入口main函数
# 使用Context上下文管理器的正确方式
context = Context()
context.init(args,domain_id=1)
node = Node("node_helloworld", context=context) # 创建节点并关联上下文

context2 = Context()
context2.init(args,domain_id=2)
node2 = Node("node_helloworld", context=context2) # 创建节点并关联上下文

while context.ok() and context2.ok(): # 检查上下文是否正常
node.get_logger().info("{}".format(context.get_domain_id())) # ROS2日志输出
node2.get_logger().info("{}".format(context2.get_domain_id())) # ROS2日志输出
time.sleep(0.5) # 休眠控制循环时间
context2.shutdown()
context.shutdown()

使用上下文管理器Context 的with语法可以自动init(不支持参数),并在with语句结束时调用shutdown,代码更加简洁。

类似于Python的 with openclose

from rclpy.node import Node                      # ROS2 节点类
from rclpy.context import Context # ROS2 上下文类
import time

def main(args=None): # ROS2节点主入口main函数
# 使用Context上下文管理器的正确方式
with Context() as context:
node = Node("node_helloworld", context=context) # 创建节点并关联上下文
while context.ok(): # 检查上下文是否正常
# 输出当前的domain id ,默认是0
# 修改domain id方式1 :修改当前终端变量
# export ROS_DOMAIN_ID=1
node.get_logger().info("{}".format(context.get_domain_id()))
time.sleep(0.5) # 休眠控制循环时间
初始化

context.init(args: List[str] | None = None, *, initialize_logging: bool = True, domain_id: int | None = None)→ None

为给定上下文初始化 ROS 通信。

  • args – 命令行参数列表。
  • initialize_logging – 是否初始化整个过程的日志记录。默认值是初始化日志记录。
  • domain_id – 要用于此上下文的域 ID。如果为无(默认值),则使用域 ID 0。
状态检察

context.get_domain_id()→ int:获取上下文的domain ID。

context.ok()→ bool:检查上下文是否未关闭。

跟踪

track_node(node: Node)→ None:跟踪与此上下文关联的节点。当上下文被销毁时,它将销毁它跟踪的每个节点。

untrack_node(node: Node)→ None:如果一个节点在上下文之前被销毁,我们不再需要跟踪它是否销毁了上下文,所以在此处将其删除。

关闭上下文

context.shutdown()→ None:关闭此上下文,会等待正在进行的操作完成,然后清理资源。好比执行系统关机,如果已关闭会报错。

on_shutdown(callback: Callable[[], None])→ None:添加关机时调用的回调。

try_shutdown()→ None:尝试关闭此上下文(如果尚未关闭),如果已关闭也不会报错。

context.destroy()→ None:销毁上下文,立即释放资源,不等待正在进行的操作。好比直接断电。

通信示例

ROS2的基础通信类型的示例代码都在ros2_21_tutorials仓库中。拉取到src文件夹中,参考上文编译、运行即可。

话题

  • 运行话题的发布者节点 : ros2 run learning_topic topic_helloworld_pub
  • 运行话题的订阅者节点 : ros2 run learning_topic topic_helloworld_sub

可以看到发布者循环发布“Hello World”字符串消息,订阅者也以几乎同样的频率收到该话题的消息数据。

服务

  • 运行服务的服务端节点 : ros2 run learning_service service_adder_server
  • 运行服务的客户端节点 : ros2 run learning_service service_adder_client 2 3

动作

  • 运行动作的服务端节点 : ros2 run learning_action action_move_server
  • 运行动作的客户端节点 : ros2 run learning_action action_move_client

参数

等价于机器学习中的超参数即:在开始训练前设置的参数,例如:学习率、隐藏层大小、迭代次数等等。

ROS机器人系统中的参数是全局字典,可以运行多个节点中共享数据。
  • 运行节点 :ros2 run learning_parameter param_declare
  • 在另外终端设置参数 :ros2 param set param_declare robot_name turtle

Gazebo

Gazebo是一款开源的3D仿真软件,用于模拟机器人在复杂环境中的行为。它提供了丰富的物理引擎和传感器模型,可以模拟机器人的运动、碰撞、传感器数据等。

# 安装gz
sudo apt-get install ros-jazzy-ros-gz

# 测试(参数 empty.sdf 表示空场景)
# 测试能否正常启动且无卡顿,gz_args:=empty.sdf表示使用空场景
ros2 launch ros_gz_sim gz_sim.launch.py gz_args:=empty.sdf

gazebosim有大量下载好的模型(models)与场景(worlds)可以使用。

想简单点就选一个自带摄像头的机器人模型,如果有中意的机器人模型没摄像头的,可以自己加一个摄像头。

选赛道类的场景,有明显的车道线,可以较好的测试算法。如果场景过于简单可以适当的添加一些模型,例如:路灯、树木、箱子等。

URDF

URDF(Unified Robot Description Format)是一种用于描述机器人模型的XML格式。

一个带摄像头的机器人模型示例:

my_robot.sdf
<?xml version="1.0" ?>
<robot name="my_robot">

<!-- 机器人底盘 - 正方体 -->
<link name="base_link">
<visual>
<geometry>
<box size="0.2 0.2 0.1" />
</geometry>
<material name="blue">
<color rgba="0 0 1 1"/>
</material>
</visual>
<collision>
<geometry>
<box size="0.2 0.2 0.1" />
</geometry>
</collision>
<inertial>
<mass value="1.0"/>
<inertia ixx="0.01" ixy="0" ixz="0" iyy="0.01" iyz="0" izz="0.01"/>
</inertial>
</link>

<!-- 摄像头 -->
<link name="camera_link">
<visual>
<geometry>
<box size="0.05 0.05 0.03" />
</geometry>
<material name="red">
<color rgba="1 0 0 1"/>
</material>
</visual>
<collision>
<geometry>
<box size="0.05 0.05 0.03" />
</geometry>
</collision>
<inertial>
<mass value="0.1"/>
<inertia ixx="0.001" ixy="0" ixz="0" iyy="0.001" iyz="0" izz="0.001"/>
</inertial>
</link>

<!-- 连接底盘和摄像头 -->
<joint name="base_to_camera" type="fixed">
<parent link="base_link"/>
<child link="camera_link"/>
<origin xyz="0.1 0 0.08" rpy="0 0 0"/>
</joint>

<!-- Gazebo摄像头传感器插件 - 关键部分:发布图像数据到ROS话题 -->
<gazebo reference="camera_link">
<sensor type="camera" name="camera1">
<update_rate>30.0</update_rate>
<camera name="head">
<horizontal_fov>1.3962634</horizontal_fov>
<image>
<width>640</width>
<height>480</height>
<format>R8G8B8</format>
</image>
<clip>
<near>0.02</near>
<far>300</far>
</clip>
</camera>
<!-- 重要:这个插件将摄像头数据发布到ROS2话题 -->
<plugin name="camera_controller" filename="libgazebo_ros_camera.so">
<alwaysOn>true</alwaysOn>
<updateRate>0.0</updateRate>
<!-- 重要:摄像头名称,可以修改,需要与算法节点中的摄像头名称一致 -->
<cameraName>camera</cameraName>
<!-- 重要:图像话题名称,可以修改,需要与算法节点中的话题名称一致 -->
<imageTopicName>image_raw</imageTopicName>
<cameraInfoTopicName>camera_info</cameraInfoTopicName>
<frameName>camera_link</frameName>
</plugin>
</sensor>
</gazebo>

<!-- 差速驱动插件 - 接收运动控制指令 -->
<gazebo>
<plugin name="differential_drive_controller" filename="libgazebo_ros_diff_drive.so">
<alwaysOn>true</alwaysOn>
<updateRate>20</updateRate>
<leftJoint>left_wheel_joint</leftJoint>
<rightJoint>right_wheel_joint</rightJoint>
<wheelSeparation>0.2</wheelSeparation>
<wheelDiameter>0.1</wheelDiameter>
<torque>20</torque>
<!-- 重要:运动控制话题名称,可以修改,需要与算法节点中的话题名称一致 -->
<commandTopic>cmd_vel</commandTopic>
<odometryTopic>odom</odometryTopic>
<odometryFrame>odom</odometryFrame>
<robotBaseFrame>base_link</robotBaseFrame>
</plugin>
</gazebo>

</robot>
  1. 摄像头插件libgazebo_ros_camera.so 插件会自动将摄像头数据发布到 /camera/image_raw 话题

  2. 运动控制插件libgazebo_ros_diff_drive.so 插件会监听 /cmd_vel 话题来控制机器人运动

.so 是一个共享库文件(Shared Library),这是Linux系统中的动态链接库格式,类似于Windows下的.dll文件。这个文件来自于ROS2的Gazebo集成包,随着Gazebo被一同安装。

  1. 数据流向
    • 摄像头 → /camera/image_raw 话题 → 你的视觉算法节点
    • 你的算法节点 → /cmd_vel 话题 → 机器人运动控制

以上所有内容,如果你不会做或者不想做,可以直接使用别人提供的模型和场景。重点是编写算法节点。

算法节点

这个项目中涉及到2种通信信息格式的转换:

从ROS2_Gazebo流出的传感器数据,需要转为Python能处理的数据格式。

从算法节点流入ROS2_Gazebo的控制信息,需要是ROS2_Gazebo能理解的数据格式。

图像格式

ROS2中的图像是格式为sensor_msgs/Image的消息类型,Python开发中需要图像数据类型为数组。

数据流向:ROS消息 → cv_bridge → numpy数组 → cv2 → 处理结果 → 控制算法

运动控制格式

Gazebo中支持的运动控制消息类型为geometry_msgs/Twist。因此只要将Python处理后的数据,转为geometry_msgs/Twist,就可以发布到Gazebo中。

代码示例

下面代码实现了一个视觉运动控制节点,从/camera/image_raw话题订阅图像数据,并处理后发布到/cmd_vel话题。

主要功能:基于颜色的简单目标检测。

  • 目标在左侧 → 向左转
  • 目标在右侧 → 向右转
  • 目标在中央且较远 → 前进
  • 目标在中央且较近 → 后退
  • 没有目标 → 旋转搜索
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2
import numpy as np
from geometry_msgs.msg import Twist

class VisualMotionController(Node):
def __init__(self):
super().__init__('visual_motion_controller')

# 创建图像订阅者 - 接收指定节点数据
self.image_subscription = self.create_subscription(
Image,
'/camera/image_raw', # 可配置的输入话题
self.image_callback,
10
)

# 创建运动控制发布者 - 发送到指定节点
self.velocity_publisher = self.create_publisher(
Twist,
'/cmd_vel', # 可配置的输出话题
10
)

# OpenCV桥接器
self.bridge = CvBridge()

# 算法参数
self.image_center_x = 320 # 图像中心X坐标(假设640x480图像)
self.target_area_threshold = 500 # 目标最小面积

# 运动状态
self.move_speed = 0.2 # 基础移动速度
self.turn_speed = 0.5 # 基础转向速度

self.get_logger().info('视觉运动控制节点已启动')

def image_callback(self, msg):
"""接收并处理图像数据"""
try:
# 将ROS图像消息转换为OpenCV格式
cv_image = self.bridge.imgmsg_to_cv2(msg, "bgr8")

# 图像处理并生成控制指令
self.process_and_control(cv_image)

except Exception as e:
self.get_logger().error(f'图像处理错误: {e}')

def process_and_control(self, image):
"""处理图像并生成运动控制指令"""
# 获取图像尺寸
height, width = image.shape[:2]
center_x = width // 2
center_y = height // 2

# 简单的颜色检测 - 检测绿色目标
target_detected, target_x, target_y, target_area = self.detect_green_target(image)

cmd = Twist()

if target_detected:
# 根据目标位置决定运动方向
self.get_logger().info(f'检测到目标: 位置({target_x}, {target_y}), 面积: {target_area}')

# 水平方向判断:左右移动
if target_x < center_x - 50: # 目标在左侧
self.move_left(cmd)
self.get_logger().info('向左转')
elif target_x > center_x + 50: # 目标在右侧
self.move_right(cmd)
self.get_logger().info('向右转')
else:
# 目标在中央,根据面积决定前进或后退
if target_area < 1000: # 目标较远
self.move_forward(cmd)
self.get_logger().info('前进')
elif target_area > 5000: # 目标较近
self.move_backward(cmd)
self.get_logger().info('后退')
else:
self.stop(cmd)
self.get_logger().info('停止 - 目标在合适距离')
else:
# 没有检测到目标,原地旋转搜索
self.search_rotate(cmd)
self.get_logger().info('搜索目标中...')

# 发布运动指令
self.publish_control_command(cmd)

def detect_green_target(self, image):
"""检测绿色目标"""
# 转换到HSV色彩空间
hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)

# 绿色的HSV范围
lower_green = np.array([40, 50, 50])
upper_green = np.array([80, 255, 255])

# 创建掩码
mask = cv2.inRange(hsv, lower_green, upper_green)

# 形态学处理,去除噪声
kernel = np.ones((5,5), np.uint8)
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)

# 查找轮廓
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

if contours:
# 找到最大轮廓
largest_contour = max(contours, key=cv2.contourArea)
area = cv2.contourArea(largest_contour)

# 过滤太小的目标
if area > self.target_area_threshold:
# 计算目标中心
M = cv2.moments(largest_contour)
if M["m00"] != 0:
cx = int(M["m10"] / M["m00"])
cy = int(M["m01"] / M["m00"])
return True, cx, cy, area

return False, 0, 0, 0

def move_forward(self, cmd):
"""前进"""
cmd.linear.x = self.move_speed
cmd.angular.z = 0.0

def move_backward(self, cmd):
"""后退"""
cmd.linear.x = -self.move_speed
cmd.angular.z = 0.0

def move_left(self, cmd):
"""向左转"""
cmd.linear.x = 0.0
cmd.angular.z = self.turn_speed

def move_right(self, cmd):
"""向右转"""
cmd.linear.x = 0.0
cmd.angular.z = -self.turn_speed

def stop(self, cmd):
"""停止"""
cmd.linear.x = 0.0
cmd.angular.z = 0.0

def search_rotate(self, cmd):
"""搜索模式 - 原地旋转"""
cmd.linear.x = 0.0
cmd.angular.z = 0.3 # 慢速旋转搜索

def publish_control_command(self, cmd):
"""发布控制指令到指定节点"""
self.velocity_publisher.publish(cmd)


def main(args=None):
"""主函数"""
rclpy.init(args=args)

# 创建视觉运动控制节点
controller = VisualMotionController()

try:
# 启动节点
rclpy.spin(controller)
except KeyboardInterrupt:
controller.get_logger().info('节点被用户中断')
finally:
# 发送停止指令
stop_cmd = Twist()
controller.publish_control_command(stop_cmd)
# 清理资源
controller.destroy_node()
rclpy.shutdown()


启动命令:

# 编译工作空间
colcon build --packages-select visual_navigation

# 刷新环境
source install/setup.bash

# 启动Gazebo仿真环境
ros2 launch turtlebot3_gazebo turtlebot3_world.launch.py

# 在另一个终端启动视觉控制节点
ros2 run visual_navigation visual_motion_controller

# 或者如果你创建了launch文件
ros2 launch visual_navigation visual_control.launch.py

调试命令

# 查看活跃的话题
ros2 topic list

# 查看图像话题数据
ros2 topic echo /camera/image_raw

# 查看速度控制话题
ros2 topic echo /cmd_vel

# 手动发送控制指令测试
ros2 topic pub /cmd_vel geometry_msgs/Twist "linear: {x: 0.2, y: 0.0, z: 0.0}, angular: {x: 0.0, y: 0.0, z: 0.0}"

# 查看节点信息
ros2 node info /visual_motion_controller