import numpy as np
from .._common import lhs, messages, optimizer, selection_sync
from .._helpers import OptimizeResult, register
__all__ = [
"minimize",
]
def minimize(
fun,
bounds,
x0=None,
args=(),
maxiter=100,
popsize=10,
nrperc=0.5,
seed=None,
xtol=1.0e-8,
ftol=1.0e-8,
workers=1,
backend=None,
return_all=False,
verbosity=1.0,
callback=True,
):
"""
Minimize an objective function using Neighborhood Algorithm (NA).
Parameters
----------
fun : callable
The objective function to be minimized. Must be in the form ``f(x, *args)``, where ``x`` is the argument in the form of a 1-D array and args is a tuple of any additional fixed parameters needed to completely specify the function.
bounds : array_like
Bounds for variables. ``(min, max)`` pairs for each element in ``x``, defining the finite lower and upper bounds for the optimizing argument of ``fun``. It is required to have ``len(bounds) == len(x)``. ``len(bounds)`` is used to determine the number of parameters in ``x``.
x0 : array_like or None, optional, default None
Initial population. Array of real elements with shape (``popsize``, ``ndim``), where ``ndim`` is the number of independent variables. If ``x0`` is not specified, the population is initialized using Latin Hypercube sampling.
args : tuple, optional, default None
Extra arguments passed to the objective function.
maxiter : int, optional, default 100
The maximum number of generations over which the entire population is evolved.
popsize : int, optional, default 10
Total population size.
nrperc : scalar, optional, default 0.5
Number of resamplings (as a fraction of total population size).
seed : int or None, optional, default None
Seed for random number generator.
xtol : scalar, optional, default 1.0e-8
Solution tolerance for termination.
ftol : scalar, optional, default 1.0e-8
Objective function value tolerance for termination.
workers : int, optional, default 1
The population is subdivided into workers sections and evaluated in parallel (uses :class:`joblib.Parallel`). Supply -1 to use all available CPU cores.
backend : str {'loky', 'threading', 'mpi'}, optional, default 'threading'
Parallel backend to use when ``workers`` is not ``0`` or ``1``:
- 'loky': disable threading
- 'threading': enable threading
- 'mpi': use MPI (uses :mod:`mpi4py`)
return_all : bool, optional, default False
Set to True to return an array with shape (``nit``, ``verbosity`` * ``popsize``, ``ndim``) of all the solutions at each iteration.
verbosity : float, optional, default 1.0
Fraction of population to consider in `return_all`. If 0.0, returns the best solution at each iteration.
callback : callable or None, optional, default None
Called after each iteration. It is a callable with the signature ``callback(X, OptimizeResult state)``, where ``X`` is the current population and ``state`` is a partial :class:`stochopy.optimize.OptimizeResult` object with the same fields as the ones from the return (except ``"success"``, ``"status"`` and ``"message"``).
Returns
-------
:class:`stochopy.optimize.OptimizeResult`
The optimization result represented as a :class:`stochopy.optimize.OptimizeResult`. Important attributes are:
- ``x``: the solution array
- ``fun``: the solution function value
- ``success``: a Boolean flag indicating if the optimizer exited successfully
- ``message``: a string which describes the cause of the termination
References
----------
.. [1] M. Sambridge, *Geophysical inversion with a neighbourhood algorithm - I. Searching a parameter space*, Geophysical Journal International, 1999, 138(2): 479–494
"""
# Cost function
if not hasattr(fun, "__call__"):
raise TypeError()
# Dimensionality and search space
if np.ndim(bounds) != 2:
raise ValueError()
# Initial guess x0
if x0 is not None:
if np.ndim(x0) != 2 or np.shape(x0)[1] != len(bounds):
raise ValueError()
# Population size
if popsize < 2:
raise ValueError()
if x0 is not None and len(x0) != popsize:
raise ValueError()
# NA parameters
if not 0.0 < nrperc <= 1.0:
raise ValueError()
# Seed
if seed is not None:
np.random.seed(seed)
# Callback
if callback is not None and not hasattr(callback, "__call__"):
raise ValueError()
# Run in serial or parallel
optargs = (
bounds,
x0,
maxiter,
popsize,
nrperc,
xtol,
ftol,
return_all,
verbosity,
callback,
)
res = na(fun, args, True, workers, backend, *optargs)
return res
[docs]@optimizer
def na(
funnorm,
args,
sync,
workers,
backend,
bounds,
x0,
maxiter,
popsize,
nrperc,
xtol,
ftol,
return_all,
verbosity,
callback,
):
"""Optimize with Neighborhood Algorithm."""
ndim = len(bounds)
lower, upper = np.transpose(bounds)
# Normalize and unnormalize
span = upper - lower
span_mask = span > 0.0
span[~span_mask] = 1.0 # Avoid zero division in normalize
normalize = lambda x: np.where(span_mask, (x - lower) / span, upper)
unnormalize = lambda x: np.where(span_mask, x * span + lower, upper)
fun = lambda x: funnorm(unnormalize(x))
# Number of resampling
nr = max(1, int(nrperc * popsize))
# Initial population
X = x0 if x0 is not None else lhs(popsize, ndim, bounds)
X = normalize(X)
pbest = X.copy()
# Evaluate initial population
pfit = fun(X)
pbestfit = pfit.copy()
# Initial best solution
gbidx = np.argmin(pbestfit)
gfit = pbestfit[gbidx]
gbest = X[gbidx].copy()
# Store all models sampled
Xall = X.copy()
Xallfit = pfit.copy()
# Initialize arrays
if return_all:
nout = int(np.ceil(verbosity * popsize))
if nout > 0:
xall = np.empty((maxiter, nout, ndim))
funall = np.empty((maxiter, nout))
xall[0] = X[:nout].copy()
funall[0] = pfit[:nout].copy()
else:
xall = np.empty((maxiter, 1, ndim))
funall = np.empty((maxiter, 1))
xall[0] = gbest
funall[0] = gfit
# First iteration for callback
if callback is not None:
res = OptimizeResult(x=unnormalize(gbest), fun=gfit, nfev=popsize, nit=1)
if return_all:
res.update({"xall": xall[:1], "funall": funall[:1]})
callback(unnormalize(X), res)
# Iterate until one of the termination criterion is satisfied
it = 1
converged = False
while not converged:
it += 1
# Mutation
X = mutation(Xall, Xallfit, popsize, ndim, nr, span_mask)
# Selection
gbest, gfit, pfit, status = selection_sync(
it, X, gbest, pbest, pbestfit, maxiter, xtol, ftol, fun
)
Xall = np.vstack((X, Xall))
Xallfit = np.concatenate((pfit, Xallfit))
if return_all:
if nout > 0:
xall[it - 1] = unnormalize(X[:nout])
funall[it - 1] = pfit[:nout].copy()
else:
idx = pfit.argmin()
xall[it - 1] = unnormalize(X[idx])
funall[it - 1] = pfit[idx].copy()
converged = status is not None
if callback is not None:
res = OptimizeResult(
x=unnormalize(gbest),
fun=gfit,
nfev=it * popsize,
nit=it,
)
if return_all:
res.update({"xall": xall[:it], "funall": funall[:it]})
callback(unnormalize(X), res)
res = OptimizeResult(
x=unnormalize(gbest),
success=status >= 0,
status=status,
message=messages[status],
fun=gfit,
nfev=it * popsize,
nit=it,
)
if return_all:
res.update({"xall": xall[:it], "funall": funall[:it]})
return res
def mutation(Xall, Xallfit, popsize, ndim, nr, span_mask):
"""
Update population.
Note
----
Code adapted from <https://github.com/keithfma/neighborhood/blob/master/neighborhood/search.py>
"""
X = np.empty((popsize, ndim))
ix = Xallfit.argsort()[:nr]
for i in range(popsize):
k = ix[i % nr]
X[i] = Xall[k].copy()
U = np.delete(Xall, k, axis=0)
d1 = 0.0
d2 = ((U[:, 1:] - X[i, 1:]) ** 2).sum(axis=1)
for j in range(ndim):
if not span_mask[j]:
# Value does not matter as it will be fixed by unnormalize
X[i, j] = 0.0
continue
lim = 0.5 * (Xall[k, j] + U[:, j] + (d1 - d2) / (Xall[k, j] - U[:, j]))
idx = lim <= X[i, j]
low = max(lim[idx].max(), 0.0) if idx.sum() else 0.0
idx = lim >= X[i, j]
high = min(lim[idx].min(), 1.0) if idx.sum() else 1.0
X[i, j] = np.random.uniform(low, high)
if j < ndim - 1:
d1 += (Xall[k, j] - X[i, j]) ** 2 - (Xall[k, j + 1] - X[i, j + 1]) ** 2
d2 += (U[:, j] - X[i, j]) ** 2 - (U[:, j + 1] - X[i, j + 1]) ** 2
return X
register("na", minimize)