Files
daohang/scripts/run_simulation.py
2026-02-21 10:47:13 +08:00

594 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
MuJoCo 全向小车导航(仿真深度图 + DDA 避障 + 目标点导航)
- 输入仿真深度图mj_ray 网格)或可替换为真实深感相机的 depth/pointcloud
- depth_to_sectors深度 → 1D 扇区最近距离(鲁棒统计)
- dda_avoidance_from_depthDirectional Distance Avoidance 避障
- vx, vy, wz 全向移动(车体系 → 世界系执行器)
- 便于 ROS1 + RealSense/Orbbec/ZED 等真实相机落地
"""
from __future__ import annotations
import os
from dataclasses import dataclass
from typing import Optional, Tuple, List
import mujoco
import mujoco.viewer
import numpy as np
SCENE_PATH = os.path.join(os.path.dirname(__file__), "..", "mujoco_scenes", "cylinder_obstacles.xml")
# ===================== 参数配置 =====================
@dataclass
class NavCfg:
# depth sector仿真/真实深度图统一接口)
hfov_deg: float = 86.0 # 水平视场角(如 RealSense D435
num_sectors: int = 61 # 扇区数
depth_z_min: float = 0.1
depth_z_max: float = 6.0
depth_percentile: float = 5.0 # 鲁棒最小值5% 分位
# DDA 避障
obstacle_threshold: float = 0.45 # stop_dist
safe_distance: float = 0.65 # safe_dist
dda_window: int = 2 # move_theta 附近 ±N 扇区
speed_adaptive_k: float = 0.15 # 速度自适应safe += k*speed
goal_threshold: float = 0.20
# limits
max_linear: float = 0.60
max_angular: float = 0.80
min_linear: float = 0.18 # 防止“只转不走”或输出趋零卡死
# gains
k_attract: float = 1.0
k_repel: float = 0.6
k_yaw_goal: float = 0.8
# 转向策略:大角度先转后走,小角度边走边调
yaw_turn_first_deg: float = 35.0 # 超过此角度:先转向,几乎不走
# stuck detect
stuck_thresh: float = 0.03
stuck_steps: int = 100
# goal set
goal_ahead_dist: float = 2.0
goal_max_r: float = 5.0
# output smoothing
ema_alpha: float = 0.70
max_dv: float = 0.18
max_dw: float = 0.20
max_dv_stuck: float = 0.25
max_dw_stuck: float = 0.28
CFG = NavCfg()
# ===================== 工具函数 =====================
def clamp_goal(gx: float, gy: float, max_r: float) -> Tuple[float, float]:
r = float(np.hypot(gx, gy)) + 1e-9
if r > max_r:
s = max_r / r
return float(gx * s), float(gy * s)
return float(gx), float(gy)
def wrap_pi(a: float) -> float:
# 更稳定的 [-pi, pi]
return float(np.arctan2(np.sin(a), np.cos(a)))
def get_agent_state(data: mujoco.MjData) -> Tuple[float, float, float]:
# 假设 qpos: x, y, yaw 在 0,1,2
qpos = data.qpos
return float(qpos[0]), float(qpos[1]), float(qpos[2])
def goal_from_camera(viewer_handle) -> Optional[Tuple[float, float]]:
"""相机视线与 z=0 平面交点(旋转相机后按 C"""
cam = viewer_handle.cam
lookat = np.array(cam.lookat, dtype=float)
dist = float(cam.distance)
az = np.deg2rad(float(cam.azimuth))
el = np.deg2rad(float(cam.elevation))
# MuJoCo viewer 的相机方向约定:这里延续你原实现
fx = np.cos(el) * np.sin(az)
fy = -np.cos(el) * np.cos(az)
fz = np.sin(el)
cam_pos = lookat - dist * np.array([fx, fy, fz], dtype=float)
if abs(fz) < 1e-8:
return None
t = -cam_pos[2] / fz
if t < 0:
return None
pt = cam_pos + t * np.array([fx, fy, fz], dtype=float)
return float(pt[0]), float(pt[1])
# ===================== 仿真深度图 + 扇区 + DDA =====================
def depth_to_sectors(
depth: np.ndarray,
hfov_rad: float,
num_sectors: int,
z_min: float = 0.1,
z_max: float = 6.0,
percentile: float = 5.0,
) -> Tuple[np.ndarray, np.ndarray]:
"""
深度图 → 1D 扇区最近距离(可直接用于真实深感相机)
depth: H×W (m)0/NaN 表示无效
"""
H, W = depth.shape
angles = np.linspace(-hfov_rad / 2, hfov_rad / 2, num_sectors, dtype=np.float64)
cols = np.linspace(0, W, num_sectors + 1).astype(int)
dists = np.full(num_sectors, z_max, dtype=np.float64)
for i in range(num_sectors):
c0, c1 = cols[i], cols[i + 1]
strip = depth[:, c0:c1].reshape(-1)
strip = strip[np.isfinite(strip)]
strip = strip[(strip > z_min) & (strip < z_max)]
if strip.size == 0:
continue
dists[i] = float(np.percentile(strip, percentile))
dists = np.minimum(dists, np.roll(dists, 1))
dists = np.minimum(dists, np.roll(dists, -1))
return dists, angles
class DepthSimulator:
"""用 mj_ray 生成仿真深度图(模拟深感相机输出)"""
def __init__(self, num_sectors: int, hfov_rad: float):
self.num_sectors = int(num_sectors)
self.angles = np.linspace(-hfov_rad / 2, hfov_rad / 2, self.num_sectors)
c = np.cos(self.angles)
s = np.sin(self.angles)
self.ray_dirs_body = np.stack([c, s, np.zeros_like(c)], axis=1).astype(np.float64)
self.depth = np.full((1, self.num_sectors), 10.0, dtype=np.float64)
self._geomid = np.array([-1], dtype=np.int32)
def render(
self,
model: mujoco.MjModel,
data: mujoco.MjData,
body_id: int,
site_id: int,
) -> np.ndarray:
site_xpos = data.site_xpos[site_id]
xmat = np.array(data.xmat[body_id], dtype=np.float64).reshape(3, 3)
ray_dirs_world = (xmat @ self.ray_dirs_body.T).T
norms = np.linalg.norm(ray_dirs_world, axis=1) + 1e-9
ray_dirs_world /= norms[:, None]
self.depth.fill(10.0)
for i in range(self.num_sectors):
d = mujoco.mj_ray(
model, data,
site_xpos, ray_dirs_world[i],
None, 1, body_id, self._geomid
)
if d >= 0:
self.depth[0, i] = float(d)
return self.depth
def dda_avoidance_from_depth(
dists: np.ndarray,
angles: np.ndarray,
vx_cmd: float,
vy_cmd: float,
cfg: NavCfg,
) -> Tuple[float, float, float]:
"""
Directional Distance Avoidance (DDA)
只关心「你要去的方向」是否过近;过近则削掉该方向分量 + 朝开阔方向侧移/转向
"""
speed = float(np.hypot(vx_cmd, vy_cmd))
if speed < 1e-6:
return 0.0, 0.0, 0.0
move_theta = float(np.arctan2(vy_cmd, vx_cmd))
N = len(dists)
i0 = int(np.argmin(np.abs(angles - move_theta)))
win = cfg.dda_window
idxs = [(i0 + k) % N for k in range(-win, win + 1)]
dmin_dir = float(np.min(dists[idxs]))
best_i = int(np.argmax(dists))
best_theta = float(angles[best_i])
stop_dist = cfg.obstacle_threshold
safe_dist = cfg.safe_distance
if hasattr(cfg, "speed_adaptive_k") and cfg.speed_adaptive_k > 0:
safe_dist = safe_dist + cfg.speed_adaptive_k * speed
stop_dist = stop_dist + 0.5 * cfg.speed_adaptive_k * speed
if dmin_dir >= safe_dist:
return 0.0, 0.0, 0.0
danger = (safe_dist - dmin_dir) / max(safe_dist - stop_dist, 1e-6)
danger = float(np.clip(danger, 0.0, 1.0))
vx_a = 0.0
vy_a = 0.0
wz_a = 0.0
mx = float(np.cos(move_theta))
my = float(np.sin(move_theta))
proj = vx_cmd * mx + vy_cmd * my
cut = (0.6 + 0.4 * danger) * proj
vx_a -= cut * mx
vy_a -= cut * my
bypass = 0.25 + 0.25 * danger
vx_a += bypass * np.cos(best_theta)
vy_a += bypass * np.sin(best_theta)
wz_a += (0.35 + 0.45 * danger) * best_theta
if dmin_dir < stop_dist:
vx_a -= vx_cmd * 0.9
vy_a -= vy_cmd * 0.9
wz_a += 0.6 * np.sign(best_theta if abs(best_theta) > 1e-3 else 1.0)
return vx_a, vy_a, wz_a
# ===================== 射线扫描旧接口DDA 下可弃用) =====================
class RayScanner:
"""
预计算扇形扫描的 body-frame 方向(基于 forward=[1,0,0], left=[0,1,0]
每帧用 body rotation (xmat) 转到 world减少每帧 cos/sin + 临时数组。
"""
def __init__(self, num_rays: int, angle_span: float):
self.num_rays = int(num_rays)
self.angle_span = float(angle_span)
self.angles = np.linspace(-self.angle_span / 2, self.angle_span / 2, self.num_rays)
c = np.cos(self.angles)
s = np.sin(self.angles)
# body-frame ray directions: dir_b = c*[1,0,0] + s*[0,1,0]
# shape (N,3)
self.ray_dirs_body = np.stack([c, s, np.zeros_like(c)], axis=1).astype(np.float64)
# buffers to avoid per-step alloc
self.distances = np.full(self.num_rays, 10.0, dtype=np.float64)
self._geomid = np.array([-1], dtype=np.int32)
def scan(
self,
model: mujoco.MjModel,
data: mujoco.MjData,
body_id: int,
site_id: int,
) -> Tuple[np.ndarray, np.ndarray]:
site_xpos = data.site_xpos[site_id] # view, no copy
xmat = np.array(data.xmat[body_id], dtype=np.float64).reshape(3, 3)
# world dirs: dir_w = xmat @ dir_b
# shape (N,3)
ray_dirs_world = (xmat @ self.ray_dirs_body.T).T
# normalize (理论上已单位,但数值上更稳一点)
norms = np.linalg.norm(ray_dirs_world, axis=1) + 1e-9
ray_dirs_world /= norms[:, None]
self.distances.fill(10.0)
for i in range(self.num_rays):
d = mujoco.mj_ray(
model, data,
site_xpos, ray_dirs_world[i],
None, 1, body_id, self._geomid
)
if d >= 0:
self.distances[i] = float(d)
return self.distances, self.angles
# ===================== 速度融合与滤波 =====================
class VelocityFilter:
"""EMA + 速率限制,输出更平滑,避免 ctrl 抖动。"""
def __init__(self, alpha: float, max_dv: float, max_dw: float):
self.alpha = float(alpha)
self.max_dv = float(max_dv)
self.max_dw = float(max_dw)
self.vx = 0.0
self.vy = 0.0
self.wz = 0.0
def reset(self):
self.vx = self.vy = self.wz = 0.0
def update(self, vx_cmd: float, vy_cmd: float, wz_cmd: float) -> Tuple[float, float, float]:
vx_f = self.alpha * self.vx + (1.0 - self.alpha) * vx_cmd
vy_f = self.alpha * self.vy + (1.0 - self.alpha) * vy_cmd
wz_f = self.alpha * self.wz + (1.0 - self.alpha) * wz_cmd
dvx = float(np.clip(vx_f - self.vx, -self.max_dv, self.max_dv))
dvy = float(np.clip(vy_f - self.vy, -self.max_dv, self.max_dv))
dwz = float(np.clip(wz_f - self.wz, -self.max_dw, self.max_dw))
self.vx += dvx
self.vy += dvy
self.wz += dwz
return self.vx, self.vy, self.wz
def goal_attraction_vel(x: float, y: float, yaw: float, goal_x: float, goal_y: float, cfg: NavCfg):
"""
大角度:先转向后前进;小角度:前进时顺带转向
"""
dx = goal_x - x
dy = goal_y - y
dist = float(np.hypot(dx, dy)) + 1e-6
target_yaw = float(np.arctan2(dy, dx))
yaw_err = wrap_pi(target_yaw - yaw)
yaw_err_abs = float(np.abs(yaw_err))
thresh_rad = np.deg2rad(cfg.yaw_turn_first_deg)
# 线速度:大角度时抑制,小角度时正常
gx_w = dx / dist
gy_w = dy / dist
c = np.cos(-yaw)
s = np.sin(-yaw)
gx_b = c * gx_w - s * gy_w
gy_b = s * gx_w + c * gy_w
scale = float(np.tanh(dist) * 0.8 + 0.2)
vx_raw = cfg.k_attract * scale * gx_b
vy_raw = cfg.k_attract * scale * gy_b
if yaw_err_abs > thresh_rad:
# 大角度:先转,几乎不走(留 0.05 防止完全不动时滤波器卡死)
linear_scale = 0.05
else:
# 小角度:边走边调,角度越小线速越足
linear_scale = 1.0 - 0.85 * (yaw_err_abs / thresh_rad) ** 2
vx = vx_raw * linear_scale
vy = vy_raw * linear_scale
# 角速度:大角度时略加强,小角度时弱增益边走边调
if yaw_err_abs > thresh_rad:
wz = cfg.k_yaw_goal * 1.3 * float(np.tanh(yaw_err))
else:
wz = cfg.k_yaw_goal * 0.7 * float(np.tanh(yaw_err))
return vx, vy, wz
def blend_and_clamp(
vx_a: float, vy_a: float, wz_a: float,
vx_g: float, vy_g: float, wz_g: float,
dist_to_goal: float,
stuck: bool,
cfg: NavCfg,
dists: Optional[np.ndarray] = None,
angles: Optional[np.ndarray] = None,
):
vx = vx_a + vx_g
vy = vy_a + vy_g
wz = wz_a + wz_g
lin = float(np.hypot(vx, vy))
# 前方扇区限速:只用前方 ±20° 最小距离
if dists is not None and angles is not None and len(dists) > 0:
front = np.abs(angles) < np.deg2rad(20)
if np.any(front):
d_front = float(np.min(dists[front]))
ratio = np.clip(
(d_front - cfg.obstacle_threshold) / max(cfg.safe_distance - cfg.obstacle_threshold, 1e-6),
0.0, 1.0
)
speed_limit = ratio * cfg.max_linear
if lin > speed_limit + 1e-6:
s = speed_limit / lin
vx *= s
vy *= s
lin = float(np.hypot(vx, vy))
# 强制最小速度:未到目标且输出太小
if dist_to_goal > cfg.goal_threshold and lin < cfg.min_linear:
if lin > 1e-6:
s = cfg.min_linear / lin
vx *= s
vy *= s
else:
# 彻底趋零时:优先用目标方向(若也为零就不动)
ax = vx_g + vx_a
ay = vy_g + vy_a
an = float(np.hypot(ax, ay)) + 1e-9
vx = cfg.min_linear * ax / an
vy = cfg.min_linear * ay / an
# 卡住脱困:额外加一点“侧向+偏航”,更容易摆脱夹角/凹障碍
if stuck:
# 让它更愿意横移并转一点(避免只顶着障碍)
vx *= 0.85
vy = vy * 1.15 + 0.06 * np.sign(vy if abs(vy) > 1e-3 else 1.0)
wz += 0.10 * np.sign(wz if abs(wz) > 1e-3 else 1.0)
# 限幅
lin = float(np.hypot(vx, vy))
if lin > cfg.max_linear:
s = cfg.max_linear / lin
vx *= s
vy *= s
wz = float(np.clip(wz, -cfg.max_angular, cfg.max_angular))
return vx, vy, wz
# ===================== 主程序 =====================
def main():
model = mujoco.MjModel.from_xml_path(SCENE_PATH)
data = mujoco.MjData(model)
body_id = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_BODY, "cylinder_agent")
site_id = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_SITE, "ray_origin")
# actuators
act_vel_x = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_ACTUATOR, "vel_x")
act_vel_y = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_ACTUATOR, "vel_y")
act_vel_yaw = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_ACTUATOR, "vel_yaw")
# goal marker joint
goal_joint_id = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_JOINT, "goal_joint")
goal_qposadr = int(model.jnt_qposadr[goal_joint_id])
def hide_goal_marker():
data.qpos[goal_qposadr: goal_qposadr + 3] = [-99.0, -99.0, 0.1]
def set_goal_marker(gx: float, gy: float):
data.qpos[goal_qposadr] = gx
data.qpos[goal_qposadr + 1] = gy
data.qpos[goal_qposadr + 2] = 0.15
hide_goal_marker()
hfov_rad = np.deg2rad(CFG.hfov_deg)
depth_sim = DepthSimulator(CFG.num_sectors, hfov_rad)
vel_filter = VelocityFilter(alpha=CFG.ema_alpha, max_dv=CFG.max_dv, max_dw=CFG.max_dw)
goals: List[Tuple[float, float]] = []
goal_active = False
last_x, last_y = 0.0, 0.0
stuck_cnt = 0
# key flags
add_goal_ahead = False
add_goal_camera = False
print("=" * 60)
print("MuJoCo 全向导航:仿真深度图 + DDA 避障 + 目标点")
print(f"G: 车头前方 {CFG.goal_ahead_dist:.1f}m | C: 相机视线落点 | 目标边界 R<{CFG.goal_max_r:.1f}m")
print("ESC 退出")
print("=" * 60)
def key_cb(keycode: int):
nonlocal add_goal_ahead, add_goal_camera
if keycode == 71: # G
add_goal_ahead = True
elif keycode == 67: # C
add_goal_camera = True
def stop_output():
data.ctrl[act_vel_x] = 0.0
data.ctrl[act_vel_y] = 0.0
data.ctrl[act_vel_yaw] = 0.0
with mujoco.viewer.launch_passive(model, data, key_callback=key_cb) as viewer:
while viewer.is_running():
mujoco.mj_forward(model, data)
# ========== goal set ==========
nonlocal_vars = None # 仅为了强调这里不用闭包 nonlocal
if add_goal_ahead:
add_goal_ahead = False
x, y, yaw = get_agent_state(data)
gx = x + CFG.goal_ahead_dist * float(np.cos(yaw))
gy = y + CFG.goal_ahead_dist * float(np.sin(yaw))
gx, gy = clamp_goal(gx, gy, CFG.goal_max_r)
goals[:] = [(gx, gy)]
goal_active = True
vel_filter.reset()
print(f" 目标: ({gx:.2f}, {gy:.2f}) [G=车头前方]")
elif add_goal_camera:
add_goal_camera = False
g = goal_from_camera(viewer)
if g is None:
print(" [C] 相机未指向地面,请调整视角后重试")
else:
gx, gy = clamp_goal(g[0], g[1], CFG.goal_max_r)
goals[:] = [(gx, gy)]
goal_active = True
vel_filter.reset()
print(f" 目标: ({gx:.2f}, {gy:.2f}) [C=相机视线]")
if not goal_active or not goals:
stop_output()
vel_filter.reset()
hide_goal_marker()
mujoco.mj_step(model, data)
viewer.sync()
continue
# ========== state ==========
x, y, yaw = get_agent_state(data)
goal_x, goal_y = goals[0]
dist_to_goal = float(np.hypot(goal_x - x, goal_y - y))
if dist_to_goal < CFG.goal_threshold:
print(" 已到达,停止。按 G / C 设置新目标")
goal_active = False
goals.clear()
vel_filter.reset()
stop_output()
hide_goal_marker()
mujoco.mj_step(model, data)
viewer.sync()
continue
set_goal_marker(goal_x, goal_y)
# ========== 仿真深度图 → 扇区 → DDA ==========
depth_img = depth_sim.render(model, data, body_id, site_id)
dists, angles = depth_to_sectors(
depth_img, hfov_rad, CFG.num_sectors,
CFG.depth_z_min, CFG.depth_z_max, CFG.depth_percentile
)
# ========== stuck detect ==========
moved = float(np.hypot(x - last_x, y - last_y))
if moved < CFG.stuck_thresh and dist_to_goal > CFG.goal_threshold:
stuck_cnt += 1
else:
stuck_cnt = 0
last_x, last_y = x, y
stuck = stuck_cnt > CFG.stuck_steps
# ========== velocities ==========
vx_g, vy_g, wz_g = goal_attraction_vel(x, y, yaw, goal_x, goal_y, CFG)
vx_a, vy_a, wz_a = dda_avoidance_from_depth(dists, angles, vx_g, vy_g, CFG)
vx_b, vy_b, wz_b = blend_and_clamp(
vx_a, vy_a, wz_a, vx_g, vy_g, wz_g, dist_to_goal, stuck, CFG,
dists=dists, angles=angles
)
# ========== filter ==========
vel_filter.max_dv = CFG.max_dv_stuck if stuck else CFG.max_dv
vel_filter.max_dw = CFG.max_dw_stuck if stuck else CFG.max_dw
vx, vy, wz = vel_filter.update(vx_b, vy_b, wz_b)
# ========== body -> world for actuator ==========
c = float(np.cos(yaw))
s = float(np.sin(yaw))
vx_w = vx * c - vy * s
vy_w = vx * s + vy * c
data.ctrl[act_vel_x] = vx_w
data.ctrl[act_vel_y] = vy_w
data.ctrl[act_vel_yaw] = wz
mujoco.mj_step(model, data)
viewer.sync()
if __name__ == "__main__":
main()