#!/usr/bin/env python3 """ MuJoCo 全向小车导航(仿真深度图 + DDA 避障 + 目标点导航) - 输入:仿真深度图(mj_ray 网格)或可替换为真实深感相机的 depth/pointcloud - depth_to_sectors:深度 → 1D 扇区最近距离(鲁棒统计) - dda_avoidance_from_depth:Directional Distance Avoidance 避障 - vx, vy, wz 全向移动(车体系 → 世界系执行器) - 便于 ROS1 + RealSense/Orbbec/ZED 等真实相机落地 """ from __future__ import annotations import os from dataclasses import dataclass from typing import Optional, Tuple, List import mujoco import mujoco.viewer import numpy as np SCENE_PATH = os.path.join(os.path.dirname(__file__), "..", "mujoco_scenes", "cylinder_obstacles.xml") # ===================== 参数配置 ===================== @dataclass class NavCfg: # depth sector(仿真/真实深度图统一接口) hfov_deg: float = 86.0 # 水平视场角(如 RealSense D435) num_sectors: int = 61 # 扇区数 depth_z_min: float = 0.1 depth_z_max: float = 6.0 depth_percentile: float = 5.0 # 鲁棒最小值:5% 分位 # DDA 避障 obstacle_threshold: float = 0.45 # stop_dist safe_distance: float = 0.65 # safe_dist dda_window: int = 2 # move_theta 附近 ±N 扇区 speed_adaptive_k: float = 0.15 # 速度自适应:safe += k*speed goal_threshold: float = 0.20 # limits max_linear: float = 0.60 max_angular: float = 0.80 min_linear: float = 0.18 # 防止“只转不走”或输出趋零卡死 # gains k_attract: float = 1.0 k_repel: float = 0.6 k_yaw_goal: float = 0.8 # 转向策略:大角度先转后走,小角度边走边调 yaw_turn_first_deg: float = 35.0 # 超过此角度:先转向,几乎不走 # stuck detect stuck_thresh: float = 0.03 stuck_steps: int = 100 # goal set goal_ahead_dist: float = 2.0 goal_max_r: float = 5.0 # output smoothing ema_alpha: float = 0.70 max_dv: float = 0.18 max_dw: float = 0.20 max_dv_stuck: float = 0.25 max_dw_stuck: float = 0.28 CFG = NavCfg() # ===================== 工具函数 ===================== def clamp_goal(gx: float, gy: float, max_r: float) -> Tuple[float, float]: r = float(np.hypot(gx, gy)) + 1e-9 if r > max_r: s = max_r / r return float(gx * s), float(gy * s) return float(gx), float(gy) def wrap_pi(a: float) -> float: # 更稳定的 [-pi, pi] return float(np.arctan2(np.sin(a), np.cos(a))) def get_agent_state(data: mujoco.MjData) -> Tuple[float, float, float]: # 假设 qpos: x, y, yaw 在 0,1,2 qpos = data.qpos return float(qpos[0]), float(qpos[1]), float(qpos[2]) def goal_from_camera(viewer_handle) -> Optional[Tuple[float, float]]: """相机视线与 z=0 平面交点(旋转相机后按 C)。""" cam = viewer_handle.cam lookat = np.array(cam.lookat, dtype=float) dist = float(cam.distance) az = np.deg2rad(float(cam.azimuth)) el = np.deg2rad(float(cam.elevation)) # MuJoCo viewer 的相机方向约定:这里延续你原实现 fx = np.cos(el) * np.sin(az) fy = -np.cos(el) * np.cos(az) fz = np.sin(el) cam_pos = lookat - dist * np.array([fx, fy, fz], dtype=float) if abs(fz) < 1e-8: return None t = -cam_pos[2] / fz if t < 0: return None pt = cam_pos + t * np.array([fx, fy, fz], dtype=float) return float(pt[0]), float(pt[1]) # ===================== 仿真深度图 + 扇区 + DDA ===================== def depth_to_sectors( depth: np.ndarray, hfov_rad: float, num_sectors: int, z_min: float = 0.1, z_max: float = 6.0, percentile: float = 5.0, ) -> Tuple[np.ndarray, np.ndarray]: """ 深度图 → 1D 扇区最近距离(可直接用于真实深感相机) depth: H×W (m),0/NaN 表示无效 """ H, W = depth.shape angles = np.linspace(-hfov_rad / 2, hfov_rad / 2, num_sectors, dtype=np.float64) cols = np.linspace(0, W, num_sectors + 1).astype(int) dists = np.full(num_sectors, z_max, dtype=np.float64) for i in range(num_sectors): c0, c1 = cols[i], cols[i + 1] strip = depth[:, c0:c1].reshape(-1) strip = strip[np.isfinite(strip)] strip = strip[(strip > z_min) & (strip < z_max)] if strip.size == 0: continue dists[i] = float(np.percentile(strip, percentile)) dists = np.minimum(dists, np.roll(dists, 1)) dists = np.minimum(dists, np.roll(dists, -1)) return dists, angles class DepthSimulator: """用 mj_ray 生成仿真深度图(模拟深感相机输出)""" def __init__(self, num_sectors: int, hfov_rad: float): self.num_sectors = int(num_sectors) self.angles = np.linspace(-hfov_rad / 2, hfov_rad / 2, self.num_sectors) c = np.cos(self.angles) s = np.sin(self.angles) self.ray_dirs_body = np.stack([c, s, np.zeros_like(c)], axis=1).astype(np.float64) self.depth = np.full((1, self.num_sectors), 10.0, dtype=np.float64) self._geomid = np.array([-1], dtype=np.int32) def render( self, model: mujoco.MjModel, data: mujoco.MjData, body_id: int, site_id: int, ) -> np.ndarray: site_xpos = data.site_xpos[site_id] xmat = np.array(data.xmat[body_id], dtype=np.float64).reshape(3, 3) ray_dirs_world = (xmat @ self.ray_dirs_body.T).T norms = np.linalg.norm(ray_dirs_world, axis=1) + 1e-9 ray_dirs_world /= norms[:, None] self.depth.fill(10.0) for i in range(self.num_sectors): d = mujoco.mj_ray( model, data, site_xpos, ray_dirs_world[i], None, 1, body_id, self._geomid ) if d >= 0: self.depth[0, i] = float(d) return self.depth def dda_avoidance_from_depth( dists: np.ndarray, angles: np.ndarray, vx_cmd: float, vy_cmd: float, cfg: NavCfg, ) -> Tuple[float, float, float]: """ Directional Distance Avoidance (DDA) 只关心「你要去的方向」是否过近;过近则削掉该方向分量 + 朝开阔方向侧移/转向 """ speed = float(np.hypot(vx_cmd, vy_cmd)) if speed < 1e-6: return 0.0, 0.0, 0.0 move_theta = float(np.arctan2(vy_cmd, vx_cmd)) N = len(dists) i0 = int(np.argmin(np.abs(angles - move_theta))) win = cfg.dda_window idxs = [(i0 + k) % N for k in range(-win, win + 1)] dmin_dir = float(np.min(dists[idxs])) best_i = int(np.argmax(dists)) best_theta = float(angles[best_i]) stop_dist = cfg.obstacle_threshold safe_dist = cfg.safe_distance if hasattr(cfg, "speed_adaptive_k") and cfg.speed_adaptive_k > 0: safe_dist = safe_dist + cfg.speed_adaptive_k * speed stop_dist = stop_dist + 0.5 * cfg.speed_adaptive_k * speed if dmin_dir >= safe_dist: return 0.0, 0.0, 0.0 danger = (safe_dist - dmin_dir) / max(safe_dist - stop_dist, 1e-6) danger = float(np.clip(danger, 0.0, 1.0)) vx_a = 0.0 vy_a = 0.0 wz_a = 0.0 mx = float(np.cos(move_theta)) my = float(np.sin(move_theta)) proj = vx_cmd * mx + vy_cmd * my cut = (0.6 + 0.4 * danger) * proj vx_a -= cut * mx vy_a -= cut * my bypass = 0.25 + 0.25 * danger vx_a += bypass * np.cos(best_theta) vy_a += bypass * np.sin(best_theta) wz_a += (0.35 + 0.45 * danger) * best_theta if dmin_dir < stop_dist: vx_a -= vx_cmd * 0.9 vy_a -= vy_cmd * 0.9 wz_a += 0.6 * np.sign(best_theta if abs(best_theta) > 1e-3 else 1.0) return vx_a, vy_a, wz_a # ===================== 射线扫描(旧接口,DDA 下可弃用) ===================== class RayScanner: """ 预计算扇形扫描的 body-frame 方向(基于 forward=[1,0,0], left=[0,1,0]), 每帧用 body rotation (xmat) 转到 world,减少每帧 cos/sin + 临时数组。 """ def __init__(self, num_rays: int, angle_span: float): self.num_rays = int(num_rays) self.angle_span = float(angle_span) self.angles = np.linspace(-self.angle_span / 2, self.angle_span / 2, self.num_rays) c = np.cos(self.angles) s = np.sin(self.angles) # body-frame ray directions: dir_b = c*[1,0,0] + s*[0,1,0] # shape (N,3) self.ray_dirs_body = np.stack([c, s, np.zeros_like(c)], axis=1).astype(np.float64) # buffers to avoid per-step alloc self.distances = np.full(self.num_rays, 10.0, dtype=np.float64) self._geomid = np.array([-1], dtype=np.int32) def scan( self, model: mujoco.MjModel, data: mujoco.MjData, body_id: int, site_id: int, ) -> Tuple[np.ndarray, np.ndarray]: site_xpos = data.site_xpos[site_id] # view, no copy xmat = np.array(data.xmat[body_id], dtype=np.float64).reshape(3, 3) # world dirs: dir_w = xmat @ dir_b # shape (N,3) ray_dirs_world = (xmat @ self.ray_dirs_body.T).T # normalize (理论上已单位,但数值上更稳一点) norms = np.linalg.norm(ray_dirs_world, axis=1) + 1e-9 ray_dirs_world /= norms[:, None] self.distances.fill(10.0) for i in range(self.num_rays): d = mujoco.mj_ray( model, data, site_xpos, ray_dirs_world[i], None, 1, body_id, self._geomid ) if d >= 0: self.distances[i] = float(d) return self.distances, self.angles # ===================== 速度融合与滤波 ===================== class VelocityFilter: """EMA + 速率限制,输出更平滑,避免 ctrl 抖动。""" def __init__(self, alpha: float, max_dv: float, max_dw: float): self.alpha = float(alpha) self.max_dv = float(max_dv) self.max_dw = float(max_dw) self.vx = 0.0 self.vy = 0.0 self.wz = 0.0 def reset(self): self.vx = self.vy = self.wz = 0.0 def update(self, vx_cmd: float, vy_cmd: float, wz_cmd: float) -> Tuple[float, float, float]: vx_f = self.alpha * self.vx + (1.0 - self.alpha) * vx_cmd vy_f = self.alpha * self.vy + (1.0 - self.alpha) * vy_cmd wz_f = self.alpha * self.wz + (1.0 - self.alpha) * wz_cmd dvx = float(np.clip(vx_f - self.vx, -self.max_dv, self.max_dv)) dvy = float(np.clip(vy_f - self.vy, -self.max_dv, self.max_dv)) dwz = float(np.clip(wz_f - self.wz, -self.max_dw, self.max_dw)) self.vx += dvx self.vy += dvy self.wz += dwz return self.vx, self.vy, self.wz def goal_attraction_vel(x: float, y: float, yaw: float, goal_x: float, goal_y: float, cfg: NavCfg): """ 大角度:先转向后前进;小角度:前进时顺带转向 """ dx = goal_x - x dy = goal_y - y dist = float(np.hypot(dx, dy)) + 1e-6 target_yaw = float(np.arctan2(dy, dx)) yaw_err = wrap_pi(target_yaw - yaw) yaw_err_abs = float(np.abs(yaw_err)) thresh_rad = np.deg2rad(cfg.yaw_turn_first_deg) # 线速度:大角度时抑制,小角度时正常 gx_w = dx / dist gy_w = dy / dist c = np.cos(-yaw) s = np.sin(-yaw) gx_b = c * gx_w - s * gy_w gy_b = s * gx_w + c * gy_w scale = float(np.tanh(dist) * 0.8 + 0.2) vx_raw = cfg.k_attract * scale * gx_b vy_raw = cfg.k_attract * scale * gy_b if yaw_err_abs > thresh_rad: # 大角度:先转,几乎不走(留 0.05 防止完全不动时滤波器卡死) linear_scale = 0.05 else: # 小角度:边走边调,角度越小线速越足 linear_scale = 1.0 - 0.85 * (yaw_err_abs / thresh_rad) ** 2 vx = vx_raw * linear_scale vy = vy_raw * linear_scale # 角速度:大角度时略加强,小角度时弱增益边走边调 if yaw_err_abs > thresh_rad: wz = cfg.k_yaw_goal * 1.3 * float(np.tanh(yaw_err)) else: wz = cfg.k_yaw_goal * 0.7 * float(np.tanh(yaw_err)) return vx, vy, wz def blend_and_clamp( vx_a: float, vy_a: float, wz_a: float, vx_g: float, vy_g: float, wz_g: float, dist_to_goal: float, stuck: bool, cfg: NavCfg, dists: Optional[np.ndarray] = None, angles: Optional[np.ndarray] = None, ): vx = vx_a + vx_g vy = vy_a + vy_g wz = wz_a + wz_g lin = float(np.hypot(vx, vy)) # 前方扇区限速:只用前方 ±20° 最小距离 if dists is not None and angles is not None and len(dists) > 0: front = np.abs(angles) < np.deg2rad(20) if np.any(front): d_front = float(np.min(dists[front])) ratio = np.clip( (d_front - cfg.obstacle_threshold) / max(cfg.safe_distance - cfg.obstacle_threshold, 1e-6), 0.0, 1.0 ) speed_limit = ratio * cfg.max_linear if lin > speed_limit + 1e-6: s = speed_limit / lin vx *= s vy *= s lin = float(np.hypot(vx, vy)) # 强制最小速度:未到目标且输出太小 if dist_to_goal > cfg.goal_threshold and lin < cfg.min_linear: if lin > 1e-6: s = cfg.min_linear / lin vx *= s vy *= s else: # 彻底趋零时:优先用目标方向(若也为零就不动) ax = vx_g + vx_a ay = vy_g + vy_a an = float(np.hypot(ax, ay)) + 1e-9 vx = cfg.min_linear * ax / an vy = cfg.min_linear * ay / an # 卡住脱困:额外加一点“侧向+偏航”,更容易摆脱夹角/凹障碍 if stuck: # 让它更愿意横移并转一点(避免只顶着障碍) vx *= 0.85 vy = vy * 1.15 + 0.06 * np.sign(vy if abs(vy) > 1e-3 else 1.0) wz += 0.10 * np.sign(wz if abs(wz) > 1e-3 else 1.0) # 限幅 lin = float(np.hypot(vx, vy)) if lin > cfg.max_linear: s = cfg.max_linear / lin vx *= s vy *= s wz = float(np.clip(wz, -cfg.max_angular, cfg.max_angular)) return vx, vy, wz # ===================== 主程序 ===================== def main(): model = mujoco.MjModel.from_xml_path(SCENE_PATH) data = mujoco.MjData(model) body_id = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_BODY, "cylinder_agent") site_id = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_SITE, "ray_origin") # actuators act_vel_x = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_ACTUATOR, "vel_x") act_vel_y = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_ACTUATOR, "vel_y") act_vel_yaw = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_ACTUATOR, "vel_yaw") # goal marker joint goal_joint_id = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_JOINT, "goal_joint") goal_qposadr = int(model.jnt_qposadr[goal_joint_id]) def hide_goal_marker(): data.qpos[goal_qposadr: goal_qposadr + 3] = [-99.0, -99.0, 0.1] def set_goal_marker(gx: float, gy: float): data.qpos[goal_qposadr] = gx data.qpos[goal_qposadr + 1] = gy data.qpos[goal_qposadr + 2] = 0.15 hide_goal_marker() hfov_rad = np.deg2rad(CFG.hfov_deg) depth_sim = DepthSimulator(CFG.num_sectors, hfov_rad) vel_filter = VelocityFilter(alpha=CFG.ema_alpha, max_dv=CFG.max_dv, max_dw=CFG.max_dw) goals: List[Tuple[float, float]] = [] goal_active = False last_x, last_y = 0.0, 0.0 stuck_cnt = 0 # key flags add_goal_ahead = False add_goal_camera = False print("=" * 60) print("MuJoCo 全向导航:仿真深度图 + DDA 避障 + 目标点") print(f"G: 车头前方 {CFG.goal_ahead_dist:.1f}m | C: 相机视线落点 | 目标边界 R<{CFG.goal_max_r:.1f}m") print("ESC 退出") print("=" * 60) def key_cb(keycode: int): nonlocal add_goal_ahead, add_goal_camera if keycode == 71: # G add_goal_ahead = True elif keycode == 67: # C add_goal_camera = True def stop_output(): data.ctrl[act_vel_x] = 0.0 data.ctrl[act_vel_y] = 0.0 data.ctrl[act_vel_yaw] = 0.0 with mujoco.viewer.launch_passive(model, data, key_callback=key_cb) as viewer: while viewer.is_running(): mujoco.mj_forward(model, data) # ========== goal set ========== nonlocal_vars = None # 仅为了强调这里不用闭包 nonlocal if add_goal_ahead: add_goal_ahead = False x, y, yaw = get_agent_state(data) gx = x + CFG.goal_ahead_dist * float(np.cos(yaw)) gy = y + CFG.goal_ahead_dist * float(np.sin(yaw)) gx, gy = clamp_goal(gx, gy, CFG.goal_max_r) goals[:] = [(gx, gy)] goal_active = True vel_filter.reset() print(f" 目标: ({gx:.2f}, {gy:.2f}) [G=车头前方]") elif add_goal_camera: add_goal_camera = False g = goal_from_camera(viewer) if g is None: print(" [C] 相机未指向地面,请调整视角后重试") else: gx, gy = clamp_goal(g[0], g[1], CFG.goal_max_r) goals[:] = [(gx, gy)] goal_active = True vel_filter.reset() print(f" 目标: ({gx:.2f}, {gy:.2f}) [C=相机视线]") if not goal_active or not goals: stop_output() vel_filter.reset() hide_goal_marker() mujoco.mj_step(model, data) viewer.sync() continue # ========== state ========== x, y, yaw = get_agent_state(data) goal_x, goal_y = goals[0] dist_to_goal = float(np.hypot(goal_x - x, goal_y - y)) if dist_to_goal < CFG.goal_threshold: print(" 已到达,停止。按 G / C 设置新目标") goal_active = False goals.clear() vel_filter.reset() stop_output() hide_goal_marker() mujoco.mj_step(model, data) viewer.sync() continue set_goal_marker(goal_x, goal_y) # ========== 仿真深度图 → 扇区 → DDA ========== depth_img = depth_sim.render(model, data, body_id, site_id) dists, angles = depth_to_sectors( depth_img, hfov_rad, CFG.num_sectors, CFG.depth_z_min, CFG.depth_z_max, CFG.depth_percentile ) # ========== stuck detect ========== moved = float(np.hypot(x - last_x, y - last_y)) if moved < CFG.stuck_thresh and dist_to_goal > CFG.goal_threshold: stuck_cnt += 1 else: stuck_cnt = 0 last_x, last_y = x, y stuck = stuck_cnt > CFG.stuck_steps # ========== velocities ========== vx_g, vy_g, wz_g = goal_attraction_vel(x, y, yaw, goal_x, goal_y, CFG) vx_a, vy_a, wz_a = dda_avoidance_from_depth(dists, angles, vx_g, vy_g, CFG) vx_b, vy_b, wz_b = blend_and_clamp( vx_a, vy_a, wz_a, vx_g, vy_g, wz_g, dist_to_goal, stuck, CFG, dists=dists, angles=angles ) # ========== filter ========== vel_filter.max_dv = CFG.max_dv_stuck if stuck else CFG.max_dv vel_filter.max_dw = CFG.max_dw_stuck if stuck else CFG.max_dw vx, vy, wz = vel_filter.update(vx_b, vy_b, wz_b) # ========== body -> world for actuator ========== c = float(np.cos(yaw)) s = float(np.sin(yaw)) vx_w = vx * c - vy * s vy_w = vx * s + vy * c data.ctrl[act_vel_x] = vx_w data.ctrl[act_vel_y] = vy_w data.ctrl[act_vel_yaw] = wz mujoco.mj_step(model, data) viewer.sync() if __name__ == "__main__": main()