Source code for polygen.polygen2d.errorBasedLloyd

import numpy as np
from shapely.geometry import Polygon, Point
from scipy.spatial import Voronoi
import time
import warnings
from shapely import prepared
from collections import deque

[docs] class PointHistoryBuffer: """ Memory-efficient circular buffer for storing point configurations during Lloyd's algorithm. This class implements a circular buffer to store the history of point configurations and track the optimal configuration based on gradient norm values. It uses a deque data structure to maintain memory efficiency when storing multiple configurations. Parameters ---------- max_size : int Maximum number of point configurations to store in the buffer point_shape : tuple Shape of each point configuration array (e.g., (n_points, 2) for 2D points) dtype : numpy.dtype, optional Data type for storing point coordinates, default is np.float64 Attributes ---------- buffer : collections.deque Circular buffer containing point configurations optimal_config : ndarray Point configuration with the lowest gradient norm optimal_norm : float Lowest gradient norm value encountered optimal_iteration : int Iteration number where the optimal configuration was found Notes ----- The buffer automatically removes oldest configurations when max_size is reached. """
[docs] def __init__(self, max_size, point_shape, dtype=np.float64): self.max_size = max_size self.buffer = deque(maxlen=max_size) self.point_shape = point_shape self.dtype = dtype # Track optimal configuration self.optimal_config = None self.optimal_norm = float('inf') self.optimal_iteration = -1
[docs] def append(self, points, iteration, grad_norm): """ Add a new point configuration to the buffer. Parameters ---------- points : ndarray Array of point coordinates to store iteration : int Current iteration number grad_norm : float Gradient norm for the current configuration """ self.buffer.append(points.copy()) # Update optimal configuration if necessary if grad_norm < self.optimal_norm: self.optimal_norm = grad_norm self.optimal_config = points.copy() self.optimal_iteration = iteration
[docs] def get_config(self, index): """ Retrieve a specific point configuration from the buffer. Parameters ---------- index : int Index of the configuration to retrieve Returns ------- ndarray or None Point configuration at the specified index, or None if index is invalid """ if 0 <= index < len(self.buffer): return self.buffer[index] return None
[docs] def get_optimal_config(self): """ Retrieve the configuration with the lowest gradient norm. Returns ------- ndarray or None Optimal point configuration, or None if no configurations stored """ return self.optimal_config.copy() if self.optimal_config is not None else None
[docs] def clear(self): """ Clear all configurations from the buffer. """ self.buffer.clear()
def __len__(self): return len(self.buffer)
[docs] def compute_weighted_centroid(X, Y, density_vals, mask, dx, dy): """ Compute the density-weighted centroid of a region using numerical integration. This function calculates the centroid of a region weighted by a density function using discrete numerical integration over a grid. Parameters ---------- X, Y : ndarray Meshgrid arrays of x and y coordinates density_vals : ndarray Array of density values at each grid point mask : ndarray Boolean mask indicating points inside the region dx, dy : float Grid spacing in x and y directions Returns ------- tuple (x_centroid, y_centroid) coordinates of the weighted centroid """ masked_density = density_vals * mask total_mass = np.sum(masked_density) * dx * dy if total_mass < 1e-10: # Return geometric centroid based on the masked area x_centroid = np.sum(X * mask) / np.sum(mask) y_centroid = np.sum(Y * mask) / np.sum(mask) return x_centroid, y_centroid x_centroid = np.sum(X * masked_density) * dx * dy / total_mass y_centroid = np.sum(Y * masked_density) * dx * dy / total_mass return x_centroid, y_centroid
[docs] def create_density_grid(bounds, samples, density_fn): """ Create a discretized grid and compute density values for numerical integration. Parameters ---------- bounds : tuple (x_min, y_min, x_max, y_max) defining the bounding box samples : int Number of sample points in each dimension density_fn : callable Function that takes (x, y) arrays and returns density values Returns ------- X : ndarray 2D array of x-coordinates Y : ndarray 2D array of y-coordinates density_vals : ndarray 2D array of density values at grid points dx : float Grid spacing in x-direction dy : float Grid spacing in y-direction """ x_min, y_min, x_max, y_max = bounds x = np.linspace(x_min, x_max, samples) y = np.linspace(y_min, y_max, samples) X, Y = np.meshgrid(x, y) # Vectorized density computation density_vals = density_fn(X, Y) dx = x[1] - x[0] dy = y[1] - y[0] return X, Y, density_vals, dx, dy
[docs] def compute_adaptive_samples(polygon_region, reference_area, min_samples=20, max_samples=100): """ Determine appropriate grid resolution based on region size. Adaptively computes the number of grid samples needed for accurate numerical integration based on the relative size of the region. Parameters ---------- polygon_region : shapely.geometry.Polygon Region to compute samples for reference_area : float Area used to determine relative size for sampling For individual cells, this should be the expected cell area min_samples, max_samples : int Bounds for number of samples Returns ------- int Number of samples to use for the grid Notes ----- The number of samples scales linearly with the relative area of the region. """ relative_area = polygon_region.area / reference_area samples = int(min_samples + (max_samples - min_samples) * relative_area) return np.clip(samples, min_samples, max_samples)
[docs] def calculate_density_weighted_centroid(polygon_region, density_fn, is_uniform_density, total_area): """ Calculate the density-weighted centroid of a polygon region. This function computes the centroid of a polygon region weighted by a density function using adaptive numerical integration. Parameters ---------- polygon_region : shapely.geometry.Polygon Region to calculate centroid for density_fn : callable Function that takes (x, y) arrays and returns density values is_uniform_density : bool If True, uses geometric centroid instead of weighted centroid total_area : float Total area of the boundary polygon Returns ------- tuple (x, y) coordinates of the weighted centroid Notes ----- For uniform density, returns the geometric centroid. Uses adaptive grid resolution based on region size for numerical integration. """ if is_uniform_density: return polygon_region.centroid.coords[0] # Use adaptive grid resolution samples = compute_adaptive_samples(polygon_region, total_area) # Create density grid X, Y, density_vals, dx, dy = create_density_grid(polygon_region.bounds, samples, density_fn) # Prepare points for containment test points = np.column_stack((X.flatten(), Y.flatten())) # Vectorized point-in-polygon test prepared_region = prepared.prep(polygon_region) mask = np.array([prepared_region.contains(Point(p)) for p in points], dtype=float) mask = mask.reshape(X.shape) # Compute weighted centroid x_centroid, y_centroid = compute_weighted_centroid(X, Y, density_vals, mask, dx, dy) if x_centroid == 0.0 and y_centroid == 0.0: return polygon_region.centroid.coords[0] return (x_centroid, y_centroid)
[docs] def calculate_energy(points, voronoi, polygon, density_fn, is_uniform_density): """ Calculate Lloyd's energy for the current point configuration. Computes the total energy of the configuration defined as the sum of second moments of each Voronoi cell, weighted by the density function. Parameters ---------- points : ndarray Array of generator points, shape (n_points, 2) voronoi : scipy.spatial.Voronoi Voronoi diagram for current points polygon : shapely.geometry.Polygon Boundary polygon density_fn : callable Function that takes (x, y) arrays and returns density values is_uniform_density : bool If True, uses uniform density weighting Returns ------- float Total energy of the configuration """ energy = 0.0 for point_idx, region_idx in enumerate(voronoi.point_region): vertices = voronoi.regions[region_idx] if not all(v >= 0 for v in vertices): continue # Create Voronoi cell polygon region = [voronoi.vertices[i] for i in vertices] cell = Polygon(region) if not cell.is_valid: cell = cell.buffer(0) # Clip with boundary cell = cell.intersection(polygon) if cell.is_empty: continue # Calculate second moment relative to generator point bounds = cell.bounds x_min, y_min, x_max, y_max = bounds # Use numerical integration for energy calculation x = np.linspace(x_min, x_max, 20) y = np.linspace(y_min, y_max, 20) X, Y = np.meshgrid(x, y) dx = x[1] - x[0] dy = y[1] - y[0] # Points for containment test points_grid = np.column_stack((X.flatten(), Y.flatten())) mask = np.array([cell.contains(Point(p)) for p in points_grid]).reshape(X.shape) # Calculate squared distances dx_squared = (X - points[point_idx, 0])**2 dy_squared = (Y - points[point_idx, 1])**2 squared_distances = dx_squared + dy_squared if is_uniform_density: energy += np.sum(squared_distances * mask) * dx * dy else: density_vals = density_fn(X, Y) energy += np.sum(squared_distances * mask * density_vals) * dx * dy return energy
[docs] def compute_error_measure(polygon, points, centroids, cell_masses, density_fn, is_uniform_density): """ Compute the non-dimensionalized error measure with a simple cached density integral. This function uses a single cached value for the density integral, which is computed only once for a given domain and density function combination. The cached value is stored as a function attribute using Python's ability to attach attributes to functions. """ n_points = len(points) total_area = polygon.area # Compute gradient norm using masses point_diffs = points - centroids squared_diffs = np.sum(point_diffs * point_diffs, axis=1) gradient_norm = np.sqrt(np.sum(cell_masses * cell_masses * squared_diffs)) # Handle density integral calculation if is_uniform_density: density_integral = total_area else: # Check if we have already computed the density integral if not hasattr(compute_error_measure, '_cached_integral'): # If this is our first time, compute the integral samples = compute_adaptive_samples(polygon, total_area, min_samples=50, max_samples=100) # Create density grid X, Y, density_vals, dx, dy = create_density_grid( polygon.bounds, samples, density_fn ) # Create domain mask points = np.column_stack((X.flatten(), Y.flatten())) prepared_domain = prepared.prep(polygon) mask = np.array([prepared_domain.contains(Point(p)) for p in points], dtype=float) mask = mask.reshape(X.shape) # Compute and cache the integral value masked_density = density_vals * mask compute_error_measure._cached_integral = np.sum(masked_density) * dx * dy density_integral = compute_error_measure._cached_integral # Compute non-dimensionalized error error = (n_points * gradient_norm) / (np.sqrt(total_area) * density_integral) return error
[docs] def lloyd_with_density(polygon, seed_points, density_function=None, max_iterations=10000000, tol=1e-5, use_decay=True, decay_start=2.0, decay_mechanism="exponential", grad_increase_tol=1.2, history_buffer_size=10): """ This function provides a structured interface to run Lloyd's algorithm (centroidal Voronoi tessellation) on a given polygon domain, using either uniform or non-uniform density functions for weighting. Parameters ---------- polygon : shapely.geometry.Polygon The domain in which to compute the centroidal Voronoi tessellation. seed_points : array_like Initial seed points (n_points x 2) inside the polygon. density_function : callable, optional A function f(x, y) -> float that defines pointwise density. If None, a uniform density of 1 is assumed. Example density function: .. code-block:: python def hexagonDensity(x, y): return np.exp(-20 * (x**2 + y**2)) + 0.05 * np.sin(np.pi * x)**2 * np.sin(np.pi * y)**2 max_iterations : int, optional Maximum number of Lloyd iterations (default=1e7). tol : float, optional Convergence tolerance for the error measure (default=1e-5). use_decay : bool, optional Whether to apply a decay factor to point movements (default=True). decay_start : float, optional Starting value for decay (default=2.0). Decay moves from decay_start -> 1. decay_mechanism : {'exponential', 'linear'}, optional Decay mode for point adjustments (default='exponential'). grad_increase_tol : float, optional Factor of tolerance for detecting error growth (default=1.2). history_buffer_size : int, optional Maximum size of the point configuration buffer (default=10). Returns ------- tuple A tuple (points, metrics) where: - points: The final point configuration as a numpy array - metrics: A dictionary containing convergence information and error history Notes ----- 1. The algorithm uses a numerical integration approach if a non-uniform density is provided. This can be slower but gives more accurate centroid locations. 2. The algorithm stops if: - The error measure is below `tol`. - The error measure grows substantially beyond its minimum observed value. - The maximum iteration limit is reached. """ # Clear any cached integral value at the start if hasattr(compute_error_measure, '_cached_integral'): delattr(compute_error_measure, '_cached_integral') # Initial setup remains the same is_uniform_density = density_function is None density_fn = (lambda x, y: np.ones_like(x)) if is_uniform_density else density_function seed_points = np.array(seed_points) total_area = polygon.area # Calculate expected average cell area # expected_cell_area = total_area / len(seed_points) # Initialize buffer and metrics point_buffer = PointHistoryBuffer( max_size=history_buffer_size, point_shape=seed_points.shape, dtype=seed_points.dtype ) metrics = { 'error_values': [], 'convergence_status': None, 'iterations': 0, 'time_taken': 0, 'min_error': float('inf'), 'min_error_iteration': 0, } point_buffer.append(seed_points, 0, float('inf')) prepared_polygon = prepared.prep(polygon) # Setup decay if use_decay: if decay_mechanism.lower() == "linear": decay_values = decay_start + (1 - decay_start) * (np.arange(max_iterations) / (max_iterations - 1)) else: decay_values = np.exp(-np.linspace(0, max_iterations, max_iterations) / max_iterations * np.log(0.1)) * (decay_start - 1) + 1 start_time = time.time() try: for iteration in range(max_iterations): vor = Voronoi(seed_points) new_points = np.empty_like(seed_points) centroids = np.empty_like(seed_points) cell_masses = np.zeros(len(seed_points)) # Process Voronoi cells and collect necessary information for point_idx, region_idx in enumerate(vor.point_region): vertices = vor.regions[region_idx] if not all(v >= 0 for v in vertices): new_points[point_idx] = seed_points[point_idx] centroids[point_idx] = seed_points[point_idx] continue # Create and validate Voronoi cell region = [vor.vertices[i] for i in vertices] polygon_voronoi = Polygon(region) if not polygon_voronoi.is_valid: polygon_voronoi = polygon_voronoi.buffer(0) clipped_polygon = polygon_voronoi.intersection(polygon) if clipped_polygon.is_empty: new_points[point_idx] = seed_points[point_idx] centroids[point_idx] = seed_points[point_idx] continue try: if is_uniform_density: # For uniform density, use geometric centroid and area centroids[point_idx] = clipped_polygon.centroid.coords[0] cell_masses[point_idx] = clipped_polygon.area else: # Calculate weighted centroid for non-uniform density # Calculate centroid and collect cell mass samples = compute_adaptive_samples(clipped_polygon, total_area) X, Y, density_vals, dx, dy = create_density_grid( clipped_polygon.bounds, samples, density_fn ) points = np.column_stack((X.flatten(), Y.flatten())) prepared_cell = prepared.prep(clipped_polygon) mask = np.array([prepared_cell.contains(Point(p)) for p in points], dtype=float) mask = mask.reshape(X.shape) masked_density = density_vals * mask cell_masses[point_idx] = np.sum(masked_density) * dx * dy centroid = compute_weighted_centroid(X, Y, density_vals, mask, dx, dy) centroids[point_idx] = centroid except Exception as e: warnings.warn(f"Error in cell calculations: {str(e)}. Using geometric centroid.") centroids[point_idx] = clipped_polygon.centroid.coords[0] cell_masses[point_idx] = clipped_polygon.area # Apply movement with decay if prepared_polygon.contains(Point(centroids[point_idx])): if use_decay: decay = decay_values[iteration] new_points[point_idx] = seed_points[point_idx] + \ (centroids[point_idx] - seed_points[point_idx]) * decay else: new_points[point_idx] = centroids[point_idx] else: new_points[point_idx] = seed_points[point_idx] # Compute error measure error = compute_error_measure( polygon, seed_points, centroids, cell_masses, density_fn, is_uniform_density ) metrics['error_values'].append(error) # Update history point_buffer.append(new_points, iteration + 1, error) if error < metrics['min_error']: metrics['min_error'] = error metrics['min_error_iteration'] = iteration # Check convergence if error < tol: metrics['convergence_status'] = 'Converged to tolerance' metrics['iterations'] = iteration + 1 metrics['time_taken'] = time.time() - start_time print(f"Converged after {iteration+1} iterations with error: {error:.5e}") print(f"Total time: {metrics['time_taken']:.5f} seconds") return point_buffer.get_optimal_config(), metrics # Check for divergence elif (error > grad_increase_tol * metrics['min_error'] and iteration > metrics['min_error_iteration'] + 5): metrics['convergence_status'] = 'Error increased significantly' metrics['iterations'] = iteration + 1 metrics['time_taken'] = time.time() - start_time print(f"Minimum error {metrics['min_error']:.5e} found at iteration {metrics['min_error_iteration']+1}") print(f"Total time: {metrics['time_taken']:.5f} seconds") return point_buffer.get_optimal_config(), metrics seed_points = new_points # Handle maximum iterations case metrics['convergence_status'] = 'Maximum iterations reached' metrics['iterations'] = max_iterations metrics['time_taken'] = time.time() - start_time print(f"Reached maximum iterations ({max_iterations}) with norm: {error:.5e}") print(f"Total time: {metrics['time_taken']:.5f} seconds") return point_buffer.get_optimal_config(), metrics except Exception as e: metrics['convergence_status'] = f'Error: {str(e)}' metrics['time_taken'] = time.time() - start_time raise