Files
daohang/scripts/run_simulation.py
2026-02-21 10:25:43 +08:00

362 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
ROS1 + MuJoCo VLA 3D 导航 — 全向移动 + 前向扫描 + 避障 + 目标点导航
圆柱体小车: vx, vy, wz 全向移动
前向范围扫描: mj_ray 射线检测
避障: 基于距离的势场/向量场
"""
import mujoco
import mujoco.viewer
import numpy as np
import os
SCENE_PATH = os.path.join(os.path.dirname(__file__), "..", "mujoco_scenes", "cylinder_obstacles.xml")
# ============ 可配置参数 ============
NUM_RAYS = 19 # 前向扫描射线数量(-90 ~ +90 deg
RAY_ANGLE_SPAN = np.pi # 扫描角度跨度 (rad)
OBSTACLE_THRESHOLD = 0.45 # 障碍判定距离 (m)
SAFE_DISTANCE = 0.65 # 安全距离
GOAL_THRESHOLD = 0.2 # 到达目标判定距离
MAX_LINEAR = 0.6 # 最大线速度
MAX_ANGULAR = 0.8 # 最大角速度
K_ATTRACT = 1.0 # 目标吸引力
K_REPEL = 0.6 # 障碍排斥力
K_YAW_GOAL = 0.8 # 目标对准角速度增益
MIN_LINEAR = 0.18 # 最小线速度,避免卡住
STUCK_THRESH = 0.03 # 位移阈值,低于此认为卡住 (m)
STUCK_STEPS = 100 # 连续卡住步数触发绕行
GOAL_POINTS = [] # 初始无目标
GOAL_AHEAD_DIST = 2.0 # G 键:目标 = 车头前方 N 米
GOAL_MAX_R = 5.0 # 目标点边界:距离原点不超过此值 (m)
def _clamp_goal(gx, gy):
"""目标点边界:限制在距离原点 GOAL_MAX_R 以内"""
r = np.sqrt(gx * gx + gy * gy) + 1e-8
if r > GOAL_MAX_R:
scale = GOAL_MAX_R / r
return (float(gx * scale), float(gy * scale))
return (float(gx), float(gy))
def get_agent_state(data, body_id):
"""获取小车位姿: x, y, yaw"""
qpos = data.qpos
x, y = qpos[0], qpos[1]
yaw = qpos[2]
return x, y, yaw
def forward_ray_scan(model, data, body_id, site_id, num_rays, angle_span):
"""
前向范围扫描: 在车体前方扇形区域内发射射线
返回: (距离数组, 角度数组)
"""
site_xpos = data.site_xpos[site_id].copy()
xmat = np.array(data.xmat[body_id]).reshape(3, 3)
forward = xmat[:, 0] # 前向 (body +X)
angles = np.linspace(-angle_span / 2, angle_span / 2, num_rays)
distances = np.full(num_rays, 10.0)
for i, theta in enumerate(angles):
c, s = np.cos(theta), np.sin(theta)
ray_dir = c * forward + s * xmat[:, 1] # 绕 body Z 旋转
ray_dir = ray_dir / (np.linalg.norm(ray_dir) + 1e-8)
geomid = np.array([-1], dtype=np.int32)
d = mujoco.mj_ray(model, data, site_xpos, ray_dir, None, 1, body_id, geomid)
if d >= 0:
distances[i] = d
return distances, angles
def obstacle_avoidance_vel(distances, angles):
"""
避障速度 + 绕行分量。被挡时朝开阔方向加速,而非只排斥
返回: (vx_avoid, vy_avoid, wz_avoid)
"""
min_d = np.min(distances)
if min_d > OBSTACLE_THRESHOLD:
return 0.0, 0.0, 0.0
vx, vy, wz = 0.0, 0.0, 0.0
safe_sector = np.argmax(distances)
best_theta = angles[safe_sector]
for d, theta in zip(distances, angles):
if d < OBSTACLE_THRESHOLD:
ratio = 1.0 - d / OBSTACLE_THRESHOLD
strength = K_REPEL * (ratio ** 2)
vx -= strength * np.cos(theta)
vy -= strength * np.sin(theta)
elif d < SAFE_DISTANCE:
ratio = (SAFE_DISTANCE - d) / (SAFE_DISTANCE - OBSTACLE_THRESHOLD)
strength = K_REPEL * 0.15 * ratio
wz += strength * np.clip(best_theta - theta, -1, 1)
# 绕行:朝最开阔方向加正向速度,避免只退不绕
bypass = 0.35
vx += bypass * np.cos(best_theta)
vy += bypass * np.sin(best_theta)
wz += 0.4 * best_theta
return vx, vy, wz
def goal_attraction_vel(x, y, yaw, goal_x, goal_y):
"""
目标吸引力: 在车体坐标系下计算朝目标的 vx, vy, wz
优先保持线速度,角速度仅用于微调朝向
"""
dx = goal_x - x
dy = goal_y - y
dist = np.sqrt(dx * dx + dy * dy) + 1e-6
# 世界系下期望方向
gx_w = dx / dist
gy_w = dy / dist
# 转到车体系
c, s = np.cos(-yaw), np.sin(-yaw)
gx_b = c * gx_w - s * gy_w
gy_b = s * gx_w + c * gy_w
# 线速度:朝目标,随距离平滑
scale = np.tanh(dist) * 0.8 + 0.2
vx = K_ATTRACT * scale * gx_b
vy = K_ATTRACT * scale * gy_b
# 角速度:弱增益,避免只转不走
target_yaw = np.arctan2(dy, dx)
yaw_err = np.arctan2(np.sin(target_yaw - yaw), np.cos(target_yaw - yaw))
wz = K_YAW_GOAL * np.tanh(yaw_err)
return vx, vy, wz
def blend_and_clamp(vx_a, vy_a, wz_a, vx_g, vy_g, wz_g, dist_to_goal, min_d, stuck):
"""融合避障与目标速度,限幅,卡住时强制最小速度"""
vx = vx_a + vx_g
vy = vy_a + vy_g
wz = wz_a + wz_g
lin = np.sqrt(vx * vx + vy * vy)
# 未到目标且线速度过小:强制最小速度
if dist_to_goal > GOAL_THRESHOLD and lin < MIN_LINEAR:
if lin > 1e-6:
vx *= MIN_LINEAR / lin
vy *= MIN_LINEAR / lin
else:
# 融合后接近零:优先朝目标,被挡时朝避障方向
ax, ay = vx_g + vx_a, vy_g + vy_a
anorm = np.sqrt(ax * ax + ay * ay) + 1e-8
vx = MIN_LINEAR * ax / anorm
vy = MIN_LINEAR * ay / anorm
lin = np.sqrt(vx * vx + vy * vy)
if lin > MAX_LINEAR:
scale = MAX_LINEAR / lin
vx *= scale
vy *= scale
wz = np.clip(wz, -MAX_ANGULAR, MAX_ANGULAR)
return vx, vy, wz
class VelocityFilter:
"""EMA 滤波 + 速率限制,平滑控制输出"""
def __init__(self, alpha=0.75, max_dv=0.15, max_dw=0.2):
self.alpha = alpha # 滤波系数,越大越平滑
self.max_dv = max_dv # 每步最大线速度变化
self.max_dw = max_dw # 每步最大角速度变化
self.vx, self.vy, self.wz = 0.0, 0.0, 0.0
def update(self, vx_cmd, vy_cmd, wz_cmd):
# 1. EMA 滤波
vx_f = self.alpha * self.vx + (1 - self.alpha) * vx_cmd
vy_f = self.alpha * self.vy + (1 - self.alpha) * vy_cmd
wz_f = self.alpha * self.wz + (1 - self.alpha) * wz_cmd
# 2. 速率限制
dvx = np.clip(vx_f - self.vx, -self.max_dv, self.max_dv)
dvy = np.clip(vy_f - self.vy, -self.max_dv, self.max_dv)
dwz = np.clip(wz_f - self.wz, -self.max_dw, self.max_dw)
self.vx += dvx
self.vy += dvy
self.wz += dwz
return self.vx, self.vy, self.wz
def main():
model = mujoco.MjModel.from_xml_path(SCENE_PATH)
data = mujoco.MjData(model)
body_id = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_BODY, "cylinder_agent")
site_id = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_SITE, "ray_origin")
# 执行器索引
act_ids = [
mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_ACTUATOR, "vel_x"),
mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_ACTUATOR, "vel_y"),
mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_ACTUATOR, "vel_yaw"),
]
goals = GOAL_POINTS.copy()
goal_idx = 0 if goals else -1
vel_filter = VelocityFilter(alpha=0.70, max_dv=0.18, max_dw=0.2)
last_x, last_y = 0.0, 0.0
stuck_cnt = 0
goal_joint_id = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_JOINT, "goal_joint")
goal_qposadr = model.jnt_qposadr[goal_joint_id]
data.qpos[goal_qposadr : goal_qposadr + 3] = [-99, -99, 0.1] # 初始隐藏
print("=" * 55)
print("VLA 3D 导航 - 全向移动 + 前向扫描 + 避障")
print("=" * 55)
print("G: 车头前方 {:.0f}m | C: 相机视线 | 目标边界 R<{:.0f}m".format(GOAL_AHEAD_DIST, GOAL_MAX_R))
print("ESC 退出")
print("=" * 55)
add_goal_ahead = [False]
add_goal_camera = [False]
def key_cb(keycode):
if keycode == 71: # G
add_goal_ahead[0] = True
elif keycode == 67: # C: 相机视线与地面交点
add_goal_camera[0] = True
def _goal_from_camera(viewer_handle):
"""相机视线与 z=0 地面交点(旋转相机后按 C"""
cam = viewer_handle.cam
lookat = np.array(cam.lookat)
dist = float(cam.distance)
az = np.deg2rad(float(cam.azimuth))
el = np.deg2rad(float(cam.elevation))
# 相机位置 = lookat - dist * 前向单位向量
fx = np.cos(el) * np.sin(az)
fy = -np.cos(el) * np.cos(az)
fz = np.sin(el)
cam_pos = lookat - dist * np.array([fx, fy, fz])
# 射线与 z=0 交点: cam_pos + t*[fx,fy,fz], 令 z=0
if abs(fz) < 1e-6:
return None
t = -cam_pos[2] / fz
if t < 0:
return None
pt = cam_pos + t * np.array([fx, fy, fz])
return (float(pt[0]), float(pt[1]))
def control_callback():
nonlocal goal_idx, last_x, last_y, stuck_cnt
if add_goal_ahead[0]:
add_goal_ahead[0] = False
x, y, yaw = get_agent_state(data, body_id)
gx = x + GOAL_AHEAD_DIST * np.cos(yaw)
gy = y + GOAL_AHEAD_DIST * np.sin(yaw)
gx, gy = _clamp_goal(gx, gy)
goals.clear()
goals.append((gx, gy))
goal_idx = 0
vel_filter.vx = vel_filter.vy = vel_filter.wz = 0.0
print(" 目标: ({:.2f}, {:.2f}) [G=车头前方]".format(gx, gy))
elif add_goal_camera[0]:
add_goal_camera[0] = False
g = _goal_from_camera(viewer)
if g is not None:
g = _clamp_goal(g[0], g[1])
goals.clear()
goals.append(g)
goal_idx = 0
vel_filter.vx = vel_filter.vy = vel_filter.wz = 0.0
print(" 目标: ({:.2f}, {:.2f}) [C=相机视线]".format(g[0], g[1]))
else:
print(" [C] 相机未指向地面,请调整视角后重试")
if goal_idx < 0 or not goals:
data.ctrl[act_ids[0]] = 0
data.ctrl[act_ids[1]] = 0
data.ctrl[act_ids[2]] = 0
vel_filter.vx = vel_filter.vy = vel_filter.wz = 0.0
data.qpos[goal_qposadr : goal_qposadr + 3] = [-99, -99, 0.1] # 藏起标记
return
x, y, yaw = get_agent_state(data, body_id)
goal_x, goal_y = goals[goal_idx]
dist_to_goal = np.sqrt((goal_x - x) ** 2 + (goal_y - y) ** 2)
if dist_to_goal < GOAL_THRESHOLD:
goals.clear()
goal_idx = -1
vel_filter.vx = vel_filter.vy = vel_filter.wz = 0.0
print(" 已到达,停止。按 G 设置新目标")
data.ctrl[act_ids[0]] = 0
data.ctrl[act_ids[1]] = 0
data.ctrl[act_ids[2]] = 0
data.qpos[goal_qposadr : goal_qposadr + 3] = [-99, -99, 0.1]
return
# 更新目标点标记位置
data.qpos[goal_qposadr] = goal_x
data.qpos[goal_qposadr + 1] = goal_y
data.qpos[goal_qposadr + 2] = 0.15
# 1. 前向范围扫描
distances, angles = forward_ray_scan(
model, data, body_id, site_id, NUM_RAYS, RAY_ANGLE_SPAN
)
min_d = np.min(distances)
# 2. 卡住检测
moved = np.sqrt((x - last_x) ** 2 + (y - last_y) ** 2)
if moved < STUCK_THRESH and dist_to_goal > GOAL_THRESHOLD:
stuck_cnt += 1
else:
stuck_cnt = 0
last_x, last_y = x, y
stuck = stuck_cnt > STUCK_STEPS
# 3. 避障速度(含绕行分量)
vx_a, vy_a, wz_a = obstacle_avoidance_vel(distances, angles)
# 4. 目标吸引速度
vx_g, vy_g, wz_g = goal_attraction_vel(x, y, yaw, goal_x, goal_y)
# 5. 融合、最小速度、限幅
vx, vy, wz = blend_and_clamp(
vx_a, vy_a, wz_a, vx_g, vy_g, wz_g, dist_to_goal, min_d, stuck
)
# 6. 滤波 + 速率限制(卡住时放宽)
max_dv = 0.25 if stuck else 0.18
max_dw = 0.28 if stuck else 0.2
vel_filter.max_dv = max_dv
vel_filter.max_dw = max_dw
vx, vy, wz = vel_filter.update(vx, vy, wz)
# 车体系 -> 世界系slide_x/y 沿世界轴)
c, s = np.cos(yaw), np.sin(yaw)
vx_w = vx * c - vy * s
vy_w = vx * s + vy * c
data.ctrl[act_ids[0]] = vx_w
data.ctrl[act_ids[1]] = vy_w
data.ctrl[act_ids[2]] = wz
with mujoco.viewer.launch_passive(model, data, key_callback=key_cb) as viewer:
while viewer.is_running():
mujoco.mj_forward(model, data)
control_callback()
mujoco.mj_step(model, data)
viewer.sync()
if __name__ == "__main__":
main()