Source code for pymanopt.optimizers.particle_swarm

import time

import numpy as np

from pymanopt import tools
from pymanopt.optimizers.optimizer import Optimizer, OptimizerResult
from pymanopt.tools import printer


[docs]class ParticleSwarm(Optimizer): """Particle swarm optimization (PSO) method. Perform optimization using the derivative-free particle swarm optimization algorithm. Args: max_cost_evaluations: Maximum number of allowed cost evaluations. max_iterations: Maximum number of allowed iterations. population_size: Size of the considered swarm population. nostalgia: Quantifies performance relative to past performances. social: Quantifies performance relative to neighbors. """ def __init__( self, max_cost_evaluations=None, max_iterations=None, population_size=None, nostalgia=1.4, social=1.4, *args, **kwargs, ): super().__init__(*args, **kwargs) self._max_cost_evaluations = max_cost_evaluations self._max_iterations = max_iterations self._population_size = population_size self._nostalgia = nostalgia self._social = social
[docs] def run(self, problem, *, initial_point=None) -> OptimizerResult: """Run PSO algorithm. Args: problem: Pymanopt problem class instance exposing the cost function and the manifold to optimize over. initial_point: Initial point on the manifold. If no value is provided then a starting point will be randomly generated. Returns: Local minimum of the cost function, or the most recent iterate if algorithm terminated before convergence. """ manifold = problem.manifold objective = problem.cost # Choose proper default algorithm parameters. We need to know about the # dimension of the manifold to limit the parameter range, so we have to # defer proper initialization until this point. dim = manifold.dim if self._max_cost_evaluations is None: self._max_cost_evaluations = max(5000, 2 * dim) if self._max_iterations is None: self._max_iterations = max(500, 4 * dim) if self._population_size is None: self._population_size = min(40, 10 * dim) # If no initial population x is given by the user, generate one at # random. if initial_point is None: x = [ manifold.random_point() for i in range(int(self._population_size)) ] elif tools.is_sequence(initial_point): if len(initial_point) != self._population_size: print( "The population size was forced to the size of " "the given initial population" ) self._population_size = len(initial_point) x = initial_point else: raise ValueError("The initial population must be iterable") # Initialize personal best positions to the initial population. y = list(x) # Save a copy of the swarm at the previous iteration. xprev = list(x) # Initialize velocities for each particle. v = [manifold.random_tangent_vector(xi) for xi in x] # Compute cost for each particle xi. costs = np.array([objective(xi) for xi in x]) fy = list(costs) cost_evaluations = self._population_size # Identify the best particle and store its cost/position. imin = costs.argmin() fbest = costs[imin] xbest = x[imin] if self._verbosity >= 2: iteration_format_length = int(np.log10(self._max_iterations)) + 1 column_printer = printer.ColumnPrinter( columns=[ ("Iteration", f"{iteration_format_length}d"), ("Cost evaluations", "7d"), ("Best cost", "+.8e"), ] ) else: column_printer = printer.VoidPrinter() column_printer.print_header() self._initialize_log() # Iteration counter (at any point, iteration is the number of fully # executed iterations so far). iteration = 0 start_time = time.time() while True: iteration += 1 column_printer.print_row([iteration, cost_evaluations, fbest]) # FIXME(nkoep): This only makes sense once we provide a custom # callback mechanism that actually checks 'xi'. # Right now this loop is pointless since our default # stopping criteria do not involve 'xi'. # Stop if any particle triggers a stopping criterion. for i, xi in enumerate(x): # noqa stopping_criterion = self._check_stopping_criterion( start_time=start_time, iteration=iteration, cost_evaluations=cost_evaluations, ) if stopping_criterion is not None: break if stopping_criterion: if self._verbosity >= 1: print(stopping_criterion) print("") break # Compute the inertia factor which we linearly decrease from 0.9 to # 0.4 from iteration = 0 to iteration = max_iterations. w = 0.4 + 0.5 * (1 - iteration / self._max_iterations) # Compute the velocities. for i, xi in enumerate(x): # Get the position and past best position of particle i. yi = y[i] # Get the previous position and velocity of particle i. xiprev = xprev[i] vi = v[i] # Compute the new velocity of particle i, composed of three # contributions. inertia = w * manifold.transport(xiprev, xi, vi) nostalgia = ( np.random.uniform() * self._nostalgia * manifold.log(xi, yi) ) social = ( np.random.uniform() * self._social * manifold.log(xi, xbest) ) v[i] = inertia + nostalgia + social # Backup the current swarm positions. xprev = list(x) # Update positions, personal bests and global best. for i, xi in enumerate(x): # Compute new position of particle i. x[i] = manifold.retraction(xi, v[i]) # Compute new cost of particle i. fxi = objective(xi) # Update costs of the swarm. costs[i] = fxi # Update self-best if necessary. if fxi < fy[i]: fy[i] = fxi y[i] = xi # Update global best if necessary. if fy[i] < fbest: fbest = fy[i] xbest = xi cost_evaluations += self._population_size return self._return_result( start_time=start_time, point=xbest, cost=fbest, iterations=iteration, stopping_criterion=stopping_criterion, cost_evaluations=cost_evaluations, )