| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 入户门检测服务客户端
- 用于调用 FastAPI 服务器进行入户门位置检测
- """
- import os
- import sys
- import time
- import json
- import argparse
- from pathlib import Path
- from typing import Optional, Dict, Any, List
- import requests
- from tqdm import tqdm
- class EntranceDoorClient:
- """入户门检测客户端"""
- def __init__(self, base_url: str = "http://localhost:8000"):
- self.base_url = base_url.rstrip("/")
- self.session = requests.Session()
- def health_check(self) -> Dict[str, Any]:
- """健康检查"""
- response = self.session.get(f"{self.base_url}/health")
- response.raise_for_status()
- return response.json()
- def detect(
- self,
- scene_folder: str,
- model_path: Optional[str] = None,
- conf: float = 0.35,
- iou: float = 0.45,
- voxel_size: float = 0.03,
- imgsz: Optional[List[int]] = None,
- vis_ply: bool = False,
- poll_interval: float = 2.0,
- timeout: int = 600
- ) -> Dict[str, Any]:
- """
- 提交检测任务并等待结果
- Args:
- scene_folder: 场景文件夹路径
- model_path: YOLOE 模型路径
- conf: 检测置信度阈值
- iou: NMS IoU 阈值
- voxel_size: 点云体素尺寸
- imgsz: YOLOE 输入图像尺寸 [高,宽]
- vis_ply: 是否生成可视化 PLY
- poll_interval: 查询状态间隔(秒)
- timeout: 超时时间(秒)
- Returns:
- 检测结果
- """
- # 提交任务
- request_data = {
- "scene_folder": scene_folder,
- "conf": conf,
- "iou": iou,
- "voxel_size": voxel_size,
- "vis_ply": vis_ply
- }
- if model_path:
- request_data["model_path"] = model_path
- if imgsz:
- request_data["imgsz"] = imgsz
- response = self.session.post(f"{self.base_url}/detect", json=request_data)
- response.raise_for_status()
- submit_result = response.json()
- task_id = submit_result["task_id"]
- print(f"任务已提交:{task_id}")
- print(f" 场景:{scene_folder}")
- print(f" 可视化:{'是' if vis_ply else '否'}")
- # 轮询任务状态
- start_time = time.time()
- print("\n等待检测结果...")
- with tqdm(total=100, desc="处理进度") as pbar:
- while True:
- # 检查超时
- if time.time() - start_time > timeout:
- raise TimeoutError(f"任务 {task_id} 超时")
- # 查询状态
- status_response = self.session.get(f"{self.base_url}/status/{task_id}")
- status_response.raise_for_status()
- status = status_response.json()
- # 更新进度条
- progress = status.get("progress", 0) or 0
- pbar.n = int(progress * 100)
- pbar.set_description(f"状态:{status['status']}")
- pbar.refresh()
- # 检查任务状态
- if status["status"] == "completed":
- print(f"\n✓ 检测完成!")
- return status
- elif status["status"] == "failed":
- print(f"\n✗ 检测失败:{status.get('error', '未知错误')}")
- raise RuntimeError(f"任务失败:{status.get('error', '未知错误')}")
- # 等待后重试
- time.sleep(poll_interval)
- def detect_and_save(
- self,
- scene_folder: str,
- output_dir: Optional[str] = None, # 此参数已废弃
- **kwargs
- ) -> Dict[str, Any]:
- """
- 检测并保存结果到本地
- Args:
- scene_folder: 场景文件夹路径
- output_dir: 已废弃,结果始终保存到 scene_folder/output
- **kwargs: 传递给 detect() 的参数
- Returns:
- 检测结果
- """
- # 执行检测
- result = self.detect(scene_folder, **kwargs)
- # 结果已保存在场景文件夹的 output 目录下(由服务器端生成)
- output_path = Path(scene_folder) / "output"
- print(f"\n结果已保存在:{output_path}/")
- if kwargs.get("vis_ply", False):
- print(f" - entrance_position.json (JSON 结果)")
- print(f" - vis.ply (可视化文件)")
- else:
- print(f" - entrance_position.json (JSON 结果)")
- return result["result"]
- def get_status(self, task_id: str) -> Dict[str, Any]:
- """查询任务状态"""
- response = self.session.get(f"{self.base_url}/status/{task_id}")
- response.raise_for_status()
- return response.json()
- def get_result(self, task_id: str) -> Dict[str, Any]:
- """获取检测结果"""
- response = self.session.get(f"{self.base_url}/result/{task_id}")
- response.raise_for_status()
- return response.json()
- def download_ply(self, task_id: str, output_path: str) -> str:
- """下载可视化 PLY 文件"""
- response = self.session.get(f"{self.base_url}/result/{task_id}/ply")
- response.raise_for_status()
- with open(output_path, "wb") as f:
- f.write(response.content)
- return output_path
- def delete_task(self, task_id: str) -> bool:
- """删除任务"""
- response = self.session.delete(f"{self.base_url}/task/{task_id}")
- response.raise_for_status()
- return True
- def main():
- parser = argparse.ArgumentParser(
- description="入户门检测客户端",
- formatter_class=argparse.RawDescriptionHelpFormatter,
- epilog="""
- 示例:
- # 健康检查
- python client.py --health
- # 检测单个场景
- python client.py -s scene0001
- # 带可视化输出
- python client.py -s scene0001 --vis_ply
- # 指定服务器地址
- python client.py -s scene0001 --server http://192.168.1.100:8000
- # 调整检测参数
- python client.py -s scene0001 --conf 0.4 --iou 0.5
- """
- )
- parser.add_argument("--server", type=str, default="http://localhost:8000",
- help="服务器地址")
- parser.add_argument("--scene", "-s", type=str, required=True,
- help="场景文件夹")
- parser.add_argument("--output", "-o", type=str, default="client_results",
- help="输出目录")
- parser.add_argument("--model", "-m", type=str, default="yoloe-26x-seg.pt",
- help="YOLOE 模型路径")
- parser.add_argument("--conf", type=float, default=0.35,
- help="置信度阈值")
- parser.add_argument("--iou", type=float, default=0.45,
- help="NMS IoU 阈值")
- parser.add_argument("--voxel-size", type=float, default=0.03,
- help="点云体素大小")
- parser.add_argument("--imgsz", type=int, nargs=2, default=[1024, 2048],
- metavar=("HEIGHT", "WIDTH"),
- help="YOLOE 输入图像尺寸 (高度 宽度)")
- parser.add_argument("--vis_ply", action="store_true",
- help="是否生成可视化 PLY")
- parser.add_argument("--health", action="store_true",
- help="执行健康检查并退出")
- parser.add_argument("--timeout", type=int, default=600,
- help="超时时间(秒)")
- args = parser.parse_args()
- # 创建客户端
- client = EntranceDoorClient(args.server)
- # 健康检查
- if args.health:
- try:
- health = client.health_check()
- print("服务器健康状态:")
- print(f" 状态:{health['status']}")
- print(f" 模型已加载:{health['model_loaded']}")
- print(f" GPU 可用:{health['gpu_available']}")
- print(f" 时间:{health['timestamp']}")
- except requests.exceptions.ConnectionError:
- print(f"无法连接到服务器:{args.server}")
- sys.exit(1)
- return
- # 检测场景
- try:
- result = client.detect_and_save(
- scene_folder=args.scene,
- output_dir=args.output,
- model_path=args.model,
- conf=args.conf,
- iou=args.iou,
- voxel_size=args.voxel_size,
- imgsz=args.imgsz,
- vis_ply=args.vis_ply,
- timeout=args.timeout
- )
- print("\n" + "=" * 50)
- print("检测结果")
- print("=" * 50)
- if result.get("entrance_position"):
- pos = result["entrance_position"]
- print(f"入户门位置:({pos['x']:.3f}, {pos['y']:.3f}, {pos['z']:.3f})")
- print(f"来源:{result.get('source', 'unknown')}")
- print(f"是否估计:{result.get('is_estimated', False)}")
- if result.get("door_info"):
- door = result["door_info"]
- print(f"\n门信息:")
- print(f" 置信度:{door['confidence']:.3f}")
- dim = door['dimensions']
- print(f" 尺寸:{dim['width']:.2f}m × {dim['height']:.2f}m × {dim['thickness']:.3f}m")
- print("\n✓ 处理完成")
- except Exception as e:
- print(f"\n✗ 错误:{e}")
- sys.exit(1)
- if __name__ == "__main__":
- main()
|