Files
daohang/scripts/run_simulation.py
2026-02-21 10:33:09 +08:00

449 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
MuJoCo 全向小车导航(前向射线扫描 + 势场避障 + 目标点导航)
- vx, vy, wz 全向移动(车体系输出 -> 世界系执行器)
- mj_ray 扇形前向扫描
- 避障:排斥 + 朝开阔方向绕行
- 卡住检测:连续位移太小 -> 强制脱困策略
"""
from __future__ import annotations
import os
from dataclasses import dataclass
from typing import Optional, Tuple, List
import mujoco
import mujoco.viewer
import numpy as np
SCENE_PATH = os.path.join(os.path.dirname(__file__), "..", "mujoco_scenes", "cylinder_obstacles.xml")
# ===================== 参数配置 =====================
@dataclass
class NavCfg:
# ray scan
num_rays: int = 19
ray_angle_span: float = np.pi # -span/2 ~ +span/2
# distances
obstacle_threshold: float = 0.45
safe_distance: float = 0.65
goal_threshold: float = 0.20
# limits
max_linear: float = 0.60
max_angular: float = 0.80
min_linear: float = 0.18 # 防止“只转不走”或输出趋零卡死
# gains
k_attract: float = 1.0
k_repel: float = 0.6
k_yaw_goal: float = 0.8
# stuck detect
stuck_thresh: float = 0.03
stuck_steps: int = 100
# goal set
goal_ahead_dist: float = 2.0
goal_max_r: float = 5.0
# output smoothing
ema_alpha: float = 0.70
max_dv: float = 0.18
max_dw: float = 0.20
max_dv_stuck: float = 0.25
max_dw_stuck: float = 0.28
CFG = NavCfg()
# ===================== 工具函数 =====================
def clamp_goal(gx: float, gy: float, max_r: float) -> Tuple[float, float]:
r = float(np.hypot(gx, gy)) + 1e-9
if r > max_r:
s = max_r / r
return float(gx * s), float(gy * s)
return float(gx), float(gy)
def wrap_pi(a: float) -> float:
# 更稳定的 [-pi, pi]
return float(np.arctan2(np.sin(a), np.cos(a)))
def get_agent_state(data: mujoco.MjData) -> Tuple[float, float, float]:
# 假设 qpos: x, y, yaw 在 0,1,2
qpos = data.qpos
return float(qpos[0]), float(qpos[1]), float(qpos[2])
def goal_from_camera(viewer_handle) -> Optional[Tuple[float, float]]:
"""相机视线与 z=0 平面交点(旋转相机后按 C"""
cam = viewer_handle.cam
lookat = np.array(cam.lookat, dtype=float)
dist = float(cam.distance)
az = np.deg2rad(float(cam.azimuth))
el = np.deg2rad(float(cam.elevation))
# MuJoCo viewer 的相机方向约定:这里延续你原实现
fx = np.cos(el) * np.sin(az)
fy = -np.cos(el) * np.cos(az)
fz = np.sin(el)
cam_pos = lookat - dist * np.array([fx, fy, fz], dtype=float)
if abs(fz) < 1e-8:
return None
t = -cam_pos[2] / fz
if t < 0:
return None
pt = cam_pos + t * np.array([fx, fy, fz], dtype=float)
return float(pt[0]), float(pt[1])
# ===================== 射线扫描(预计算优化) =====================
class RayScanner:
"""
预计算扇形扫描的 body-frame 方向(基于 forward=[1,0,0], left=[0,1,0]
每帧用 body rotation (xmat) 转到 world减少每帧 cos/sin + 临时数组。
"""
def __init__(self, num_rays: int, angle_span: float):
self.num_rays = int(num_rays)
self.angle_span = float(angle_span)
self.angles = np.linspace(-self.angle_span / 2, self.angle_span / 2, self.num_rays)
c = np.cos(self.angles)
s = np.sin(self.angles)
# body-frame ray directions: dir_b = c*[1,0,0] + s*[0,1,0]
# shape (N,3)
self.ray_dirs_body = np.stack([c, s, np.zeros_like(c)], axis=1).astype(np.float64)
# buffers to avoid per-step alloc
self.distances = np.full(self.num_rays, 10.0, dtype=np.float64)
self._geomid = np.array([-1], dtype=np.int32)
def scan(
self,
model: mujoco.MjModel,
data: mujoco.MjData,
body_id: int,
site_id: int,
) -> Tuple[np.ndarray, np.ndarray]:
site_xpos = data.site_xpos[site_id] # view, no copy
xmat = np.array(data.xmat[body_id], dtype=np.float64).reshape(3, 3)
# world dirs: dir_w = xmat @ dir_b
# shape (N,3)
ray_dirs_world = (xmat @ self.ray_dirs_body.T).T
# normalize (理论上已单位,但数值上更稳一点)
norms = np.linalg.norm(ray_dirs_world, axis=1) + 1e-9
ray_dirs_world /= norms[:, None]
self.distances.fill(10.0)
for i in range(self.num_rays):
d = mujoco.mj_ray(
model, data,
site_xpos, ray_dirs_world[i],
None, 1, body_id, self._geomid
)
if d >= 0:
self.distances[i] = float(d)
return self.distances, self.angles
# ===================== 速度融合与滤波 =====================
class VelocityFilter:
"""EMA + 速率限制,输出更平滑,避免 ctrl 抖动。"""
def __init__(self, alpha: float, max_dv: float, max_dw: float):
self.alpha = float(alpha)
self.max_dv = float(max_dv)
self.max_dw = float(max_dw)
self.vx = 0.0
self.vy = 0.0
self.wz = 0.0
def reset(self):
self.vx = self.vy = self.wz = 0.0
def update(self, vx_cmd: float, vy_cmd: float, wz_cmd: float) -> Tuple[float, float, float]:
vx_f = self.alpha * self.vx + (1.0 - self.alpha) * vx_cmd
vy_f = self.alpha * self.vy + (1.0 - self.alpha) * vy_cmd
wz_f = self.alpha * self.wz + (1.0 - self.alpha) * wz_cmd
dvx = float(np.clip(vx_f - self.vx, -self.max_dv, self.max_dv))
dvy = float(np.clip(vy_f - self.vy, -self.max_dv, self.max_dv))
dwz = float(np.clip(wz_f - self.wz, -self.max_dw, self.max_dw))
self.vx += dvx
self.vy += dvy
self.wz += dwz
return self.vx, self.vy, self.wz
def goal_attraction_vel(x: float, y: float, yaw: float, goal_x: float, goal_y: float, cfg: NavCfg):
dx = goal_x - x
dy = goal_y - y
dist = float(np.hypot(dx, dy)) + 1e-6
# 世界系单位方向
gx_w = dx / dist
gy_w = dy / dist
# 转车体系(只关心 yaw
c = np.cos(-yaw)
s = np.sin(-yaw)
gx_b = c * gx_w - s * gy_w
gy_b = s * gx_w + c * gy_w
# 线速度(随距离平滑)
scale = float(np.tanh(dist) * 0.8 + 0.2)
vx = cfg.k_attract * scale * gx_b
vy = cfg.k_attract * scale * gy_b
# 角速度(弱对准)
target_yaw = float(np.arctan2(dy, dx))
yaw_err = wrap_pi(target_yaw - yaw)
wz = cfg.k_yaw_goal * float(np.tanh(yaw_err))
return vx, vy, wz
def obstacle_avoidance_vel(distances: np.ndarray, angles: np.ndarray, cfg: NavCfg):
min_d = float(np.min(distances))
if min_d > cfg.obstacle_threshold:
return 0.0, 0.0, 0.0, min_d
# 最开阔方向
safe_i = int(np.argmax(distances))
best_theta = float(angles[safe_i])
vx = 0.0
vy = 0.0
wz = 0.0
# 排斥 + 轻微“扭转”朝开阔方向
# (这里保持你原逻辑,但数值上更稳一点)
for d, th in zip(distances, angles):
d = float(d)
th = float(th)
if d < cfg.obstacle_threshold:
ratio = 1.0 - d / cfg.obstacle_threshold
strength = cfg.k_repel * (ratio * ratio)
vx -= strength * np.cos(th)
vy -= strength * np.sin(th)
elif d < cfg.safe_distance:
ratio = (cfg.safe_distance - d) / (cfg.safe_distance - cfg.obstacle_threshold)
strength = cfg.k_repel * 0.15 * ratio
wz += strength * float(np.clip(best_theta - th, -1.0, 1.0))
# 绕行:朝最开阔方向“推一把”
bypass = 0.35
vx += bypass * np.cos(best_theta)
vy += bypass * np.sin(best_theta)
wz += 0.4 * best_theta
return vx, vy, wz, min_d
def blend_and_clamp(
vx_a: float, vy_a: float, wz_a: float,
vx_g: float, vy_g: float, wz_g: float,
dist_to_goal: float,
stuck: bool,
cfg: NavCfg
):
vx = vx_a + vx_g
vy = vy_a + vy_g
wz = wz_a + wz_g
lin = float(np.hypot(vx, vy))
# 强制最小速度:未到目标且输出太小
if dist_to_goal > cfg.goal_threshold and lin < cfg.min_linear:
if lin > 1e-6:
s = cfg.min_linear / lin
vx *= s
vy *= s
else:
# 彻底趋零时:优先用目标方向(若也为零就不动)
ax = vx_g + vx_a
ay = vy_g + vy_a
an = float(np.hypot(ax, ay)) + 1e-9
vx = cfg.min_linear * ax / an
vy = cfg.min_linear * ay / an
# 卡住脱困:额外加一点“侧向+偏航”,更容易摆脱夹角/凹障碍
if stuck:
# 让它更愿意横移并转一点(避免只顶着障碍)
vx *= 0.85
vy = vy * 1.15 + 0.06 * np.sign(vy if abs(vy) > 1e-3 else 1.0)
wz += 0.10 * np.sign(wz if abs(wz) > 1e-3 else 1.0)
# 限幅
lin = float(np.hypot(vx, vy))
if lin > cfg.max_linear:
s = cfg.max_linear / lin
vx *= s
vy *= s
wz = float(np.clip(wz, -cfg.max_angular, cfg.max_angular))
return vx, vy, wz
# ===================== 主程序 =====================
def main():
model = mujoco.MjModel.from_xml_path(SCENE_PATH)
data = mujoco.MjData(model)
body_id = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_BODY, "cylinder_agent")
site_id = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_SITE, "ray_origin")
# actuators
act_vel_x = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_ACTUATOR, "vel_x")
act_vel_y = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_ACTUATOR, "vel_y")
act_vel_yaw = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_ACTUATOR, "vel_yaw")
# goal marker joint
goal_joint_id = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_JOINT, "goal_joint")
goal_qposadr = int(model.jnt_qposadr[goal_joint_id])
def hide_goal_marker():
data.qpos[goal_qposadr: goal_qposadr + 3] = [-99.0, -99.0, 0.1]
def set_goal_marker(gx: float, gy: float):
data.qpos[goal_qposadr] = gx
data.qpos[goal_qposadr + 1] = gy
data.qpos[goal_qposadr + 2] = 0.15
hide_goal_marker()
scanner = RayScanner(CFG.num_rays, CFG.ray_angle_span)
vel_filter = VelocityFilter(alpha=CFG.ema_alpha, max_dv=CFG.max_dv, max_dw=CFG.max_dw)
goals: List[Tuple[float, float]] = []
goal_active = False
last_x, last_y = 0.0, 0.0
stuck_cnt = 0
# key flags
add_goal_ahead = False
add_goal_camera = False
print("=" * 60)
print("MuJoCo 全向导航:射线扫描 + 避障 + 目标点")
print(f"G: 车头前方 {CFG.goal_ahead_dist:.1f}m | C: 相机视线落点 | 目标边界 R<{CFG.goal_max_r:.1f}m")
print("ESC 退出")
print("=" * 60)
def key_cb(keycode: int):
nonlocal add_goal_ahead, add_goal_camera
if keycode == 71: # G
add_goal_ahead = True
elif keycode == 67: # C
add_goal_camera = True
def stop_output():
data.ctrl[act_vel_x] = 0.0
data.ctrl[act_vel_y] = 0.0
data.ctrl[act_vel_yaw] = 0.0
with mujoco.viewer.launch_passive(model, data, key_callback=key_cb) as viewer:
while viewer.is_running():
mujoco.mj_forward(model, data)
# ========== goal set ==========
nonlocal_vars = None # 仅为了强调这里不用闭包 nonlocal
if add_goal_ahead:
add_goal_ahead = False
x, y, yaw = get_agent_state(data)
gx = x + CFG.goal_ahead_dist * float(np.cos(yaw))
gy = y + CFG.goal_ahead_dist * float(np.sin(yaw))
gx, gy = clamp_goal(gx, gy, CFG.goal_max_r)
goals[:] = [(gx, gy)]
goal_active = True
vel_filter.reset()
print(f" 目标: ({gx:.2f}, {gy:.2f}) [G=车头前方]")
elif add_goal_camera:
add_goal_camera = False
g = goal_from_camera(viewer)
if g is None:
print(" [C] 相机未指向地面,请调整视角后重试")
else:
gx, gy = clamp_goal(g[0], g[1], CFG.goal_max_r)
goals[:] = [(gx, gy)]
goal_active = True
vel_filter.reset()
print(f" 目标: ({gx:.2f}, {gy:.2f}) [C=相机视线]")
if not goal_active or not goals:
stop_output()
vel_filter.reset()
hide_goal_marker()
mujoco.mj_step(model, data)
viewer.sync()
continue
# ========== state ==========
x, y, yaw = get_agent_state(data)
goal_x, goal_y = goals[0]
dist_to_goal = float(np.hypot(goal_x - x, goal_y - y))
if dist_to_goal < CFG.goal_threshold:
print(" 已到达,停止。按 G / C 设置新目标")
goal_active = False
goals.clear()
vel_filter.reset()
stop_output()
hide_goal_marker()
mujoco.mj_step(model, data)
viewer.sync()
continue
set_goal_marker(goal_x, goal_y)
# ========== scan ==========
distances, angles = scanner.scan(model, data, body_id, site_id)
# ========== stuck detect ==========
moved = float(np.hypot(x - last_x, y - last_y))
if moved < CFG.stuck_thresh and dist_to_goal > CFG.goal_threshold:
stuck_cnt += 1
else:
stuck_cnt = 0
last_x, last_y = x, y
stuck = stuck_cnt > CFG.stuck_steps
# ========== velocities ==========
vx_a, vy_a, wz_a, _min_d = obstacle_avoidance_vel(distances, angles, CFG)
vx_g, vy_g, wz_g = goal_attraction_vel(x, y, yaw, goal_x, goal_y, CFG)
vx_b, vy_b, wz_b = blend_and_clamp(vx_a, vy_a, wz_a, vx_g, vy_g, wz_g, dist_to_goal, stuck, CFG)
# ========== filter ==========
vel_filter.max_dv = CFG.max_dv_stuck if stuck else CFG.max_dv
vel_filter.max_dw = CFG.max_dw_stuck if stuck else CFG.max_dw
vx, vy, wz = vel_filter.update(vx_b, vy_b, wz_b)
# ========== body -> world for actuator ==========
c = float(np.cos(yaw))
s = float(np.sin(yaw))
vx_w = vx * c - vy * s
vy_w = vx * s + vy * c
data.ctrl[act_vel_x] = vx_w
data.ctrl[act_vel_y] = vy_w
data.ctrl[act_vel_yaw] = wz
mujoco.mj_step(model, data)
viewer.sync()
if __name__ == "__main__":
main()