diff --git a/.github/workflows/test-inference.yml b/.github/workflows/test-inference.yml index 88b879cc9..ca581dae7 100644 --- a/.github/workflows/test-inference.yml +++ b/.github/workflows/test-inference.yml @@ -10,7 +10,7 @@ jobs: test: name: "Test inference" # This action is designed only to run on the Stability research cluster at this time, so many assumptions are made about the environment - if: github.repository == 'stability-ai/generative-models' + if: github.repository == 'Stability-AI/generative-models' runs-on: [self-hosted, slurm, g40] steps: - uses: actions/checkout@v3 diff --git a/README.md b/README.md index 81758e969..4aff00094 100755 --- a/README.md +++ b/README.md @@ -219,6 +219,35 @@ pip3 install -e git+https://github.com/Stability-AI/datapipelines.git@main#egg=s ## Packaging +## Robotics Utility: Geometric Scene + Ground Truth + +A lightweight synthetic scene generator is available for robotics/perception +debugging and dataset bootstrapping: + +`scripts/util/generate_robotics_geometric_scene.py` + +It produces: +- Rendered RGB frames (checkerboard + cube wireframe). +- Per-frame JSON labels with camera intrinsics/extrinsics and exact 3D->2D + projections. + +Example: + +```shell +python scripts/util/generate_robotics_geometric_scene.py \ + --output-dir outputs/robotics_geometry \ + --frames 32 \ + --seed 42 +``` + +Evaluate reprojection consistency: + +```shell +python scripts/util/evaluate_robotics_geometry.py \ + --dataset-dir outputs/robotics_geometry \ + --max-mean-error-px 0.25 +``` + This repository uses PEP 517 compliant packaging using [Hatch](https://hatch.pypa.io/latest/). To build a distributable wheel, install `hatch` and run `hatch build` diff --git a/scripts/util/evaluate_robotics_geometry.py b/scripts/util/evaluate_robotics_geometry.py new file mode 100644 index 000000000..34a314348 --- /dev/null +++ b/scripts/util/evaluate_robotics_geometry.py @@ -0,0 +1,151 @@ +#!/usr/bin/env python3 +""" +Evaluate geometric consistency of generated robotics scene labels. + +This script recomputes image projections from world points and camera parameters +stored in each frame label, then compares with stored 2D points. + +Metrics: + - mean / median / max reprojection error (pixels) + - per-frame checkerboard and cube errors +""" + +from __future__ import annotations + +import argparse +import json +from pathlib import Path +from typing import Dict, List, Tuple + +import numpy as np + + +def project(points_world: np.ndarray, r_cw: np.ndarray, t_cw: np.ndarray, k: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: + # X_cam = R * X_world + t + points_cam = (r_cw @ points_world.T).T + t_cw.reshape(1, 3) + z = points_cam[:, 2] + valid = z > 1e-8 + uv = np.full((len(points_world), 2), np.nan, dtype=np.float64) + if np.any(valid): + p = points_cam[valid] + pix = (k @ p.T).T + pix = pix[:, :2] / pix[:, 2:3] + uv[valid] = pix + return uv, valid + + +def reprojection_error(pred_uv: np.ndarray, gt_uv: np.ndarray, valid_mask: np.ndarray) -> np.ndarray: + valid = valid_mask.astype(bool) + valid &= ~np.isnan(pred_uv).any(axis=1) + valid &= ~np.isnan(gt_uv).any(axis=1) + if not np.any(valid): + return np.array([], dtype=np.float64) + d = pred_uv[valid] - gt_uv[valid] + return np.linalg.norm(d, axis=1) + + +def evaluate_label(label_path: Path) -> Dict: + data = json.loads(label_path.read_text()) + cam = data["camera"] + obj = data["objects"] + + k = np.array(cam["K"], dtype=np.float64) + r_cw = np.array(cam["R_cw"], dtype=np.float64) + t_cw = np.array(cam["t_cw"], dtype=np.float64) + + cb_world = np.array(obj["checkerboard"]["points_world"], dtype=np.float64) + cb_uv_gt = np.array(obj["checkerboard"]["points_uv"], dtype=np.float64) + cb_valid_gt = np.array(obj["checkerboard"]["valid_mask"], dtype=bool) + cb_uv_pred, cb_valid_pred = project(cb_world, r_cw, t_cw, k) + cb_err = reprojection_error(cb_uv_pred, cb_uv_gt, cb_valid_gt & cb_valid_pred) + + cube_world = np.array(obj["cube"]["corners_world"], dtype=np.float64) + cube_uv_gt = np.array(obj["cube"]["corners_uv"], dtype=np.float64) + cube_valid_gt = np.array(obj["cube"]["valid_mask"], dtype=bool) + cube_uv_pred, cube_valid_pred = project(cube_world, r_cw, t_cw, k) + cube_err = reprojection_error(cube_uv_pred, cube_uv_gt, cube_valid_gt & cube_valid_pred) + + all_err = np.concatenate([cb_err, cube_err]) if (len(cb_err) or len(cube_err)) else np.array([], dtype=np.float64) + + def stats(v: np.ndarray) -> Dict[str, float]: + if len(v) == 0: + return {"count": 0, "mean": float("nan"), "median": float("nan"), "max": float("nan")} + return { + "count": int(len(v)), + "mean": float(np.mean(v)), + "median": float(np.median(v)), + "max": float(np.max(v)), + } + + return { + "frame_index": int(data["frame_index"]), + "checkerboard": stats(cb_err), + "cube": stats(cube_err), + "all": stats(all_err), + } + + +def aggregate(frame_results: List[Dict]) -> Dict: + all_values = [] + cb_values = [] + cube_values = [] + for r in frame_results: + if r["all"]["count"] > 0 and not np.isnan(r["all"]["mean"]): + all_values.append(r["all"]["mean"]) + if r["checkerboard"]["count"] > 0 and not np.isnan(r["checkerboard"]["mean"]): + cb_values.append(r["checkerboard"]["mean"]) + if r["cube"]["count"] > 0 and not np.isnan(r["cube"]["mean"]): + cube_values.append(r["cube"]["mean"]) + + def agg(v: List[float]) -> Dict[str, float]: + if not v: + return {"frames": 0, "mean_of_means": float("nan"), "median_of_means": float("nan"), "max_of_means": float("nan")} + a = np.array(v, dtype=np.float64) + return { + "frames": int(len(a)), + "mean_of_means": float(np.mean(a)), + "median_of_means": float(np.median(a)), + "max_of_means": float(np.max(a)), + } + + return {"all": agg(all_values), "checkerboard": agg(cb_values), "cube": agg(cube_values)} + + +def parse_args() -> argparse.Namespace: + p = argparse.ArgumentParser(description="Evaluate reprojection consistency for generated robotics scene labels.") + p.add_argument("--dataset-dir", type=Path, required=True, help="Path like outputs/robotics_geometry") + p.add_argument("--max-mean-error-px", type=float, default=0.25, help="Fail if global mean_of_means exceeds this threshold") + return p.parse_args() + + +def main() -> None: + args = parse_args() + labels_dir = args.dataset_dir / "labels" + if not labels_dir.exists(): + raise SystemExit(f"labels directory not found: {labels_dir}") + + label_files = sorted(labels_dir.glob("frame_*.json")) + if not label_files: + raise SystemExit(f"no frame_*.json labels found in: {labels_dir}") + + frame_results = [evaluate_label(p) for p in label_files] + summary = aggregate(frame_results) + + out = { + "dataset_dir": str(args.dataset_dir), + "frames_evaluated": len(frame_results), + "threshold_max_mean_error_px": args.max_mean_error_px, + "summary": summary, + } + print(json.dumps(out, indent=2)) + + global_mean = summary["all"]["mean_of_means"] + if not np.isnan(global_mean) and global_mean > args.max_mean_error_px: + raise SystemExit( + f"FAILED: global mean reprojection error {global_mean:.6f}px > {args.max_mean_error_px:.6f}px" + ) + + +if __name__ == "__main__": + main() + diff --git a/scripts/util/generate_robotics_geometric_scene.py b/scripts/util/generate_robotics_geometric_scene.py new file mode 100644 index 000000000..27f0779b4 --- /dev/null +++ b/scripts/util/generate_robotics_geometric_scene.py @@ -0,0 +1,325 @@ +#!/usr/bin/env python3 +""" +Generate deterministic synthetic geometric scenes for robotics evaluation. + +Outputs: + - RGB images with rendered checkerboard + cube wireframe. + - JSON ground-truth labels containing: + * camera intrinsics/extrinsics + * 3D object points + * projected 2D keypoints + +This script is intentionally lightweight (numpy + opencv only) so it can be +used in CI or simple dataset bootstrapping workflows. +""" + +from __future__ import annotations + +import argparse +import json +import math +from pathlib import Path +from typing import Any, Dict, List, Tuple + +import cv2 +import numpy as np + + +def build_intrinsics(width: int, height: int, fov_deg: float) -> np.ndarray: + fov_rad = math.radians(fov_deg) + fx = (width / 2.0) / math.tan(fov_rad / 2.0) + fy = fx + cx = width / 2.0 + cy = height / 2.0 + k = np.array([[fx, 0.0, cx], [0.0, fy, cy], [0.0, 0.0, 1.0]], dtype=np.float64) + return k + + +def look_at_rotation(camera_pos: np.ndarray, target: np.ndarray) -> np.ndarray: + forward = target - camera_pos + forward = forward / np.linalg.norm(forward) + world_up = np.array([0.0, 0.0, 1.0], dtype=np.float64) + right = np.cross(forward, world_up) + right_norm = np.linalg.norm(right) + if right_norm < 1e-9: + world_up = np.array([0.0, 1.0, 0.0], dtype=np.float64) + right = np.cross(forward, world_up) + right_norm = np.linalg.norm(right) + right = right / right_norm + up = np.cross(right, forward) + up = up / np.linalg.norm(up) + + # Camera coordinates: x-right, y-down, z-forward + # Keep y-down by negating up basis. + r_cw = np.stack([right, -up, forward], axis=0) + return r_cw + + +def world_to_camera( + points_world: np.ndarray, r_cw: np.ndarray, camera_pos: np.ndarray +) -> np.ndarray: + # X_cam = R * (X_world - C) + return (r_cw @ (points_world - camera_pos).T).T + + +def project_points(points_cam: np.ndarray, k: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: + z = points_cam[:, 2] + valid = z > 1e-6 + points_2d = np.full((len(points_cam), 2), np.nan, dtype=np.float64) + if np.any(valid): + p = points_cam[valid] + uv = (k @ p.T).T + uv = uv[:, :2] / uv[:, 2:3] + points_2d[valid] = uv + return points_2d, valid + + +def build_checkerboard_points(rows: int, cols: int, square_size: float) -> np.ndarray: + # Checkerboard lies on z=0 plane. + pts = [] + for r in range(rows): + for c in range(cols): + pts.append([c * square_size, r * square_size, 0.0]) + return np.array(pts, dtype=np.float64) + + +def build_cube_points(center: np.ndarray, size: float, yaw_deg: float) -> np.ndarray: + h = size / 2.0 + base = np.array( + [ + [-h, -h, -h], + [h, -h, -h], + [h, h, -h], + [-h, h, -h], + [-h, -h, h], + [h, -h, h], + [h, h, h], + [-h, h, h], + ], + dtype=np.float64, + ) + yaw = math.radians(yaw_deg) + cz, sz = math.cos(yaw), math.sin(yaw) + r_z = np.array([[cz, -sz, 0.0], [sz, cz, 0.0], [0.0, 0.0, 1.0]], dtype=np.float64) + return (r_z @ base.T).T + center + + +def draw_checkerboard( + img: np.ndarray, + corners_uv: np.ndarray, + rows: int, + cols: int, + color_a: Tuple[int, int, int], + color_b: Tuple[int, int, int], +) -> None: + # Fill quads for each checker cell where all 4 corners are valid. + for r in range(rows - 1): + for c in range(cols - 1): + i0 = r * cols + c + i1 = i0 + 1 + i2 = i0 + cols + 1 + i3 = i0 + cols + quad = np.array([corners_uv[i0], corners_uv[i1], corners_uv[i2], corners_uv[i3]]) + if np.isnan(quad).any(): + continue + poly = np.round(quad).astype(np.int32) + color = color_a if (r + c) % 2 == 0 else color_b + cv2.fillConvexPoly(img, poly, color, lineType=cv2.LINE_AA) + + +def draw_wire_cube(img: np.ndarray, cube_uv: np.ndarray, cube_valid: np.ndarray) -> None: + edges = [ + (0, 1), + (1, 2), + (2, 3), + (3, 0), + (4, 5), + (5, 6), + (6, 7), + (7, 4), + (0, 4), + (1, 5), + (2, 6), + (3, 7), + ] + for a, b in edges: + if not (cube_valid[a] and cube_valid[b]): + continue + pa = tuple(np.round(cube_uv[a]).astype(int)) + pb = tuple(np.round(cube_uv[b]).astype(int)) + cv2.line(img, pa, pb, (30, 220, 250), 2, cv2.LINE_AA) + + for i in range(8): + if cube_valid[i]: + p = tuple(np.round(cube_uv[i]).astype(int)) + cv2.circle(img, p, 3, (255, 255, 255), -1, cv2.LINE_AA) + + +def as_list(a: np.ndarray) -> List[Any]: + return a.tolist() + + +def generate_frame( + frame_idx: int, + width: int, + height: int, + rng: np.random.Generator, + rows: int, + cols: int, + square_size: float, + cube_size: float, + fov_deg: float, +) -> Tuple[np.ndarray, Dict[str, Any]]: + img = np.full((height, width, 3), (15, 20, 28), dtype=np.uint8) + + # Scene geometry in world frame + checker_pts_world = build_checkerboard_points(rows, cols, square_size) + board_center = np.array( + [((cols - 1) * square_size) * 0.5, ((rows - 1) * square_size) * 0.5, 0.0], + dtype=np.float64, + ) + + # Slight random object pose for variation, deterministic via seed. + cube_center = board_center + np.array( + [ + rng.uniform(-0.08, 0.08), + rng.uniform(-0.08, 0.08), + cube_size * 0.55 + rng.uniform(0.00, 0.02), + ], + dtype=np.float64, + ) + cube_yaw = rng.uniform(-45.0, 45.0) + cube_pts_world = build_cube_points(cube_center, cube_size, cube_yaw) + + # Camera orbit around board center. + radius = rng.uniform(0.9, 1.4) + azimuth = rng.uniform(0.0, 2.0 * math.pi) + elevation = rng.uniform(0.35, 0.85) + camera_pos = board_center + np.array( + [radius * math.cos(azimuth), radius * math.sin(azimuth), elevation], dtype=np.float64 + ) + target = board_center + np.array([0.0, 0.0, 0.12], dtype=np.float64) + + r_cw = look_at_rotation(camera_pos, target) + t_cw = -r_cw @ camera_pos + k = build_intrinsics(width, height, fov_deg) + + checker_cam = world_to_camera(checker_pts_world, r_cw, camera_pos) + cube_cam = world_to_camera(cube_pts_world, r_cw, camera_pos) + checker_uv, checker_valid = project_points(checker_cam, k) + cube_uv, cube_valid = project_points(cube_cam, k) + + draw_checkerboard( + img, + checker_uv, + rows, + cols, + color_a=(180, 180, 180), + color_b=(70, 70, 70), + ) + draw_wire_cube(img, cube_uv, cube_valid) + + cv2.putText( + img, + f"frame {frame_idx:04d}", + (18, 32), + cv2.FONT_HERSHEY_SIMPLEX, + 0.75, + (220, 220, 220), + 2, + cv2.LINE_AA, + ) + + label: Dict[str, Any] = { + "frame_index": frame_idx, + "camera": { + "width": width, + "height": height, + "fov_deg": fov_deg, + "K": as_list(k), + "R_cw": as_list(r_cw), + "t_cw": as_list(t_cw), + "camera_position_world": as_list(camera_pos), + "target_world": as_list(target), + }, + "objects": { + "checkerboard": { + "rows": rows, + "cols": cols, + "square_size_m": square_size, + "points_world": as_list(checker_pts_world), + "points_cam": as_list(checker_cam), + "points_uv": as_list(checker_uv), + "valid_mask": checker_valid.astype(bool).tolist(), + }, + "cube": { + "size_m": cube_size, + "yaw_deg": cube_yaw, + "center_world": as_list(cube_center), + "corners_world": as_list(cube_pts_world), + "corners_cam": as_list(cube_cam), + "corners_uv": as_list(cube_uv), + "valid_mask": cube_valid.astype(bool).tolist(), + }, + }, + } + return img, label + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Generate synthetic geometric scenes + GT labels for robotics." + ) + parser.add_argument("--output-dir", type=Path, default=Path("outputs/robotics_geometry")) + parser.add_argument("--frames", type=int, default=16) + parser.add_argument("--width", type=int, default=960) + parser.add_argument("--height", type=int, default=540) + parser.add_argument("--fov-deg", type=float, default=60.0) + parser.add_argument("--seed", type=int, default=7) + parser.add_argument("--checker-rows", type=int, default=7) + parser.add_argument("--checker-cols", type=int, default=10) + parser.add_argument("--square-size", type=float, default=0.04) + parser.add_argument("--cube-size", type=float, default=0.12) + return parser.parse_args() + + +def main() -> None: + args = parse_args() + rng = np.random.default_rng(args.seed) + + out_img = args.output_dir / "images" + out_lbl = args.output_dir / "labels" + out_img.mkdir(parents=True, exist_ok=True) + out_lbl.mkdir(parents=True, exist_ok=True) + + for i in range(args.frames): + img, label = generate_frame( + frame_idx=i, + width=args.width, + height=args.height, + rng=rng, + rows=args.checker_rows, + cols=args.checker_cols, + square_size=args.square_size, + cube_size=args.cube_size, + fov_deg=args.fov_deg, + ) + img_path = out_img / f"frame_{i:04d}.png" + lbl_path = out_lbl / f"frame_{i:04d}.json" + cv2.imwrite(str(img_path), img) + lbl_path.write_text(json.dumps(label, indent=2)) + + summary = { + "frames": args.frames, + "image_size": [args.width, args.height], + "fov_deg": args.fov_deg, + "seed": args.seed, + "output_dir": str(args.output_dir), + } + (args.output_dir / "dataset_summary.json").write_text(json.dumps(summary, indent=2)) + print(json.dumps(summary, indent=2)) + + +if __name__ == "__main__": + main() +