# Copyright (c) 2025 Martin Pflaum
# This file is part of the diffinytrace project, licensed under the MIT License.
__all__ = ["Integrator", "Cube","Disc","IntegrationMethod"]
import torch
import numpy as np
import math
from scipy.stats import qmc
from enum import Enum
mersenne_twister = np.random.Generator(np.random.MT19937(seed=12345))
[docs]
class IntegrationMethod(Enum):
SIMPSON = "simpson"
MIDPOINT = "midpoint"
MONTE_CARLO = "monte_carlo"
SOBOL = "sobol"
SOBOL_POW2 = "sobol_pow2"
def check_2val(num_points):
num_points = np.array(num_points)
if len(num_points.shape)>0:
return num_points[0]*num_points[1]
return num_points
[docs]
class Integrator():
def __init__(self):
pass
[docs]
def sample(self, num_points: int | list[int], method: IntegrationMethod) -> tuple[torch.Tensor, torch.Tensor]:
"""
Sample points and weights using the specified method.
Args:
num_points (int or list): Number of points in each dimension.
method (str): The integration method to use. Options are 'simpson', 'midpoint', 'monte_carlo', 'sobol', and 'sobol_pow2'.
Returns:
tuple: A tuple containing the sampled points and their corresponding weights.
"""
raise NotImplementedError("sample() not implemented")
[docs]
def in_bounds(self, x: torch.Tensor) -> torch.Tensor:
raise NotImplementedError("in_bounds() not implemented")
[docs]
def get_volume(self) -> float:
raise NotImplementedError("get_volume() not implemented")
[docs]
class Cube(Integrator):
"""
Integrator for a multi-dimensional cube (hyperrectangle).
Args:
bounds (array-like): The bounds for each dimension of the cube. Should be a list or array of shape (n_dim, 2),
where each row specifies [lower_bound, upper_bound] for a dimension.
Example:
>>> cube = dit.integrators.Cube([[0, 1], [0, 1]])
>>> points, weights = cube.sample([10, 10], method=IntegrationMethod.MIDPOINT)
>>> volume = cube.get_volume()
>>> all_in_bounds = cube.in_bounds(points)
>>> print("Sampled points:", points)
>>> print("Integration weights:", weights)
>>> print("Cube volume:", volume)
>>> print("All points in bounds:", all_in_bounds)
"""
def __init__(self,bounds):
super().__init__()
bounds = np.array(bounds)
if len(bounds.shape)==1:
bounds = np.array([bounds])
self.bounds = torch.tensor(bounds)
if len(self.bounds.shape)!=2:
raise ValueError("len(self.bounds.shape)==2 must hold true!")
[docs]
def sample(self, num_points: int | list[int], method: IntegrationMethod = IntegrationMethod.MIDPOINT) -> tuple[torch.Tensor, torch.Tensor]:
r"""
Sample points and weights using the specified method.
Args:
num_points (int or list): Number of points in each dimension.
method (str): The integration method to use. Options are 'simpson', 'midpoint', 'monte_carlo', 'sobol', and 'sobol_pow2'.
Returns:
tuple: A tuple containing the sampled points and their corresponding weights.
"""
if not isinstance(method, str):
method = str(method.value)
if method == 'simpson':
return self._sample_simpson(num_points)
elif method == 'midpoint':
return self._sample_midpoint(num_points)
elif method == 'monte_carlo':
return self._sample_monte_carlo(num_points)
elif method == 'sobol':
return self._sample_sobol(num_points,False)
elif method == 'sobol_pow2':
return self._sample_sobol(num_points,True)
else:
raise ValueError(f"Unknown integration method: {method}")
[docs]
def in_bounds(self, x:torch.Tensor) -> torch.Tensor:
out = torch.ones(x.shape[0],device=x.device,dtype=torch.bool).float()
for k in range(self.bounds.shape[0]):
out = out*((self.bounds[k,0]<=x[:,k]).float())*((x[:,k]<=self.bounds[k,1]).float())
out = out==1.0
return out
[docs]
def get_volume(self) -> float:
"""
Returns:
float: Volume of the Cube.
"""
volume = torch.prod(self.bounds[:,1]-self.bounds[:,0])
return volume
def _sample_midpoint(self, num_points):
"""
Sample points and weights using the midpoint rule.
Args:
num_points (list or array): Number of points in each dimension.
Returns:
sampled_points (torch.Tensor): Tensor of sampled points.
weights (torch.Tensor): Tensor of weights associated with each point.
"""
# Ensure num_points matches the number of dimensions in bounds
num_points = np.array(num_points)
if len(num_points) != self.bounds.shape[0]:
raise ValueError("Using midpoint sampling expected num_points to match the number of dimensions.")
midpoints = []
# Calculate the midpoints for each dimension
for i in range(self.bounds.shape[0]):
lower_bound, upper_bound = self.bounds[i]
# Calculate the size of each interval (dx) for the dimension
dx = (upper_bound - lower_bound) / num_points[i]
# Compute the midpoints in this dimension
points = torch.linspace(lower_bound + dx / 2.0, upper_bound - dx / 2.0, num_points[i])
midpoints.append(points)
# Create a meshgrid of midpoints for all dimensions
grid = torch.meshgrid(*midpoints, indexing='ij')
sampled_points = torch.stack(grid, dim=-1).reshape(-1, self.bounds.shape[0])
# Compute the weights based on the volume of each subregion
weights = torch.ones(sampled_points.shape[0], dtype=torch.float32)
for i in range(self.bounds.shape[0]):
lower_bound, upper_bound = self.bounds[i]
dx = (upper_bound - lower_bound) / num_points[i]
# Each dimension contributes its own weight
weights *= dx # Multiply the weights by the width of the intervals
return sampled_points, weights
def _sample_simpson(self, num_points):
# Ensure num_points matches the expected number of dimensions
num_points = np.array(num_points)
if len(num_points) != self.bounds.shape[0]:
raise ValueError("Using Simpson's rule expected num_points to have the same number of entries as dimensions.")
for elem in num_points:
if elem % 2 == 0:
raise ValueError("Simpson's rule only takes an odd number of points!")
# Create sample points and weights
sample_points = []
weights = []
for i in range(self.bounds.shape[0]):
lower_bound, upper_bound = self.bounds[i]
dx = (upper_bound - lower_bound) / (num_points[i] - 1)
x = torch.linspace(lower_bound, upper_bound, num_points[i]) # Include endpoints
sample_points.append(x)
# Weights for Simpson's rule
w = torch.ones(num_points[i]) # Initialize weights
w[1:-1:2] *= 4 # Odd indices
w[2:-1:2] *= 2 # Even indices
weights.append(w * dx / 3) # Multiply by dx/3
# Create a meshgrid of sample points
grid = torch.meshgrid(*sample_points)
sampled_points = torch.stack(grid, dim=-1).reshape(-1, self.bounds.shape[0])
# Total weight for each point
total_weights = weights[0]
for w in weights[1:]:
total_weights = torch.ger(total_weights, w).reshape(-1) # Use outer product and flatten
points, weights = sampled_points, total_weights.reshape(-1) # Ensure weights are a flat array
points = points.to(torch.get_default_dtype())
weights = weights.to(torch.get_default_dtype())
return points, weights
def _sample_trapezoidal(self, num_points):
raise RuntimeError("DO NOT USE THIS method. It's more or less the same as midpoint rule....")
num_points = np.array(num_points)
if len(num_points) != self.bounds.shape[0]:
raise ValueError("Using trapezoidal sampling expected num_points to match the number of dimensions.")
sample_points = []
weights = []
for i in range(self.bounds.shape[0]):
lower_bound, upper_bound = self.bounds[i]
dx = (upper_bound - lower_bound) / (num_points[i] - 1)
x = torch.linspace(lower_bound, upper_bound, num_points[i])
sample_points.append(x)
# Weights for the trapezoidal rule
w = torch.ones(num_points[i])
w[0] /= 2 # First point weight
w[-1] /= 2 # Last point weight
weights.append(w * dx) # Multiply by dx to get the correct area contribution
# Create a meshgrid of sample points
grid = torch.meshgrid(*sample_points, indexing='ij')
sampled_points = torch.stack(grid, dim=-1).reshape(-1, self.bounds.shape[0])
# Calculate total weights
total_weights = weights[0]
for w in weights[1:]:
total_weights = total_weights.unsqueeze(-1) * w.unsqueeze(0) # Use broadcasting to multiply correctly
points, weights = sampled_points, total_weights.reshape(-1)
points = points.to(torch.get_default_dtype())
weights = weights.to(torch.get_default_dtype())
return points, weights
def _sample_monte_carlo(self, num_points):
num_points = check_2val(num_points)
if len(num_points.shape)!=0:
raise ValueError("num_points for monte_carlo needs to be a scalar")
"""Sample points uniformly using the Monte Carlo method."""
points = torch.empty((num_points, self.bounds.shape[0]))
for i in range(self.bounds.shape[0]):
# Generate random points uniformly within the bounds for each dimension
rand_points = mersenne_twister.uniform(0,1,size=num_points)
rand_points = torch.tensor(rand_points, dtype=torch.float32)
points[:, i] = rand_points * (self.bounds[i, 1] - self.bounds[i, 0]) + self.bounds[i, 0]
# Calculate the volume of the cube
volume = torch.prod(self.bounds[:,1]-self.bounds[:,0])
constant_multi = volume/float(num_points)
weights = torch.full((int(num_points),),fill_value=constant_multi)
points = points.to(torch.get_default_dtype())
weights = weights.to(torch.get_default_dtype())
return points, weights
def _sample_sobol(self, num_points,is_pow2):
num_points = check_2val(num_points)
if len(num_points.shape)!=0:
raise ValueError("num_points for sobol needs to be a scalar")
"""Sample points using the Sobol sequence method."""
points = None
num_points_log2 = np.log2(num_points)
if round(num_points_log2,0) != num_points_log2:
if is_pow2:
raise RuntimeError("round(num_points_log2,0) != num_points_log2"+ f",num_points_log2=={num_points_log2}")
sobol = torch.quasirandom.SobolEngine(dimension=self.bounds.shape[0], scramble=True)
points = sobol.draw(num_points,dtype=torch.float32)
else:
sampler = qmc.Sobol(d=self.bounds.shape[0], scramble=True)
points = sampler.random_base2(m=int(num_points_log2))
points = torch.tensor(points)
# Scale points according to the cube bounds
scaled_points = points * (self.bounds[:, 1] - self.bounds[:, 0]) + self.bounds[:, 0]
# Calculate the volume of the cube
volume = torch.prod(self.bounds[:,1]-self.bounds[:,0])
constant_multi = volume/float(num_points)
weights = torch.full((int(num_points),),fill_value=constant_multi)
points = points.to(torch.get_default_dtype())
weights = weights.to(torch.get_default_dtype())
return scaled_points, weights
[docs]
class Disc(Integrator):
"""
Integrator for a 2D disc (circle).
Args:
radius (float): The radius of the disc.
Example:
>>> disc = dit.integrators.Disc(1.0)
>>> points, weights = disc.sample(2**4, method="sobol_pow2")
>>> volume = disc.get_volume()
>>> all_in_bounds = disc.in_bounds(points)
>>> print("Sampled points:", points)
>>> print("Integration weights:", weights)
>>> print("Disc area:", volume)
>>> print("All points in bounds:", all_in_bounds)
"""
def __init__(self,radius):
self.radius = float(radius)
[docs]
def sample(self, num_points: int | list[int], method: IntegrationMethod = IntegrationMethod.SOBOL) -> tuple[torch.Tensor, torch.Tensor]:
"""
Sample points and weights using the specified method.
Args:
num_points (int or list): Number of points in each dimension.
method (str): The integration method to use. Options are 'simpson', 'midpoint', 'monte_carlo', 'sobol', and 'sobol_pow2'.
Returns:
tuple: A tuple containing the sampled points and their corresponding weights.
"""
if not isinstance(method, str):
method = str(method.value)
if method == 'simpson':
return self._sample_simpson(num_points)
elif method == 'monte_carlo':
return self._sample_monte_carlo(num_points)
elif method == 'sobol':
return self._sample_sobol(num_points,False)
elif method == 'sobol_pow2':
return self._sample_sobol(num_points,True)
elif method == 'midpoint':
return self._sample_midpoint(num_points)
else:
raise ValueError(f"Unknown integration method: {method}")
def _sample_midpoint(self, num_points):
#raise RuntimeError("midpoint rule not implemted for disc")
num_points = np.array(num_points)
midpoints = []
# Calculate the midpoints for each dimension
lower_bound, upper_bound = [-self.radius,self.radius]
for i in range(2):
# Calculate the size of each interval (dx) for the dimension
dx = (upper_bound - lower_bound) / num_points[i]
# Compute the midpoints in this dimension
points = torch.linspace(lower_bound + dx / 2.0, upper_bound - dx / 2.0, num_points[i])
midpoints.append(points)
# Create a meshgrid of midpoints for all dimensions
grid = torch.meshgrid(*midpoints, indexing='ij')
sampled_points = torch.stack(grid, dim=-1).reshape(-1, 2)
# Compute the weights based on the volume of each subregion
weights = torch.ones(sampled_points.shape[0], dtype=torch.float32)
lower_bound, upper_bound = [-self.radius,self.radius]
for i in range(2):
dx = (upper_bound - lower_bound) / num_points[i]
# Each dimension contributes its own weight
weights *= dx # Multiply the weights by the width of the intervals
in_bounds = self.in_bounds(sampled_points)
sampled_points = sampled_points[in_bounds]
weights = weights[in_bounds]
return sampled_points, weights
def _sample_simpson(self,num_points):
raise RuntimeError("simpson's rule not implemted for disc")
if len(num_points) != self.bounds.shape[0]:
raise ValueError("using simpson rule expected num_points to have the same number of entries as dimensions (4,3,2)")
def _sample_weights_from_unif(self,points):
num_points = points.shape[0]
volume = (torch.pi*(self.radius**2.0))
constant_multi = volume/float(num_points)
weights = torch.full((int(num_points),),fill_value=constant_multi)
weights = weights.to(torch.get_default_dtype())
return weights
def _sample_points_from_unif(self,points):
# Scale points to the disc
num_points = points.shape[0]
r_points = self.radius * torch.sqrt(points[:, 0]) # Use sqrt to ensure uniform distribution
theta = 2 * torch.pi * points[:, 1]
# Convert polar to Cartesian coordinates
x = r_points * torch.cos(theta)
y = r_points * torch.sin(theta)
# Stack x and y to get the final points
points = torch.stack((x, y), dim=1)
points = points.to(torch.get_default_dtype())
return points
def _sample_monte_carlo(self, num_points):
num_points = check_2val(num_points)
#points = torch.rand(num_points,2)
rand_points = mersenne_twister.uniform(0,1,size=num_points*2)
rand_points = torch.tensor(rand_points, dtype=torch.float32)
rand_points = rand_points.reshape(num_points,2)
out_points = self._sample_points_from_unif(rand_points)
out_weights = self._sample_weights_from_unif(rand_points)
return out_points,out_weights
def _sample_sobol(self, num_points,is_pow2):
num_points = check_2val(num_points)
points = None
num_points_log2 = np.log2(num_points)
if round(num_points_log2,0) != num_points_log2:
if is_pow2:
raise RuntimeError("round(num_points_log2,0) != num_points_log2"+ f",num_points_log2=={num_points_log2}")
sobol = torch.quasirandom.SobolEngine(dimension=2, scramble=True)
points = sobol.draw(num_points,dtype=torch.float32)
else:
sampler = qmc.Sobol(d=2, scramble=True)
points = sampler.random_base2(m=int(num_points_log2))
points = torch.tensor(points)
out_points = self._sample_points_from_unif(points)
out_weights = self._sample_weights_from_unif(points)
return out_points,out_weights
[docs]
def in_bounds(self,x):
"""Check if points are within the disc.
Args:
x (torch.Tensor): Points to check.
Returns:
torch.Tensor: Boolean tensor indicating if points are within the disc.
"""
device = x.device
dtype = x.dtype
return torch.linalg.norm(x,dim=1)<self.radius
[docs]
def get_volume(self):
"""Calculate the volume of the disc.
Returns:
float: Volume of the disc.
"""
return math.pi*self.radius**2.