diff --git a/docs/DDA_KNOWLEDGE.md b/docs/DDA_KNOWLEDGE.md new file mode 100644 index 0000000..cfe0b31 --- /dev/null +++ b/docs/DDA_KNOWLEDGE.md @@ -0,0 +1,122 @@ +# DDA 知识点:Directional Distance Avoidance + +## 一、从「离散射线」到「深度图」的转变 + +| 原方案 | 新方案 | +|--------|--------| +| mj_ray 离散射线 | 深度图 / 点云(带距离的视场) | +| N 条射线直接给距离 | 从深度图按方位做 1D 距离扇区 | +| 每条射线独立 | 每个扇区取**最近障碍距离**(鲁棒统计) | + +**核心**:深感相机给你的是 **Depth / PointCloud**,不需要 mj_ray 这种离散射线;先把深度转成扇区最近距离,再做避障。 + +--- + +## 二、DDA 核心思想 + +1. **输入**:`dist[sector]` —— 每个方向上的最近障碍距离(米) +2. **判断**:当前期望运动方向 `move_theta = atan2(vy_cmd, vx_cmd)` 是否过近 +3. **动作**: + - 若过近:**禁止/衰减**朝该方向的速度分量 + - 同时生成**侧向/偏航**的「脱离」控制,朝开阔方向 + +--- + +## 三、深度 → 扇区最近距离(depth_to_sectors) + +- 视场水平角 `HFOV`(如 RealSense D435 ≈ 86°) +- 扇区数 `N`(如 31、61) +- 对每个扇区:在深度图对应列范围取**鲁棒最小值**: + - `percentile(depth, 5%)` 或 + - 剔除 0/无效 + 中值滤波 + +得到: +- `angles[i]`:扇区角(车体坐标系,左正右负) +- `d[i]`:该方向最近障碍距离(米) + +--- + +## 四、DDA 避障控制律 + +1. 计算期望运动方向角:`move_theta = atan2(vy_cmd, vx_cmd)` +2. 在 `move_theta` 附近取窗口内最小距离 `dmin_dir` +3. 阈值判断: + - `dmin_dir >= safe_dist`:正常走 + - `dmin_dir < stop_dist`:禁止向前 + 强制侧移/转向 + - 中间区间:按比例衰减 + 轻微偏航 +4. 选择「最开阔方向」`best_theta`,作为绕行/偏航目标 + +--- + +## 五、ROS1 + 深感相机 落地要点 + +### 5.1 深度图输入 + +- **Depth Image**:`sensor_msgs/Image`,订阅 `/camera/depth/image_rect` 等 +- **PointCloud**:`sensor_msgs/PointCloud2`,转为深度图或直接投影到扇区 + +### 5.2 常用深感相机参数 + +| 相机 | HFOV | 分辨率 | +|------|------|--------| +| RealSense D435 | ~87° | 640×480 | +| Orbbec Astra | ~60° | 640×480 | +| ZED | 可调 | 多种 | +| OAK-D | ~73° | 640×400 | + +### 5.3 推荐两行增强(仿真中已启用) + +1. **速度自适应安全距离**:跑得越快看得越远 + ```py + safe_dist = base_safe + k * speed + stop_dist = base_stop + 0.5 * k * speed + ``` + +2. **前方扇区限速**:只用前方 ±20° 的最小距离限速 + ```py + front = (np.abs(angles) < np.deg2rad(20)) + d_front = np.min(dists[front]) + speed_limit = np.clip((d_front - stop_dist) / (safe_dist - stop_dist), 0, 1) * MAX_LINEAR + ``` + +--- + +## 六、ROS1 深度输入对接(待实现) + +### 6.1 Depth Image 订阅 + +```python +# 伪代码:ROS1 订阅 +import rospy +from sensor_msgs.msg import Image +from cv_bridge import CvBridge + +def depth_cb(msg): + depth = bridge.imgmsg_to_cv2(msg, desired_encoding='passthrough') + depth = depth.astype(np.float32) / 1000.0 # mm -> m + dists, angles = depth_to_sectors(depth, hfov_rad, num_sectors, ...) + # 存入共享变量,供控制回调使用 + +rospy.Subscriber('/camera/depth/image_rect', Image, depth_cb) +``` + +### 6.2 PointCloud 订阅 + +从 `sensor_msgs/PointCloud2` 投影到极坐标扇区,取每扇区最近距离。 + +--- + +## 七、控制流程对接 + +**仿真**:`mj_ray` 网格 → 仿真深度图 `(1, num_sectors)` → `depth_to_sectors()` +**实机**:深感相机 depth/pointcloud → `depth_to_sectors()` + +``` +depth (H×W) 或 pointcloud + → depth_to_sectors() → dists, angles + → dda_avoidance_from_depth(dists, angles, vx_g, vy_g) + → vx_a, vy_a, wz_a + → blend_and_clamp(vx_a, vy_a, wz_a, vx_g, vy_g, wz_g) +``` + +融合后继续做:滤波 → 车体系→世界系 → 执行器。 diff --git a/scripts/run_simulation.py b/scripts/run_simulation.py index 5ab3e1d..db15891 100644 --- a/scripts/run_simulation.py +++ b/scripts/run_simulation.py @@ -1,10 +1,12 @@ #!/usr/bin/env python3 """ -MuJoCo 全向小车导航(前向射线扫描 + 势场避障 + 目标点导航) -- vx, vy, wz 全向移动(车体系输出 -> 世界系执行器) -- mj_ray 扇形前向扫描 -- 避障:排斥 + 朝开阔方向绕行 -- 卡住检测:连续位移太小 -> 强制脱困策略 +MuJoCo 全向小车导航(仿真深度图 + DDA 避障 + 目标点导航) + +- 输入:仿真深度图(mj_ray 网格)或可替换为真实深感相机的 depth/pointcloud +- depth_to_sectors:深度 → 1D 扇区最近距离(鲁棒统计) +- dda_avoidance_from_depth:Directional Distance Avoidance 避障 +- vx, vy, wz 全向移动(车体系 → 世界系执行器) +- 便于 ROS1 + RealSense/Orbbec/ZED 等真实相机落地 """ from __future__ import annotations @@ -23,13 +25,18 @@ SCENE_PATH = os.path.join(os.path.dirname(__file__), "..", "mujoco_scenes", "cyl # ===================== 参数配置 ===================== @dataclass class NavCfg: - # ray scan - num_rays: int = 19 - ray_angle_span: float = np.pi # -span/2 ~ +span/2 + # depth sector(仿真/真实深度图统一接口) + hfov_deg: float = 86.0 # 水平视场角(如 RealSense D435) + num_sectors: int = 61 # 扇区数 + depth_z_min: float = 0.1 + depth_z_max: float = 6.0 + depth_percentile: float = 5.0 # 鲁棒最小值:5% 分位 - # distances - obstacle_threshold: float = 0.45 - safe_distance: float = 0.65 + # DDA 避障 + obstacle_threshold: float = 0.45 # stop_dist + safe_distance: float = 0.65 # safe_dist + dda_window: int = 2 # move_theta 附近 ±N 扇区 + speed_adaptive_k: float = 0.15 # 速度自适应:safe += k*speed goal_threshold: float = 0.20 # limits @@ -41,6 +48,8 @@ class NavCfg: k_attract: float = 1.0 k_repel: float = 0.6 k_yaw_goal: float = 0.8 + # 转向策略:大角度先转后走,小角度边走边调 + yaw_turn_first_deg: float = 35.0 # 超过此角度:先转向,几乎不走 # stuck detect stuck_thresh: float = 0.03 @@ -105,7 +114,137 @@ def goal_from_camera(viewer_handle) -> Optional[Tuple[float, float]]: return float(pt[0]), float(pt[1]) -# ===================== 射线扫描(预计算优化) ===================== +# ===================== 仿真深度图 + 扇区 + DDA ===================== +def depth_to_sectors( + depth: np.ndarray, + hfov_rad: float, + num_sectors: int, + z_min: float = 0.1, + z_max: float = 6.0, + percentile: float = 5.0, +) -> Tuple[np.ndarray, np.ndarray]: + """ + 深度图 → 1D 扇区最近距离(可直接用于真实深感相机) + depth: H×W (m),0/NaN 表示无效 + """ + H, W = depth.shape + angles = np.linspace(-hfov_rad / 2, hfov_rad / 2, num_sectors, dtype=np.float64) + cols = np.linspace(0, W, num_sectors + 1).astype(int) + dists = np.full(num_sectors, z_max, dtype=np.float64) + + for i in range(num_sectors): + c0, c1 = cols[i], cols[i + 1] + strip = depth[:, c0:c1].reshape(-1) + strip = strip[np.isfinite(strip)] + strip = strip[(strip > z_min) & (strip < z_max)] + if strip.size == 0: + continue + dists[i] = float(np.percentile(strip, percentile)) + + dists = np.minimum(dists, np.roll(dists, 1)) + dists = np.minimum(dists, np.roll(dists, -1)) + return dists, angles + + +class DepthSimulator: + """用 mj_ray 生成仿真深度图(模拟深感相机输出)""" + + def __init__(self, num_sectors: int, hfov_rad: float): + self.num_sectors = int(num_sectors) + self.angles = np.linspace(-hfov_rad / 2, hfov_rad / 2, self.num_sectors) + c = np.cos(self.angles) + s = np.sin(self.angles) + self.ray_dirs_body = np.stack([c, s, np.zeros_like(c)], axis=1).astype(np.float64) + self.depth = np.full((1, self.num_sectors), 10.0, dtype=np.float64) + self._geomid = np.array([-1], dtype=np.int32) + + def render( + self, + model: mujoco.MjModel, + data: mujoco.MjData, + body_id: int, + site_id: int, + ) -> np.ndarray: + site_xpos = data.site_xpos[site_id] + xmat = np.array(data.xmat[body_id], dtype=np.float64).reshape(3, 3) + ray_dirs_world = (xmat @ self.ray_dirs_body.T).T + norms = np.linalg.norm(ray_dirs_world, axis=1) + 1e-9 + ray_dirs_world /= norms[:, None] + + self.depth.fill(10.0) + for i in range(self.num_sectors): + d = mujoco.mj_ray( + model, data, + site_xpos, ray_dirs_world[i], + None, 1, body_id, self._geomid + ) + if d >= 0: + self.depth[0, i] = float(d) + return self.depth + + +def dda_avoidance_from_depth( + dists: np.ndarray, + angles: np.ndarray, + vx_cmd: float, + vy_cmd: float, + cfg: NavCfg, +) -> Tuple[float, float, float]: + """ + Directional Distance Avoidance (DDA) + 只关心「你要去的方向」是否过近;过近则削掉该方向分量 + 朝开阔方向侧移/转向 + """ + speed = float(np.hypot(vx_cmd, vy_cmd)) + if speed < 1e-6: + return 0.0, 0.0, 0.0 + + move_theta = float(np.arctan2(vy_cmd, vx_cmd)) + N = len(dists) + i0 = int(np.argmin(np.abs(angles - move_theta))) + win = cfg.dda_window + idxs = [(i0 + k) % N for k in range(-win, win + 1)] + dmin_dir = float(np.min(dists[idxs])) + + best_i = int(np.argmax(dists)) + best_theta = float(angles[best_i]) + + stop_dist = cfg.obstacle_threshold + safe_dist = cfg.safe_distance + if hasattr(cfg, "speed_adaptive_k") and cfg.speed_adaptive_k > 0: + safe_dist = safe_dist + cfg.speed_adaptive_k * speed + stop_dist = stop_dist + 0.5 * cfg.speed_adaptive_k * speed + + if dmin_dir >= safe_dist: + return 0.0, 0.0, 0.0 + + danger = (safe_dist - dmin_dir) / max(safe_dist - stop_dist, 1e-6) + danger = float(np.clip(danger, 0.0, 1.0)) + + vx_a = 0.0 + vy_a = 0.0 + wz_a = 0.0 + + mx = float(np.cos(move_theta)) + my = float(np.sin(move_theta)) + proj = vx_cmd * mx + vy_cmd * my + cut = (0.6 + 0.4 * danger) * proj + vx_a -= cut * mx + vy_a -= cut * my + + bypass = 0.25 + 0.25 * danger + vx_a += bypass * np.cos(best_theta) + vy_a += bypass * np.sin(best_theta) + wz_a += (0.35 + 0.45 * danger) * best_theta + + if dmin_dir < stop_dist: + vx_a -= vx_cmd * 0.9 + vy_a -= vy_cmd * 0.9 + wz_a += 0.6 * np.sign(best_theta if abs(best_theta) > 1e-3 else 1.0) + + return vx_a, vy_a, wz_a + + +# ===================== 射线扫描(旧接口,DDA 下可弃用) ===================== class RayScanner: """ 预计算扇形扫描的 body-frame 方向(基于 forward=[1,0,0], left=[0,1,0]), @@ -190,75 +329,57 @@ class VelocityFilter: def goal_attraction_vel(x: float, y: float, yaw: float, goal_x: float, goal_y: float, cfg: NavCfg): + """ + 大角度:先转向后前进;小角度:前进时顺带转向 + """ dx = goal_x - x dy = goal_y - y dist = float(np.hypot(dx, dy)) + 1e-6 - # 世界系单位方向 + target_yaw = float(np.arctan2(dy, dx)) + yaw_err = wrap_pi(target_yaw - yaw) + yaw_err_abs = float(np.abs(yaw_err)) + + thresh_rad = np.deg2rad(cfg.yaw_turn_first_deg) + + # 线速度:大角度时抑制,小角度时正常 gx_w = dx / dist gy_w = dy / dist - - # 转车体系(只关心 yaw) c = np.cos(-yaw) s = np.sin(-yaw) gx_b = c * gx_w - s * gy_w gy_b = s * gx_w + c * gy_w - - # 线速度(随距离平滑) scale = float(np.tanh(dist) * 0.8 + 0.2) - vx = cfg.k_attract * scale * gx_b - vy = cfg.k_attract * scale * gy_b + vx_raw = cfg.k_attract * scale * gx_b + vy_raw = cfg.k_attract * scale * gy_b + + if yaw_err_abs > thresh_rad: + # 大角度:先转,几乎不走(留 0.05 防止完全不动时滤波器卡死) + linear_scale = 0.05 + else: + # 小角度:边走边调,角度越小线速越足 + linear_scale = 1.0 - 0.85 * (yaw_err_abs / thresh_rad) ** 2 + + vx = vx_raw * linear_scale + vy = vy_raw * linear_scale + + # 角速度:大角度时略加强,小角度时弱增益边走边调 + if yaw_err_abs > thresh_rad: + wz = cfg.k_yaw_goal * 1.3 * float(np.tanh(yaw_err)) + else: + wz = cfg.k_yaw_goal * 0.7 * float(np.tanh(yaw_err)) - # 角速度(弱对准) - target_yaw = float(np.arctan2(dy, dx)) - yaw_err = wrap_pi(target_yaw - yaw) - wz = cfg.k_yaw_goal * float(np.tanh(yaw_err)) return vx, vy, wz -def obstacle_avoidance_vel(distances: np.ndarray, angles: np.ndarray, cfg: NavCfg): - min_d = float(np.min(distances)) - if min_d > cfg.obstacle_threshold: - return 0.0, 0.0, 0.0, min_d - - # 最开阔方向 - safe_i = int(np.argmax(distances)) - best_theta = float(angles[safe_i]) - - vx = 0.0 - vy = 0.0 - wz = 0.0 - - # 排斥 + 轻微“扭转”朝开阔方向 - # (这里保持你原逻辑,但数值上更稳一点) - for d, th in zip(distances, angles): - d = float(d) - th = float(th) - if d < cfg.obstacle_threshold: - ratio = 1.0 - d / cfg.obstacle_threshold - strength = cfg.k_repel * (ratio * ratio) - vx -= strength * np.cos(th) - vy -= strength * np.sin(th) - elif d < cfg.safe_distance: - ratio = (cfg.safe_distance - d) / (cfg.safe_distance - cfg.obstacle_threshold) - strength = cfg.k_repel * 0.15 * ratio - wz += strength * float(np.clip(best_theta - th, -1.0, 1.0)) - - # 绕行:朝最开阔方向“推一把” - bypass = 0.35 - vx += bypass * np.cos(best_theta) - vy += bypass * np.sin(best_theta) - wz += 0.4 * best_theta - - return vx, vy, wz, min_d - - def blend_and_clamp( vx_a: float, vy_a: float, wz_a: float, vx_g: float, vy_g: float, wz_g: float, dist_to_goal: float, stuck: bool, - cfg: NavCfg + cfg: NavCfg, + dists: Optional[np.ndarray] = None, + angles: Optional[np.ndarray] = None, ): vx = vx_a + vx_g vy = vy_a + vy_g @@ -266,6 +387,22 @@ def blend_and_clamp( lin = float(np.hypot(vx, vy)) + # 前方扇区限速:只用前方 ±20° 最小距离 + if dists is not None and angles is not None and len(dists) > 0: + front = np.abs(angles) < np.deg2rad(20) + if np.any(front): + d_front = float(np.min(dists[front])) + ratio = np.clip( + (d_front - cfg.obstacle_threshold) / max(cfg.safe_distance - cfg.obstacle_threshold, 1e-6), + 0.0, 1.0 + ) + speed_limit = ratio * cfg.max_linear + if lin > speed_limit + 1e-6: + s = speed_limit / lin + vx *= s + vy *= s + lin = float(np.hypot(vx, vy)) + # 强制最小速度:未到目标且输出太小 if dist_to_goal > cfg.goal_threshold and lin < cfg.min_linear: if lin > 1e-6: @@ -324,7 +461,8 @@ def main(): hide_goal_marker() - scanner = RayScanner(CFG.num_rays, CFG.ray_angle_span) + hfov_rad = np.deg2rad(CFG.hfov_deg) + depth_sim = DepthSimulator(CFG.num_sectors, hfov_rad) vel_filter = VelocityFilter(alpha=CFG.ema_alpha, max_dv=CFG.max_dv, max_dw=CFG.max_dw) goals: List[Tuple[float, float]] = [] @@ -338,7 +476,7 @@ def main(): add_goal_camera = False print("=" * 60) - print("MuJoCo 全向导航:射线扫描 + 避障 + 目标点") + print("MuJoCo 全向导航:仿真深度图 + DDA 避障 + 目标点") print(f"G: 车头前方 {CFG.goal_ahead_dist:.1f}m | C: 相机视线落点 | 目标边界 R<{CFG.goal_max_r:.1f}m") print("ESC 退出") print("=" * 60) @@ -409,8 +547,12 @@ def main(): set_goal_marker(goal_x, goal_y) - # ========== scan ========== - distances, angles = scanner.scan(model, data, body_id, site_id) + # ========== 仿真深度图 → 扇区 → DDA ========== + depth_img = depth_sim.render(model, data, body_id, site_id) + dists, angles = depth_to_sectors( + depth_img, hfov_rad, CFG.num_sectors, + CFG.depth_z_min, CFG.depth_z_max, CFG.depth_percentile + ) # ========== stuck detect ========== moved = float(np.hypot(x - last_x, y - last_y)) @@ -422,9 +564,12 @@ def main(): stuck = stuck_cnt > CFG.stuck_steps # ========== velocities ========== - vx_a, vy_a, wz_a, _min_d = obstacle_avoidance_vel(distances, angles, CFG) vx_g, vy_g, wz_g = goal_attraction_vel(x, y, yaw, goal_x, goal_y, CFG) - vx_b, vy_b, wz_b = blend_and_clamp(vx_a, vy_a, wz_a, vx_g, vy_g, wz_g, dist_to_goal, stuck, CFG) + vx_a, vy_a, wz_a = dda_avoidance_from_depth(dists, angles, vx_g, vy_g, CFG) + vx_b, vy_b, wz_b = blend_and_clamp( + vx_a, vy_a, wz_a, vx_g, vy_g, wz_g, dist_to_goal, stuck, CFG, + dists=dists, angles=angles + ) # ========== filter ========== vel_filter.max_dv = CFG.max_dv_stuck if stuck else CFG.max_dv