Python Tutorial#
Overview#
This tutorial walks through four complete pycunls examples, each demonstrating a different optimization pattern. Every example follows the same high-level flow described in the Introduction:
Allocate state data on the GPU (CuPy arrays).
Wrap the GPU memory in one or more state batch objects.
Build one or more factor batch objects from observations.
Add state batches and factor batches to a Problem.
Run a minimizer and inspect MinimizerSummary.
The examples increase in complexity:
Sparse Bundle Adjustment — uses ReprojectionFactorBatch to jointly optimize camera poses and 3D landmarks from multi-view observations.
Pose Graph Optimization — uses SE3BetweenFactorBatch to recover a chain of SE(3) poses from consecutive relative-transform measurements.
Custom Warp Factor — shows how to implement a user-defined factor kernel using NVIDIA Warp and WarpFactorBatch.
Custom Warp State — shows how to implement a custom manifold retraction using NVIDIA Warp and WarpStateBatch.
Full source code for all examples lives in the python/examples/
directory.
Sparse Bundle Adjustment#
Source: python/examples/sparse_bundle_adjustment.py
SBA problem statement#
This is a Python port of the C++ bundle adjustment example (see Sparse Bundle Adjustment for the full mathematical formulation). Given \(M\) cameras with poses \(T_1, \ldots, T_M \in \mathrm{SE}(3)\) and \(N\) 3D landmarks \(\mathbf{p}_1, \ldots, \mathbf{p}_N \in \mathbb{R}^3\), we minimize the sum of squared reprojection errors across all \(K\) observations:
Pose \(T_0\) is held constant as a gauge anchor.
SBA API used#
Class |
Role |
|---|---|
Stores camera poses on the SE(3) manifold. The first pose is marked
constant via |
|
Stores 3D landmark coordinates in \(\mathbb{R}^3\). All points are optimization variables. |
|
Computes normalized reprojection residuals and Jacobians for each (pose, point) pair. |
|
Assembles the factor graph. |
|
Iteratively solves the nonlinear least-squares problem with adaptive damping. |
SBA code walkthrough#
Step 1 — Generate synthetic data. Random camera poses and 3D points are created on the host using NumPy. Each ground-truth point is filtered so that it has positive depth in all cameras. Poses \(T_1, \ldots, T_{M-1}\) and all points are perturbed to create noisy initial estimates; \(T_0\) stays at ground truth.
import numpy as np
import cupy as cp
import pycunls
from se3_utils import twist_to_se3, compose_se3, project_normalized
num_poses = 6
num_points = 800
num_observations = num_poses * num_points
rng = np.random.default_rng(1234)
# Ground-truth SE(3) camera poses from random twists.
gt_poses = [twist_to_se3(twist) for twist in ...]
# 3D points visible from all cameras (positive depth).
gt_points = ... # (num_points, 3) float32
# Perturbed initial estimates.
initial_points = gt_points + rng.uniform(-0.35, 0.35, gt_points.shape)
initial_poses = [...] # T_0 at ground truth, T_1..T_{M-1} perturbed
Step 2 — Create 2D observations. Each camera observes every 3D point. Observations are in normalized image coordinates — the format ReprojectionFactorBatch expects.
observations = np.empty((num_observations, 2), dtype=np.float32)
for pi in range(num_poses):
for qi in range(num_points):
observations[pi * num_points + qi] = project_normalized(
gt_poses[pi], gt_points[qi])
Step 3 — Upload data to the GPU and build state batches. CuPy arrays serve as GPU storage. SE3StateBatch wraps the pose memory with only \(T_0\) marked constant; VectorStateBatch3 wraps the landmark memory.
poses_gpu = cp.asarray(initial_poses_flat, dtype=cp.float32)
points_gpu = cp.asarray(initial_points.reshape(-1), dtype=cp.float32)
obs_gpu = cp.asarray(observations.reshape(-1), dtype=cp.float32)
const_ids = cp.array([0], dtype=cp.int32)
cublas = pycunls.CublasHandle()
pose_states = pycunls.SE3StateBatch(cublas, poses_gpu, num_poses, const_ids, 1)
point_states = pycunls.VectorStateBatch3(points_gpu, num_points)
Step 4 — Build the reprojection factor batch.
reproj_factor = pycunls.ReprojectionFactorBatch(
obs_gpu, num_observations, z_threshold=1e-3)
Step 5 — Create the state-pointer list and assemble the problem.
The state-pointer list is a flat sequence of device pointers, two per
factor: [pose_ptr, point_ptr]. This tells the solver which state
blocks each factor reads.
state_pointers = []
for pi in range(num_poses):
for qi in range(num_points):
state_pointers.append(pose_states.state_block_device_ptr(pi))
state_pointers.append(point_states.state_block_device_ptr(qi))
problem = pycunls.Problem()
problem.add_state_batch(pose_states)
problem.add_state_batch(point_states)
problem.add_factor_batch(reproj_factor, state_pointers)
assert problem.check_consistency()
Step 6 — Configure and run Levenberg-Marquardt .
opts = pycunls.MinimizerOptions()
opts.max_num_iterations = 80
opts.state_tolerance = 1e-8
opts.cost_tolerance = 1e-8
lm_opts = pycunls.LevenbergMarquardtMinimizerOptions()
lm_opts.base_options = opts
lm_opts.initial_lambda = 1e-3
stream = pycunls.CudaStream()
minimizer = pycunls.LevenbergMarquardtMinimizer(lm_opts)
summary = minimizer.minimize(stream, problem)
cp.cuda.runtime.streamSynchronize(stream.get_stream())
Step 7 — Read back results.
After optimization, the CuPy arrays contain the updated state. Copy them
to the host with cp.asnumpy and compare against ground truth.
optimized_points = cp.asnumpy(points_gpu).reshape(-1, 3)
point_mse = float(np.mean((optimized_points - gt_points) ** 2))
print(f"Initial cost: {summary.initial_cost:.6f}")
print(f"Final cost: {summary.final_cost:.6f}")
print(f"Iterations: {summary.num_iterations}")
print(f"Point MSE: {point_mse:.6f}")
Pose Graph Optimization#
Source: python/examples/pose_graph_optimization.py
PGO problem statement#
This is a Python port of the C++ pose graph example (see Pose Graph Optimization for the full mathematical formulation). A chain of \(N\) SE(3) poses is connected by \(N{-}1\) between constraints. The residual for constraint \(i\) is:
Pose \(T_0\) is held constant as a gauge anchor.
PGO API used#
Class |
Role |
|---|---|
A single instance stores the full pose chain. The first pose is marked constant; the rest are optimized. |
|
Computes the relative-transform residual and its Jacobians w.r.t. both pose blocks. |
|
Assembles the factor graph. |
|
Solves the nonlinear system. |
PGO code walkthrough#
Step 1 — Generate synthetic pose chain and measurements. A random anchor and random relative transforms are generated on the host. Ground-truth poses are built by chaining the transforms, then all poses except the anchor are perturbed.
num_poses = 201
num_constraints = num_poses - 1
rng = np.random.default_rng(9012)
anchor = random_se3()
deltas = [random_se3() for _ in range(num_constraints)]
gt_poses = [anchor]
for i in range(num_constraints):
gt_poses.append(compose_se3(gt_poses[-1], se3_inverse(deltas[i])))
initial_poses = [gt_poses[0].copy()]
for i in range(1, num_poses):
perturbation = random_se3(rot_scale=0.05, trans_scale=0.3)
initial_poses.append(compose_se3(perturbation, gt_poses[i]))
Step 2 — Upload data and build the state batch.
poses_gpu = cp.asarray(np.stack(initial_poses).reshape(-1), dtype=cp.float32)
deltas_gpu = cp.asarray(np.stack(deltas).reshape(-1), dtype=cp.float32)
const_ids = cp.array([0], dtype=cp.int32)
cublas = pycunls.CublasHandle()
pose_states = pycunls.SE3StateBatch(
cublas, poses_gpu, num_poses, const_ids, 1)
Step 3 — Build the between-factor batch.
between_factor = pycunls.SE3BetweenFactorBatch(
deltas_gpu, num_constraints)
Step 4 — Wire state pointers and assemble the problem.
Each between factor reads two state blocks: [T_i, T_{i+1}].
state_pointers = []
for i in range(num_constraints):
state_pointers.append(pose_states.state_block_device_ptr(i))
state_pointers.append(pose_states.state_block_device_ptr(i + 1))
problem = pycunls.Problem()
problem.add_state_batch(pose_states)
problem.add_factor_batch(between_factor, state_pointers)
assert problem.check_consistency()
Step 5 — Solve with Levenberg-Marquardt.
opts = pycunls.MinimizerOptions()
opts.max_num_iterations = 60
opts.state_tolerance = 1e-8
opts.cost_tolerance = 1e-8
lm_opts = pycunls.LevenbergMarquardtMinimizerOptions()
lm_opts.base_options = opts
lm_opts.initial_lambda = 1e-3
stream = pycunls.CudaStream()
minimizer = pycunls.LevenbergMarquardtMinimizer(lm_opts)
summary = minimizer.minimize(stream, problem)
cp.cuda.runtime.streamSynchronize(stream.get_stream())
print(f"Initial cost: {summary.initial_cost:.6f}")
print(f"Final cost: {summary.final_cost:.6f}")
print(f"Iterations: {summary.num_iterations}")
Custom Warp Factor#
Source: python/examples/custom_warp_factor.py
Custom Warp factor problem statement#
This is a Python port of the C++ custom factor example (see
Custom Factor). It builds a chain of scalar states
connected by difference constraints implemented as an NVIDIA Warp kernel
through WarpFactorBatch (from
pycunls.warp):
A prior factor anchors \(x_0\) to its observed value:
Warp factor API used#
Class |
Role |
|---|---|
Stores all \(N\) scalar states in \(\mathbb{R}^1\). |
|
Base class for user-defined factors evaluated via Warp kernels.
Provides |
|
Built-in prior factor that anchors \(x_0\). |
|
Assembles the factor graph. |
|
Solves the nonlinear system. |
Warp factor code walkthrough#
Step 1 — Define the Warp kernel.
The kernel computes the scalar difference residual and constant Jacobian
[-1, +1] for each factor. State values are first gathered into
contiguous buffers (since Warp kernels operate on contiguous
wp.array objects and cannot perform the double-pointer indirection
that raw CUDA kernels do).
import warp as wp
from pycunls.warp import WarpFactorBatch
@wp.kernel
def scalar_diff_kernel(
measurements: wp.array(dtype=wp.float32),
left_vals: wp.array(dtype=wp.float32),
right_vals: wp.array(dtype=wp.float32),
residuals: wp.array(dtype=wp.float32),
jacobians: wp.array(dtype=wp.float32),
num_factors: int,
write_jacobians: int,
):
i = wp.tid()
if i >= num_factors:
return
residuals[i] = (right_vals[i] - left_vals[i]) - measurements[i]
if write_jacobians != 0:
jacobians[i * 2] = -1.0
jacobians[i * 2 + 1] = 1.0
Step 2 — Subclass WarpFactorBatch.
The evaluate method gathers scattered state pointers into contiguous
CuPy arrays, wraps them as wp.array objects, and launches the Warp
kernel.
class ScalarDiffFactor(WarpFactorBatch):
def __init__(self, measurements_wp, num_factors):
super().__init__(
residual_size=1,
state_block_sizes=[1, 1],
num_factors=num_factors,
)
self.measurements = measurements_wp
self._num_factors = num_factors
def evaluate(self, residuals_ptr, jacobians_ptr,
state_pointers_ptr, stream_handle):
n = self._num_factors
all_vals = _gather_state_values(state_pointers_ptr, n * 2)
left_vals = all_vals[0::2].copy()
right_vals = all_vals[1::2].copy()
left_wp = wp.array(ptr=int(left_vals.data.ptr), ...)
right_wp = wp.array(ptr=int(right_vals.data.ptr), ...)
res = self.wrap_array(residuals_ptr, wp.float32, n)
jac = self.wrap_array(jacobians_ptr, wp.float32, n * 2)
stream = self.make_warp_stream(stream_handle)
wp.launch(scalar_diff_kernel, dim=n,
inputs=[self.measurements, left_wp, right_wp,
res, jac, n, 1],
stream=stream)
return True
Step 3 — Build the problem and solve. A VectorStateBatch1 holds all scalar states. Two factor batches are added: the custom Warp difference factor and a built-in prior anchor on \(x_0\).
states_gpu = cp.asarray(initial)
measurements_wp = wp.array(measurements_np, dtype=wp.float32, device="cuda:0")
prior_obs_gpu = cp.asarray(gt[:1])
state_batch = pycunls.VectorStateBatch1(states_gpu, num_states)
diff_factor = ScalarDiffFactor(measurements_wp, num_diff)
prior_factor = pycunls.PriorVectorFactorBatch1(prior_obs_gpu, 1)
diff_ptrs = []
for i in range(num_diff):
diff_ptrs.append(state_batch.state_block_device_ptr(i))
diff_ptrs.append(state_batch.state_block_device_ptr(i + 1))
prior_ptrs = [state_batch.state_block_device_ptr(0)]
problem = pycunls.Problem()
problem.add_state_batch(state_batch)
problem.add_factor_batch(diff_factor, diff_ptrs)
problem.add_factor_batch(prior_factor, prior_ptrs)
assert problem.check_consistency()
minimizer = pycunls.LevenbergMarquardtMinimizer(lm_opts)
summary = minimizer.minimize(stream, problem)
Custom Warp State#
Source: python/examples/custom_warp_state.py
Custom Warp state problem statement#
This example demonstrates WarpStateBatch (from pycunls.warp) by
defining a positive-scalar manifold where the Plus (retraction)
operation is multiplicative:
This makes the tangent space the reals (\(\delta \in \mathbb{R}\)), while states stay strictly positive — the natural parameterization for quantities like scales, variances, or rates.
A chain of positive scalars is connected by log-ratio between-factors:
All Jacobians equal \(\pm 1\) because the problem is linear in the tangent (log) space.
Warp state API used#
Class |
Role |
|---|---|
Base class for user-defined state batches with a Warp-based Plus.
Provides |
|
Base class for the custom log-prior and log-ratio factors. |
|
Assembles the factor graph. |
|
Solves the nonlinear system. |
Warp state code walkthrough#
Step 1 — Define the Warp Plus kernel. The kernel computes the multiplicative retraction for each state block.
@wp.kernel
def positive_plus_kernel(
x: wp.array(dtype=wp.float32),
delta: wp.array(dtype=wp.float32),
x_plus_delta: wp.array(dtype=wp.float32),
n: int,
):
i = wp.tid()
if i >= n:
return
x_plus_delta[i] = x[i] * wp.exp(delta[i])
Step 2 — Subclass WarpStateBatch.
The plus method wraps the raw device pointers as wp.array objects
and launches the kernel.
from pycunls.warp import WarpStateBatch
class PositiveScalarStateBatch(WarpStateBatch):
def __init__(self, data, num_blocks, **kwargs):
super().__init__(data, ambient_size=1, tangent_size=1,
num_blocks=num_blocks, **kwargs)
self._num = num_blocks
def plus(self, x_ptr, delta_ptr, x_plus_delta_ptr, stream_handle):
n = self._num
x = self.wrap_array(x_ptr, wp.float32, n)
delta = self.wrap_array(delta_ptr, wp.float32, n)
x_out = self.wrap_array(x_plus_delta_ptr, wp.float32, n)
stream = self.make_warp_stream(stream_handle)
wp.launch(positive_plus_kernel, dim=n,
inputs=[x, delta, x_out, n], stream=stream)
Step 3 — Define custom factors and build the problem. Two custom WarpFactorBatch subclasses compute the log-prior and log-ratio residuals. The problem is assembled the same way as previous examples.
states_gpu = cp.asarray(initial)
state_batch = PositiveScalarStateBatch(states_gpu, num_states)
between_factor = LogRatioBetweenFactor(log_ratios_wp, num_between)
prior_factor = LogPriorFactor(prior_obs_wp, 1)
problem = pycunls.Problem()
problem.add_state_batch(state_batch)
problem.add_factor_batch(between_factor, between_ptrs)
problem.add_factor_batch(prior_factor, prior_ptrs)
assert problem.check_consistency()
minimizer = pycunls.LevenbergMarquardtMinimizer(lm_opts)
summary = minimizer.minimize(stream, problem)
Because the problem is linear in log-space, Gauss-Newton converges in a single iteration.