| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 入户门位置导出脚本
- 功能:
- 1. 检测场景中的门并识别入户门
- 2. 输出入户门在世界坐标系中的位置
- 3. 如果未检测到门,基于点位信息估计入户门位置
- 4. 输出 JSON 格式结果
- """
- import os
- import sys
- import json
- import argparse
- from pathlib import Path
- from dataclasses import dataclass
- from typing import Dict, List, Optional, Tuple, Any
- import cv2
- import numpy as np
- import open3d as o3d
- from tqdm import tqdm
- from ultralytics import YOLOE
- from camera_spherical import Intrinsic_Spherical_NP
- from scipy.spatial import Delaunay
- # ============================================================================
- # 数据类
- # ============================================================================
- @dataclass
- class PoseData:
- """位姿数据"""
- uuid: str
- rotation: Dict[str, float] # w, x, y, z
- translation: Dict[str, float] # x, y, z
- pose_id: int # 点位 ID
- @dataclass
- class Door3D:
- """3D 门实例"""
- id: int
- center: np.ndarray # 中心坐标 [x, y, z]
- bbox_8points: np.ndarray # 8 个角点 [[x,y,z], ...] (8x3)
- source_detections: List[Dict] # 来源检测信息
- source_uuid: Optional[str] = None # 来源点位的 UUID
- # ============================================================================
- # 入户门检测器
- # ============================================================================
- class EntranceDoorDetector:
- """
- 入户门检测器:
- - 检测场景中的门
- - 识别入户门
- - 输出入户门位置 JSON
- """
- # 门检测类别
- DOOR_CLASSES = [
- "door", "indoor door", "exterior door",
- "wooden door", "metal door", "glass door", "double door",
- "single door", "open door", "closed door"
- ]
- def __init__(
- self,
- scene_folder: str,
- model_path: str = "yoloe-26x-seg.pt",
- conf: float = 0.35,
- iou: float = 0.45,
- voxel_size: float = 0.03,
- depth_scale: float = 256.0,
- depth_min: float = 0.02,
- # 地面/天花板拟合参数
- ground_ransac_dist: float = 0.05,
- ground_ransac_prob: float = 0.99,
- ceiling_percentile: float = 95.0,
- # 门过滤参数
- door_ground_dist: float = 0.1,
- door_height_min: float = 1.0,
- door_height_max: float = 3.0,
- door_width_min: float = 0.3,
- door_width_max: float = 3.0,
- door_thickness_max: float = 0.5,
- # 3D 门合并参数
- merge_iou_thresh: float = 0.1,
- merge_dist_thresh: float = 0.3,
- merge_z_overlap_thresh: float = 0.5,
- # YOLOE 图像尺寸参数
- imgsz: Tuple[int, int] = (1024, 2048), # (height, width)
- # 可视化参数
- vis_ply: bool = False,
- ):
- """
- 初始化入户门检测器
- Args:
- scene_folder: 场景文件夹路径
- model_path: YOLOE 模型路径
- conf: 检测置信度阈值
- iou: NMS IoU 阈值
- voxel_size: 点云体素下采样尺寸
- depth_scale: 深度图缩放因子
- depth_min: 最小有效深度
- ground_ransac_dist: RANSAC 地面拟合距离阈值
- ground_ransac_prob: RANSAC 置信度
- ceiling_percentile: 天花板点分位数
- door_ground_dist: 门底部距地面最大距离
- door_height_min/max: 门高度范围
- door_width_min/max: 门宽度范围
- door_thickness_max: 门最大厚度
- merge_iou_thresh: 3D 门合并 IoU 阈值
- merge_dist_thresh: 3D 门合并中心距离阈值
- merge_z_overlap_thresh: 3D 门合并 Z 方向重叠度阈值
- imgsz: YOLOE 输入图像尺寸 (height, width),默认 (1024, 2048) 适配全景图
- vis_ply: 是否导出可视化 PLY 文件
- """
- self.scene_folder = Path(scene_folder)
- self.conf = conf
- self.iou = iou
- self.voxel_size = voxel_size
- self.depth_scale = depth_scale
- self.depth_min = depth_min
- # 地面/天花板参数
- self.ground_ransac_dist = ground_ransac_dist
- self.ground_ransac_prob = ground_ransac_prob
- self.ceiling_percentile = ceiling_percentile
- # 门过滤参数
- self.door_ground_dist = door_ground_dist
- self.door_height_min = door_height_min
- self.door_height_max = door_height_max
- self.door_width_min = door_width_min
- self.door_width_max = door_width_max
- self.door_thickness_max = door_thickness_max
- # 3D 门合并参数
- self.merge_iou_thresh = merge_iou_thresh
- self.merge_dist_thresh = merge_dist_thresh
- self.merge_z_overlap_thresh = merge_z_overlap_thresh
- # YOLOE 图像尺寸参数
- self.imgsz = imgsz # (height, width)
- # 可视化参数
- self.vis_ply = vis_ply
- # 地面参数
- self.ground_d = None
- self.ground_z_from_puck = None
- # 子目录
- self.rgb_folder = self.scene_folder / "pano_img"
- self.depth_folder = self.scene_folder / "depth_img"
- self.pose_file = self.scene_folder / "vision.txt"
- # 输出目录
- self.output_folder = self.scene_folder / "output"
- # 状态变量
- self.poses: Dict[str, PoseData] = {}
- self.puck_z_dict: Dict[str, float] = {}
- self.entrance_door: Optional[Door3D] = None
- self.estimated_entrance_position: Optional[np.ndarray] = None
- self.all_doors: List[Door3D] = []
- self.processing_info: Dict[str, Any] = {}
- # 加载位姿
- self._load_poses()
- # 加载 YOLOE 模型
- print(f"加载 YOLOE 模型:{model_path}")
- self.model = YOLOE(model_path)
- self.model.set_classes(self.DOOR_CLASSES)
- def _load_poses(self):
- """从 vision.txt 加载位姿信息"""
- if not self.pose_file.exists():
- raise FileNotFoundError(f"位姿文件不存在:{self.pose_file}")
- with open(self.pose_file, 'r') as f:
- data = json.load(f)
- for loc in data.get('sweepLocations', []):
- uuid = str(loc['uuid'])
- self.poses[uuid] = PoseData(
- uuid=uuid,
- rotation=loc['pose']['rotation'],
- translation=loc['pose']['translation'],
- pose_id=loc.get('id', int(uuid))
- )
- if 'puck' in loc and 'z' in loc['puck']:
- self.puck_z_dict[uuid] = loc['puck']['z']
- print(f"加载 {len(self.poses)} 个拍摄点位")
- # 计算整体地面 Z
- if self.puck_z_dict:
- puck_z_values = list(self.puck_z_dict.values())
- self.ground_z_from_puck = np.median(puck_z_values)
- print(f"从 puck 参数估计地面 Z (中位数): {self.ground_z_from_puck:.4f}m")
- def _build_pose_matrix(self, pose: PoseData) -> np.ndarray:
- """构建 4x4 位姿变换矩阵"""
- R = o3d.geometry.get_rotation_matrix_from_quaternion(
- np.array([pose.rotation['w'], pose.rotation['x'],
- pose.rotation['y'], pose.rotation['z']])
- )
- t = np.array([
- pose.translation['x'],
- pose.translation['y'],
- pose.translation['z']
- ])
- T = np.eye(4)
- T[:3, :3] = R
- T[:3, 3] = t
- return T
- def _mask_to_3d_points(
- self,
- mask: np.ndarray,
- depth: np.ndarray,
- pose_matrix: np.ndarray
- ) -> Optional[np.ndarray]:
- """将 2D mask 映射到世界坐标系 3D 点"""
- H, W = depth.shape
- sph = Intrinsic_Spherical_NP(W, H)
- ys, xs = np.where(mask > 0)
- if len(xs) == 0:
- return None
- valid = depth[ys, xs] > self.depth_min
- if not np.any(valid):
- return None
- xs, ys = xs[valid], ys[valid]
- depths = depth[ys, xs]
- bx, by, bz = sph.bearing([xs.astype(np.float64), ys.astype(np.float64)])
- bx, by, bz = np.array(bx), np.array(by), np.array(bz)
- pts_cam = np.stack([bx * depths, by * depths, bz * depths], axis=1)
- R_z180 = np.diag([-1.0, -1.0, 1.0])
- pts_cam = pts_cam @ R_z180.T
- pts_w = (pose_matrix[:3, :3] @ pts_cam.T).T + pose_matrix[:3, 3]
- return pts_w
- def _filter_outliers(self, points: np.ndarray, std_thresh: float = 2.0) -> np.ndarray:
- """过滤 3D 点云中的离群点"""
- if len(points) < 10:
- return points
- mean = np.mean(points, axis=0)
- std = np.std(points, axis=0)
- mask = np.all(np.abs(points - mean) < std_thresh * std, axis=1)
- filtered = points[mask]
- if len(filtered) < len(points) * 0.5:
- return points
- return filtered
- def _filter_door_points_by_depth(self, points: np.ndarray) -> np.ndarray:
- """根据深度一致性过滤门的 3D 点"""
- if len(points) < 50:
- return points
- centered = points - np.mean(points, axis=0)
- cov = np.cov(centered.T)
- eigenvalues, eigenvectors = np.linalg.eigh(cov)
- idx = np.argsort(eigenvalues)[::-1]
- eigenvalues = eigenvalues[idx]
- eigenvectors = eigenvectors[:, idx]
- normal = eigenvectors[:, 2]
- projected = np.dot(points, normal)
- mean_proj = np.mean(projected)
- std_proj = np.std(projected)
- mask = np.abs(projected - mean_proj) < 2.0 * std_proj
- filtered = points[mask]
- if len(filtered) < len(points) * 0.5:
- return points
- return filtered
- def _axis_aligned_bbox(self, points: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
- """计算轴对齐包围盒 (min, max)"""
- lo = np.min(points, axis=0)
- hi = np.max(points, axis=0)
- return lo, hi
- def _bbox_8corners(self, bbox_min: np.ndarray, bbox_max: np.ndarray) -> np.ndarray:
- """从 bbox min/max 获取 8 个角点"""
- cx, cy, cz = bbox_min
- ex, ey, ez = bbox_max
- return np.array([
- [cx, cy, cz], [ex, cy, cz], [ex, ey, cz], [cx, ey, cz],
- [cx, cy, ez], [ex, cy, ez], [ex, ey, ez], [cx, ey, ez],
- ])
- def _bbox_iou_3d(self, b1, b2) -> float:
- """3D IoU 计算"""
- lo = np.maximum(b1[0], b2[0])
- hi = np.minimum(b1[1], b2[1])
- inter = np.prod(np.maximum(hi - lo, 0))
- vol1 = np.prod(b1[1] - b1[0])
- vol2 = np.prod(b2[1] - b2[0])
- union = vol1 + vol2 - inter
- return inter / union if union > 0 else 0.0
- def _merge_3d_doors(self, door_candidates: List[Dict]) -> List[Door3D]:
- """使用并查集合并 3D 门"""
- if not door_candidates:
- return []
- n = len(door_candidates)
- if n == 1:
- d = door_candidates[0]
- return [Door3D(
- id=0,
- center=(d['bbox_min'] + d['bbox_max']) / 2,
- bbox_8points=self._bbox_8corners(d['bbox_min'], d['bbox_max']),
- source_detections=[d['source']],
- source_uuid=d.get('source_uuid')
- )]
- parent = list(range(n))
- def find(x):
- if parent[x] != x:
- parent[x] = find(parent[x])
- return parent[x]
- def union(x, y):
- px, py = find(x), find(y)
- if px != py:
- parent[px] = py
- for i in range(n):
- for j in range(i + 1, n):
- ci = (door_candidates[i]['bbox_min'] + door_candidates[i]['bbox_max']) / 2
- cj = (door_candidates[j]['bbox_min'] + door_candidates[j]['bbox_max']) / 2
- dist = np.linalg.norm(ci - cj)
- same_image = door_candidates[i]['source']['image'] == door_candidates[j]['source']['image']
- z_min_i, z_max_i = door_candidates[i]['bbox_min'][2], door_candidates[i]['bbox_max'][2]
- z_min_j, z_max_j = door_candidates[j]['bbox_min'][2], door_candidates[j]['bbox_max'][2]
- z_overlap_min = max(z_min_i, z_min_j)
- z_overlap_max = min(z_max_i, z_max_j)
- z_intersection = max(0, z_overlap_max - z_overlap_min)
- z_union = max(z_max_i, z_max_j) - min(z_min_i, z_min_j)
- z_overlap_ratio = z_intersection / z_union if z_union > 0 else 0
- iou = self._bbox_iou_3d(
- (door_candidates[i]['bbox_min'], door_candidates[i]['bbox_max']),
- (door_candidates[j]['bbox_min'], door_candidates[j]['bbox_max'])
- )
- if same_image:
- should_merge = (iou > 0.05 and z_overlap_ratio > 0.5)
- else:
- should_merge = (
- (dist < self.merge_dist_thresh or iou > self.merge_iou_thresh) and
- z_overlap_ratio > self.merge_z_overlap_thresh
- )
- if should_merge:
- union(i, j)
- from collections import defaultdict
- groups = defaultdict(list)
- for i in range(n):
- groups[find(i)].append(door_candidates[i])
- doors = []
- for door_id, members in enumerate(groups.values()):
- if not members:
- continue
- all_bbox_mins = [m['bbox_min'] for m in members]
- all_bbox_maxs = [m['bbox_max'] for m in members]
- merged_min_xy = np.min(all_bbox_mins, axis=0)[:2]
- merged_max_xy = np.max(all_bbox_maxs, axis=0)[:2]
- z_mins = [b[2] for b in all_bbox_mins]
- z_maxs = [b[2] for b in all_bbox_maxs]
- merged_z_min = np.min(z_mins)
- merged_z_max = np.max(z_maxs)
- bbox_min = np.array([merged_min_xy[0], merged_min_xy[1], merged_z_min])
- bbox_max = np.array([merged_max_xy[0], merged_max_xy[1], merged_z_max])
- sources = [m['source'] for m in members]
- source_uuid = members[0].get('source_uuid')
- doors.append(Door3D(
- id=door_id,
- center=(bbox_min + bbox_max) / 2,
- bbox_8points=self._bbox_8corners(bbox_min, bbox_max),
- source_detections=sources,
- source_uuid=source_uuid
- ))
- return doors
- def _fit_ground_plane_ransac(self, pc: o3d.geometry.PointCloud) -> Tuple[np.ndarray, float]:
- """使用 RANSAC 拟合地面平面"""
- points = np.asarray(pc.points)
- z_coords = points[:, 2]
- z_threshold = np.percentile(z_coords, 5)
- ground_candidates = points[z_coords <= z_threshold]
- if len(ground_candidates) < 100:
- plane_model, inliers = pc.segment_plane(
- distance_threshold=self.ground_ransac_dist,
- ransac_n=3,
- num_iterations=1000
- )
- else:
- centered = ground_candidates - np.mean(ground_candidates, axis=0)
- cov = np.cov(centered.T)
- eigenvalues, eigenvectors = np.linalg.eigh(cov)
- normal = eigenvectors[:, 0]
- mean_point = np.mean(ground_candidates, axis=0)
- a, b, c = normal
- d = -np.dot(normal, mean_point)
- plane_model = [a, b, c, d]
- a, b, c, d = plane_model
- normal = np.array([a, b, c])
- if normal[2] < 0:
- normal = -normal
- d = -d
- return normal, d
- def _fit_ceiling_plane(self, pc: o3d.geometry.PointCloud, ground_normal: np.ndarray, ground_d: float) -> float:
- """拟合天花板平面,返回地面到天花板距离"""
- points = np.asarray(pc.points)
- distances_to_ground = np.dot(points, ground_normal) + ground_d
- valid_mask = distances_to_ground < 5.0
- valid_points = points[valid_mask]
- valid_distances = distances_to_ground[valid_mask]
- if len(valid_points) < 1000:
- valid_points = points
- valid_distances = distances_to_ground
- height_percentiles = np.percentile(valid_distances, [85, 90, 95])
- ceiling_threshold = height_percentiles[1]
- ceiling_mask = valid_distances >= ceiling_threshold
- ceiling_points = valid_points[ceiling_mask]
- if len(ceiling_points) < 100:
- ceiling_threshold = height_percentiles[0]
- ceiling_mask = valid_distances >= ceiling_threshold
- ceiling_points = valid_points[ceiling_mask]
- floor_ceiling_dist = np.mean(valid_distances[ceiling_mask])
- return floor_ceiling_dist
- def _filter_door_by_properties(self, door: Door3D) -> Tuple[bool, List[str]]:
- """根据物理特性过滤门"""
- reasons = []
- size = door.bbox_8points.max(axis=0) - door.bbox_8points.min(axis=0)
- height = size[2]
- width = max(size[0], size[1])
- if height <= 0:
- reasons.append(f"高度无效 ({height:.2f}m)")
- if width <= 0:
- reasons.append(f"宽度无效 ({width:.2f}m)")
- return len(reasons) == 0, reasons
- def _filter_door_by_ground_puck(self, door: Door3D) -> Tuple[bool, str]:
- """基于 puck 参数检查门的地面距离"""
- door_bottom_z = door.bbox_8points[:, 2].min()
- puck_z = None
- if door.source_uuid and door.source_uuid in self.puck_z_dict:
- puck_z = self.puck_z_dict[door.source_uuid]
- elif self.ground_z_from_puck is not None:
- puck_z = self.ground_z_from_puck
- if puck_z is None:
- return True, ""
- dist = door_bottom_z - puck_z
- if abs(dist) > 0.2:
- return False, f"门底部距地面 {dist:.3f}m (puck_z={puck_z:.3f})"
- return True, ""
- def _denoise_xy_projection(
- self,
- points: np.ndarray,
- grid_size: float = 0.15,
- min_points_per_cell: int = 5
- ) -> np.ndarray:
- """投影到 XY 平面后进行网格滤波去噪"""
- if len(points) < 10:
- return points
- xy = points[:, :2]
- grid_coords = np.floor(xy / grid_size).astype(int)
- from collections import Counter
- cell_counts = Counter(map(tuple, grid_coords))
- valid_cells = {
- cell for cell, count in cell_counts.items()
- if count >= min_points_per_cell
- }
- mask = np.array([tuple(gc) in valid_cells for gc in grid_coords])
- if np.sum(mask) < len(points) * 0.3:
- valid_cells = {
- cell for cell, count in cell_counts.items()
- if count >= max(1, min_points_per_cell - 2)
- }
- mask = np.array([tuple(gc) in valid_cells for gc in grid_coords])
- return points[mask]
- def _compute_xy_contour(self, points: np.ndarray) -> Tuple[np.ndarray, Optional[Delaunay]]:
- """计算点云 XY 投影的轮廓(Alpha Shape)"""
- from scipy.spatial import Delaunay, ConvexHull
- if len(points) < 4:
- return points[:, :2], None
- xy = points[:, :2]
- alpha = 2.0
- tri = Delaunay(xy)
- centers = []
- for i in range(tri.npoints):
- idx = tri.simplices[i]
- p0, p1, p2 = xy[idx[0]], xy[idx[1]], xy[idx[2]]
- a = np.linalg.norm(p1 - p2)
- b = np.linalg.norm(p0 - p2)
- c = np.linalg.norm(p0 - p1)
- s = (a + b + c) / 2
- area = np.sqrt(max(0, s * (s - a) * (s - b) * (s - c)))
- if area < 1e-10:
- continue
- R = (a * b * c) / (4 * area)
- if R < alpha:
- center = (p0 + p1 + p2) / 3
- centers.append(center)
- if len(centers) < 3:
- print("⚠️ Alpha Shape 失败,使用凸包")
- hull = ConvexHull(xy)
- return xy[hull.vertices], tri
- centers = np.array(centers)
- hull = ConvexHull(centers)
- contour = centers[hull.vertices]
- return contour, tri
- def _point_to_line_segment_distance(
- self,
- point: np.ndarray,
- line_start: np.ndarray,
- line_end: np.ndarray
- ) -> float:
- """计算点到线段的最短距离"""
- p = np.array(point)
- a = np.array(line_start)
- b = np.array(line_end)
- ab = b - a
- ap = p - a
- t = np.dot(ap, ab) / np.dot(ab, ab)
- t = np.clip(t, 0, 1)
- proj = a + t * ab
- return np.linalg.norm(p - proj)
- def _compute_distance_to_contour_boundary(
- self,
- point_xy: np.ndarray,
- contour: np.ndarray
- ) -> float:
- """计算点到轮廓边界的最短距离"""
- min_dist = float('inf')
- n = len(contour)
- for i in range(n):
- p1 = contour[i]
- p2 = contour[(i + 1) % n]
- dist = self._point_to_line_segment_distance(point_xy, p1, p2)
- min_dist = min(min_dist, dist)
- return min_dist
- def _estimate_entrance_from_poses(
- self,
- pc: o3d.geometry.PointCloud,
- contour: Optional[np.ndarray] = None,
- tri = None
- ) -> Optional[np.ndarray]:
- """基于点位信息估计入户门位置"""
- if not self.poses:
- print("⚠️ 没有点位数据,无法估计入户门")
- return None
- print("\n=== 基于点位信息估计入户门 ===")
- # 重新加载 vision.txt 获取完整的点位信息
- vision_file = self.scene_folder / "vision.txt"
- if not vision_file.exists():
- print("⚠️ vision.txt 不存在,无法获取可见性信息")
- return None
- with open(vision_file, 'r') as f:
- vision_data = json.load(f)
- pose_lookup = {}
- for loc in vision_data.get('sweepLocations', []):
- uuid = str(loc['uuid'])
- pose_lookup[uuid] = {
- 'id': loc['id'],
- 'pose': loc['pose'],
- 'puck': loc.get('puck', {}),
- 'visibles': loc.get('visibles', []),
- 'position': np.array([
- loc['pose']['translation']['x'],
- loc['pose']['translation']['y'],
- loc['pose']['translation']['z']
- ])
- }
- pose_info = []
- for uuid, data in pose_lookup.items():
- pose_info.append({
- 'uuid': uuid,
- 'id': data['id'],
- 'position': data['position'],
- 'position_xy': data['position'][:2],
- 'visibles': set(data['visibles']),
- 'puck_z': data['puck'].get('z', 0)
- })
- # 计算轮廓
- if contour is None:
- points_xy_denoised = self._denoise_xy_projection(
- np.asarray(pc.points),
- grid_size=0.15,
- min_points_per_cell=5
- )
- contour, tri = self._compute_xy_contour(points_xy_denoised)
- print(f"轮廓点数量:{len(contour)}")
- # 可见性一致性过滤
- filtered_poses = []
- filtered_out = []
- for i, pose_a in enumerate(pose_info):
- is_indoor = False
- for j, pose_b in enumerate(pose_info):
- if i == j:
- continue
- b_id = pose_b['id']
- a_visible_to_b = b_id in pose_a['visibles']
- dist_ab = np.linalg.norm(pose_a['position_xy'] - pose_b['position_xy'])
- for k, pose_c in enumerate(pose_info):
- if k == i or k == j:
- continue
- c_id = pose_c['id']
- a_visible_to_c = c_id in pose_a['visibles']
- dist_ac = np.linalg.norm(pose_a['position_xy'] - pose_c['position_xy'])
- if a_visible_to_b and not a_visible_to_c and dist_ac < dist_ab:
- is_indoor = True
- filtered_out.append((pose_a, f"遮挡不一致:可见 B({b_id}) 不可见 C({c_id})"))
- break
- if is_indoor:
- break
- if not is_indoor:
- filtered_poses.append(pose_a)
- print(f"可见性过滤:{len(pose_info)} → {len(filtered_poses)} 个点位")
- if not filtered_poses:
- print("⚠️ 所有点位都被过滤,使用全部点位")
- filtered_poses = pose_info
- # 计算几何中心
- all_positions_xy = np.array([p['position_xy'] for p in filtered_poses])
- centroid = np.mean(all_positions_xy, axis=0)
- # 计算边界距离
- boundary_distances = []
- for p in filtered_poses:
- dist = self._compute_distance_to_contour_boundary(p['position_xy'], contour)
- boundary_distances.append(dist)
- # 计算中心距离
- center_distances = np.linalg.norm(all_positions_xy - centroid, axis=1)
- # 归一化
- max_boundary_dist = max(boundary_distances) if boundary_distances else 1.0
- max_center_dist = center_distances.max() if center_distances.max() > 0 else 1.0
- # 计算评分
- print("\n点位评分详情:")
- best_score = -float('inf')
- best_pose = None
- pose_scores = []
- for i, p in enumerate(filtered_poses):
- if max_boundary_dist > 0:
- boundary_score = (1 - boundary_distances[i] / max_boundary_dist) * 70
- else:
- boundary_score = 35
- if max_center_dist > 0:
- center_score = (center_distances[i] / max_center_dist) * 30
- else:
- center_score = 15
- total_score = boundary_score + center_score
- pose_scores.append({
- 'uuid': p['uuid'],
- 'id': p['id'],
- 'boundary_score': boundary_score,
- 'center_score': center_score,
- 'total_score': total_score,
- 'boundary_distance': boundary_distances[i],
- 'center_distance': center_distances[i]
- })
- print(f" 点位 {p['uuid']} (ID={p['id']}): 边界={boundary_score:.1f} + 中心距={center_score:.1f} = {total_score:.1f}")
- if total_score > best_score:
- best_score = total_score
- best_pose = p
- print(f"\n选择点位 {best_pose['uuid']} (综合评分={best_score:.1f})")
- self.processing_info['pose_estimation'] = {
- 'selected_pose_uuid': best_pose['uuid'],
- 'selected_pose_id': best_pose['id'],
- 'boundary_score': best_score,
- 'center_distance_score': pose_scores[0]['center_score'] if pose_scores else 0,
- 'total_score': best_score,
- 'all_poses_count': len(pose_info),
- 'valid_poses_count': len(filtered_poses),
- 'filtered_poses': [{'uuid': p['uuid'], 'reason': r} for p, r in filtered_out]
- }
- return best_pose['position']
- def _identify_entrance_door(
- self,
- doors: List[Door3D],
- pc: o3d.geometry.PointCloud
- ) -> int:
- """识别入户门 ID"""
- if len(doors) == 0:
- print("\n⚠️ 未检测到任何门,使用点位信息估计入户门位置")
- estimated_entrance = self._estimate_entrance_from_poses(pc)
- if estimated_entrance is not None:
- print(f"估计入户门位置:{estimated_entrance}")
- self.estimated_entrance_position = estimated_entrance
- return -1
- if len(doors) == 1:
- return 0
- # 计算轮廓
- points_xy_denoised = self._denoise_xy_projection(
- np.asarray(pc.points),
- grid_size=0.15,
- min_points_per_cell=5
- )
- contour, tri = self._compute_xy_contour(points_xy_denoised)
- print(f"轮廓点数量:{len(contour)}")
- # 计算每个门的评分
- all_centers = np.array([d.center for d in doors])
- best_score = -1
- best_idx = 0
- door_scores = []
- for i, door in enumerate(doors):
- size = door.bbox_8points.max(axis=0) - door.bbox_8points.min(axis=0)
- height = size[2]
- width = max(size[0], size[1])
- height_score = max(0, 15 - abs(height - 2.1) * 10)
- width_score = max(0, 15 - abs(width - 1.0) * 12)
- size_score = height_score + width_score
- dists_to_others = np.linalg.norm(all_centers - door.center, axis=1)
- avg_dist = dists_to_others.mean()
- edge_score = min(25, avg_dist * 10)
- source_count = len(door.source_detections)
- view_score = min(10, source_count * 3)
- # 边界评分
- door_xy = door.center[:2]
- min_boundary_dist = float('inf')
- n = len(contour)
- for j in range(n):
- p1 = contour[j]
- p2 = contour[(j + 1) % n]
- dist = self._point_to_line_segment_distance(door_xy, p1, p2)
- min_boundary_dist = min(min_boundary_dist, dist)
- max_boundary_dist = max([
- self._compute_distance_to_contour_boundary(d.center[:2], contour)
- for d in doors
- ])
- if max_boundary_dist > 0:
- boundary_score = (1 - min_boundary_dist / max_boundary_dist) * 30
- else:
- boundary_score = 15
- total = size_score + edge_score + view_score + boundary_score
- door_scores.append({
- 'door_id': i,
- 'size_score': size_score,
- 'edge_score': edge_score,
- 'view_score': view_score,
- 'boundary_score': boundary_score,
- 'total_score': total
- })
- print(f" 门{i}: 尺寸={size_score:.1f} + 边缘={edge_score:.1f} + 视角={view_score:.1f} + 边界={boundary_score:.1f} = {total:.1f}")
- if total > best_score:
- best_score = total
- best_idx = i
- print(f"\n入户门选择:门 {best_idx} (得分={best_score:.1f})")
- self.processing_info['door_scores'] = door_scores
- return best_idx
- def detect_and_identify(self):
- """检测门并识别入户门"""
- print("\n" + "=" * 50)
- print("开始检测门")
- print("=" * 50)
- rgb_files = sorted(
- self.rgb_folder.glob("*.jpg"),
- key=lambda x: int(x.stem)
- )
- if not rgb_files:
- raise FileNotFoundError(f"在 {self.rgb_folder} 中未找到 RGB 图像")
- print(f"找到 {len(rgb_files)} 张全景图")
- # 收集点云和门候选
- combined_pc = o3d.geometry.PointCloud()
- door_candidates = []
- for rgb_file in tqdm(rgb_files, desc="检测门"):
- idx = rgb_file.stem
- pose = self.poses.get(idx)
- if pose is None:
- continue
- depth_path = self.depth_folder / f"{idx}.png"
- if not depth_path.exists():
- continue
- rgb = cv2.cvtColor(cv2.imread(str(rgb_file)), cv2.COLOR_BGR2RGB)
- depth = cv2.imread(str(depth_path), cv2.IMREAD_UNCHANGED).astype(np.float32) / self.depth_scale
- H, W = depth.shape
- pose_matrix = self._build_pose_matrix(pose)
- # YOLOE 检测
- results = self.model.predict(
- str(rgb_file),
- imgsz=self.imgsz,
- conf=self.conf,
- iou=self.iou,
- max_det=50,
- augment=True,
- retina_masks=True,
- half=False,
- verbose=False,
- )
- result = results[0]
- if result.masks is not None:
- masks = result.masks.data.cpu().numpy()
- scores = result.boxes.conf.cpu().numpy().tolist()
- for i, mask_bin in enumerate(masks):
- mask_resized = cv2.resize(
- (mask_bin > 0.5).astype(np.uint8),
- (W, H),
- interpolation=cv2.INTER_NEAREST
- )
- pts_3d = self._mask_to_3d_points(mask_resized, depth, pose_matrix)
- if pts_3d is not None and len(pts_3d) > 10:
- pts_3d_filtered = self._filter_outliers(pts_3d, std_thresh=2.0)
- pts_3d_filtered = self._filter_door_points_by_depth(pts_3d_filtered)
- bbox_min, bbox_max = self._axis_aligned_bbox(pts_3d_filtered)
- door_candidates.append({
- 'bbox_min': bbox_min,
- 'bbox_max': bbox_max,
- 'points_3d': pts_3d_filtered,
- 'source': {
- 'image': rgb_file.name,
- 'score': scores[i] if i < len(scores) else 0.0,
- 'pose_uuid': idx
- },
- 'source_uuid': idx
- })
- # RGB-D 转点云
- sph = Intrinsic_Spherical_NP(W, H)
- px, py = np.meshgrid(np.arange(W), np.arange(H))
- px_flat = px.flatten().astype(np.float64)
- py_flat = py.flatten().astype(np.float64)
- bx, by, bz = sph.bearing([px_flat, py_flat])
- mask = depth.flatten() > self.depth_min
- d = depth.flatten()[mask]
- if len(d) > 0:
- pts_cam = np.stack([bx[mask] * d, by[mask] * d, bz[mask] * d], axis=1)
- R_z180 = np.diag([-1.0, -1.0, 1.0])
- pts_cam = pts_cam @ R_z180.T
- pts_w = (pose_matrix[:3, :3] @ pts_cam.T).T + pose_matrix[:3, 3]
- # 获取 RGB 颜色 - 需要 resize 到 depth 图像同样大小
- rgb_resized = cv2.resize(rgb, (W, H), interpolation=cv2.INTER_LINEAR)
- rgb_flat = rgb_resized.reshape(-1, 3) / 255.0
- colors = rgb_flat[mask]
- pc = o3d.geometry.PointCloud()
- pc.points = o3d.utility.Vector3dVector(pts_w)
- pc.colors = o3d.utility.Vector3dVector(colors)
- combined_pc += pc
- print(f"\n融合前点数:{len(combined_pc.points)}")
- combined_pc = combined_pc.voxel_down_sample(self.voxel_size)
- print(f"融合后点数:{len(combined_pc.points)}")
- # 地面和天花板拟合
- ground_normal, ground_d = self._fit_ground_plane_ransac(combined_pc)
- self.ground_d = ground_d
- floor_ceiling_dist = self._fit_ceiling_plane(combined_pc, ground_normal, ground_d)
- # 3D 门合并
- print(f"\n3D 门候选:{len(door_candidates)}")
- doors_3d = self._merge_3d_doors(door_candidates)
- print(f"合并后门数量:{len(doors_3d)}")
- # 过滤
- valid_doors = []
- filtered_doors = []
- for door in doors_3d:
- passed_prop, prop_reasons = self._filter_door_by_properties(door)
- if not passed_prop:
- filtered_doors.append((door, prop_reasons))
- continue
- valid_doors.append(door)
- print(f"通过物理特性过滤:{len(valid_doors)} 个门")
- # 地面距离过滤
- ground_valid_doors = []
- use_puck = self.ground_z_from_puck is not None
- for door in valid_doors:
- if use_puck:
- passed_ground, _ = self._filter_door_by_ground_puck(door)
- else:
- passed_ground = True # 不使用 RANSAC 过滤
- if passed_ground:
- ground_valid_doors.append(door)
- print(f"通过地面距离过滤:{len(ground_valid_doors)} 个门")
- # 识别入户门
- print("\n" + "=" * 40)
- print("入户门识别")
- print("=" * 40)
- entrance_idx = self._identify_entrance_door(ground_valid_doors, combined_pc)
- self.processing_info['total_candidates'] = len(door_candidates)
- self.processing_info['merged_doors'] = len(doors_3d)
- self.processing_info['valid_doors'] = len(valid_doors)
- self.processing_info['ground_valid_doors'] = len(ground_valid_doors)
- if entrance_idx >= 0:
- self.entrance_door = ground_valid_doors[entrance_idx]
- print(f"入户门:门 {entrance_idx}")
- else:
- print("未检测到入户门,使用点位估计")
- self.all_doors = ground_valid_doors
- self.combined_pc = combined_pc # 保存点云用于可视化
- return self.entrance_door is not None or self.estimated_entrance_position is not None
- def export_json(self, output_path: Optional[str] = None) -> str:
- """导出结果到 JSON 文件"""
- if output_path is None:
- self.output_folder.mkdir(parents=True, exist_ok=True)
- output_path = self.output_folder / "entrance_position.json"
- else:
- output_path = Path(output_path)
- output_path.parent.mkdir(parents=True, exist_ok=True)
- scene_name = self.scene_folder.name
- result: Dict[str, Any] = {
- "scene_name": scene_name,
- }
- if self.entrance_door is not None:
- # 从门检测确定
- door = self.entrance_door
- # 计算综合置信度(所有检测的平均值)
- detection_scores = [
- src.get('score', 0.0)
- for src in door.source_detections
- ]
- avg_confidence = np.mean(detection_scores) if detection_scores else 0.0
- result["entrance_position"] = {
- "x": float(door.center[0]),
- "y": float(door.center[1]),
- "z": float(door.center[2])
- }
- result["source"] = "door_detection"
- result["is_estimated"] = False
- size = door.bbox_8points.max(axis=0) - door.bbox_8points.min(axis=0)
- result["door_info"] = {
- "door_id": door.id,
- "confidence": float(avg_confidence),
- "dimensions": {
- "width": float(max(size[0], size[1])),
- "height": float(size[2]),
- "thickness": float(min(size[0], size[1]))
- },
- "center": {
- "x": float(door.center[0]),
- "y": float(door.center[1]),
- "z": float(door.center[2])
- },
- "bbox_min": {
- "x": float(door.bbox_8points[:, 0].min()),
- "y": float(door.bbox_8points[:, 1].min()),
- "z": float(door.bbox_8points[:, 2].min())
- },
- "bbox_max": {
- "x": float(door.bbox_8points[:, 0].max()),
- "y": float(door.bbox_8points[:, 1].max()),
- "z": float(door.bbox_8points[:, 2].max())
- }
- }
- result["source_detections"] = [
- {
- "pose_uuid": src.get('pose_uuid', 'unknown'),
- "detection_confidence": float(src.get('score', 0.0)),
- "image": src.get('image', 'unknown')
- }
- for src in door.source_detections
- ]
- result["used_poses_count"] = len(door.source_detections)
- elif self.estimated_entrance_position is not None:
- # 从点位估计
- pos = self.estimated_entrance_position
- result["entrance_position"] = {
- "x": float(pos[0]),
- "y": float(pos[1]),
- "z": float(pos[2])
- }
- result["source"] = "pose_estimation"
- result["is_estimated"] = True
- if 'pose_estimation' in self.processing_info:
- pe = self.processing_info['pose_estimation']
- result["pose_info"] = {
- "selected_pose_uuid": pe.get('selected_pose_uuid'),
- "selected_pose_id": pe.get('selected_pose_id'),
- "boundary_score": pe.get('boundary_score', 0),
- "center_distance_score": pe.get('center_distance_score', 0),
- "total_score": pe.get('total_score', 0)
- }
- result["filtered_poses"] = pe.get('filtered_poses', [])
- result["all_poses_count"] = len(self.poses)
- result["valid_poses_count"] = self.processing_info.get('pose_estimation', {}).get('valid_poses_count', len(self.poses))
- else:
- # 无法确定
- result["entrance_position"] = None
- result["source"] = "unknown"
- result["is_estimated"] = False
- result["error"] = "未检测到门且无法从点位估计"
- result["metadata"] = {
- "all_poses_count": len(self.poses),
- "processing_info": self.processing_info
- }
- with open(output_path, 'w', encoding='utf-8') as f:
- json.dump(result, f, indent=2, ensure_ascii=False)
- print(f"\n结果已导出:{output_path}")
- return str(output_path)
- def export_vis_ply(self, pc: o3d.geometry.PointCloud, output_path: Optional[str] = None):
- """导出入户门位置可视化 PLY 文件
- Args:
- pc: 场景点云(带颜色)
- output_path: 输出路径,默认为 scene/output/vis.ply
- """
- # 获取入户门位置
- position = None
- if self.entrance_door is not None:
- position = self.entrance_door.center
- elif self.estimated_entrance_position is not None:
- position = self.estimated_entrance_position
- if position is None:
- print("⚠️ 没有入户门位置,跳过可视化")
- return
- # 下采样场景点云(减少文件大小)
- pc_vis = pc.voxel_down_sample(0.05)
- # 获取场景点云颜色(已包含在点云中)
- if len(pc_vis.colors) == 0:
- # 如果没有颜色,使用灰色
- pc_colors = np.tile([0.5, 0.5, 0.5], (len(pc_vis.points), 1))
- else:
- pc_colors = np.asarray(pc_vis.colors)
- # 创建红色球体点云
- sphere = o3d.geometry.TriangleMesh.create_sphere(radius=0.2, resolution=30)
- sphere.translate(position)
- sphere_pc = sphere.sample_points_uniformly(number_of_points=2000)
- sphere_colors = np.tile([1.0, 0.0, 0.0], (len(sphere_pc.points), 1)) # 红色
- # 合并点云
- combined_points = np.vstack([np.asarray(pc_vis.points), np.asarray(sphere_pc.points)])
- combined_colors = np.vstack([pc_colors, sphere_colors])
- # 创建输出点云
- output_pc = o3d.geometry.PointCloud()
- output_pc.points = o3d.utility.Vector3dVector(combined_points)
- output_pc.colors = o3d.utility.Vector3dVector(combined_colors)
- # 输出路径
- if output_path is None:
- output_path = self.output_folder / "vis.ply"
- else:
- output_path = Path(output_path)
- output_path.parent.mkdir(parents=True, exist_ok=True)
- # 保存 PLY 文件(ASCII 格式确保 MeshLab 兼容性)
- o3d.io.write_point_cloud(str(output_path), output_pc, write_ascii=True, print_progress=False)
- print(f"可视化已导出:{output_path}")
- print(f" - 入户门位置:{position}")
- print(f" - 红色球体半径:0.2m")
- print(f" - 总点数:{len(output_pc.points)} (场景:{len(pc_vis.points)}, 球体:{len(sphere_pc.points)})")
- # ============================================================================
- # 主函数
- # ============================================================================
- def main():
- parser = argparse.ArgumentParser(
- description="入户门位置导出脚本",
- formatter_class=argparse.RawDescriptionHelpFormatter,
- epilog="""
- 示例:
- # 处理单个场景
- python export_entrance_position.py -s scene0001
- # 指定输出路径
- python export_entrance_position.py -s scene0001 -o output/entrance.json
- # 调整检测参数
- python export_entrance_position.py -s scene0001 --conf 0.4 --iou 0.5
- # 调整图像尺寸(高度 x 宽度)
- python export_entrance_position.py -s scene0001 --imgsz 1024 2048
- # 使用更小的图像尺寸(加快处理速度)
- python export_entrance_position.py -s scene0001 --imgsz 640 1280
- # 导出入户门位置可视化 PLY 文件(红色球体标记)
- python export_entrance_position.py -s scene0001 --vis_ply
- """
- )
- parser.add_argument("--scene", "-s", type=str, required=True,
- help="场景文件夹")
- parser.add_argument("--output", "-o", type=str, default=None,
- help="输出 JSON 文件路径")
- parser.add_argument("--model", "-m", type=str, default="yoloe-26x-seg.pt",
- help="YOLOE 模型路径")
- parser.add_argument("--conf", type=float, default=0.35,
- help="置信度阈值")
- parser.add_argument("--iou", type=float, default=0.45,
- help="NMS IoU 阈值")
- parser.add_argument("--voxel-size", type=float, default=0.03,
- help="点云体素大小")
- parser.add_argument("--imgsz", type=int, nargs=2, default=[1024, 2048],
- metavar=("HEIGHT", "WIDTH"),
- help="YOLOE 输入图像尺寸 (高度 宽度),默认 1024 2048")
- parser.add_argument("--vis_ply", action="store_true", default=False,
- help="是否导出入户门位置可视化 PLY 文件(红色球体)")
- args = parser.parse_args()
- scene_path = Path(args.scene)
- if not scene_path.exists():
- print(f"❌ 场景文件夹不存在:{scene_path}")
- sys.exit(1)
- detector = EntranceDoorDetector(
- scene_folder=args.scene,
- model_path=args.model,
- conf=args.conf,
- iou=args.iou,
- voxel_size=args.voxel_size,
- imgsz=(args.imgsz[0], args.imgsz[1]),
- vis_ply=args.vis_ply,
- )
- success = detector.detect_and_identify()
- if success:
- detector.export_json(args.output)
- if args.vis_ply:
- detector.export_vis_ply(detector.combined_pc)
- print("\n✓ 处理完成")
- else:
- print("\n⚠️ 处理失败:无法确定入户门位置")
- sys.exit(1)
- if __name__ == "__main__":
- main()
|