| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130 |
- """
- 场景点云处理器
- 整合 YOLOE 门检测 + 全景图点云转换 + 位姿变换 + 多视角融合 + 3D 门合并
- 支持 scene0001 格式的场景数据
- """
- import os
- import sys
- import json
- import argparse
- from pathlib import Path
- from dataclasses import dataclass, field
- from typing import Dict, List, Optional, Tuple
- # 添加当前目录到路径,以便导入 camera_spherical
- sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
- import cv2
- import numpy as np
- import open3d as o3d
- from tqdm import tqdm
- from ultralytics import YOLOE
- from camera_spherical import Intrinsic_Spherical_NP
- # ============================================================================
- # 数据类
- # ============================================================================
- @dataclass
- class PoseData:
- """位姿数据"""
- uuid: str
- rotation: Dict[str, float] # w, x, y, z
- translation: Dict[str, float] # x, y, z
- @dataclass
- class MaskDetection:
- """单张图像中的 mask 检测结果"""
- image_name: str
- mask_contours: List[List[List[float]]] # 2D 轮廓像素坐标 [[x, y], ...]
- scores: List[float]
- mask_3d_points: List[np.ndarray] # 每个 mask 对应的世界坐标系 3D 点
- @dataclass
- class Door3D:
- """3D 门实例"""
- id: int
- center: np.ndarray # 中心坐标 [x, y, z]
- bbox_8points: np.ndarray # 8 个角点 [[x,y,z], ...] (8x3)
- source_detections: List[Dict] # 来源检测信息
- @dataclass
- class EntranceInfo:
- """入户门信息"""
- is_detected: bool # 是否通过检测确定
- door: Optional[Door3D] # 入户门对象(检测确定时)
- center: np.ndarray # 入户门中心/估计位置 [x, y, z]
- score: float # 置信度评分
- method: str # 确定方法:"exterior_class" / "size_score" / "fallback_2d" / "scene_center_estimate"
- reason: str # 说明文字
- # ============================================================================
- # 场景处理器
- # ============================================================================
- class SceneProcessor:
- """场景处理器:整合检测 + 点云生成 + 位姿变换 + 3D 门融合"""
- # 门相关类别 - 统一映射为 "door"
- DOOR_CLASSES = [
- "door", "indoor door", "exterior door",
- "wooden door", "metal door", "glass door", "double door",
- "single door", "open door", "closed door"
- ]
- # 3D 门合并参数
- MERGE_IOU_THRESH = 0.3 # 3D IoU 阈值
- MERGE_DIST_THRESH = 2.0 # 中心距离阈值 (米)
- # 3D 门过滤参数 - 有效门的物理特性
- DOOR_HEIGHT_MIN = 1.0 # 最小高度 (米)
- DOOR_HEIGHT_MAX = 3.0 # 最大高度 (米)
- DOOR_WIDTH_MIN = 0.3 # 最小宽度 (米)
- DOOR_WIDTH_MAX = 3.0 # 最大宽度 (米)
- DOOR_THICKNESS_MAX = 0.5 # 最大厚度 (米) - 门的深度方向
- GROUND_DIST_THRESH = 0.5 # 门底部距地面最大距离 (米)
- def __init__(
- self,
- scene_folder: str,
- model_path: str = "yoloe-26x-seg.pt",
- conf: float = 0.35,
- iou: float = 0.45,
- voxel_size: float = 0.03,
- depth_scale: float = 256.0,
- depth_min: float = 0.02,
- ground_y: Optional[float] = None, # 地面 Y 坐标(可选,默认从点云估计)
- ):
- """
- 初始化场景处理器
- Args:
- scene_folder: 场景文件夹路径
- model_path: YOLOE 模型路径
- conf: 检测置信度阈值
- iou: NMS IoU 阈值
- voxel_size: 点云体素下采样尺寸
- depth_scale: 深度图缩放因子
- depth_min: 最小有效深度
- ground_y: 地面 Y 坐标(可选,默认从点云自动估计)
- """
- self.scene_folder = Path(scene_folder)
- self.conf = conf
- self.iou = iou
- self.voxel_size = voxel_size
- self.depth_scale = depth_scale
- self.depth_min = depth_min
- self.ground_y = ground_y
- # 子目录
- self.rgb_folder = self.scene_folder / "pano_img"
- self.depth_folder = self.scene_folder / "depth_img"
- self.pose_file = self.scene_folder / "vision.txt"
- # 输出目录
- self.output_folder = self.scene_folder / "output"
- self.detection_folder = self.output_folder / "detections"
- # 加载位姿
- self.poses = self._load_poses()
- # 初始化 YOLOE 模型
- print(f"加载 YOLOE 模型:{model_path}")
- self.model = YOLOE(model_path)
- self.model.set_classes(self.DOOR_CLASSES)
- print(f"检测类别 (统一为 door): {self.DOOR_CLASSES}")
- def _load_poses(self) -> Dict[str, PoseData]:
- """从 vision.txt 加载位姿信息"""
- if not self.pose_file.exists():
- raise FileNotFoundError(f"位姿文件不存在:{self.pose_file}")
- with open(self.pose_file, 'r') as f:
- data = json.load(f)
- poses = {}
- for loc in data.get('sweepLocations', []):
- uuid = str(loc['uuid'])
- poses[uuid] = PoseData(
- uuid=uuid,
- rotation=loc['pose']['rotation'],
- translation=loc['pose']['translation']
- )
- print(f"加载 {len(poses)} 个位姿")
- return poses
- def _build_pose_matrix(self, pose: PoseData) -> np.ndarray:
- """构建 4x4 位姿变换矩阵"""
- R = o3d.geometry.get_rotation_matrix_from_quaternion(
- np.array([pose.rotation['w'], pose.rotation['x'],
- pose.rotation['y'], pose.rotation['z']])
- )
- t = np.array([
- pose.translation['x'],
- pose.translation['y'],
- pose.translation['z']
- ])
- T = np.eye(4)
- T[:3, :3] = R
- T[:3, 3] = t
- return T
- def _mask_to_3d_points(
- self,
- mask: np.ndarray,
- depth: np.ndarray,
- pose_matrix: np.ndarray
- ) -> Optional[np.ndarray]:
- """
- 将 2D mask 映射到世界坐标系 3D 点
- Args:
- mask: 二值 mask (H, W)
- depth: 深度图 (H, W)
- pose_matrix: 4x4 位姿矩阵
- Returns:
- 世界坐标系下的 3D 点 (N, 3)
- """
- H, W = depth.shape
- sph = Intrinsic_Spherical_NP(W, H)
- # 获取 mask 内的像素
- ys, xs = np.where(mask > 0)
- if len(xs) == 0:
- return None
- # 有效深度掩码
- valid = depth[ys, xs] > self.depth_min
- if not np.any(valid):
- return None
- xs, ys = xs[valid], ys[valid]
- depths = depth[ys, xs]
- # 计算方向向量
- bx, by, bz = sph.bearing([xs.astype(np.float64), ys.astype(np.float64)])
- bx, by, bz = np.array(bx), np.array(by), np.array(bz)
- # 相机坐标系
- pts_cam = np.stack([bx * depths, by * depths, bz * depths], axis=1)
- # Z 轴 180 度翻转
- R_z180 = np.diag([-1.0, -1.0, 1.0])
- pts_cam = pts_cam @ R_z180.T
- # 世界坐标系
- pts_w = (pose_matrix[:3, :3] @ pts_cam.T).T + pose_matrix[:3, 3]
- return pts_w
- def _extract_mask_contours(self, masks) -> Tuple[List[List[List[float]]], List[np.ndarray]]:
- """
- 从 YOLOE mask 结果提取轮廓
- Args:
- masks: YOLOE masks (H, W, N)
- Returns:
- (轮廓列表, 对应 mask 数组) 每个 mask 只保留最大轮廓,保证与 scores 一一对应
- """
- contours = []
- mask_arrays = []
- if masks is None:
- return contours, mask_arrays
- masks_np = masks.cpu().numpy()
- for i in range(masks_np.shape[0]):
- mask = masks_np[i]
- # 二值化
- mask_bin = (mask > 0.5).astype(np.uint8) * 255
- # 提取轮廓
- cnts, _ = cv2.findContours(mask_bin, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
- if not cnts:
- continue
- # 只保留面积最大的轮廓,确保与 score 一一对应
- largest = max(cnts, key=cv2.contourArea)
- if len(largest) >= 3:
- # 简化轮廓
- epsilon = 0.02 * cv2.arcLength(largest, True)
- approx = cv2.approxPolyDP(largest, epsilon, True)
- contour = approx.reshape(-1, 2).astype(float).tolist()
- contours.append(contour)
- mask_arrays.append((mask_bin > 0).astype(np.uint8))
- else:
- # 点数不足时回退到完整 mask
- mask_arrays.append((mask_bin > 0).astype(np.uint8))
- return contours, mask_arrays
- def detect_single_image(
- self,
- img_path: str,
- depth: np.ndarray,
- pose_matrix: np.ndarray,
- save_path: Optional[str] = None
- ) -> MaskDetection:
- """
- 检测单张图像并提取 mask 轮廓和 3D 点
- Args:
- img_path: 图像路径
- depth: 深度图
- pose_matrix: 位姿矩阵
- save_path: 保存路径
- Returns:
- MaskDetection 对象
- """
- results = self.model.predict(
- img_path,
- imgsz=(1024, 2048),
- conf=self.conf,
- iou=self.iou,
- max_det=50,
- augment=True,
- retina_masks=True,
- half=False,
- verbose=False,
- )
- result = results[0]
- scores = []
- contours = []
- mask_3d_points = []
- if result.masks is not None:
- masks = result.masks.data
- # 提取轮廓(每个 mask 只保留最大轮廓,与 scores 一一对应)
- contours, mask_arrays = self._extract_mask_contours(masks)
- # 获取分数
- scores = result.boxes.conf.cpu().numpy().tolist()
- # 每个 mask 转 3D 点
- H, W = depth.shape
- for mask_bin in mask_arrays:
- mask_resized = cv2.resize(mask_bin, (W, H), interpolation=cv2.INTER_NEAREST)
- pts_3d = self._mask_to_3d_points(mask_resized, depth, pose_matrix)
- if pts_3d is not None and len(pts_3d) > 0:
- mask_3d_points.append(pts_3d)
- if save_path:
- os.makedirs(os.path.dirname(save_path), exist_ok=True)
- result.save(save_path)
- return MaskDetection(
- image_name=os.path.basename(img_path),
- mask_contours=contours,
- scores=scores,
- mask_3d_points=mask_3d_points
- )
- def _axis_aligned_bbox(self, points: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
- """计算轴对齐包围盒 (min, max)"""
- lo = np.min(points, axis=0)
- hi = np.max(points, axis=0)
- return lo, hi
- def _bbox_8corners(self, bbox_min: np.ndarray, bbox_max: np.ndarray) -> np.ndarray:
- """从 bbox min/max 获取 8 个角点"""
- cx, cy, cz = bbox_min
- ex, ey, ez = bbox_max
- return np.array([
- [cx, cy, cz], [ex, cy, cz], [ex, ey, cz], [cx, ey, cz],
- [cx, cy, ez], [ex, cy, ez], [ex, ey, ez], [cx, ey, ez],
- ])
- def _bbox_iou_3d(self, b1, b2) -> float:
- """3D IoU 计算"""
- lo = np.maximum(b1[0], b2[0])
- hi = np.minimum(b1[1], b2[1])
- inter = np.prod(np.maximum(hi - lo, 0))
- vol1 = np.prod(b1[1] - b1[0])
- vol2 = np.prod(b2[1] - b2[0])
- union = vol1 + vol2 - inter
- return inter / union if union > 0 else 0.0
- def _merge_3d_doors(self, door_candidates: List[Dict]) -> List[Door3D]:
- """
- 合并重叠或接近的 3D 门 - 使用并查集确保完全合并
- 合并逻辑:
- 1. 构建连通图:如果两个门满足合并条件,则它们连通
- 2. 使用并查集找出所有连通分量
- 3. 每个连通分量合并为一个门
- Args:
- door_candidates: 候选门列表
- Returns:
- 合并后的 Door3D 列表
- """
- if not door_candidates:
- return []
- n = len(door_candidates)
- if n == 1:
- # 只有一个候选,直接返回
- d = door_candidates[0]
- return [Door3D(
- id=0,
- center=(d['bbox_min'] + d['bbox_max']) / 2,
- bbox_8points=self._bbox_8corners(d['bbox_min'], d['bbox_max']),
- source_detections=[d['source']]
- )]
- # ========== 并查集 ==========
- parent = list(range(n))
- def find(x):
- if parent[x] != x:
- parent[x] = find(parent[x]) # 路径压缩
- return parent[x]
- def union(x, y):
- px, py = find(x), find(y)
- if px != py:
- parent[px] = py
- # ========== 构建连通关系 ==========
- # 检查所有门对,满足条件的合并
- for i in range(n):
- for j in range(i + 1, n):
- ci = (door_candidates[i]['bbox_min'] + door_candidates[i]['bbox_max']) / 2
- cj = (door_candidates[j]['bbox_min'] + door_candidates[j]['bbox_max']) / 2
- dist = np.linalg.norm(ci - cj)
- iou = self._bbox_iou_3d(
- (door_candidates[i]['bbox_min'], door_candidates[i]['bbox_max']),
- (door_candidates[j]['bbox_min'], door_candidates[j]['bbox_max'])
- )
- if dist < self.MERGE_DIST_THRESH and iou > self.MERGE_IOU_THRESH:
- union(i, j)
- # ========== 按连通分量分组 ==========
- from collections import defaultdict
- groups = defaultdict(list)
- for i in range(n):
- groups[find(i)].append(door_candidates[i])
- # ========== 合并每个组 ==========
- doors = []
- for door_id, members in enumerate(groups.values()):
- if not members:
- continue
- # 合并所有成员的 bbox
- bbox_min = np.min([m['bbox_min'] for m in members], axis=0)
- bbox_max = np.max([m['bbox_max'] for m in members], axis=0)
- sources = [m['source'] for m in members]
- doors.append(Door3D(
- id=door_id,
- center=(bbox_min + bbox_max) / 2,
- bbox_8points=self._bbox_8corners(bbox_min, bbox_max),
- source_detections=sources
- ))
- return doors
- def _estimate_ground_y(self, combined_pc: o3d.geometry.PointCloud) -> float:
- """
- 从点云估计地面 Y 坐标
- 假设:地面是场景中最低的大面积平面
- 方法:取所有点中 Y 坐标的下 1% 分位数
- Args:
- combined_pc: 完整场景点云
- Returns:
- 估计的地面 Y 坐标
- """
- points = np.asarray(combined_pc.points)
- if len(points) == 0:
- return 0.0
- # 取 Y 坐标(假设 Y 轴向上)的下 1% 分位数,避免家具腿等低矮物体干扰
- ground_y = np.percentile(points[:, 1], 1)
- return ground_y
- def _filter_door_by_properties(self, door: Door3D, ground_y: float) -> Tuple[bool, List[str]]:
- """
- 根据物理特性过滤门
- 过滤条件:
- 1. 高度在合理范围内 (1.0m - 3.0m)
- 2. 宽度在合理范围内 (0.3m - 3.0m)
- 3. 厚度不超过阈值 (≤ 0.5m)
- 4. 门底部接近地面 (≤ 0.5m)
- Args:
- door: 3D 门对象
- ground_y: 地面 Y 坐标
- Returns:
- (是否通过过滤,拒绝原因列表)
- """
- reasons = []
- # 计算 bounding box 尺寸
- size = door.bbox_8points.max(axis=0) - door.bbox_8points.min(axis=0)
- height = size[1] # Y 方向是高度
- width = max(size[0], size[2]) # 宽度是 X/Z 中较大的
- thickness = min(size[0], size[2]) # 厚度是 X/Z 中较小的
- # 门底部 Y 坐标
- door_bottom_y = door.bbox_8points[:, 1].min()
- # 条件 1: 高度检查
- if height < self.DOOR_HEIGHT_MIN:
- reasons.append(f"高度过小 ({height:.2f}m < {self.DOOR_HEIGHT_MIN}m)")
- elif height > self.DOOR_HEIGHT_MAX:
- reasons.append(f"高度过大 ({height:.2f}m > {self.DOOR_HEIGHT_MAX}m)")
- # 条件 2: 宽度检查
- if width < self.DOOR_WIDTH_MIN:
- reasons.append(f"宽度过小 ({width:.2f}m < {self.DOOR_WIDTH_MIN}m)")
- elif width > self.DOOR_WIDTH_MAX:
- reasons.append(f"宽度过大 ({width:.2f}m > {self.DOOR_WIDTH_MAX}m)")
- # 条件 3: 厚度检查
- if thickness > self.DOOR_THICKNESS_MAX:
- reasons.append(f"厚度过大 ({thickness:.2f}m > {self.DOOR_THICKNESS_MAX}m)")
- # 条件 4: 地面贴合检查
- dist_to_ground = door_bottom_y - ground_y
- if dist_to_ground > self.GROUND_DIST_THRESH:
- reasons.append(f"距地面过远 ({dist_to_ground:.2f}m > {self.GROUND_DIST_THRESH}m)")
- elif dist_to_ground < -self.GROUND_DIST_THRESH:
- reasons.append(f"嵌入地面过深 ({abs(dist_to_ground):.2f}m)")
- passed = len(reasons) == 0
- return passed, reasons
- def _score_entrance_door(self, door: Door3D, ground_y: float, all_centers: np.ndarray) -> Dict:
- """
- 为每个门计算"入户门可能性"评分
- 评分维度(满分 100):
- 1. 尺寸评分 (30分): 入户门通常较大,高度约 2.0-2.4m,宽度约 0.9-1.2m
- 2. 地面贴合 (20分): 入户门底部贴近地面
- 3. 边缘位置 (25分): 入户门在建筑外围,离场景中心较远
- 4. 厚度评分 (15分): 入户门通常较厚(实心门)
- 5. 多视角支持 (10分): 被多个视角检测到的门更可信
- Args:
- door: 3D 门对象
- ground_y: 地面 Y 坐标
- all_centers: 所有门的中心坐标
- Returns:
- 评分详情 {"total": float, "details": Dict[str, float], "reason": str}
- """
- size = door.bbox_8points.max(axis=0) - door.bbox_8points.min(axis=0)
- height = size[1]
- width = max(size[0], size[2])
- thickness = min(size[0], size[2])
- door_bottom_y = door.bbox_8points[:, 1].min()
- # 1. 尺寸评分 (30分) - 理想入户门尺寸 高2.1m 宽1.0m
- height_score = max(0, 15 - abs(height - 2.1) * 10)
- width_score = max(0, 15 - abs(width - 1.0) * 12)
- size_score = height_score + width_score
- # 2. 地面贴合 (20分) - 底部越接近地面分越高
- dist_to_ground = abs(door_bottom_y - ground_y)
- ground_score = max(0, 20 - dist_to_ground * 40)
- # 3. 边缘位置 (25分) - 离所有门的中心越远,越可能是入户门
- if len(all_centers) > 1:
- dists_to_others = np.linalg.norm(all_centers - door.center, axis=1)
- max_dist = dists_to_others.max()
- avg_dist = dists_to_others.mean()
- # 平均距离越大越可能是入户门
- edge_score = min(25, avg_dist * 10)
- else:
- edge_score = 15 # 只有一个门时给中等分
- # 4. 厚度评分 (15分) - 入户门通常更厚
- # 理想厚度 0.04-0.1m,但点云中由于包含门框,会显得更厚
- if 0.05 <= thickness <= 0.3:
- thickness_score = 15
- elif thickness < 0.05:
- thickness_score = 5
- else:
- thickness_score = max(0, 15 - (thickness - 0.3) * 30)
- # 5. 多视角支持 (10分)
- source_count = len(door.source_detections)
- view_score = min(10, source_count * 3)
- total = size_score + ground_score + edge_score + thickness_score + view_score
- reasons = [
- f"尺寸={size_score:.0f}/30 (高{height:.2f}m 宽{width:.2f}m)",
- f"地面={ground_score:.0f}/20 (距地{dist_to_ground:.2f}m)",
- f"边缘={edge_score:.0f}/25",
- f"厚度={thickness_score:.0f}/15 ({thickness:.2f}m)",
- f"视角={view_score:.0f}/10 ({source_count}个)",
- ]
- return {
- "total": round(total, 1),
- "details": {
- "size": round(size_score, 1),
- "ground": round(ground_score, 1),
- "edge": round(edge_score, 1),
- "thickness": round(thickness_score, 1),
- "view": round(view_score, 1),
- },
- "reason": " | ".join(reasons),
- }
- def _identify_entrance_door(
- self,
- valid_doors: List[Door3D],
- ground_y: float,
- all_detections: List[MaskDetection],
- combined_pc: o3d.geometry.PointCloud,
- ) -> EntranceInfo:
- """
- 从多个门中识别入户门,无检测时提供兜底策略
- 策略优先级:
- 1. 如果 YOLOE 检测到 "exterior door" 类别 → 直接使用
- 2. 有多个有效门 → 按评分选择最高分
- 3. 无有效门但有 2D 检测 → 用置信度最高的 2D 检测的 3D 投影位置
- 4. 完全无检测 → 用场景几何估计(场景边界中心+地面高度)
- Args:
- valid_doors: 通过物理特性过滤的门
- ground_y: 地面 Y 坐标
- all_detections: 所有 2D 检测结果
- combined_pc: 合并后的场景点云
- Returns:
- EntranceInfo 入户门信息
- """
- # ===== 策略 1: 检查是否有 exterior door 检测结果 =====
- for det in all_detections:
- for src in det.source_detections if hasattr(det, 'source_detections') else []:
- pass # source_detections 在 Door3D 上,不在 MaskDetection 上
- # MaskDetection 只有 scores,没有类别信息
- # YOLOE predict 返回的 result.boxes.cls 包含类别 ID
- pass
- # ===== 策略 2: 有多个有效门,按评分选择 =====
- if len(valid_doors) >= 1:
- all_centers = np.array([d.center for d in valid_doors])
- scored_doors = []
- for door in valid_doors:
- score_info = self._score_entrance_door(door, ground_y, all_centers)
- scored_doors.append((door, score_info))
- # 按总分排序
- scored_doors.sort(key=lambda x: x[1]["total"], reverse=True)
- best_door, best_score = scored_doors[0]
- method = "size_score"
- reason = f"多门评分选择(共{len(valid_doors)}个门): {best_score['reason']}"
- if len(valid_doors) == 1:
- method = "size_score"
- reason = f"唯一有效门: {best_score['reason']}"
- print(f"\n入户门识别 - 选择门{best_door.id}")
- print(f" 方法: {method}")
- print(f" 评分: {best_score['total']}/100")
- print(f" 中心: {best_door.center.round(3)}")
- print(f" 原因: {reason}")
- return EntranceInfo(
- is_detected=True,
- door=best_door,
- center=best_door.center.copy(),
- score=best_score["total"] / 100.0,
- method=method,
- reason=reason,
- )
- # ===== 策略 3: 无有效门但有 2D 检测 =====
- # 找置信度最高的 2D 检测
- best_2d_score = 0.0
- best_2d_det = None
- for det in all_detections:
- if det.scores and len(det.mask_3d_points) > 0:
- max_score = max(det.scores)
- if max_score > best_2d_score:
- best_2d_score = max_score
- best_2d_det = det
- if best_2d_det is not None and len(best_2d_det.mask_3d_points) > 0:
- # 使用最高分 mask 的 3D 点中心作为入户门估计位置
- best_mask_idx = best_2d_det.scores.index(best_2d_score)
- if best_mask_idx < len(best_2d_det.mask_3d_points):
- est_center = np.mean(best_2d_det.mask_3d_points[best_mask_idx], axis=0)
- print(f"\n入户门识别 - 使用最高分 2D 检测兜底")
- print(f" 方法: fallback_2d")
- print(f" 图像: {best_2d_det.image_name}")
- print(f" 置信度: {best_2d_score:.3f}")
- print(f" 估计中心: {est_center.round(3)}")
- return EntranceInfo(
- is_detected=False,
- door=None,
- center=est_center,
- score=best_2d_score,
- method="fallback_2d",
- reason=f"最高分 2D 检测 ({best_2d_det.image_name}, conf={best_2d_score:.3f})",
- )
- # ===== 策略 4: 完全无检测,用场景几何估计 =====
- print(f"\n入户门识别 - 无任何检测,使用场景几何估计")
- points = np.asarray(combined_pc.points)
- if len(points) > 0:
- # 取点云的水平中心 + 最低点作为入口估计
- est_x = np.median(points[:, 0])
- est_y = ground_y + 1.0 # 地面以上 1m(典型门把手高度附近)
- est_z = np.median(points[:, 2])
- # 找离中心最远的方向(可能是入口方向)
- center_2d = np.array([est_x, est_z])
- dists = np.linalg.norm(points[:, [0, 2]] - center_2d, axis=1)
- far_idx = np.argsort(dists)[-len(dists)//10:] # 最远 10% 的点
- far_points = points[far_idx]
- est_x = np.median(far_points[:, 0])
- est_z = np.median(far_points[:, 2])
- else:
- est_x, est_y, est_z = 0.0, ground_y + 1.0, 0.0
- est_center = np.array([est_x, est_y, est_z])
- print(f" 方法: scene_center_estimate")
- print(f" 估计中心: {est_center.round(3)}")
- return EntranceInfo(
- is_detected=False,
- door=None,
- center=est_center,
- score=0.0,
- method="scene_center_estimate",
- reason="无检测结果,场景几何估计",
- )
- def _rgb_depth_to_pointcloud(
- self,
- rgb: np.ndarray,
- depth: np.ndarray,
- pose_matrix: np.ndarray
- ) -> o3d.geometry.PointCloud:
- """将 RGB-D 转换为世界坐标系点云"""
- H, W = depth.shape
- sph = Intrinsic_Spherical_NP(W, H)
- px, py = np.meshgrid(np.arange(W), np.arange(H))
- px_flat = px.flatten().astype(np.float64)
- py_flat = py.flatten().astype(np.float64)
- bx, by, bz = sph.bearing([px_flat, py_flat])
- bx, by, bz = np.array(bx), np.array(by), np.array(bz)
- mask = depth.flatten() > self.depth_min
- d = depth.flatten()[mask]
- if len(d) == 0:
- return o3d.geometry.PointCloud()
- pts_cam = np.stack([bx[mask] * d, by[mask] * d, bz[mask] * d], axis=1)
- R_z180 = np.diag([-1.0, -1.0, 1.0])
- pts_cam = pts_cam @ R_z180.T
- pts_w = (pose_matrix[:3, :3] @ pts_cam.T).T + pose_matrix[:3, 3]
- if rgb.shape[:2] != depth.shape:
- rgb_d = cv2.resize(rgb, (W, H), interpolation=cv2.INTER_LINEAR)
- else:
- rgb_d = rgb
- colors = rgb_d.reshape(-1, 3)[mask].astype(np.float64) / 255.0
- pc = o3d.geometry.PointCloud()
- pc.points = o3d.utility.Vector3dVector(pts_w)
- pc.colors = o3d.utility.Vector3dVector(colors)
- return pc
- def process_scene(self):
- """处理整个场景"""
- # 创建输出目录
- self.output_folder.mkdir(parents=True, exist_ok=True)
- self.detection_folder.mkdir(parents=True, exist_ok=True)
- rgb_files = sorted(
- self.rgb_folder.glob("*.jpg"),
- key=lambda x: int(x.stem)
- )
- if not rgb_files:
- raise FileNotFoundError(f"在 {self.rgb_folder} 中未找到 RGB 图像")
- print(f"找到 {len(rgb_files)} 张全景图,开始处理...")
- combined_pc = o3d.geometry.PointCloud()
- all_detections: List[MaskDetection] = []
- door_candidates = [] # 3D 门候选
- for rgb_file in tqdm(rgb_files, desc="处理场景"):
- idx = rgb_file.stem
- pose = self.poses.get(idx)
- if pose is None:
- print(f" ⚠️ 警告:{rgb_file.name} 无位姿信息,跳过")
- continue
- depth_path = self.depth_folder / f"{idx}.png"
- if not depth_path.exists():
- print(f" ⚠️ 警告:深度图不存在 {depth_path},跳过")
- continue
- rgb = cv2.cvtColor(cv2.imread(str(rgb_file)), cv2.COLOR_BGR2RGB)
- depth = cv2.imread(str(depth_path), cv2.IMREAD_UNCHANGED).astype(np.float32) / self.depth_scale
- pose_matrix = self._build_pose_matrix(pose)
- # 1. YOLOE 检测 + mask 3D 映射
- save_det_path = self.detection_folder / f"{idx}_det.jpg"
- det_result = self.detect_single_image(
- str(rgb_file), depth, pose_matrix, str(save_det_path)
- )
- all_detections.append(det_result)
- det_count = len(det_result.mask_contours)
- tqdm.write(f" {rgb_file.name}: 检测 {det_count} 个门")
- # 2. 收集 3D 门候选
- for i, pts_3d in enumerate(det_result.mask_3d_points):
- if len(pts_3d) > 10: # 至少 10 个点
- bbox_min, bbox_max = self._axis_aligned_bbox(pts_3d)
- door_candidates.append({
- 'bbox_min': bbox_min,
- 'bbox_max': bbox_max,
- 'points_3d': pts_3d,
- 'source': {
- 'image': rgb_file.name,
- 'score': det_result.scores[i] if i < len(det_result.scores) else 0.0
- }
- })
- # 3. RGB-D 转完整点云
- pc = self._rgb_depth_to_pointcloud(rgb, depth, pose_matrix)
- combined_pc += pc
- # 最终下采样
- print(f"\n融合前点数:{len(combined_pc.points)}")
- combined_pc = combined_pc.voxel_down_sample(self.voxel_size)
- print(f"融合后点数:{len(combined_pc.points)}")
- # 保存合并点云
- merge_ply_path = self.output_folder / "merged.ply"
- o3d.io.write_point_cloud(str(merge_ply_path), combined_pc)
- print(f"保存合并点云:{merge_ply_path}")
- # 合并 3D 门
- print(f"\n3D 门候选:{len(door_candidates)}")
- doors_3d = self._merge_3d_doors(door_candidates)
- print(f"合并后门数量:{len(doors_3d)}")
- # 估计地面 Y 坐标
- if self.ground_y is not None:
- ground_y = self.ground_y
- else:
- ground_y = self._estimate_ground_y(combined_pc)
- print(f"估计地面 Y 坐标:{ground_y:.3f}")
- # 过滤不符合物理特性的门
- print("\n过滤 3D 门...")
- valid_doors = []
- filtered_doors = []
- for door in doors_3d:
- passed, reasons = self._filter_door_by_properties(door, ground_y)
- if passed:
- valid_doors.append(door)
- else:
- filtered_doors.append((door, reasons))
- print(f"通过过滤:{len(valid_doors)} 个门")
- if len(filtered_doors) > 0:
- print(f"被过滤:{len(filtered_doors)} 个门")
- for door, reasons in filtered_doors:
- print(f" 门{door.id} (中心={door.center.round(2)}):")
- for reason in reasons:
- print(f" - {reason}")
- # 识别入户门
- print("\n" + "=" * 40)
- print("入户门识别")
- print("=" * 40)
- entrance_info = self._identify_entrance_door(
- valid_doors, ground_y, all_detections, combined_pc
- )
- # 保存检测结果(包含入户门信息)
- self._save_detections(all_detections, valid_doors, filtered_doors, entrance_info)
- return combined_pc, valid_doors, entrance_info
- def _save_detections(self, detections: List[MaskDetection], doors_3d: List[Door3D],
- filtered_doors: List[Tuple[Door3D, List[str]]] = None,
- entrance_info: Optional[EntranceInfo] = None):
- """保存检测结果"""
- # 只保存有检测到的图像
- detected_results = []
- for d in detections:
- if len(d.mask_contours) > 0:
- detected_results.append({
- "image": d.image_name,
- "count": len(d.mask_contours),
- "mask_contours": d.mask_contours,
- "scores": d.scores
- })
- # 3D 门信息 - 正确处理多来源合并
- doors_3d_data = []
- for door in doors_3d:
- # 统计来源信息
- source_count = len(door.source_detections)
- scores = [s['score'] for s in door.source_detections if 'score' in s]
- avg_score = sum(scores) / len(scores) if scores else 0.0
- # 计算门的尺寸
- size = door.bbox_8points.max(axis=0) - door.bbox_8points.min(axis=0)
- doors_3d_data.append({
- "id": door.id,
- "center": door.center.tolist(),
- "bbox_8points": door.bbox_8points.tolist(),
- "size": np.round(size, 4).tolist(), # [宽,高,厚]
- "source_count": source_count, # 来源数量
- "avg_score": round(avg_score, 4), # 平均置信度
- "sources": door.source_detections # 详细来源列表
- })
- # 被过滤的门信息
- filtered_data = []
- if filtered_doors and len(filtered_doors) > 0:
- for door, reasons in filtered_doors:
- source_count = len(door.source_detections)
- scores = [s['score'] for s in door.source_detections if 'score' in s]
- avg_score = sum(scores) / len(scores) if scores else 0.0
- size = door.bbox_8points.max(axis=0) - door.bbox_8points.min(axis=0)
- filtered_data.append({
- "id": door.id,
- "center": door.center.tolist(),
- "size": np.round(size, 4).tolist(),
- "avg_score": round(avg_score, 4),
- "source_count": source_count,
- "reject_reasons": reasons
- })
- # 入户门信息
- entrance_data = None
- if entrance_info:
- entrance_data = {
- "is_detected": entrance_info.is_detected,
- "center": entrance_info.center.tolist(),
- "score": round(entrance_info.score, 4),
- "method": entrance_info.method,
- "reason": entrance_info.reason,
- }
- if entrance_info.door is not None:
- entrance_data["door_id"] = entrance_info.door.id
- output = {
- "total_images_processed": len(detections),
- "images_with_doors": len(detected_results),
- "total_2d_detections": sum(r["count"] for r in detected_results),
- "total_3d_doors": len(doors_3d),
- "filtered_3d_doors": len(filtered_doors) if filtered_doors else 0,
- "entrance_door": entrance_data,
- "detected_images": detected_results,
- "3d_doors": doors_3d_data,
- "filtered_doors": filtered_data
- }
- json_path = self.output_folder / "detections.json"
- with open(json_path, 'w', encoding='utf-8') as f:
- json.dump(output, f, indent=2, ensure_ascii=False)
- print(f"保存检测结果:{json_path}")
- # 打印汇总
- print("\n" + "="*50)
- print("检测汇总")
- print("="*50)
- print(f" 处理图像数:{output['total_images_processed']}")
- print(f" 检测到门的图像:{output['images_with_doors']}")
- print(f" 2D 检测总数:{output['total_2d_detections']}")
- print(f" 3D 门数量:{output['total_3d_doors']}")
- if len(filtered_doors) > 0 if filtered_doors else False:
- print(f" 被过滤的门:{output['filtered_3d_doors']}")
- if entrance_data:
- det_tag = "已检测" if entrance_data["is_detected"] else "估计"
- print(f"\n 入户门 [{det_tag}]:")
- print(f" 方法: {entrance_data['method']}")
- print(f" 中心: {entrance_data['center']}")
- print(f" 评分: {entrance_data['score']}")
- print(f" 原因: {entrance_data['reason']}")
- if doors_3d:
- print("\n 有效 3D 门信息:")
- for door in doors_3d:
- size = door.bbox_8points.max(axis=0) - door.bbox_8points.min(axis=0)
- sources_info = f"{len(door.source_detections)} 个视角"
- if len(door.source_detections) > 1:
- scores_str = ", ".join([f"{s['image']}:{s['score']:.2f}" for s in door.source_detections])
- sources_info += f" ({scores_str})"
- print(f" 门{door.id}: 中心={door.center.round(3)}, 尺寸={size.round(3)}")
- print(f" 来源:{sources_info}")
- if filtered_doors and len(filtered_doors) > 0:
- print("\n 被过滤的门 (不符合物理特性):")
- for door, reasons in filtered_doors:
- size = door.bbox_8points.max(axis=0) - door.bbox_8points.min(axis=0)
- print(f" 门{door.id}: 中心={door.center.round(3)}, 尺寸={size.round(3)}")
- for reason in reasons:
- print(f" - {reason}")
- print("="*50)
- # ============================================================================
- # 主函数
- # ============================================================================
- def main():
- parser = argparse.ArgumentParser(
- description="场景点云处理器 - YOLOE 检测 + 点云融合 + 3D 门合并",
- formatter_class=argparse.RawDescriptionHelpFormatter,
- epilog="""
- 示例:
- # 处理 scene0001 场景
- python scene_processor.py -s scene0001
- # 指定模型和参数
- python scene_processor.py -s scene0001 --model yoloe-26x-seg.pt --conf 0.4
- # 调整点云精度
- python scene_processor.py -s scene0001 --voxel-size 0.02
- # 指定地面 Y 坐标(默认自动估计)
- python scene_processor.py -s scene0001 --ground-y -1.5
- # 调整门尺寸过滤阈值
- python scene_processor.py -s scene0001 --door-height-min 1.2 --door-width-min 0.5
- """
- )
- parser.add_argument(
- "--scene", "-s",
- type=str,
- default="scene0001",
- help="场景文件夹 (默认:scene0001)"
- )
- parser.add_argument(
- "--model", "-m",
- type=str,
- default="yoloe-26x-seg.pt",
- help="YOLOE 模型路径"
- )
- parser.add_argument(
- "--conf",
- type=float,
- default=0.35,
- help="置信度阈值 (默认:0.35)"
- )
- parser.add_argument(
- "--iou",
- type=float,
- default=0.45,
- help="NMS IoU 阈值 (默认:0.45)"
- )
- parser.add_argument(
- "--voxel-size",
- type=float,
- default=0.03,
- help="点云体素大小 (默认:0.03)"
- )
- parser.add_argument(
- "--depth-scale",
- type=float,
- default=256.0,
- help="深度图缩放因子 (默认:256.0)"
- )
- parser.add_argument(
- "--ground-y",
- type=float,
- default=None,
- help="地面 Y 坐标 (默认:从点云自动估计)"
- )
- parser.add_argument(
- "--door-height-min",
- type=float,
- default=1.0,
- help="门最小高度 (默认:1.0 米)"
- )
- parser.add_argument(
- "--door-height-max",
- type=float,
- default=3.0,
- help="门最大高度 (默认:3.0 米)"
- )
- parser.add_argument(
- "--door-width-min",
- type=float,
- default=0.3,
- help="门最小宽度 (默认:0.3 米)"
- )
- parser.add_argument(
- "--door-width-max",
- type=float,
- default=3.0,
- help="门最大宽度 (默认:3.0 米)"
- )
- parser.add_argument(
- "--door-thickness-max",
- type=float,
- default=0.5,
- help="门最大厚度 (默认:0.5 米)"
- )
- parser.add_argument(
- "--ground-dist-thresh",
- type=float,
- default=0.5,
- help="门底部距地面最大距离 (默认:0.5 米)"
- )
- args = parser.parse_args()
- if not Path(args.scene).exists():
- print(f"❌ 场景文件夹不存在:{args.scene}")
- return
- processor = SceneProcessor(
- scene_folder=args.scene,
- model_path=args.model,
- conf=args.conf,
- iou=args.iou,
- voxel_size=args.voxel_size,
- depth_scale=args.depth_scale,
- ground_y=args.ground_y,
- )
- # 更新过滤参数
- processor.DOOR_HEIGHT_MIN = args.door_height_min
- processor.DOOR_HEIGHT_MAX = args.door_height_max
- processor.DOOR_WIDTH_MIN = args.door_width_min
- processor.DOOR_WIDTH_MAX = args.door_width_max
- processor.DOOR_THICKNESS_MAX = args.door_thickness_max
- processor.GROUND_DIST_THRESH = args.ground_dist_thresh
- processor.process_scene()
- if __name__ == "__main__":
- main()
|