#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 3D 场景可视化脚本 功能: 1. RANSAC 平面拟合地面和天花板 2. 基于地面拟合结果过滤门(门底部距地面>0.1 米过滤) 3. 将门和拍摄点位平移到天花板高度进行可视化 4. 入户门用特殊颜色标记 5. 输出 PLY 文件,支持在 3D 查看器中查看 """ import os import sys import json import argparse from pathlib import Path from dataclasses import dataclass from typing import Dict, List, Optional, Tuple import cv2 import numpy as np import open3d as o3d from tqdm import tqdm from ultralytics import YOLOE from camera_spherical import Intrinsic_Spherical_NP from scipy.spatial import Delaunay # ============================================================================ # 数据类 # ============================================================================ @dataclass class PoseData: """位姿数据""" uuid: str rotation: Dict[str, float] # w, x, y, z translation: Dict[str, float] # x, y, z @dataclass class Door3D: """3D 门实例""" id: int center: np.ndarray # 中心坐标 [x, y, z] bbox_8points: np.ndarray # 8 个角点 [[x,y,z], ...] (8x3) source_detections: List[Dict] # 来源检测信息 source_uuid: Optional[str] = None # 来源点位的 UUID(用于获取 puck_z) # ============================================================================ # 场景可视化处理器 # ============================================================================ class SceneVisualizer: """ 场景 3D 可视化处理器: - RANSAC 平面拟合地面/天花板 - 门过滤(基于地面距离) - 门和点位平移到天花板高度 - 输出可视化 PLY 文件 """ # 门检测类别 DOOR_CLASSES = [ "door", "indoor door", "exterior door", "wooden door", "metal door", "glass door", "double door", "single door", "open door", "closed door" ] def __init__( self, scene_folder: str, model_path: str = "yoloe-26x-seg.pt", conf: float = 0.35, iou: float = 0.45, voxel_size: float = 0.03, depth_scale: float = 256.0, depth_min: float = 0.02, # 地面/天花板拟合参数 ground_ransac_dist: float = 0.05, # RANSAC 内点距离阈值 ground_ransac_prob: float = 0.99, # RANSAC 置信度 ceiling_percentile: float = 95.0, # 天花板点分位数 # 门过滤参数 door_ground_dist: float = 0.1, # 门底部距地面最大距离 door_height_min: float = 1.0, door_height_max: float = 3.0, door_width_min: float = 0.3, door_width_max: float = 3.0, door_thickness_max: float = 0.5, # 3D 门合并参数 merge_iou_thresh: float = 0.1, # 降低 IoU 阈值,增加合并敏感度 merge_dist_thresh: float = 0.3, # 中心距离阈值 (米) - 同一物理门的检测应该很近 merge_z_overlap_thresh: float = 0.5, # Z 方向重叠度阈值 (0-1,0=不重叠,1=完全重叠) # 入户门参数 entrance_method: str = "score", # "score" 或 "json" entrance_json_path: Optional[str] = None, ): """ 初始化场景可视化处理器 Args: scene_folder: 场景文件夹路径 model_path: YOLOE 模型路径 conf: 检测置信度阈值 iou: NMS IoU 阈值 voxel_size: 点云体素下采样尺寸 depth_scale: 深度图缩放因子 depth_min: 最小有效深度 ground_ransac_dist: RANSAC 地面拟合距离阈值 ground_ransac_prob: RANSAC 置信度 ceiling_percentile: 天花板点分位数(取最高的 x% 点拟合天花板) door_ground_dist: 门底部距地面最大距离(超过则过滤) door_height_min/max: 门高度范围 door_width_min/max: 门宽度范围 door_thickness_max: 门最大厚度 merge_iou_thresh: 3D 门合并 IoU 阈值 merge_dist_thresh: 3D 门合并中心距离阈值 entrance_method: 入户门判定方法 ("score" 或 "json") entrance_json_path: 入户门 JSON 路径(当 method=json 时使用) """ self.scene_folder = Path(scene_folder) self.conf = conf self.iou = iou self.voxel_size = voxel_size self.depth_scale = depth_scale self.depth_min = depth_min # 地面/天花板参数 self.ground_ransac_dist = ground_ransac_dist self.ground_ransac_prob = ground_ransac_prob self.ceiling_percentile = ceiling_percentile # 门过滤参数 self.door_ground_dist = door_ground_dist self.door_height_min = door_height_min self.door_height_max = door_height_max self.door_width_min = door_width_min self.door_width_max = door_width_max self.door_thickness_max = door_thickness_max # 3D 门合并参数 self.merge_iou_thresh = merge_iou_thresh self.merge_dist_thresh = merge_dist_thresh self.merge_z_overlap_thresh = merge_z_overlap_thresh # 地面 Z 值向下兼容:合并时取最低的地面 Z 值 self.ground_z_fallback_low = True # 入户门参数 self.entrance_method = entrance_method self.entrance_json_path = entrance_json_path self.entrance_door_id = None self.estimated_entrance_position = None # 当没有检测到门时的估计位置 # 地面参数(在拟合时设置) self.ground_d = None self.ground_z_from_puck = None # 从 puck 参数估计的地面 Z # 子目录 self.rgb_folder = self.scene_folder / "pano_img" self.depth_folder = self.scene_folder / "depth_img" self.pose_file = self.scene_folder / "vision.txt" # 输出目录 self.output_folder = self.scene_folder / "output" # 加载位姿 self.poses = self._load_poses() # 加载 YOLOE 模型 print(f"加载 YOLOE 模型:{model_path}") self.model = YOLOE(model_path) self.model.set_classes(self.DOOR_CLASSES) print(f"检测类别:{self.DOOR_CLASSES}") def _load_poses(self) -> Dict[str, PoseData]: """从 vision.txt 加载位姿信息""" if not self.pose_file.exists(): raise FileNotFoundError(f"位姿文件不存在:{self.pose_file}") with open(self.pose_file, 'r') as f: data = json.load(f) poses = {} self.puck_z_dict = {} # 保存每个点位的 puck_z for loc in data.get('sweepLocations', []): uuid = str(loc['uuid']) poses[uuid] = PoseData( uuid=uuid, rotation=loc['pose']['rotation'], translation=loc['pose']['translation'] ) # 保存每个点位的 puck_z if 'puck' in loc and 'z' in loc['puck']: self.puck_z_dict[uuid] = loc['puck']['z'] print(f"加载 {len(poses)} 个拍摄点位") # 计算整体地面 Z(用于没有 puck 数据时的回退) if self.puck_z_dict: puck_z_values = list(self.puck_z_dict.values()) self.ground_z_from_puck = np.median(puck_z_values) print(f"从 puck 参数估计地面 Z (中位数): {self.ground_z_from_puck:.4f}m") print(f"puck_z 范围:[{min(puck_z_values):.4f}, {max(puck_z_values):.4f}]") else: self.ground_z_from_puck = None print("⚠️ 未找到 puck 参数,将使用 RANSAC 拟合地面") return poses def _build_pose_matrix(self, pose: PoseData) -> np.ndarray: """构建 4x4 位姿变换矩阵""" R = o3d.geometry.get_rotation_matrix_from_quaternion( np.array([pose.rotation['w'], pose.rotation['x'], pose.rotation['y'], pose.rotation['z']]) ) t = np.array([ pose.translation['x'], pose.translation['y'], pose.translation['z'] ]) T = np.eye(4) T[:3, :3] = R T[:3, 3] = t return T def _mask_to_3d_points( self, mask: np.ndarray, depth: np.ndarray, pose_matrix: np.ndarray ) -> Optional[np.ndarray]: """ 将 2D mask 映射到世界坐标系 3D 点 Args: mask: 二值 mask (H, W) depth: 深度图 (H, W) pose_matrix: 4x4 位姿矩阵 Returns: 世界坐标系下的 3D 点 (N, 3) """ H, W = depth.shape sph = Intrinsic_Spherical_NP(W, H) # 获取 mask 内的像素 ys, xs = np.where(mask > 0) if len(xs) == 0: return None # 有效深度掩码 valid = depth[ys, xs] > self.depth_min if not np.any(valid): return None xs, ys = xs[valid], ys[valid] depths = depth[ys, xs] # 计算方向向量 bx, by, bz = sph.bearing([xs.astype(np.float64), ys.astype(np.float64)]) bx, by, bz = np.array(bx), np.array(by), np.array(bz) # 相机坐标系 pts_cam = np.stack([bx * depths, by * depths, bz * depths], axis=1) # Z 轴 180 度翻转 R_z180 = np.diag([-1.0, -1.0, 1.0]) pts_cam = pts_cam @ R_z180.T # 世界坐标系 pts_w = (pose_matrix[:3, :3] @ pts_cam.T).T + pose_matrix[:3, 3] return pts_w def _extract_mask_contours(self, masks) -> Tuple[List[List[List[float]]], List[np.ndarray]]: """ 从 YOLOE mask 结果提取轮廓 Args: masks: YOLOE masks (H, W, N) Returns: (轮廓列表,对应 mask 数组) """ contours = [] mask_arrays = [] if masks is None: return contours, mask_arrays masks_np = masks.cpu().numpy() for i in range(masks_np.shape[0]): mask = masks_np[i] # 二值化 mask_bin = (mask > 0.5).astype(np.uint8) * 255 # 提取轮廓 cnts, _ = cv2.findContours(mask_bin, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) if not cnts: continue # 只保留面积最大的轮廓 largest = max(cnts, key=cv2.contourArea) if len(largest) >= 3: # 简化轮廓 epsilon = 0.02 * cv2.arcLength(largest, True) approx = cv2.approxPolyDP(largest, epsilon, True) contour = approx.reshape(-1, 2).astype(float).tolist() contours.append(contour) mask_arrays.append((mask_bin > 0).astype(np.uint8)) else: mask_arrays.append((mask_bin > 0).astype(np.uint8)) return contours, mask_arrays def detect_single_image( self, img_path: str, depth: np.ndarray, pose_matrix: np.ndarray, save_path: Optional[str] = None ) -> Tuple[List[np.ndarray], List[float]]: """ 检测单张图像并提取 mask 3D 点 Returns: (mask_3d_points 列表,scores 列表) """ results = self.model.predict( img_path, imgsz=(1024, 2048), conf=self.conf, iou=self.iou, max_det=50, augment=True, retina_masks=True, half=False, verbose=False, ) result = results[0] mask_3d_points = [] scores = [] if result.masks is not None: masks = result.masks.data contours, mask_arrays = self._extract_mask_contours(masks) scores = result.boxes.conf.cpu().numpy().tolist() H, W = depth.shape for mask_bin in mask_arrays: mask_resized = cv2.resize(mask_bin, (W, H), interpolation=cv2.INTER_NEAREST) pts_3d = self._mask_to_3d_points(mask_resized, depth, pose_matrix) if pts_3d is not None and len(pts_3d) > 10: mask_3d_points.append(pts_3d) if save_path: os.makedirs(os.path.dirname(save_path), exist_ok=True) result.save(save_path) return mask_3d_points, scores def _axis_aligned_bbox(self, points: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: """计算轴对齐包围盒 (min, max)""" lo = np.min(points, axis=0) hi = np.max(points, axis=0) return lo, hi def _filter_outliers(self, points: np.ndarray, std_thresh: float = 3.0) -> np.ndarray: """ 过滤 3D 点云中的离群点 Args: points: 3D 点 (N, 3) std_thresh: 标准差阈值(默认 3 倍标准差) Returns: 过滤后的点 """ if len(points) < 10: return points # 计算每个维度的均值和标准差 mean = np.mean(points, axis=0) std = np.std(points, axis=0) # 保留在 3 倍标准差内的点 mask = np.all(np.abs(points - mean) < std_thresh * std, axis=1) filtered = points[mask] if len(filtered) < len(points) * 0.5: # 如果过滤掉的点超过一半,返回原始点 return points return filtered def _filter_door_points_by_depth(self, points: np.ndarray) -> np.ndarray: """ 根据深度一致性过滤门的 3D 点 门的特点是:在门平面法向量方向上很薄 使用 PCA 找出最薄的方向,然后过滤掉超出阈值的点 Args: points: 3D 点 (N, 3) Returns: 过滤后的点 """ if len(points) < 50: return points # PCA 分析 centered = points - np.mean(points, axis=0) cov = np.cov(centered.T) eigenvalues, eigenvectors = np.linalg.eigh(cov) # 特征值从大到小排序 idx = np.argsort(eigenvalues)[::-1] eigenvalues = eigenvalues[idx] eigenvectors = eigenvectors[:, idx] # 最小特征值对应的特征向量是门的法向量(厚度方向) normal = eigenvectors[:, 2] # 最小特征值对应的方向 # 将点投影到法向量方向 projected = np.dot(points, normal) # 过滤掉超出 2 倍标准差的点 mean_proj = np.mean(projected) std_proj = np.std(projected) mask = np.abs(projected - mean_proj) < 2.0 * std_proj filtered = points[mask] if len(filtered) < len(points) * 0.5: return points return filtered def _compute_door_dimensions_pca(self, points: np.ndarray) -> Tuple[float, float, float]: """ 使用 PCA 计算门的实际尺寸 门的特点是:在一个方向上很薄(厚度),在另外两个方向上较大(高度和宽度) Args: points: 3D 点 (N, 3) Returns: (width, thickness, height) 门的宽度、厚度、高度 """ if len(points) < 10: size = points.max(axis=0) - points.min(axis=0) return size[0], size[1], size[2] # PCA 分析 centered = points - np.mean(points, axis=0) cov = np.cov(centered.T) eigenvalues, eigenvectors = np.linalg.eigh(cov) # 特征值从大到小排序 idx = np.argsort(eigenvalues)[::-1] eigenvalues = eigenvalues[idx] eigenvectors = eigenvectors[:, idx] # 将点投影到主成分方向 projected = centered @ eigenvectors # 计算每个方向的尺寸 dims = projected.max(axis=0) - projected.min(axis=0) # 门的特点:厚度方向应该是最小的维度 # 假设门的厚度是最小的维度 thickness = dims.min() height = dims.max() # 高度是最大的维度 width = dims.sum() - thickness - height # 宽度是中间的维度 return width, thickness, height def _bbox_8corners(self, bbox_min: np.ndarray, bbox_max: np.ndarray) -> np.ndarray: """从 bbox min/max 获取 8 个角点""" cx, cy, cz = bbox_min ex, ey, ez = bbox_max return np.array([ [cx, cy, cz], [ex, cy, cz], [ex, ey, cz], [cx, ey, cz], [cx, cy, ez], [ex, cy, ez], [ex, ey, ez], [cx, ey, ez], ]) def _bbox_iou_3d(self, b1, b2) -> float: """3D IoU 计算""" lo = np.maximum(b1[0], b2[0]) hi = np.minimum(b1[1], b2[1]) inter = np.prod(np.maximum(hi - lo, 0)) vol1 = np.prod(b1[1] - b1[0]) vol2 = np.prod(b2[1] - b2[0]) union = vol1 + vol2 - inter return inter / union if union > 0 else 0.0 def _merge_3d_doors(self, door_candidates: List[Dict]) -> List[Door3D]: """ 使用并查集合并 3D 门 合并策略: 1. 来自不同点位 Detection 才考虑合并(同一图像的可能是多个门) 2. 中心距离接近(同一物理门在不同视角下应该位置相近) 3. Z 方向高度重叠(同一门的高度范围应该相近) 4. IoU 或距离满足其一即可(放宽条件) 地面 Z 值向下兼容: - 对于不同点位的重叠或相邻很近的门,合并后地面的 Z 值以最低的为准 - 例如:a 点位地面 z=-1.45, b 点位地面 z=-1.5,合并后以 -1.5 为准 """ if not door_candidates: return [] n = len(door_candidates) if n == 1: d = door_candidates[0] return [Door3D( id=0, center=(d['bbox_min'] + d['bbox_max']) / 2, bbox_8points=self._bbox_8corners(d['bbox_min'], d['bbox_max']), source_detections=[d['source']], source_uuid=d.get('source_uuid') )] # 并查集 parent = list(range(n)) def find(x): if parent[x] != x: parent[x] = find(parent[x]) return parent[x] def union(x, y): px, py = find(x), find(y) if px != py: parent[px] = py # 构建连通关系 for i in range(n): for j in range(i + 1, n): ci = (door_candidates[i]['bbox_min'] + door_candidates[i]['bbox_max']) / 2 cj = (door_candidates[j]['bbox_min'] + door_candidates[j]['bbox_max']) / 2 dist = np.linalg.norm(ci - cj) # 检查是否来自同一张图像 same_image = door_candidates[i]['source']['image'] == door_candidates[j]['source']['image'] # 计算 Z 方向重叠度 z_min_i, z_max_i = door_candidates[i]['bbox_min'][2], door_candidates[i]['bbox_max'][2] z_min_j, z_max_j = door_candidates[j]['bbox_min'][2], door_candidates[j]['bbox_max'][2] z_overlap_min = max(z_min_i, z_min_j) z_overlap_max = min(z_max_i, z_max_j) z_intersection = max(0, z_overlap_max - z_overlap_min) z_union = max(z_max_i, z_max_j) - min(z_min_i, z_min_j) z_overlap_ratio = z_intersection / z_union if z_union > 0 else 0 # 计算 3D IoU iou = self._bbox_iou_3d( (door_candidates[i]['bbox_min'], door_candidates[i]['bbox_max']), (door_candidates[j]['bbox_min'], door_candidates[j]['bbox_max']) ) # 合并条件: # 1. 来自不同图像:距离近 OR IoU 高,且 Z 重叠 # 2. 来自同一图像:IoU 高且 Z 重叠(可能是同一个门在不同位置被检测到) if same_image: # 同一图像:IoU 高且 Z 重叠 should_merge = (iou > 0.05 and z_overlap_ratio > 0.5) else: # 不同图像:(距离近 OR IoU 高) 且 Z 重叠 should_merge = ( (dist < self.merge_dist_thresh or iou > self.merge_iou_thresh) and z_overlap_ratio > self.merge_z_overlap_thresh ) if should_merge: union(i, j) # 按连通分量分组 from collections import defaultdict groups = defaultdict(list) for i in range(n): groups[find(i)].append(door_candidates[i]) # 合并每个组 doors = [] for door_id, members in enumerate(groups.values()): if not members: continue # 原始合并逻辑:取 min 和 max # bbox_min = np.min([m['bbox_min'] for m in members], axis=0) # bbox_max = np.max([m['bbox_max'] for m in members], axis=0) # 地面 Z 值向下兼容: # 1. X, Y 方向保持不变(取最小/最大) # 2. Z 方向(地面)取最低值(最负的),门顶保持最高值 all_bbox_mins = [m['bbox_min'] for m in members] all_bbox_maxs = [m['bbox_max'] for m in members] # X, Y 方向:取最小/最大 merged_min_xy = np.min(all_bbox_mins, axis=0)[:2] # [x_min, y_min] merged_max_xy = np.max(all_bbox_maxs, axis=0)[:2] # [x_max, y_max] # Z 方向:地面 Z 向下兼容(取最小值/最负值),门顶取最大值 z_mins = [b[2] for b in all_bbox_mins] # 所有门的底部 Z z_maxs = [b[2] for b in all_bbox_maxs] # 所有门的顶部 Z # 地面 Z 值向下兼容:取最低的 Z 值(最负的值) merged_z_min = np.min(z_mins) merged_z_max = np.max(z_maxs) bbox_min = np.array([merged_min_xy[0], merged_min_xy[1], merged_z_min]) bbox_max = np.array([merged_max_xy[0], merged_max_xy[1], merged_z_max]) sources = [m['source'] for m in members] # 使用第一个成员的 source_uuid(同一门的多个检测来自相近位置) source_uuid = members[0].get('source_uuid') doors.append(Door3D( id=door_id, center=(bbox_min + bbox_max) / 2, bbox_8points=self._bbox_8corners(bbox_min, bbox_max), source_detections=sources, source_uuid=source_uuid )) return doors def _fit_ground_plane_ransac(self, pc: o3d.geometry.PointCloud) -> Tuple[np.ndarray, float]: """ 使用 RANSAC 拟合地面平面 策略: 1. 先用 Z 坐标最低的 5% 的点作为初始地面点 2. 对这些点使用 RANSAC 平面拟合 3. 确保法向量朝上(Z 轴正方向) Args: pc: 点云 Returns: (平面法向量,平面到原点距离 d,满足 ax+by+cz+d=0) """ print("\n=== RANSAC 地面拟合 ===") points = np.asarray(pc.points) # 方法 1:先用 Z 坐标最低的点作为初始种子 z_coords = points[:, 2] z_threshold = np.percentile(z_coords, 5) # 最低的 5% ground_candidates = points[z_coords <= z_threshold] print(f"地面候选点数量:{len(ground_candidates)} (Z <= {z_threshold:.3f}m)") if len(ground_candidates) < 100: print("⚠️ 地面候选点过少,使用全局 RANSAC") # 回退到全局 RANSAC plane_model, inliers = pc.segment_plane( distance_threshold=self.ground_ransac_dist, ransac_n=3, num_iterations=1000 ) else: # 对地面候选点使用 PCA 拟合平面 centered = ground_candidates - np.mean(ground_candidates, axis=0) cov = np.cov(centered.T) eigenvalues, eigenvectors = np.linalg.eigh(cov) # 最小特征值对应的特征向量是法向量 normal = eigenvectors[:, 0] # 最小特征值 mean_point = np.mean(ground_candidates, axis=0) # 平面方程:a(x-x0) + b(y-y0) + c(z-z0) = 0 # 即:ax + by + cz + d = 0, d = -(ax0 + by0 + cz0) a, b, c = normal d = -np.dot(normal, mean_point) plane_model = [a, b, c, d] inliers = list(range(len(ground_candidates))) a, b, c, d = plane_model normal = np.array([a, b, c]) # 确保法向量朝上(Z 轴正方向) if normal[2] < 0: normal = -normal d = -d inlier_ratio = len(inliers) / len(pc.points) print(f"地面平面方程:{normal[0]:.4f}x + {normal[1]:.4f}y + {normal[2]:.4f}z + {d:.4f} = 0") print(f"地面法向量:[{normal[0]:.4f}, {normal[1]:.4f}, {normal[2]:.4f}]") print(f"内点数量:{len(inliers)} / {len(pc.points)} ({inlier_ratio:.2%})") return normal, d def _fit_ceiling_plane(self, pc: o3d.geometry.PointCloud, ground_normal: np.ndarray, ground_d: float) -> Tuple[np.ndarray, float, float]: """ 拟合天花板平面 Args: pc: 点云 ground_normal: 地面法向量 ground_d: 地面平面方程的 d 参数 Returns: (天花板平面法向量,平面到原点距离 d, 地面到天花板距离) """ print("\n=== 天花板平面拟合 ===") points = np.asarray(pc.points) # 计算每个点沿法向量方向到地面的距离 # distance = p·n + d_ground distances_to_ground = np.dot(points, ground_normal) + ground_d # 过滤掉异常高点(高于 5 米的点可能是噪声) # 正常房间高度在 2.5-4 米之间 valid_mask = distances_to_ground < 5.0 valid_points = points[valid_mask] valid_distances = distances_to_ground[valid_mask] if len(valid_points) < 1000: print(f"⚠️ 有效点过少 ({len(valid_points)} 个),使用所有点") valid_points = points valid_distances = distances_to_ground # 取最高的点拟合天花板(85%-95% 分位数之间,避免噪声) height_percentiles = np.percentile(valid_distances, [85, 90, 95]) print(f"高度分位数:85%={height_percentiles[0]:.3f}m, 90%={height_percentiles[1]:.3f}m, 95%={height_percentiles[2]:.3f}m") # 使用 90%-95% 之间的点拟合天花板 ceiling_threshold = height_percentiles[1] # 90% 分位数 ceiling_mask = valid_distances >= ceiling_threshold ceiling_points = valid_points[ceiling_mask] if len(ceiling_points) < 100: print(f"⚠️ 天花板点过少 ({len(ceiling_points)} 个),降低阈值到 85%") ceiling_threshold = height_percentiles[0] ceiling_mask = valid_distances >= ceiling_threshold ceiling_points = valid_points[ceiling_mask] # 天花板法向量与地面相同(平行平面) ceiling_normal = ground_normal.copy() # 计算天花板到原点的距离 # d_ceiling = -mean(p·n) for ceiling points d_ceiling = -np.mean(np.dot(ceiling_points, ceiling_normal)) # 计算地面到天花板的距离(用天花板点的平均高度) floor_ceiling_dist = np.mean(valid_distances[ceiling_mask]) ceiling_cloud = o3d.geometry.PointCloud() ceiling_cloud.points = o3d.utility.Vector3dVector(ceiling_points) print(f"天花板点数量:{len(ceiling_points)}") print(f"天花板平面方程:{ceiling_normal[0]:.4f}x + {ceiling_normal[1]:.4f}y + {ceiling_normal[2]:.4f}z + {d_ceiling:.4f} = 0") print(f"地面到天花板距离:{floor_ceiling_dist:.3f} 米") return ceiling_normal, d_ceiling, floor_ceiling_dist def _point_to_plane_distance(self, point: np.ndarray, normal: np.ndarray, d: float) -> float: """计算点到平面的距离""" return abs(np.dot(point, normal) + d) def _denoise_xy_projection( self, points: np.ndarray, grid_size: float = 0.15, min_points_per_cell: int = 5 ) -> np.ndarray: """ 投影到 XY 平面后进行网格滤波去噪 原理: 1. 将点云投影到 XY 平面 2. 划分网格,统计每个网格内的点数 3. 只保留点数足够的网格中的点 4. 这样可以去除孤立的噪声点,同时保留建筑轮廓 Args: points: 3D 点 (N, 3) grid_size: 网格尺寸(米),默认 0.15m min_points_per_cell: 每个网格最少点数,低于此值的网格视为噪声 Returns: 去噪后的点 (M, 3) """ if len(points) < 10: return points xy = points[:, :2] z = points[:, 2] # 网格化 grid_coords = np.floor(xy / grid_size).astype(int) # 统计每个网格的点数 from collections import Counter, defaultdict cell_counts = Counter(map(tuple, grid_coords)) # 保留点数足够的网格 valid_cells = { cell for cell, count in cell_counts.items() if count >= min_points_per_cell } # 创建掩码 mask = np.array([tuple(gc) in valid_cells for gc in grid_coords]) if np.sum(mask) < len(points) * 0.3: # 如果过滤掉的点超过 70%,降低阈值 print(f"⚠️ 去噪过滤过多 ({np.sum(mask)}/{len(points)}),降低阈值重试") valid_cells = { cell for cell, count in cell_counts.items() if count >= max(1, min_points_per_cell - 2) } mask = np.array([tuple(gc) in valid_cells for gc in grid_coords]) filtered = points[mask] print(f"XY 投影去噪:{len(points)} → {len(filtered)} 点 ({len(filtered)/len(points)*100:.1f}%)") return filtered def _compute_xy_contour(self, points: np.ndarray) -> Tuple[np.ndarray, Optional[Delaunay]]: """ 计算点云 XY 投影的轮廓(凹包/Alpha Shape) Args: points: 3D 点 (N, 3),应该已经去过噪 Returns: (轮廓点集,Delaunay 三角网) 轮廓点集形状 (K, 2),Delaunay 三角网用于点在多边形内判断 """ from scipy.spatial import Delaunay if len(points) < 4: return points[:, :2], None xy = points[:, :2] # 使用 Alpha Shape 提取轮廓 alpha = 2.0 # alpha 半径,越大越接近凸包 # 计算 Delaunay 三角网 tri = Delaunay(xy) # 提取 Alpha Shape 边界 # 策略:移除边长超过阈值的三角形 centers = [] for i in range(tri.npoints): # 获取第 i 个三角形的三个顶点 idx = tri.simplices[i] p0, p1, p2 = xy[idx[0]], xy[idx[1]], xy[idx[2]] # 计算三角形外接圆半径 a = np.linalg.norm(p1 - p2) b = np.linalg.norm(p0 - p2) c = np.linalg.norm(p0 - p1) s = (a + b + c) / 2 area = np.sqrt(max(0, s * (s - a) * (s - b) * (s - c))) if area < 1e-10: continue R = (a * b * c) / (4 * area) if R < alpha: center = (p0 + p1 + p2) / 3 centers.append(center) if len(centers) < 3: # Alpha Shape 失败,回退到凸包 print("⚠️ Alpha Shape 失败,使用凸包") from scipy.spatial import ConvexHull hull = ConvexHull(xy) return xy[hull.vertices], tri # 从中心点集提取边界 centers = np.array(centers) from scipy.spatial import ConvexHull hull = ConvexHull(centers) contour = centers[hull.vertices] return contour, tri def _point_in_contour(self, point: np.ndarray, contour: np.ndarray, tri: Optional[Delaunay]) -> bool: """ 判断点是否在轮廓内 Args: point: 2D 点 [x, y] contour: 轮廓点集 (K, 2) tri: Delaunay 三角网(用于判断) Returns: 是否在轮廓内 """ if tri is None: # 没有三角网,使用射线法判断 from matplotlib.path import Path path = Path(contour) return path.contains_point(point) # 使用 Delaunay 三角网判断 # 如果点在凸包内,simplex 返回 >= 0 simplex = tri.find_simplex(point) return simplex >= 0 def _compute_door_boundary_score( self, door: Door3D, contour: np.ndarray, tri ) -> float: """ 计算门到轮廓边界的距离评分 入户门应该在建筑轮廓的边缘,距离边界越近评分越高 Args: door: 3D 门对象 contour: 轮廓点集 (K, 2) tri: Delaunay 三角网 Returns: 边界评分 (0-30 分) """ door_xy = door.center[:2] # 计算门中心到轮廓边界的最短距离 # 遍历轮廓的每条边 min_dist = float('inf') n = len(contour) for i in range(n): p1 = contour[i] p2 = contour[(i + 1) % n] dist = self._point_to_line_segment_distance(door_xy, p1, p2) min_dist = min(min_dist, dist) if min_dist == float('inf'): return 0.0 # 距离转评分(越近分数越高) # 0 米 = 30 分,2 米 = 0 分 score = max(0, 30 - min_dist * 15) return score def _point_to_line_segment_distance( self, point: np.ndarray, line_start: np.ndarray, line_end: np.ndarray ) -> float: """ 计算点到线段的最短距离 Args: point: 2D 点 [x, y] line_start: 线段起点 [x1, y1] line_end: 线段终点 [x2, y2] Returns: 最短距离 """ p = np.array(point) a = np.array(line_start) b = np.array(line_end) ab = b - a ap = p - a # 投影长度 t = np.dot(ap, ab) / np.dot(ab, ab) t = np.clip(t, 0, 1) # 投影点 proj = a + t * ab return np.linalg.norm(p - proj) def _filter_door_by_ground(self, door: Door3D, ground_normal: np.ndarray, ground_d: float) -> Tuple[bool, str]: """ 基于地面距离过滤门 Args: door: 3D 门对象 ground_normal: 地面法向量 ground_d: 地面平面方程的 d 参数 Returns: (是否通过,拒绝原因) """ # 计算门底部到地面的距离 # 门的底部是 bbox 中沿法向量方向最低的点 points = door.bbox_8points # 计算每个点到地面的距离 distances = np.array([self._point_to_plane_distance(p, ground_normal, ground_d) for p in points]) # 门底部距离 = 最小距离(最接近地面的点) door_bottom_dist = distances.min() if door_bottom_dist > self.door_ground_dist: return False, f"门底部距地面 {door_bottom_dist:.3f}m > {self.door_ground_dist}m" return True, "" def _filter_door_by_ground_puck(self, door: Door3D) -> Tuple[bool, str]: """ 基于 puck 参数检查门的地面距离 使用门对应来源点位的 puck_z Args: door: 3D 门对象 Returns: (是否通过,拒绝原因) """ # 门底部 Z 坐标(bbox 中 Z 最小的点) door_bottom_z = door.bbox_8points[:, 2].min() # 获取门对应来源点位的 puck_z puck_z = None if door.source_uuid and door.source_uuid in self.puck_z_dict: puck_z = self.puck_z_dict[door.source_uuid] elif self.ground_z_from_puck is not None: # 回退到中位数 puck_z = self.ground_z_from_puck if puck_z is None: return True, "" # 没有 puck 数据,不检查 # 门底部到 puck 地面的距离 dist = door_bottom_z - puck_z # 允许 0.2m 的误差(门可能离地有一点距离,或者有门槛) if abs(dist) > 0.2: return False, f"门底部距地面 {dist:.3f}m (puck_z={puck_z:.3f})" return True, "" def _filter_door_by_properties(self, door: Door3D) -> Tuple[bool, List[str]]: """根据物理特性过滤门 注意:Z 轴是高度方向(向上) 当前只检查尺寸是否为空,不对长宽高具体数值做限制 """ reasons = [] size = door.bbox_8points.max(axis=0) - door.bbox_8points.min(axis=0) height = size[2] # Z 方向是高度 width = max(size[0], size[1]) # 宽度是 X/Y 中较大的(门的平面方向) thickness = min(size[0], size[1]) # 厚度是 X/Y 中较小的(门的深度方向) # 只检查尺寸是否有效,不对具体数值做限制 if height <= 0: reasons.append(f"高度无效 ({height:.2f}m)") if width <= 0: reasons.append(f"宽度无效 ({width:.2f}m)") return len(reasons) == 0, reasons def _estimate_entrance_from_poses( self, pc: o3d.geometry.PointCloud, contour: Optional[np.ndarray] = None, tri = None ) -> Optional[np.ndarray]: """ 当没有检测到门时,基于点位信息估计入户门位置 核心假设: 1. 入户门在建筑边缘 → 点位到轮廓边界的距离越近越可能是入户门 2. 室内点位被墙壁遮挡 → 可见性不一致的点位可能在室内 3. 入户门通常在建筑一端 → 距离中心远的点位更可能是入户方向 评分策略: - 边界距离评分 (70%): 点位 XY 投影到轮廓边界的距离 - 可见性一致性评分 (过滤): 过滤掉明显在室内的点位 - 距离中心评分 (30%): 点位到几何中心的距离 Args: pc: 场景点云 contour: 轮廓点集 (可选,None 则内部计算) tri: Delaunay 三角网 (可选) Returns: 估计的入户门位置 (x, y, z),无法估计时返回 None """ if not self.poses: print("⚠️ 没有点位数据,无法估计入户门") return None print("\n=== 基于点位信息估计入户门 ===") # ========== 步骤 1:准备数据 ========== # 重新加载 vision.txt 获取完整的点位信息(包括 visibles) vision_file = self.scene_folder / "vision.txt" if not vision_file.exists(): print("⚠️ vision.txt 不存在,无法获取可见性信息") return None with open(vision_file, 'r') as f: vision_data = json.load(f) # 构建点位查找表 pose_lookup = {} for loc in vision_data.get('sweepLocations', []): uuid = str(loc['uuid']) pose_lookup[uuid] = { 'id': loc['id'], 'pose': loc['pose'], 'puck': loc.get('puck', {}), 'visibles': loc.get('visibles', []), 'position': np.array([ loc['pose']['translation']['x'], loc['pose']['translation']['y'], loc['pose']['translation']['z'] ]) } # 收集所有点位信息 pose_info = [] for uuid, data in pose_lookup.items(): pose_info.append({ 'uuid': uuid, 'id': data['id'], 'position': data['position'], 'position_xy': data['position'][:2], 'visibles': set(data['visibles']), 'puck_z': data['puck'].get('z', 0) }) # ========== 步骤 2:计算轮廓(用于边界距离评分) ========== if contour is None: # 使用 XY 投影去噪后的点云计算轮廓 points_xy_denoised = self._denoise_xy_projection( np.asarray(pc.points), grid_size=0.15, min_points_per_cell=5 ) contour, tri = self._compute_xy_contour(points_xy_denoised) print(f"轮廓点数量:{len(contour)}") # ========== 步骤 3:可见性一致性过滤 ========== # 过滤掉明显在室内的点位 # 规则:如果 A 对 B 可见,A 对 C 不可见,但 AC 距离 < AB 距离 → A 在室内(被墙遮挡) filtered_poses = [] filtered_out = [] for i, pose_a in enumerate(pose_info): is_indoor = False for j, pose_b in enumerate(pose_info): if i == j: continue # 检查 A 对 B 是否可见 b_id = pose_b['id'] a_visible_to_b = b_id in pose_a['visibles'] # 计算 AB 距离 dist_ab = np.linalg.norm(pose_a['position_xy'] - pose_b['position_xy']) # 检查其他点位 C for k, pose_c in enumerate(pose_info): if k == i or k == j: continue c_id = pose_c['id'] a_visible_to_c = c_id in pose_a['visibles'] dist_ac = np.linalg.norm(pose_a['position_xy'] - pose_c['position_xy']) # 规则:A 对 B 可见,A 对 C 不可见,但 AC < AB → A 在室内 if a_visible_to_b and not a_visible_to_c and dist_ac < dist_ab: is_indoor = True filtered_out.append((pose_a, f"遮挡不一致:可见 B({b_id}) 不可见 C({c_id}), 但 AC({dist_ac:.2f}) < AB({dist_ab:.2f})")) break if is_indoor: break if not is_indoor: filtered_poses.append(pose_a) print(f"\n可见性过滤:{len(pose_info)} → {len(filtered_poses)} 个点位") if filtered_out: print("被过滤点位 (可能在室内):") for p, reason in filtered_out[:5]: # 只显示前 5 个 print(f" 点位 {p['uuid']}: {reason}") # 如果没有点位通过过滤,回退到所有点位 if not filtered_poses: print("⚠️ 所有点位都被过滤,使用全部点位") filtered_poses = pose_info # ========== 步骤 4:计算几何中心 ========== all_positions_xy = np.array([p['position_xy'] for p in filtered_poses]) centroid = np.mean(all_positions_xy, axis=0) # ========== 步骤 5:计算每个点位的评分 ========== # 边界距离评分 (70%) + 距离中心评分 (30%) # 计算每个点到轮廓边界的距离 boundary_distances = [] for p in filtered_poses: dist = self._compute_distance_to_contour_boundary(p['position_xy'], contour) boundary_distances.append(dist) # 计算每个点到中心的距离 center_distances = np.linalg.norm(all_positions_xy - centroid, axis=1) # 归一化 max_boundary_dist = max(boundary_distances) if boundary_distances else 1.0 max_center_dist = center_distances.max() if center_distances.max() > 0 else 1.0 # 计算评分 print("\n点位评分详情:") best_score = -float('inf') best_pose = None for i, p in enumerate(filtered_poses): # 边界距离评分 (70%): 越靠近边界分越高 if max_boundary_dist > 0: boundary_score = (1 - boundary_distances[i] / max_boundary_dist) * 70 else: boundary_score = 35 # 所有点都在边界上 # 距离中心评分 (30%): 越远离中心分越高 if max_center_dist > 0: center_score = (center_distances[i] / max_center_dist) * 30 else: center_score = 15 total_score = boundary_score + center_score print(f" 点位 {p['uuid']} (ID={p['id']}): 边界={boundary_score:.1f} + 中心距={center_score:.1f} = {total_score:.1f} " f"(边界距离={boundary_distances[i]:.2f}m, 中心距={center_distances[i]:.2f}m)") if total_score > best_score: best_score = total_score best_pose = p print(f"\n选择点位 {best_pose['uuid']} (综合评分={best_score:.1f})") return best_pose['position'] def _compute_distance_to_contour_boundary( self, point_xy: np.ndarray, contour: np.ndarray ) -> float: """ 计算点到轮廓边界的最短距离 Args: point_xy: 2D 点 [x, y] contour: 轮廓点集 (K, 2) Returns: 最短距离 """ min_dist = float('inf') n = len(contour) for i in range(n): p1 = contour[i] p2 = contour[(i + 1) % n] dist = self._point_to_line_segment_distance(point_xy, p1, p2) min_dist = min(min_dist, dist) return min_dist def _compute_distance_to_pointcloud_boundary( self, point: np.ndarray, pc: o3d.geometry.PointCloud ) -> float: """ 计算点到点云边界的距离 Args: point: 3D 点 [x, y, z] pc: 点云 Returns: 到边界的距离 """ # 投影到 XY 平面 points_xy = np.asarray(pc.points)[:, :2] point_xy = point[:2] # 计算点云边界(凸包) from scipy.spatial import ConvexHull hull = ConvexHull(points_xy) hull_indices = hull.vertices hull_points = points_xy[hull_indices] # 计算点到凸包边界的最短距离 min_dist = float('inf') n = len(hull_points) for i in range(n): p1 = hull_points[i] p2 = hull_points[(i + 1) % n] dist = self._point_to_line_segment_distance(point_xy, p1, p2) min_dist = min(min_dist, dist) return min_dist def _identify_entrance_door( self, doors: List[Door3D], pc: o3d.geometry.PointCloud, contour: Optional[np.ndarray] = None, tri = None ) -> int: """ 识别入户门 ID Args: doors: 门列表 pc: 场景点云 contour: 点云轮廓(可选,用于边界评分) tri: Delaunay 三角网(可选,用于点在多边形内判断) Returns: 入户门在 doors 列表中的索引,-1 表示无法确定 """ if self.entrance_method == "json": # 从 JSON 文件读取入户门 ID if self.entrance_json_path is None: print("⚠️ entrance_json_path 未指定,无法使用 json 方法") return -1 with open(self.entrance_json_path, 'r') as f: data = json.load(f) entrance_door_id = data.get('entrance_door_id') if entrance_door_id is not None: self.entrance_door_id = entrance_door_id print(f"\n从 JSON 读取入户门 ID: {entrance_door_id}") # 找到对应的门(通过中心坐标匹配) if entrance_door_id is not None: # 尝试匹配 door_id for i, door in enumerate(doors): # 检查是否有 source 包含 door_id 信息 for src in door.source_detections: if src.get('door_id') == entrance_door_id: print(f"匹配到门 {i}") return i # 如果找不到,返回中心最接近 world_position 的门 world_position = data.get('world_position') if world_position is not None: wp = np.array(world_position) min_dist = float('inf') best_idx = -1 for i, door in enumerate(doors): dist = np.linalg.norm(door.center - wp) if dist < min_dist: min_dist = dist best_idx = i if best_idx >= 0: print(f"通过 world_position 匹配到门 {best_idx} (距离={min_dist:.3f}m)") return best_idx return -1 else: # 使用评分方法 if len(doors) == 0: # ========== 没有检测到门时的回退策略 ========== print("\n⚠️ 未检测到任何门,使用点位信息估计入户门位置") estimated_entrance = self._estimate_entrance_from_poses( pc=pc, contour=contour, tri=tri ) if estimated_entrance is not None: print(f"估计入户门位置:{estimated_entrance}") # 创建一个虚拟的入户门标记(不加入 doors 列表,仅用于可视化) self.estimated_entrance_position = estimated_entrance return -1 if len(doors) == 1: return 0 # 计算轮廓(用于边界评分) use_contour = False if contour is None: # 使用 XY 投影去噪后的点云计算轮廓 points_xy_denoised = self._denoise_xy_projection( np.asarray(pc.points), grid_size=0.15, min_points_per_cell=5 ) contour, tri = self._compute_xy_contour(points_xy_denoised) print(f"轮廓点数量:{len(contour)}") use_contour = True # 计算每个门的评分 all_centers = np.array([d.center for d in doors]) best_score = -1 best_idx = 0 print("\n入户门评分详情:") for i, door in enumerate(doors): size = door.bbox_8points.max(axis=0) - door.bbox_8points.min(axis=0) height = size[2] # Z 方向是高度 width = max(size[0], size[1]) # X/Y 中较大的是宽度 # 尺寸评分(理想高 2.1m 宽 1.0m) height_score = max(0, 15 - abs(height - 2.1) * 10) width_score = max(0, 15 - abs(width - 1.0) * 12) size_score = height_score + width_score # 边缘位置评分(基于到其他门的距离) dists_to_others = np.linalg.norm(all_centers - door.center, axis=1) avg_dist = dists_to_others.mean() edge_score = min(25, avg_dist * 10) # 多视角支持 source_count = len(door.source_detections) view_score = min(10, source_count * 3) # 边界评分(新增:入户门应该在建筑轮廓边缘) boundary_score = 0.0 if use_contour and contour is not None: boundary_score = self._compute_door_boundary_score(door, contour, tri) total = size_score + edge_score + view_score + boundary_score # 打印每个门的评分详情 print(f" 门{i}: 尺寸={size_score:.1f} + 边缘={edge_score:.1f} + 视角={view_score:.1f} + 边界={boundary_score:.1f} = {total:.1f}") if total > best_score: best_score = total best_idx = i print(f"\n入户门选择:门 {best_idx} (得分={best_score:.1f})") return best_idx def _rgb_depth_to_pointcloud( self, rgb: np.ndarray, depth: np.ndarray, pose_matrix: np.ndarray ) -> o3d.geometry.PointCloud: """将 RGB-D 转换为世界坐标系点云""" H, W = depth.shape sph = Intrinsic_Spherical_NP(W, H) px, py = np.meshgrid(np.arange(W), np.arange(H)) px_flat = px.flatten().astype(np.float64) py_flat = py.flatten().astype(np.float64) bx, by, bz = sph.bearing([px_flat, py_flat]) bx, by, bz = np.array(bx), np.array(by), np.array(bz) mask = depth.flatten() > self.depth_min d = depth.flatten()[mask] if len(d) == 0: return o3d.geometry.PointCloud() pts_cam = np.stack([bx[mask] * d, by[mask] * d, bz[mask] * d], axis=1) R_z180 = np.diag([-1.0, -1.0, 1.0]) pts_cam = pts_cam @ R_z180.T pts_w = (pose_matrix[:3, :3] @ pts_cam.T).T + pose_matrix[:3, 3] if rgb.shape[:2] != depth.shape: rgb_d = cv2.resize(rgb, (W, H), interpolation=cv2.INTER_LINEAR) else: rgb_d = rgb colors = rgb_d.reshape(-1, 3)[mask].astype(np.float64) / 255.0 pc = o3d.geometry.PointCloud() pc.points = o3d.utility.Vector3dVector(pts_w) pc.colors = o3d.utility.Vector3dVector(colors) return pc def _translate_point_to_ceiling( self, point: np.ndarray, floor_ceiling_dist: float, ground_normal: np.ndarray, ground_d: float ) -> np.ndarray: """ 将点沿法向量方向平移,平移距离 = 地面到天花板的距离 Args: point: 原始点 floor_ceiling_dist: 地面到天花板的距离 ground_normal: 地面法向量(单位向量) ground_d: 地面平面方程的 d Returns: 平移后的点 """ # 平移向量 = floor_ceiling_dist * normal # 这样门整体会向上移动层高的距离 # 门的底部会从地面移到天花板高度 # 门的顶部会在天花板以上(保持门的原始高度) translation = floor_ceiling_dist * ground_normal return point + translation def _create_door_visualization( self, door: Door3D, ground_normal: np.ndarray, ground_d: float, floor_ceiling_dist: float, is_entrance: bool = False, translate_to_ceiling: bool = True ) -> o3d.geometry.PointCloud: """ 创建门的可视化点云(带边框) Args: door: 3D 门对象 ground_normal: 地面法向量 ground_d: 地面平面方程的 d floor_ceiling_dist: 地面到天花板的距离 is_entrance: 是否入户门 translate_to_ceiling: 是否平移到天花板 Returns: 门点云(带颜色) """ # 门是平面矩形,bbox_8points 中: # - 索引 0-3:底面 4 个点(Z 最小,靠近地面) # - 索引 4-7:顶面 4 个点(Z 最大,靠近门顶) # 投影到天花板只需要 4 个点,取底面 4 个点或顶面 4 个点即可 # 直接取底面 4 个点(索引 0-3) rect_points = door.bbox_8points[:4] # 底面 4 个点构成矩形 # 生成矩形 4 条边上的点 edge_points = [] edges = [(0, 1), (1, 2), (2, 3), (3, 0)] # 4 条边 for i, j in edges: p_start = rect_points[i] p_end = rect_points[j] # 在边上生成 15 个点(更密集的轮廓) for t in np.linspace(0, 1, 15): edge_points.append(p_start + t * (p_end - p_start)) points = np.array(edge_points) if translate_to_ceiling: # 将每个点平移到天花板高度 points = np.array([ self._translate_point_to_ceiling(p, floor_ceiling_dist, ground_normal, ground_d) for p in points ]) # 创建点云 pc = o3d.geometry.PointCloud() pc.points = o3d.utility.Vector3dVector(points) # 设置颜色:入户门红色,其他门绿色 if is_entrance: colors = np.array([[1.0, 0.0, 0.0]] * len(points)) # 红色 else: colors = np.array([[0.0, 1.0, 0.0]] * len(points)) # 绿色 pc.colors = o3d.utility.Vector3dVector(colors) return pc def _create_door_visualization_with_color( self, door: Door3D, ground_normal: np.ndarray, ground_d: float, floor_ceiling_dist: float, color: np.ndarray, translate_to_ceiling: bool = True ) -> o3d.geometry.PointCloud: """ 创建门的可视化点云(带边框)- 指定颜色版本 Args: door: 3D 门对象 ground_normal: 地面法向量 ground_d: 地面平面方程的 d floor_ceiling_dist: 地面到天花板的距离 color: RGB 颜色 [R, G, B] translate_to_ceiling: 是否平移到天花板 Returns: 门点云(带颜色) """ # 门是平面矩形,bbox_8points 中: # - 索引 0-3:底面 4 个点(Z 最小,靠近地面) # - 索引 4-7:顶面 4 个点(Z 最大,靠近门顶) # 投影到天花板只需要 4 个点,取底面 4 个点或顶面 4 个点即可 # 直接取底面 4 个点(索引 0-3) rect_points = door.bbox_8points[:4] # 底面 4 个点构成矩形 # 生成矩形 4 条边上的点 edge_points = [] edges = [(0, 1), (1, 2), (2, 3), (3, 0)] # 4 条边 for i, j in edges: p_start = rect_points[i] p_end = rect_points[j] # 在边上生成 15 个点(更密集的轮廓) for t in np.linspace(0, 1, 15): edge_points.append(p_start + t * (p_end - p_start)) points = np.array(edge_points) if translate_to_ceiling: # 将每个点平移到天花板高度 points = np.array([ self._translate_point_to_ceiling(p, floor_ceiling_dist, ground_normal, ground_d) for p in points ]) # 创建点云 pc = o3d.geometry.PointCloud() pc.points = o3d.utility.Vector3dVector(points) # 设置颜色 colors = np.array([color] * len(points)) pc.colors = o3d.utility.Vector3dVector(colors) return pc def _create_pose_visualization( self, pose: PoseData, ground_normal: np.ndarray, ground_d: float, floor_ceiling_dist: float, translate_to_ceiling: bool = True ) -> o3d.geometry.PointCloud: """ 创建拍摄点位的可视化 Args: pose: 位姿数据 ground_normal: 地面法向量 ground_d: 地面平面方程的 d floor_ceiling_dist: 地面到天花板的距离 translate_to_ceiling: 是否平移到天花板 Returns: 点位点云(蓝色) """ point = np.array([ pose.translation['x'], pose.translation['y'], pose.translation['z'] ]) if translate_to_ceiling: point = self._translate_point_to_ceiling(point, floor_ceiling_dist, ground_normal, ground_d) # 创建点云(单个点) pc = o3d.geometry.PointCloud() pc.points = o3d.utility.Vector3dVector([point]) pc.colors = o3d.utility.Vector3dVector([[0.0, 0.0, 1.0]]) # 蓝色 return pc def process_and_visualize( self, output_ply_path: str, translate_to_ceiling: bool = True ): """ 处理场景并生成可视化 PLY 文件 Args: output_ply_path: 输出 PLY 文件路径 translate_to_ceiling: 是否将门和点位平移到天花板 """ # 创建输出目录 self.output_folder.mkdir(parents=True, exist_ok=True) rgb_files = sorted( self.rgb_folder.glob("*.jpg"), key=lambda x: int(x.stem) ) if not rgb_files: raise FileNotFoundError(f"在 {self.rgb_folder} 中未找到 RGB 图像") print(f"找到 {len(rgb_files)} 张全景图,开始处理...") # ========== 第一步:收集所有点云和门检测 ========== combined_pc = o3d.geometry.PointCloud() door_candidates = [] for rgb_file in tqdm(rgb_files, desc="检测门"): idx = rgb_file.stem pose = self.poses.get(idx) if pose is None: continue depth_path = self.depth_folder / f"{idx}.png" if not depth_path.exists(): continue rgb = cv2.cvtColor(cv2.imread(str(rgb_file)), cv2.COLOR_BGR2RGB) depth = cv2.imread(str(depth_path), cv2.IMREAD_UNCHANGED).astype(np.float32) / self.depth_scale pose_matrix = self._build_pose_matrix(pose) # YOLOE 检测 mask_3d_points, scores = self.detect_single_image( str(rgb_file), depth, pose_matrix ) # 收集 3D 门候选 for i, pts_3d in enumerate(mask_3d_points): if len(pts_3d) > 10: # 先过滤离群点 pts_3d_filtered = self._filter_outliers(pts_3d, std_thresh=2.0) # 再根据深度一致性过滤 pts_3d_filtered = self._filter_door_points_by_depth(pts_3d_filtered) bbox_min, bbox_max = self._axis_aligned_bbox(pts_3d_filtered) door_candidates.append({ 'bbox_min': bbox_min, 'bbox_max': bbox_max, 'points_3d': pts_3d_filtered, 'source': { 'image': rgb_file.name, 'score': scores[i] if i < len(scores) else 0.0 }, 'source_uuid': idx # 保存来源点位 UUID }) # RGB-D 转点云 pc = self._rgb_depth_to_pointcloud(rgb, depth, pose_matrix) combined_pc += pc # 下采样 print(f"\n融合前点数:{len(combined_pc.points)}") combined_pc = combined_pc.voxel_down_sample(self.voxel_size) print(f"融合后点数:{len(combined_pc.points)}") # ========== 第二步:地面和天花板拟合 ========== ground_normal, ground_d = self._fit_ground_plane_ransac(combined_pc) self.ground_d = ground_d # 保存用于后续计算 ceiling_normal, ceiling_d, floor_ceiling_dist = self._fit_ceiling_plane( combined_pc, ground_normal, ground_d ) # ========== 第三步:3D 门合并和过滤 ========== print(f"\n3D 门候选:{len(door_candidates)}") # 打印每个候选的尺寸 for i, c in enumerate(door_candidates): size = c['bbox_max'] - c['bbox_min'] print(f" 候选 {i}: 尺寸=[{size[0]:.3f}, {size[1]:.3f}, {size[2]:.3f}], 中心={((c['bbox_min']+c['bbox_max'])/2).round(3)}") doors_3d = self._merge_3d_doors(door_candidates) print(f"合并后门数量:{len(doors_3d)}") # 过滤 - 只检查物理特性,不检查地面距离 # 地面距离只用于入户门候选筛选 print("\n过滤 3D 门 (只检查物理特性)...") valid_doors = [] # 通过物理特性过滤的门 filtered_doors = [] # 未通过物理特性过滤的门 for door in doors_3d: # 物理特性过滤(高度、宽度、厚度) passed_prop, prop_reasons = self._filter_door_by_properties(door) if not passed_prop: filtered_doors.append((door, prop_reasons)) continue valid_doors.append(door) print(f"通过物理特性过滤:{len(valid_doors)} 个门") if len(filtered_doors) > 0: print(f"被过滤(物理特性):{len(filtered_doors)} 个门") for door, reasons in filtered_doors: print(f" 门{door.id} (中心={door.center.round(2)}):") for reason in reasons: print(f" - {reason}") # 检查地面距离,用于入户门候选筛选 print("\n检查地面距离 (用于入户门候选筛选)...") ground_valid_doors = [] # 通过地面距离过滤的门(入户门候选) # 优先使用 puck 参数进行地面距离检查 use_puck = (self.ground_z_from_puck is not None) for door in valid_doors: if use_puck: passed_ground, ground_reason = self._filter_door_by_ground_puck(door) else: passed_ground, ground_reason = self._filter_door_by_ground( door, ground_normal, ground_d ) if passed_ground: ground_valid_doors.append(door) else: print(f" 门{door.id} (中心={door.center.round(2)}): {ground_reason}") if use_puck: print(f"通过地面距离过滤(puck_z={self.ground_z_from_puck:.3f}):{len(ground_valid_doors)} 个门") # 调试输出:显示每个门使用的 puck_z for door in valid_doors: puck_z_used = None if door.source_uuid and door.source_uuid in self.puck_z_dict: puck_z_used = self.puck_z_dict[door.source_uuid] print(f" 门{door.id}: source_uuid={door.source_uuid}, puck_z={puck_z_used:.3f}") else: print(f" 门{door.id}: 使用中位数 puck_z={self.ground_z_from_puck:.3f}") else: print(f"通过地面距离过滤(RANSAC):{len(ground_valid_doors)} 个门") # ========== 第四步:识别入户门 ========== print("\n" + "=" * 40) print("入户门识别") print("=" * 40) # 入户门候选只从通过地面距离过滤的门中选择 entrance_idx = self._identify_entrance_door( ground_valid_doors, combined_pc, contour=None, # None 表示在方法内部计算 tri=None ) # ========== 第五步:创建可视化 ========== print("\n创建可视化点云...") # 1. 原始场景点云(降采样用于可视化) vis_pc = combined_pc.voxel_down_sample(0.05) # 2. 门的可视化点云 - 显示所有门(包括被过滤的) door_vis_points = [] door_vis_colors = [] # 颜色定义 COLOR_ENTRANCE = np.array([1.0, 0.0, 0.0]) # 红色 - 入户门 COLOR_VALID = np.array([0.0, 1.0, 0.0]) # 绿色 - 有效门(通过物理特性过滤) COLOR_FILTERED = np.array([1.0, 1.0, 0.0]) # 黄色 - 被过滤的门 print(f"\n可视化门:") # 找到入户门在 valid_doors 中的索引 entrance_door_in_valid = None if entrance_idx >= 0 and entrance_idx < len(ground_valid_doors): entrance_door = ground_valid_doors[entrance_idx] # 找到这个门在 valid_doors 中的索引 for i, door in enumerate(valid_doors): if door.id == entrance_door.id: entrance_door_in_valid = i break # 显示有效门 for i, door in enumerate(valid_doors): is_entrance = (i == entrance_door_in_valid) if is_entrance: color = COLOR_ENTRANCE label = f"入户门 (红色)" else: color = COLOR_VALID label = f"有效门 (绿色)" door_pc = self._create_door_visualization_with_color( door, ground_normal, ground_d, floor_ceiling_dist, color, translate_to_ceiling=translate_to_ceiling ) pts = np.asarray(door_pc.points) door_vis_points.append(pts) door_vis_colors.append(np.asarray(door_pc.colors)) print(f" {label}: 中心={door.center.round(3)}, 点数={len(pts)}") # 显示被过滤的门(黄色) for door, reasons in filtered_doors: door_pc = self._create_door_visualization_with_color( door, ground_normal, ground_d, floor_ceiling_dist, COLOR_FILTERED, translate_to_ceiling=translate_to_ceiling ) pts = np.asarray(door_pc.points) door_vis_points.append(pts) door_vis_colors.append(np.asarray(door_pc.colors)) print(f" 被过滤的门 (黄色): 中心={door.center.round(3)}, 点数={len(pts)}") for reason in reasons: print(f" - {reason}") if door_vis_points: door_vis_pc = o3d.geometry.PointCloud() door_vis_pc.points = o3d.utility.Vector3dVector(np.vstack(door_vis_points)) door_vis_pc.colors = o3d.utility.Vector3dVector(np.vstack(door_vis_colors)) # 3. 拍摄点位可视化 pose_vis_points = [] pose_vis_colors = [] for pose in self.poses.values(): pose_pc = self._create_pose_visualization( pose, ground_normal, ground_d, floor_ceiling_dist, translate_to_ceiling=translate_to_ceiling ) pose_vis_points.append(np.asarray(pose_pc.points)) pose_vis_colors.append(np.asarray(pose_pc.colors)) if pose_vis_points: pose_vis_pc = o3d.geometry.PointCloud() pose_vis_pc.points = o3d.utility.Vector3dVector(np.vstack(pose_vis_points)) pose_vis_pc.colors = o3d.utility.Vector3dVector(np.vstack(pose_vis_colors)) # 4. 估计的入户门位置可视化(当没有检测到门时) entrance_vis_pc = None if self.estimated_entrance_position is not None: print("\n可视化估计的入户门位置...") entrance_point = self.estimated_entrance_position.copy() # 平移到天花板高度 if translate_to_ceiling: entrance_point = self._translate_point_to_ceiling( entrance_point, floor_ceiling_dist, ground_normal, ground_d ) # 创建一个较大的红色球体标记 from scipy.spatial import ConvexHull # 生成球面点 phi = np.linspace(0, np.pi, 10) theta = np.linspace(0, 2 * np.pi, 20) phi, theta = np.meshgrid(phi, theta) r = 0.15 # 球体半径 x = entrance_point[0] + r * np.sin(phi) * np.cos(theta) y = entrance_point[1] + r * np.sin(phi) * np.sin(theta) z = entrance_point[2] + r * np.cos(phi) sphere_points = np.column_stack([x.flatten(), y.flatten(), z.flatten()]) entrance_vis_pc = o3d.geometry.PointCloud() entrance_vis_pc.points = o3d.utility.Vector3dVector(sphere_points) entrance_colors = np.array([[1.0, 0.0, 0.0]] * len(sphere_points)) # 红色 entrance_vis_pc.colors = o3d.utility.Vector3dVector(entrance_colors) print(f" 估计入户门位置 (红色球体): {entrance_point.round(3)}") # ========== 第六步:合并并保存 ========== # 创建一个完整的场景点云 full_scene = o3d.geometry.PointCloud() # 添加原始点云(灰色) full_scene += vis_pc # 添加门点云 if door_vis_points: full_scene += door_vis_pc # 添加拍摄点位点云 if pose_vis_points: full_scene += pose_vis_pc # 添加估计的入户门位置可视化 if entrance_vis_pc is not None: full_scene += entrance_vis_pc # 保存 PLY 文件 # 创建输出目录 os.makedirs(os.path.dirname(output_ply_path), exist_ok=True) o3d.io.write_point_cloud(output_ply_path, full_scene) print(f"\n可视化结果已保存:{output_ply_path}") # ========== 打印汇总 ========== print("\n" + "=" * 50) print("可视化汇总") print("=" * 50) print(f"原始点云点数:{len(vis_pc.points)}") print(f"有效门数量:{len(valid_doors)}") if entrance_idx >= 0: print(f"入户门索引:{entrance_idx}") elif self.estimated_entrance_position is not None: print(f"估计入户门位置:{self.estimated_entrance_position.round(3)} (红色球体标记)") else: print("入户门:未确定") print(f"拍摄点位数量:{len(self.poses)}") print(f"门点云点数:{sum(len(p) for p in door_vis_points) if door_vis_points else 0}") print(f"点位点云点数:{sum(len(p) for p in pose_vis_points) if pose_vis_points else 0}") if entrance_vis_pc is not None: print(f"估计入户门标记点数:{len(entrance_vis_pc.points)}") print(f"总点数:{len(full_scene.points)}") if translate_to_ceiling: print(f"\n门和点位已平移到天花板高度 (平移距离={floor_ceiling_dist:.3f}m)") else: print("\n门和点位保持原始位置") print("\n颜色说明:") print(" - 原始点云:RGB 颜色") print(" - 入户门:红色(门边框 / 估计位置球体)") print(" - 有效门:绿色(通过物理特性过滤)") print(" - 被过滤的门:黄色(物理特性不符合)") print(" - 拍摄点位:蓝色") print("=" * 50) return full_scene, valid_doors, entrance_idx # ============================================================================ # 主函数 # ============================================================================ def main(): parser = argparse.ArgumentParser( description="3D 场景可视化 - RANSAC 地面拟合 + 门平移到天花板", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 示例: # 处理单个场景 python visualize_scene_3d.py -s scene0001 --vis-ply # 指定入户门 JSON 文件 python visualize_scene_3d.py -s scene0001 --vis-ply \\ --entrance-method json \\ --entrance-json /path/to/entrance.json # 调整地面拟合参数 python visualize_scene_3d.py -s scene0001 --vis-ply \\ --ground-ransac-dist 0.03 \\ --ceiling-percentile 98 # 不平移到天花板(保持原始位置) python visualize_scene_3d.py -s scene0001 --vis-ply --no-translate # 调整门过滤阈值 python visualize_scene_3d.py -s scene0001 --vis-ply \\ --door-ground-dist 0.15 \\ --door-height-min 1.2 # ========== 批处理模式 ========== # 处理 ten_pano_demo 目录下所有场景 python visualize_scene_3d.py --batch-base /path/to/ten_pano_demo --vis-ply # 指定批处理输出目录 python visualize_scene_3d.py --batch-base /path/to/ten_pano_demo \\ --batch-output /path/to/output --vis-ply # 批处理 + 不平移 python visualize_scene_3d.py --batch-base /path/to/ten_pano_demo \\ --vis-ply --no-translate """ ) parser.add_argument("--scene", "-s", type=str, default="scene0001", help="场景文件夹") parser.add_argument("--model", "-m", type=str, default="yoloe-26x-seg.pt", help="YOLOE 模型路径") parser.add_argument("--conf", type=float, default=0.35, help="置信度阈值") parser.add_argument("--iou", type=float, default=0.45, help="NMS IoU 阈值") parser.add_argument("--voxel-size", type=float, default=0.03, help="点云体素大小") parser.add_argument("--depth-scale", type=float, default=256.0, help="深度图缩放因子") # 地面/天花板参数 parser.add_argument("--ground-ransac-dist", type=float, default=0.05, help="RANSAC 地面拟合距离阈值") parser.add_argument("--ceiling-percentile", type=float, default=95.0, help="天花板点分位数") # 门过滤参数 parser.add_argument("--door-ground-dist", type=float, default=0.1, help="门底部距地面最大距离") parser.add_argument("--door-height-min", type=float, default=1.0, help="门最小高度") parser.add_argument("--door-height-max", type=float, default=3.0, help="门最大高度") parser.add_argument("--door-width-min", type=float, default=0.3, help="门最小宽度") parser.add_argument("--door-width-max", type=float, default=3.0, help="门最大宽度") parser.add_argument("--door-thickness-max", type=float, default=0.5, help="门最大厚度") # 3D 门合并参数 parser.add_argument("--merge-iou-thresh", type=float, default=0.1, help="门合并 IoU 阈值(默认 0.1,降低敏感度)") parser.add_argument("--merge-dist-thresh", type=float, default=0.3, help="门合并中心距离阈值(默认 0.3 米)") parser.add_argument("--merge-z-overlap-thresh", type=float, default=0.5, help="门合并 Z 方向重叠度阈值(默认 0.5,0-1 范围)") # 入户门参数 parser.add_argument("--entrance-method", type=str, default="score", choices=["score", "json"], help="入户门判定方法") parser.add_argument("--entrance-json", type=str, default=None, help="入户门 JSON 文件路径") # 可视化参数 parser.add_argument("--vis-ply", action="store_true", help="保存可视化 PLY 文件") parser.add_argument("--vis-output", type=str, default=None, help="输出 PLY 文件路径") parser.add_argument("--no-translate", action="store_true", help="不平移到天花板,保持原始位置") # 批处理参数 parser.add_argument("--batch-base", type=str, default=None, help="批处理基础目录(如 ten_pano_demo),自动处理所有子场景") parser.add_argument("--batch-output", type=str, default=None, help="批处理输出基础目录,默认在基础目录下创建 output 文件夹") args = parser.parse_args() # ========== 批处理模式 ========== if args.batch_base: process_all_scenes(args) return # ========== 单场景模式 ========== if not Path(args.scene).exists(): print(f"❌ 场景文件夹不存在:{args.scene}") return # 确定输出路径 if args.vis_output is None: output_ply = Path(args.scene) / "output" / "visualization.ply" else: output_ply = Path(args.vis_output) processor = SceneVisualizer( scene_folder=args.scene, model_path=args.model, conf=args.conf, iou=args.iou, voxel_size=args.voxel_size, depth_scale=args.depth_scale, ground_ransac_dist=args.ground_ransac_dist, ceiling_percentile=args.ceiling_percentile, door_ground_dist=args.door_ground_dist, door_height_min=args.door_height_min, door_height_max=args.door_height_max, door_width_min=args.door_width_min, door_width_max=args.door_width_max, door_thickness_max=args.door_thickness_max, merge_iou_thresh=args.merge_iou_thresh, merge_dist_thresh=args.merge_dist_thresh, merge_z_overlap_thresh=args.merge_z_overlap_thresh, entrance_method=args.entrance_method, entrance_json_path=args.entrance_json, ) if args.vis_ply: processor.process_and_visualize( output_ply_path=str(output_ply), translate_to_ceiling=not args.no_translate ) else: print("⚠️ 未指定 --vis-ply,不进行可视化") print("请使用 --vis-ply 参数来生成可视化 PLY 文件") def process_all_scenes(args): """ 批处理模式:处理基础目录下的所有场景 目录结构假设: base_dir/ ├── depth/ │ ├── scene1/ │ │ └── depthmap/ │ └── scene2/ ├── rgb/ │ ├── scene1/ │ └── scene2/ └── vision/ ├── scene1/ └── scene2/ """ from pathlib import Path from tqdm import tqdm base_dir = Path(args.batch_base) if not base_dir.exists(): print(f"❌ 批处理基础目录不存在:{base_dir}") return # 确定输出目录 if args.batch_output: output_base = Path(args.batch_output) else: output_base = base_dir / "output" # 获取所有场景(从 depth 目录获取) depth_dir = base_dir / "depth" if not depth_dir.exists(): print(f"❌ depth 目录不存在:{depth_dir}") return scene_names = sorted([d.name for d in depth_dir.iterdir() if d.is_dir()]) if not scene_names: print(f"⚠️ 未找到任何场景") return print(f"找到 {len(scene_names)} 个场景:{scene_names}") print(f"输出目录:{output_base}") print("=" * 60) # 创建输出基础目录 output_base.mkdir(parents=True, exist_ok=True) for i, scene_name in enumerate(tqdm(scene_names, desc="总体进度")): print(f"\n[{i+1}/{len(scene_names)}] 处理场景:{scene_name}") print("-" * 40) # 构建各路径 scene_rgb_dir = base_dir / "rgb" / scene_name scene_depth_dir = base_dir / "depth" / scene_name / "depthmap" scene_vision_dir = base_dir / "vision" / scene_name scene_vision_file = scene_vision_dir / "vision.txt" # 检查路径是否存在 if not scene_rgb_dir.exists(): print(f"⚠️ 跳过 {scene_name}: rgb 目录不存在") continue if not scene_depth_dir.exists(): print(f"⚠️ 跳过 {scene_name}: depth 目录不存在") continue if not scene_vision_file.exists(): print(f"⚠️ 跳过 {scene_name}: vision.txt 不存在") continue # 输出路径 scene_output_dir = output_base / scene_name output_ply = scene_output_dir / "visualization.ply" # SceneVisualizer 期望的目录结构: # scene_folder/ # pano_img/*.jpg # depth_img/depthmap/*.png # vision.txt # 所以需要创建一个临时结构或直接传递 scene_depth_dir 的父目录的父目录 # 但实际上我们应该使用 scene_depth_dir 作为 depth 目录 # 修改策略:传递 depth/scene_name 目录,因为它包含 depthmap # 而 rgb 和 vision 需要单独指定 - 但 SceneVisualizer 不支持 # 解决方案:使用符号链接或者修改 SceneVisualizer 支持自定义子目录名 # 简单方案:创建一个临时目录结构 # 创建临时目录结构 import tempfile import shutil temp_dir = tempfile.mkdtemp(prefix=f"scene_{scene_name}_") temp_scene = Path(temp_dir) / scene_name temp_scene.mkdir(parents=True, exist_ok=True) # 创建符号链接 # SceneVisualizer 期望的结构: # scene_folder/pano_img/*.jpg # scene_folder/depth_img/{idx}.png # scene_folder/vision.txt (temp_scene / "pano_img").symlink_to(scene_rgb_dir) (temp_scene / "depth_img").symlink_to(scene_depth_dir) # 直接指向 depthmap 目录 (temp_scene / "vision.txt").symlink_to(scene_vision_file) try: # 创建处理器 processor = SceneVisualizer( scene_folder=str(temp_scene), model_path=args.model, conf=args.conf, iou=args.iou, voxel_size=args.voxel_size, depth_scale=args.depth_scale, ground_ransac_dist=args.ground_ransac_dist, ceiling_percentile=args.ceiling_percentile, door_ground_dist=args.door_ground_dist, door_height_min=args.door_height_min, door_height_max=args.door_height_max, door_width_min=args.door_width_min, door_width_max=args.door_width_max, door_thickness_max=args.door_thickness_max, merge_iou_thresh=args.merge_iou_thresh, merge_dist_thresh=args.merge_dist_thresh, merge_z_overlap_thresh=args.merge_z_overlap_thresh, entrance_method=args.entrance_method, entrance_json_path=args.entrance_json, ) # 处理并可视化 if args.vis_ply: try: processor.process_and_visualize( output_ply_path=str(output_ply), translate_to_ceiling=not args.no_translate ) print(f"✓ 保存:{output_ply}") except Exception as e: print(f"❌ 处理失败:{e}") else: print("⚠️ 未指定 --vis-ply,跳过可视化") finally: # 清理临时目录 import shutil shutil.rmtree(temp_dir, ignore_errors=True) if __name__ == "__main__": main()