""" 场景点云处理器 整合 YOLOE 门检测 + 全景图点云转换 + 位姿变换 + 多视角融合 + 3D 门合并 支持 scene0001 格式的场景数据 """ import os import sys import json import argparse from pathlib import Path from dataclasses import dataclass, field from typing import Dict, List, Optional, Tuple # 添加当前目录到路径,以便导入 camera_spherical sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) import cv2 import numpy as np import open3d as o3d from tqdm import tqdm from ultralytics import YOLOE from camera_spherical import Intrinsic_Spherical_NP # ============================================================================ # 数据类 # ============================================================================ @dataclass class PoseData: """位姿数据""" uuid: str rotation: Dict[str, float] # w, x, y, z translation: Dict[str, float] # x, y, z @dataclass class MaskDetection: """单张图像中的 mask 检测结果""" image_name: str mask_contours: List[List[List[float]]] # 2D 轮廓像素坐标 [[x, y], ...] scores: List[float] mask_3d_points: List[np.ndarray] # 每个 mask 对应的世界坐标系 3D 点 @dataclass class Door3D: """3D 门实例""" id: int center: np.ndarray # 中心坐标 [x, y, z] bbox_8points: np.ndarray # 8 个角点 [[x,y,z], ...] (8x3) source_detections: List[Dict] # 来源检测信息 @dataclass class EntranceInfo: """入户门信息""" is_detected: bool # 是否通过检测确定 door: Optional[Door3D] # 入户门对象(检测确定时) center: np.ndarray # 入户门中心/估计位置 [x, y, z] score: float # 置信度评分 method: str # 确定方法:"exterior_class" / "size_score" / "fallback_2d" / "scene_center_estimate" reason: str # 说明文字 # ============================================================================ # 场景处理器 # ============================================================================ class SceneProcessor: """场景处理器:整合检测 + 点云生成 + 位姿变换 + 3D 门融合""" # 门相关类别 - 统一映射为 "door" DOOR_CLASSES = [ "door", "indoor door", "exterior door", "wooden door", "metal door", "glass door", "double door", "single door", "open door", "closed door" ] # 3D 门合并参数 MERGE_IOU_THRESH = 0.3 # 3D IoU 阈值 MERGE_DIST_THRESH = 2.0 # 中心距离阈值 (米) # 3D 门过滤参数 - 有效门的物理特性 DOOR_HEIGHT_MIN = 1.0 # 最小高度 (米) DOOR_HEIGHT_MAX = 3.0 # 最大高度 (米) DOOR_WIDTH_MIN = 0.3 # 最小宽度 (米) DOOR_WIDTH_MAX = 3.0 # 最大宽度 (米) DOOR_THICKNESS_MAX = 0.5 # 最大厚度 (米) - 门的深度方向 GROUND_DIST_THRESH = 0.5 # 门底部距地面最大距离 (米) def __init__( self, scene_folder: str, model_path: str = "yoloe-26x-seg.pt", conf: float = 0.35, iou: float = 0.45, voxel_size: float = 0.03, depth_scale: float = 256.0, depth_min: float = 0.02, ground_y: Optional[float] = None, # 地面 Y 坐标(可选,默认从点云估计) ): """ 初始化场景处理器 Args: scene_folder: 场景文件夹路径 model_path: YOLOE 模型路径 conf: 检测置信度阈值 iou: NMS IoU 阈值 voxel_size: 点云体素下采样尺寸 depth_scale: 深度图缩放因子 depth_min: 最小有效深度 ground_y: 地面 Y 坐标(可选,默认从点云自动估计) """ self.scene_folder = Path(scene_folder) self.conf = conf self.iou = iou self.voxel_size = voxel_size self.depth_scale = depth_scale self.depth_min = depth_min self.ground_y = ground_y # 子目录 self.rgb_folder = self.scene_folder / "pano_img" self.depth_folder = self.scene_folder / "depth_img" self.pose_file = self.scene_folder / "vision.txt" # 输出目录 self.output_folder = self.scene_folder / "output" self.detection_folder = self.output_folder / "detections" # 加载位姿 self.poses = self._load_poses() # 初始化 YOLOE 模型 print(f"加载 YOLOE 模型:{model_path}") self.model = YOLOE(model_path) self.model.set_classes(self.DOOR_CLASSES) print(f"检测类别 (统一为 door): {self.DOOR_CLASSES}") def _load_poses(self) -> Dict[str, PoseData]: """从 vision.txt 加载位姿信息""" if not self.pose_file.exists(): raise FileNotFoundError(f"位姿文件不存在:{self.pose_file}") with open(self.pose_file, 'r') as f: data = json.load(f) poses = {} for loc in data.get('sweepLocations', []): uuid = str(loc['uuid']) poses[uuid] = PoseData( uuid=uuid, rotation=loc['pose']['rotation'], translation=loc['pose']['translation'] ) print(f"加载 {len(poses)} 个位姿") return poses def _build_pose_matrix(self, pose: PoseData) -> np.ndarray: """构建 4x4 位姿变换矩阵""" R = o3d.geometry.get_rotation_matrix_from_quaternion( np.array([pose.rotation['w'], pose.rotation['x'], pose.rotation['y'], pose.rotation['z']]) ) t = np.array([ pose.translation['x'], pose.translation['y'], pose.translation['z'] ]) T = np.eye(4) T[:3, :3] = R T[:3, 3] = t return T def _mask_to_3d_points( self, mask: np.ndarray, depth: np.ndarray, pose_matrix: np.ndarray ) -> Optional[np.ndarray]: """ 将 2D mask 映射到世界坐标系 3D 点 Args: mask: 二值 mask (H, W) depth: 深度图 (H, W) pose_matrix: 4x4 位姿矩阵 Returns: 世界坐标系下的 3D 点 (N, 3) """ H, W = depth.shape sph = Intrinsic_Spherical_NP(W, H) # 获取 mask 内的像素 ys, xs = np.where(mask > 0) if len(xs) == 0: return None # 有效深度掩码 valid = depth[ys, xs] > self.depth_min if not np.any(valid): return None xs, ys = xs[valid], ys[valid] depths = depth[ys, xs] # 计算方向向量 bx, by, bz = sph.bearing([xs.astype(np.float64), ys.astype(np.float64)]) bx, by, bz = np.array(bx), np.array(by), np.array(bz) # 相机坐标系 pts_cam = np.stack([bx * depths, by * depths, bz * depths], axis=1) # Z 轴 180 度翻转 R_z180 = np.diag([-1.0, -1.0, 1.0]) pts_cam = pts_cam @ R_z180.T # 世界坐标系 pts_w = (pose_matrix[:3, :3] @ pts_cam.T).T + pose_matrix[:3, 3] return pts_w def _extract_mask_contours(self, masks) -> Tuple[List[List[List[float]]], List[np.ndarray]]: """ 从 YOLOE mask 结果提取轮廓 Args: masks: YOLOE masks (H, W, N) Returns: (轮廓列表, 对应 mask 数组) 每个 mask 只保留最大轮廓,保证与 scores 一一对应 """ contours = [] mask_arrays = [] if masks is None: return contours, mask_arrays masks_np = masks.cpu().numpy() for i in range(masks_np.shape[0]): mask = masks_np[i] # 二值化 mask_bin = (mask > 0.5).astype(np.uint8) * 255 # 提取轮廓 cnts, _ = cv2.findContours(mask_bin, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) if not cnts: continue # 只保留面积最大的轮廓,确保与 score 一一对应 largest = max(cnts, key=cv2.contourArea) if len(largest) >= 3: # 简化轮廓 epsilon = 0.02 * cv2.arcLength(largest, True) approx = cv2.approxPolyDP(largest, epsilon, True) contour = approx.reshape(-1, 2).astype(float).tolist() contours.append(contour) mask_arrays.append((mask_bin > 0).astype(np.uint8)) else: # 点数不足时回退到完整 mask mask_arrays.append((mask_bin > 0).astype(np.uint8)) return contours, mask_arrays def detect_single_image( self, img_path: str, depth: np.ndarray, pose_matrix: np.ndarray, save_path: Optional[str] = None ) -> MaskDetection: """ 检测单张图像并提取 mask 轮廓和 3D 点 Args: img_path: 图像路径 depth: 深度图 pose_matrix: 位姿矩阵 save_path: 保存路径 Returns: MaskDetection 对象 """ results = self.model.predict( img_path, imgsz=(1024, 2048), conf=self.conf, iou=self.iou, max_det=50, augment=True, retina_masks=True, half=False, verbose=False, ) result = results[0] scores = [] contours = [] mask_3d_points = [] if result.masks is not None: masks = result.masks.data # 提取轮廓(每个 mask 只保留最大轮廓,与 scores 一一对应) contours, mask_arrays = self._extract_mask_contours(masks) # 获取分数 scores = result.boxes.conf.cpu().numpy().tolist() # 每个 mask 转 3D 点 H, W = depth.shape for mask_bin in mask_arrays: mask_resized = cv2.resize(mask_bin, (W, H), interpolation=cv2.INTER_NEAREST) pts_3d = self._mask_to_3d_points(mask_resized, depth, pose_matrix) if pts_3d is not None and len(pts_3d) > 0: mask_3d_points.append(pts_3d) if save_path: os.makedirs(os.path.dirname(save_path), exist_ok=True) result.save(save_path) return MaskDetection( image_name=os.path.basename(img_path), mask_contours=contours, scores=scores, mask_3d_points=mask_3d_points ) def _axis_aligned_bbox(self, points: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: """计算轴对齐包围盒 (min, max)""" lo = np.min(points, axis=0) hi = np.max(points, axis=0) return lo, hi def _bbox_8corners(self, bbox_min: np.ndarray, bbox_max: np.ndarray) -> np.ndarray: """从 bbox min/max 获取 8 个角点""" cx, cy, cz = bbox_min ex, ey, ez = bbox_max return np.array([ [cx, cy, cz], [ex, cy, cz], [ex, ey, cz], [cx, ey, cz], [cx, cy, ez], [ex, cy, ez], [ex, ey, ez], [cx, ey, ez], ]) def _bbox_iou_3d(self, b1, b2) -> float: """3D IoU 计算""" lo = np.maximum(b1[0], b2[0]) hi = np.minimum(b1[1], b2[1]) inter = np.prod(np.maximum(hi - lo, 0)) vol1 = np.prod(b1[1] - b1[0]) vol2 = np.prod(b2[1] - b2[0]) union = vol1 + vol2 - inter return inter / union if union > 0 else 0.0 def _merge_3d_doors(self, door_candidates: List[Dict]) -> List[Door3D]: """ 合并重叠或接近的 3D 门 - 使用并查集确保完全合并 合并逻辑: 1. 构建连通图:如果两个门满足合并条件,则它们连通 2. 使用并查集找出所有连通分量 3. 每个连通分量合并为一个门 Args: door_candidates: 候选门列表 Returns: 合并后的 Door3D 列表 """ if not door_candidates: return [] n = len(door_candidates) if n == 1: # 只有一个候选,直接返回 d = door_candidates[0] return [Door3D( id=0, center=(d['bbox_min'] + d['bbox_max']) / 2, bbox_8points=self._bbox_8corners(d['bbox_min'], d['bbox_max']), source_detections=[d['source']] )] # ========== 并查集 ========== parent = list(range(n)) def find(x): if parent[x] != x: parent[x] = find(parent[x]) # 路径压缩 return parent[x] def union(x, y): px, py = find(x), find(y) if px != py: parent[px] = py # ========== 构建连通关系 ========== # 检查所有门对,满足条件的合并 for i in range(n): for j in range(i + 1, n): ci = (door_candidates[i]['bbox_min'] + door_candidates[i]['bbox_max']) / 2 cj = (door_candidates[j]['bbox_min'] + door_candidates[j]['bbox_max']) / 2 dist = np.linalg.norm(ci - cj) iou = self._bbox_iou_3d( (door_candidates[i]['bbox_min'], door_candidates[i]['bbox_max']), (door_candidates[j]['bbox_min'], door_candidates[j]['bbox_max']) ) if dist < self.MERGE_DIST_THRESH and iou > self.MERGE_IOU_THRESH: union(i, j) # ========== 按连通分量分组 ========== from collections import defaultdict groups = defaultdict(list) for i in range(n): groups[find(i)].append(door_candidates[i]) # ========== 合并每个组 ========== doors = [] for door_id, members in enumerate(groups.values()): if not members: continue # 合并所有成员的 bbox bbox_min = np.min([m['bbox_min'] for m in members], axis=0) bbox_max = np.max([m['bbox_max'] for m in members], axis=0) sources = [m['source'] for m in members] doors.append(Door3D( id=door_id, center=(bbox_min + bbox_max) / 2, bbox_8points=self._bbox_8corners(bbox_min, bbox_max), source_detections=sources )) return doors def _estimate_ground_y(self, combined_pc: o3d.geometry.PointCloud) -> float: """ 从点云估计地面 Y 坐标 假设:地面是场景中最低的大面积平面 方法:取所有点中 Y 坐标的下 1% 分位数 Args: combined_pc: 完整场景点云 Returns: 估计的地面 Y 坐标 """ points = np.asarray(combined_pc.points) if len(points) == 0: return 0.0 # 取 Y 坐标(假设 Y 轴向上)的下 1% 分位数,避免家具腿等低矮物体干扰 ground_y = np.percentile(points[:, 1], 1) return ground_y def _filter_door_by_properties(self, door: Door3D, ground_y: float) -> Tuple[bool, List[str]]: """ 根据物理特性过滤门 过滤条件: 1. 高度在合理范围内 (1.0m - 3.0m) 2. 宽度在合理范围内 (0.3m - 3.0m) 3. 厚度不超过阈值 (≤ 0.5m) 4. 门底部接近地面 (≤ 0.5m) Args: door: 3D 门对象 ground_y: 地面 Y 坐标 Returns: (是否通过过滤,拒绝原因列表) """ reasons = [] # 计算 bounding box 尺寸 size = door.bbox_8points.max(axis=0) - door.bbox_8points.min(axis=0) height = size[1] # Y 方向是高度 width = max(size[0], size[2]) # 宽度是 X/Z 中较大的 thickness = min(size[0], size[2]) # 厚度是 X/Z 中较小的 # 门底部 Y 坐标 door_bottom_y = door.bbox_8points[:, 1].min() # 条件 1: 高度检查 if height < self.DOOR_HEIGHT_MIN: reasons.append(f"高度过小 ({height:.2f}m < {self.DOOR_HEIGHT_MIN}m)") elif height > self.DOOR_HEIGHT_MAX: reasons.append(f"高度过大 ({height:.2f}m > {self.DOOR_HEIGHT_MAX}m)") # 条件 2: 宽度检查 if width < self.DOOR_WIDTH_MIN: reasons.append(f"宽度过小 ({width:.2f}m < {self.DOOR_WIDTH_MIN}m)") elif width > self.DOOR_WIDTH_MAX: reasons.append(f"宽度过大 ({width:.2f}m > {self.DOOR_WIDTH_MAX}m)") # 条件 3: 厚度检查 if thickness > self.DOOR_THICKNESS_MAX: reasons.append(f"厚度过大 ({thickness:.2f}m > {self.DOOR_THICKNESS_MAX}m)") # 条件 4: 地面贴合检查 dist_to_ground = door_bottom_y - ground_y if dist_to_ground > self.GROUND_DIST_THRESH: reasons.append(f"距地面过远 ({dist_to_ground:.2f}m > {self.GROUND_DIST_THRESH}m)") elif dist_to_ground < -self.GROUND_DIST_THRESH: reasons.append(f"嵌入地面过深 ({abs(dist_to_ground):.2f}m)") passed = len(reasons) == 0 return passed, reasons def _score_entrance_door(self, door: Door3D, ground_y: float, all_centers: np.ndarray) -> Dict: """ 为每个门计算"入户门可能性"评分 评分维度(满分 100): 1. 尺寸评分 (30分): 入户门通常较大,高度约 2.0-2.4m,宽度约 0.9-1.2m 2. 地面贴合 (20分): 入户门底部贴近地面 3. 边缘位置 (25分): 入户门在建筑外围,离场景中心较远 4. 厚度评分 (15分): 入户门通常较厚(实心门) 5. 多视角支持 (10分): 被多个视角检测到的门更可信 Args: door: 3D 门对象 ground_y: 地面 Y 坐标 all_centers: 所有门的中心坐标 Returns: 评分详情 {"total": float, "details": Dict[str, float], "reason": str} """ size = door.bbox_8points.max(axis=0) - door.bbox_8points.min(axis=0) height = size[1] width = max(size[0], size[2]) thickness = min(size[0], size[2]) door_bottom_y = door.bbox_8points[:, 1].min() # 1. 尺寸评分 (30分) - 理想入户门尺寸 高2.1m 宽1.0m height_score = max(0, 15 - abs(height - 2.1) * 10) width_score = max(0, 15 - abs(width - 1.0) * 12) size_score = height_score + width_score # 2. 地面贴合 (20分) - 底部越接近地面分越高 dist_to_ground = abs(door_bottom_y - ground_y) ground_score = max(0, 20 - dist_to_ground * 40) # 3. 边缘位置 (25分) - 离所有门的中心越远,越可能是入户门 if len(all_centers) > 1: dists_to_others = np.linalg.norm(all_centers - door.center, axis=1) max_dist = dists_to_others.max() avg_dist = dists_to_others.mean() # 平均距离越大越可能是入户门 edge_score = min(25, avg_dist * 10) else: edge_score = 15 # 只有一个门时给中等分 # 4. 厚度评分 (15分) - 入户门通常更厚 # 理想厚度 0.04-0.1m,但点云中由于包含门框,会显得更厚 if 0.05 <= thickness <= 0.3: thickness_score = 15 elif thickness < 0.05: thickness_score = 5 else: thickness_score = max(0, 15 - (thickness - 0.3) * 30) # 5. 多视角支持 (10分) source_count = len(door.source_detections) view_score = min(10, source_count * 3) total = size_score + ground_score + edge_score + thickness_score + view_score reasons = [ f"尺寸={size_score:.0f}/30 (高{height:.2f}m 宽{width:.2f}m)", f"地面={ground_score:.0f}/20 (距地{dist_to_ground:.2f}m)", f"边缘={edge_score:.0f}/25", f"厚度={thickness_score:.0f}/15 ({thickness:.2f}m)", f"视角={view_score:.0f}/10 ({source_count}个)", ] return { "total": round(total, 1), "details": { "size": round(size_score, 1), "ground": round(ground_score, 1), "edge": round(edge_score, 1), "thickness": round(thickness_score, 1), "view": round(view_score, 1), }, "reason": " | ".join(reasons), } def _identify_entrance_door( self, valid_doors: List[Door3D], ground_y: float, all_detections: List[MaskDetection], combined_pc: o3d.geometry.PointCloud, ) -> EntranceInfo: """ 从多个门中识别入户门,无检测时提供兜底策略 策略优先级: 1. 如果 YOLOE 检测到 "exterior door" 类别 → 直接使用 2. 有多个有效门 → 按评分选择最高分 3. 无有效门但有 2D 检测 → 用置信度最高的 2D 检测的 3D 投影位置 4. 完全无检测 → 用场景几何估计(场景边界中心+地面高度) Args: valid_doors: 通过物理特性过滤的门 ground_y: 地面 Y 坐标 all_detections: 所有 2D 检测结果 combined_pc: 合并后的场景点云 Returns: EntranceInfo 入户门信息 """ # ===== 策略 1: 检查是否有 exterior door 检测结果 ===== for det in all_detections: for src in det.source_detections if hasattr(det, 'source_detections') else []: pass # source_detections 在 Door3D 上,不在 MaskDetection 上 # MaskDetection 只有 scores,没有类别信息 # YOLOE predict 返回的 result.boxes.cls 包含类别 ID pass # ===== 策略 2: 有多个有效门,按评分选择 ===== if len(valid_doors) >= 1: all_centers = np.array([d.center for d in valid_doors]) scored_doors = [] for door in valid_doors: score_info = self._score_entrance_door(door, ground_y, all_centers) scored_doors.append((door, score_info)) # 按总分排序 scored_doors.sort(key=lambda x: x[1]["total"], reverse=True) best_door, best_score = scored_doors[0] method = "size_score" reason = f"多门评分选择(共{len(valid_doors)}个门): {best_score['reason']}" if len(valid_doors) == 1: method = "size_score" reason = f"唯一有效门: {best_score['reason']}" print(f"\n入户门识别 - 选择门{best_door.id}") print(f" 方法: {method}") print(f" 评分: {best_score['total']}/100") print(f" 中心: {best_door.center.round(3)}") print(f" 原因: {reason}") return EntranceInfo( is_detected=True, door=best_door, center=best_door.center.copy(), score=best_score["total"] / 100.0, method=method, reason=reason, ) # ===== 策略 3: 无有效门但有 2D 检测 ===== # 找置信度最高的 2D 检测 best_2d_score = 0.0 best_2d_det = None for det in all_detections: if det.scores and len(det.mask_3d_points) > 0: max_score = max(det.scores) if max_score > best_2d_score: best_2d_score = max_score best_2d_det = det if best_2d_det is not None and len(best_2d_det.mask_3d_points) > 0: # 使用最高分 mask 的 3D 点中心作为入户门估计位置 best_mask_idx = best_2d_det.scores.index(best_2d_score) if best_mask_idx < len(best_2d_det.mask_3d_points): est_center = np.mean(best_2d_det.mask_3d_points[best_mask_idx], axis=0) print(f"\n入户门识别 - 使用最高分 2D 检测兜底") print(f" 方法: fallback_2d") print(f" 图像: {best_2d_det.image_name}") print(f" 置信度: {best_2d_score:.3f}") print(f" 估计中心: {est_center.round(3)}") return EntranceInfo( is_detected=False, door=None, center=est_center, score=best_2d_score, method="fallback_2d", reason=f"最高分 2D 检测 ({best_2d_det.image_name}, conf={best_2d_score:.3f})", ) # ===== 策略 4: 完全无检测,用场景几何估计 ===== print(f"\n入户门识别 - 无任何检测,使用场景几何估计") points = np.asarray(combined_pc.points) if len(points) > 0: # 取点云的水平中心 + 最低点作为入口估计 est_x = np.median(points[:, 0]) est_y = ground_y + 1.0 # 地面以上 1m(典型门把手高度附近) est_z = np.median(points[:, 2]) # 找离中心最远的方向(可能是入口方向) center_2d = np.array([est_x, est_z]) dists = np.linalg.norm(points[:, [0, 2]] - center_2d, axis=1) far_idx = np.argsort(dists)[-len(dists)//10:] # 最远 10% 的点 far_points = points[far_idx] est_x = np.median(far_points[:, 0]) est_z = np.median(far_points[:, 2]) else: est_x, est_y, est_z = 0.0, ground_y + 1.0, 0.0 est_center = np.array([est_x, est_y, est_z]) print(f" 方法: scene_center_estimate") print(f" 估计中心: {est_center.round(3)}") return EntranceInfo( is_detected=False, door=None, center=est_center, score=0.0, method="scene_center_estimate", reason="无检测结果,场景几何估计", ) def _rgb_depth_to_pointcloud( self, rgb: np.ndarray, depth: np.ndarray, pose_matrix: np.ndarray ) -> o3d.geometry.PointCloud: """将 RGB-D 转换为世界坐标系点云""" H, W = depth.shape sph = Intrinsic_Spherical_NP(W, H) px, py = np.meshgrid(np.arange(W), np.arange(H)) px_flat = px.flatten().astype(np.float64) py_flat = py.flatten().astype(np.float64) bx, by, bz = sph.bearing([px_flat, py_flat]) bx, by, bz = np.array(bx), np.array(by), np.array(bz) mask = depth.flatten() > self.depth_min d = depth.flatten()[mask] if len(d) == 0: return o3d.geometry.PointCloud() pts_cam = np.stack([bx[mask] * d, by[mask] * d, bz[mask] * d], axis=1) R_z180 = np.diag([-1.0, -1.0, 1.0]) pts_cam = pts_cam @ R_z180.T pts_w = (pose_matrix[:3, :3] @ pts_cam.T).T + pose_matrix[:3, 3] if rgb.shape[:2] != depth.shape: rgb_d = cv2.resize(rgb, (W, H), interpolation=cv2.INTER_LINEAR) else: rgb_d = rgb colors = rgb_d.reshape(-1, 3)[mask].astype(np.float64) / 255.0 pc = o3d.geometry.PointCloud() pc.points = o3d.utility.Vector3dVector(pts_w) pc.colors = o3d.utility.Vector3dVector(colors) return pc def process_scene(self): """处理整个场景""" # 创建输出目录 self.output_folder.mkdir(parents=True, exist_ok=True) self.detection_folder.mkdir(parents=True, exist_ok=True) rgb_files = sorted( self.rgb_folder.glob("*.jpg"), key=lambda x: int(x.stem) ) if not rgb_files: raise FileNotFoundError(f"在 {self.rgb_folder} 中未找到 RGB 图像") print(f"找到 {len(rgb_files)} 张全景图,开始处理...") combined_pc = o3d.geometry.PointCloud() all_detections: List[MaskDetection] = [] door_candidates = [] # 3D 门候选 for rgb_file in tqdm(rgb_files, desc="处理场景"): idx = rgb_file.stem pose = self.poses.get(idx) if pose is None: print(f" ⚠️ 警告:{rgb_file.name} 无位姿信息,跳过") continue depth_path = self.depth_folder / f"{idx}.png" if not depth_path.exists(): print(f" ⚠️ 警告:深度图不存在 {depth_path},跳过") continue rgb = cv2.cvtColor(cv2.imread(str(rgb_file)), cv2.COLOR_BGR2RGB) depth = cv2.imread(str(depth_path), cv2.IMREAD_UNCHANGED).astype(np.float32) / self.depth_scale pose_matrix = self._build_pose_matrix(pose) # 1. YOLOE 检测 + mask 3D 映射 save_det_path = self.detection_folder / f"{idx}_det.jpg" det_result = self.detect_single_image( str(rgb_file), depth, pose_matrix, str(save_det_path) ) all_detections.append(det_result) det_count = len(det_result.mask_contours) tqdm.write(f" {rgb_file.name}: 检测 {det_count} 个门") # 2. 收集 3D 门候选 for i, pts_3d in enumerate(det_result.mask_3d_points): if len(pts_3d) > 10: # 至少 10 个点 bbox_min, bbox_max = self._axis_aligned_bbox(pts_3d) door_candidates.append({ 'bbox_min': bbox_min, 'bbox_max': bbox_max, 'points_3d': pts_3d, 'source': { 'image': rgb_file.name, 'score': det_result.scores[i] if i < len(det_result.scores) else 0.0 } }) # 3. RGB-D 转完整点云 pc = self._rgb_depth_to_pointcloud(rgb, depth, pose_matrix) combined_pc += pc # 最终下采样 print(f"\n融合前点数:{len(combined_pc.points)}") combined_pc = combined_pc.voxel_down_sample(self.voxel_size) print(f"融合后点数:{len(combined_pc.points)}") # 保存合并点云 merge_ply_path = self.output_folder / "merged.ply" o3d.io.write_point_cloud(str(merge_ply_path), combined_pc) print(f"保存合并点云:{merge_ply_path}") # 合并 3D 门 print(f"\n3D 门候选:{len(door_candidates)}") doors_3d = self._merge_3d_doors(door_candidates) print(f"合并后门数量:{len(doors_3d)}") # 估计地面 Y 坐标 if self.ground_y is not None: ground_y = self.ground_y else: ground_y = self._estimate_ground_y(combined_pc) print(f"估计地面 Y 坐标:{ground_y:.3f}") # 过滤不符合物理特性的门 print("\n过滤 3D 门...") valid_doors = [] filtered_doors = [] for door in doors_3d: passed, reasons = self._filter_door_by_properties(door, ground_y) if passed: valid_doors.append(door) else: filtered_doors.append((door, reasons)) print(f"通过过滤:{len(valid_doors)} 个门") if len(filtered_doors) > 0: print(f"被过滤:{len(filtered_doors)} 个门") for door, reasons in filtered_doors: print(f" 门{door.id} (中心={door.center.round(2)}):") for reason in reasons: print(f" - {reason}") # 识别入户门 print("\n" + "=" * 40) print("入户门识别") print("=" * 40) entrance_info = self._identify_entrance_door( valid_doors, ground_y, all_detections, combined_pc ) # 保存检测结果(包含入户门信息) self._save_detections(all_detections, valid_doors, filtered_doors, entrance_info) return combined_pc, valid_doors, entrance_info def _save_detections(self, detections: List[MaskDetection], doors_3d: List[Door3D], filtered_doors: List[Tuple[Door3D, List[str]]] = None, entrance_info: Optional[EntranceInfo] = None): """保存检测结果""" # 只保存有检测到的图像 detected_results = [] for d in detections: if len(d.mask_contours) > 0: detected_results.append({ "image": d.image_name, "count": len(d.mask_contours), "mask_contours": d.mask_contours, "scores": d.scores }) # 3D 门信息 - 正确处理多来源合并 doors_3d_data = [] for door in doors_3d: # 统计来源信息 source_count = len(door.source_detections) scores = [s['score'] for s in door.source_detections if 'score' in s] avg_score = sum(scores) / len(scores) if scores else 0.0 # 计算门的尺寸 size = door.bbox_8points.max(axis=0) - door.bbox_8points.min(axis=0) doors_3d_data.append({ "id": door.id, "center": door.center.tolist(), "bbox_8points": door.bbox_8points.tolist(), "size": np.round(size, 4).tolist(), # [宽,高,厚] "source_count": source_count, # 来源数量 "avg_score": round(avg_score, 4), # 平均置信度 "sources": door.source_detections # 详细来源列表 }) # 被过滤的门信息 filtered_data = [] if filtered_doors and len(filtered_doors) > 0: for door, reasons in filtered_doors: source_count = len(door.source_detections) scores = [s['score'] for s in door.source_detections if 'score' in s] avg_score = sum(scores) / len(scores) if scores else 0.0 size = door.bbox_8points.max(axis=0) - door.bbox_8points.min(axis=0) filtered_data.append({ "id": door.id, "center": door.center.tolist(), "size": np.round(size, 4).tolist(), "avg_score": round(avg_score, 4), "source_count": source_count, "reject_reasons": reasons }) # 入户门信息 entrance_data = None if entrance_info: entrance_data = { "is_detected": entrance_info.is_detected, "center": entrance_info.center.tolist(), "score": round(entrance_info.score, 4), "method": entrance_info.method, "reason": entrance_info.reason, } if entrance_info.door is not None: entrance_data["door_id"] = entrance_info.door.id output = { "total_images_processed": len(detections), "images_with_doors": len(detected_results), "total_2d_detections": sum(r["count"] for r in detected_results), "total_3d_doors": len(doors_3d), "filtered_3d_doors": len(filtered_doors) if filtered_doors else 0, "entrance_door": entrance_data, "detected_images": detected_results, "3d_doors": doors_3d_data, "filtered_doors": filtered_data } json_path = self.output_folder / "detections.json" with open(json_path, 'w', encoding='utf-8') as f: json.dump(output, f, indent=2, ensure_ascii=False) print(f"保存检测结果:{json_path}") # 打印汇总 print("\n" + "="*50) print("检测汇总") print("="*50) print(f" 处理图像数:{output['total_images_processed']}") print(f" 检测到门的图像:{output['images_with_doors']}") print(f" 2D 检测总数:{output['total_2d_detections']}") print(f" 3D 门数量:{output['total_3d_doors']}") if len(filtered_doors) > 0 if filtered_doors else False: print(f" 被过滤的门:{output['filtered_3d_doors']}") if entrance_data: det_tag = "已检测" if entrance_data["is_detected"] else "估计" print(f"\n 入户门 [{det_tag}]:") print(f" 方法: {entrance_data['method']}") print(f" 中心: {entrance_data['center']}") print(f" 评分: {entrance_data['score']}") print(f" 原因: {entrance_data['reason']}") if doors_3d: print("\n 有效 3D 门信息:") for door in doors_3d: size = door.bbox_8points.max(axis=0) - door.bbox_8points.min(axis=0) sources_info = f"{len(door.source_detections)} 个视角" if len(door.source_detections) > 1: scores_str = ", ".join([f"{s['image']}:{s['score']:.2f}" for s in door.source_detections]) sources_info += f" ({scores_str})" print(f" 门{door.id}: 中心={door.center.round(3)}, 尺寸={size.round(3)}") print(f" 来源:{sources_info}") if filtered_doors and len(filtered_doors) > 0: print("\n 被过滤的门 (不符合物理特性):") for door, reasons in filtered_doors: size = door.bbox_8points.max(axis=0) - door.bbox_8points.min(axis=0) print(f" 门{door.id}: 中心={door.center.round(3)}, 尺寸={size.round(3)}") for reason in reasons: print(f" - {reason}") print("="*50) # ============================================================================ # 主函数 # ============================================================================ def main(): parser = argparse.ArgumentParser( description="场景点云处理器 - YOLOE 检测 + 点云融合 + 3D 门合并", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 示例: # 处理 scene0001 场景 python scene_processor.py -s scene0001 # 指定模型和参数 python scene_processor.py -s scene0001 --model yoloe-26x-seg.pt --conf 0.4 # 调整点云精度 python scene_processor.py -s scene0001 --voxel-size 0.02 # 指定地面 Y 坐标(默认自动估计) python scene_processor.py -s scene0001 --ground-y -1.5 # 调整门尺寸过滤阈值 python scene_processor.py -s scene0001 --door-height-min 1.2 --door-width-min 0.5 """ ) parser.add_argument( "--scene", "-s", type=str, default="scene0001", help="场景文件夹 (默认:scene0001)" ) parser.add_argument( "--model", "-m", type=str, default="yoloe-26x-seg.pt", help="YOLOE 模型路径" ) parser.add_argument( "--conf", type=float, default=0.35, help="置信度阈值 (默认:0.35)" ) parser.add_argument( "--iou", type=float, default=0.45, help="NMS IoU 阈值 (默认:0.45)" ) parser.add_argument( "--voxel-size", type=float, default=0.03, help="点云体素大小 (默认:0.03)" ) parser.add_argument( "--depth-scale", type=float, default=256.0, help="深度图缩放因子 (默认:256.0)" ) parser.add_argument( "--ground-y", type=float, default=None, help="地面 Y 坐标 (默认:从点云自动估计)" ) parser.add_argument( "--door-height-min", type=float, default=1.0, help="门最小高度 (默认:1.0 米)" ) parser.add_argument( "--door-height-max", type=float, default=3.0, help="门最大高度 (默认:3.0 米)" ) parser.add_argument( "--door-width-min", type=float, default=0.3, help="门最小宽度 (默认:0.3 米)" ) parser.add_argument( "--door-width-max", type=float, default=3.0, help="门最大宽度 (默认:3.0 米)" ) parser.add_argument( "--door-thickness-max", type=float, default=0.5, help="门最大厚度 (默认:0.5 米)" ) parser.add_argument( "--ground-dist-thresh", type=float, default=0.5, help="门底部距地面最大距离 (默认:0.5 米)" ) args = parser.parse_args() if not Path(args.scene).exists(): print(f"❌ 场景文件夹不存在:{args.scene}") return processor = SceneProcessor( scene_folder=args.scene, model_path=args.model, conf=args.conf, iou=args.iou, voxel_size=args.voxel_size, depth_scale=args.depth_scale, ground_y=args.ground_y, ) # 更新过滤参数 processor.DOOR_HEIGHT_MIN = args.door_height_min processor.DOOR_HEIGHT_MAX = args.door_height_max processor.DOOR_WIDTH_MIN = args.door_width_min processor.DOOR_WIDTH_MAX = args.door_width_max processor.DOOR_THICKNESS_MAX = args.door_thickness_max processor.GROUND_DIST_THRESH = args.ground_dist_thresh processor.process_scene() if __name__ == "__main__": main()