Source code for sequana.mixture

# source
# http://nbviewer.ipython.org/github/tritemio/notebooks/blob/master/Mixture_Model_Fitting.ipynb

import warnings

from easydev import AttrDict, DevTools

devtools = DevTools()

from sequana.lazy import numpy as np
from sequana.lazy import pylab

from . import criteria

half_log_two_pi = 0.5 * np.log(2 * np.pi)


__all__ = ["Fitting", "GaussianMixtureModel", "GaussianMixtureFitting", "EM"]


[docs]class GaussianMixtureModel(object): """Gaussian Mixture Model .. plot:: from sequana import mixture from pylab import plot, linspace m = mixture.GaussianMixtureModel(k=2) X = linspace(0,10,100) plot(X, [m.pdf(x, params=[1, 0.5, 0.2, 4, 0.5, 0.8]) for x in X]) """ def __init__(self, k=2): self.k = k
[docs] def pdf(self, x, params, normalise=True): """Expected parameters are params is a list of gaussian distribution ordered as mu, sigma, pi, mu2, sigma2, pi2, ... """ assert divmod(len(params), 3)[1] == 0 assert len(params) >= 3 * self.k k = len(params) / 3 self.k = k pis = np.array(params[2::3]) if any(np.array(pis) < 0): return 0 if normalise is True: pis /= pis.sum() # !!! sum pi must equal 1 otherwise may diverge badly import scipy.stats as ss data = 0 for i in range(0, int(k)): mu, sigma, pi_ = params[i * 3 : (i + 1) * 3] pi_ = pis[i] if sigma != 0: data += pi_ * ss.norm.pdf(x, mu, sigma) return data
[docs] def log_likelihood(self, params, sample): with warnings.catch_warnings(): warnings.simplefilter("ignore") res = -1 * pylab.log(self.pdf(sample, params)).sum() return res
[docs]class Fitting(object): """Base class for :class:`EM` and :class:`GaussianMixtureFitting`""" def __init__(self, data, k=2, method="Nelder-Mead"): """.. rubric:: constructor :param list data: :param int k: number of GMM to use :param str method: minimization method to be used (one of scipy optimise module) """ self.data = np.array(data) self.size = float(len(self.data)) self._k = k self._model = None # initialise the model self.k = k self.verbose = True def _get_k(self): return self._k def _set_k(self, k): assert k > 0 gmm = GaussianMixtureModel(k=k) self._k = k self._model = gmm k = property(_get_k, _set_k) def _get_model(self): return self._model model = property(_get_model)
[docs] def get_guess(self): """Random guess to initialise optimisation""" params = {} m = self.data.min() M = self.data.max() range_ = M - m mus = [m + range_ / (self.k + 1.0) * i for i in range(1, self.k + 1)] params["mus"] = mus sigma = range_ / float(self.k + 1) / 2.0 params["sigmas"] = [sigma] * self.k params["pis"] = [1.0 / self.k] * self.k params = [[mu, sigma, pi] for mu, sigma, pi in zip(params["mus"], params["sigmas"], params["pis"])] params = list(pylab.flatten(params)) return params
[docs] def plot( self, normed=True, N=1000, Xmin=None, Xmax=None, bins=50, color="red", lw=2, hist_kw={"color": "#5F9EA0", "edgecolor": "k"}, ax=None, ): if ax: ax.hist(self.data, normed=normed, bins=bins, **hist_kw) else: pylab.hist(self.data, density=normed, bins=bins, **hist_kw) if Xmin is None: Xmin = self.data.min() if Xmax is None: Xmax = self.data.max() X = pylab.linspace(Xmin, Xmax, N) if ax: ax.plot(X, [self.model.pdf(x, self.results.x) for x in X], color=color, lw=lw) else: pylab.plot(X, [self.model.pdf(x, self.results.x) for x in X], color=color, lw=lw) K = len(self.results.x) # The PIs must be normalised import scipy.stats as ss for i in range(self.k): mu, sigma, pi_ = ( self.results.mus[i], self.results.sigmas[i], self.results.pis[i], ) if ax: ax.plot( X, [pi_ * ss.norm.pdf(x, mu, sigma) for x in X], "k--", alpha=0.7, lw=2, ) else: pylab.plot( X, [pi_ * ss.norm.pdf(x, mu, sigma) for x in X], "k--", alpha=0.7, lw=2, )
[docs]class GaussianMixtureFitting(Fitting): """GaussianMixtureFitting using scipy minization .. plot:: :width: 80% :include-source: from sequana import mixture from pylab import normal data = [normal(0,1) for x in range(700)] + [normal(3,1) for x in range(300)] mf = mixture.GaussianMixtureFitting(data) mf.estimate(k=2) mf.plot() """ def __init__(self, data, k=2, method="Nelder-Mead"): """ Here we use the function minimize() from scipy.optimization. The list of (currently) available minimization methods is 'Nelder-Mead' (simplex), 'Powell', 'CG', 'BFGS', 'Newton-CG',> 'Anneal', 'L-BFGS-B' (like BFGS but bounded), 'TNC', 'COBYLA', 'SLSQPG'. """ super(GaussianMixtureFitting, self).__init__(data, k=k, method=method) self._method = method def _get_method(self): return self._method def _set_method(self, method): devtools.check_param_in_list( method, ["Nelder-Mead", "Powell", "CG", "BFGS", "Newton-CG", "Anneal", "L-BFGS-B"], ) self._method = method method = property(_get_method, _set_method)
[docs] def estimate(self, guess=None, k=None, maxfev=2e4, maxiter=1e3, bounds=None): """guess is a list of parameters as expected by the model guess = {'mus':[1,2], 'sigmas': [0.5, 0.5], 'pis': [0.3, 0.7] } """ if k is not None: self.k = k if guess is None: # estimate the mu/sigma/pis from the data guess = self.get_guess() from scipy.optimize import minimize res = minimize( self.model.log_likelihood, x0=guess, args=(self.data,), method=self.method, options=dict(maxiter=maxiter, maxfev=maxfev), bounds=bounds, ) self.results = res pis = np.array(self.results.x[2::3]) self.results.pis_raw = pis.copy() # The ratio may be negative, in which case we need to normalise. # An example would be to have -0.35, -0.15, which normalise would five 0.7, 0.3 as expected. """if sum(pis<0) > 0: unstable = True pis /= pis.sum() if self.verbose: print("Unstable... found negative pis (k=%s)" % self.k) else: unstable = False pis /= pis.sum() """ unstable = False k = len(self.results.x) / 3 params = [] for i in range(0, int(k)): params.append(self.results.x[i * 3]) params.append(self.results.x[(i * 3 + 1)]) params.append(pis[i]) self.results.x = params # FIXME shall we multiply by -1 ?? self.results.log_likelihood = self.model.log_likelihood(params, self.data) if self.results.log_likelihood and unstable is False: self.results.AIC = criteria.AIC(self.results.log_likelihood, self.k, logL=True) self.results.AICc = criteria.AICc(self.results.log_likelihood, self.k, self.data.size, logL=True) self.results.BIC = criteria.BIC(self.results.log_likelihood, self.k, self.data.size, logL=True) else: self.results.AIC = 1000 self.results.AICc = 1000 self.results.BIC = 1000 pis = np.array(self.results.x[2::3]) self.results.pis = list(pis / pis.sum()) self.results.sigmas = self.results.x[1::3] self.results.mus = self.results.x[0::3] return res
[docs]class EM(Fitting): """Expectation minimization class to estimate parameters of GMM .. plot:: :width: 50% :include-source: from sequana import mixture from pylab import normal data = [normal(0,1) for x in range(7000)] + [normal(3,1) for x in range(3000)] em = mixture.EM(data) em.estimate(k=2) em.plot() """ def __init__(self, data, model=None, max_iter=100): """.. rubric:: constructor :param data: :param model: not used. Model is the :class:`GaussianMixtureModel` but could be other model. :param int max_iter: max iteration for the minization """ super(EM, self).__init__(data, k=2) # default is k=2 self.max_iter = max_iter # @do_profile()
[docs] def estimate(self, guess=None, k=2): """ :param list guess: a list to provide the initial guess. Order is mu1, sigma1, pi1, mu2, ... :param int k: number of models to be used. """ # print("EM estimation") self.k = k # Initial guess of parameters and initializations if guess is None: # estimate the mu/sigma/pis from the data guess = self.get_guess() mu = np.array(guess[0::3]) sig = np.array(guess[1::3]) pi_ = np.array(guess[2::3]) N_ = len(pi_) gamma = np.zeros((N_, int(self.size))) N_ = np.zeros(N_) p_new = guess # EM loop counter = 0 converged = False self.mus = [] import scipy.stats as ss while not converged: # Compute the responsibility func. and new parameters for k in range(0, self.k): # unstable if eslf.model.pdf is made of zeros # self.model.pdf(self.data, p_new,normalise=False).sum()!=0: gamma[k, :] = pi_[k] * ss.norm.pdf(self.data, mu[k], sig[k]) with warnings.catch_warnings(): warnings.simplefilter("ignore") gamma[k, :] /= self.model.pdf(self.data, p_new, normalise=False) """else: gamma[k, :] = pi_[k]*pylab.normpdf(self.data, mu[k], sig[k])/(self.model.pdf(self.data, p_new, normalise=False)+1e-6) """ N_[k] = gamma[k].sum() mu[k] = np.sum(gamma[k] * self.data) / N_[k] sig[k] = pylab.sqrt(np.sum(gamma[k] * (self.data - mu[k]) ** 2) / N_[k]) pi_[k] = N_[k] / self.size self.results = {"x": p_new, "nfev": counter, "success": converged} p_new = [] for this in range(self.k): p_new.extend([mu[this], sig[this], pi_[this]]) # p_new = [(mu[x], sig[x], pi_[x]) for x in range(0, self.k)] # p_new = list(pylab.flatten(p_new)) self.status = True try: assert abs(N_.sum() - self.size) / self.size < 1e-6 assert abs(pi_.sum() - 1) < 1e-6 except: print("issue arised at iteration %s" % counter) self.debug = {"N": N_, "pis": pi_} self.status = False break self.mus.append(mu) # Convergence check counter += 1 converged = counter >= self.max_iter self.gamma = gamma if self.status is True: self.results = {"x": p_new, "nfev": counter, "success": converged} self.results = AttrDict(**self.results) self.results.mus = self.results.x[0::3] self.results.sigmas = self.results.x[1::3] self.results.pis = self.results.x[2::3] log_likelihood = self.model.log_likelihood(self.results.x, self.data) self.results.AIC = criteria.AIC(log_likelihood, k, logL=True) self.results.log_likelihood = log_likelihood self.results.AIC = criteria.AIC(log_likelihood, self.k, logL=True) self.results.AICc = criteria.AICc(log_likelihood, self.k, self.data.size, logL=True) self.results.BIC = criteria.BIC(log_likelihood, self.k, self.data.size, logL=True)
[docs] def plot(self, model_parameters=None, **kwargs): """Take a list of dictionnaries with models parameters to plot predicted models. If user doesn't provide parameters, the standard plot function from fitting is used. Example: model_parameters=[{"mu": 5, "sigma": 0.5, "pi": 1}] """ if not model_parameters: return super(EM, self).plot(**kwargs) # Set parameters with the dictionnary self.k = len(model_parameters) self.results = AttrDict() self.results.mus = [model["mu"] for model in model_parameters] self.results.sigmas = [model["sigma"] for model in model_parameters] self.results.pis = [model["pi"] for model in model_parameters] parms_keys = ("mu", "sigma", "pi") self.results.x = [model[key] for model in model_parameters for key in parms_keys] return super(EM, self).plot(**kwargs)