Source code for featuremapper.distribution

"""
Distribution class
"""
# To do:
#
# - wrap bins for cyclic histograms
# - check use of float() in count_mag() etc
# - clarify comment about negative selectivity
#
# - function to return value in a range (like a real histogram)
# - cache values
# - assumes cyclic axes start at 0: include a shift based on range
#
# - is there a way to make this work for arrays without mentioning
#   "array" anywhere in here?
# - should this be two classes: one for the core (which would be
#   small though) and another for statistics?

import numpy as np

import param

unavailable_scipy_optimize = False
try:
    from scipy import optimize
except ImportError:
    param.Parameterized().debug("scipy.optimize not available, dummy von Mises fit")
    unavailable_scipy_optimize = True

[docs]def arg(z): """ Return the complex argument (phase) of z. (z in radians.) """ z = z + complex(0,0) # so that arg(z) also works for real z return np.arctan2(z.imag, z.real)
[docs]def wrap(lower, upper, x): """ Circularly alias the numeric value x into the range [lower,upper). Valid for cyclic quantities like orientations or hues. """ #I have no idea how I came up with this algorithm; it should be simplified. # # Note that Python's % operator works on floats and arrays; # usually one can simply use that instead. E.g. to wrap array or # scalar x into 0,2*pi, just use "x % (2*pi)". range_ = upper-lower return lower + np.fmod(x-lower + 2*range_*(1-np.floor(x/(2*range_))), range_)
[docs]class Distribution(object): """ Holds a distribution of the values f(x) associated with a variable x. A Distribution is a histogram-like object that is a dictionary of samples. Each sample is an x:f(x) pair, where x is called the bin and f(x) is called the value(). Each bin's value is typically maintained as the sum of all the values that have been placed into it. The bin axis is continuous, and can represent a continuous quantity without discretization. Alternatively, this class can be used as a traditional histogram by either discretizing the bin number before adding each sample, or by binning the values in the final Distribution. Distributions are bounded by the specified axis_bounds, and can either be cyclic (like directions or hues) or non-cyclic. For cyclic distributions, samples provided outside the axis_bounds will be wrapped back into the bound range, as is appropriate for quantities like directions. For non-cyclic distributions, providing samples outside the axis_bounds will result in a ValueError. In addition to the values, can also return the counts, i.e., the number of times that a sample has been added with the given bin. Not all instances of this class will be a true distribution in the mathematical sense; e.g. the values will have to be normalized before they can be considered a probability distribution. If keep_peak=True, the value stored in each bin will be the maximum of all values ever added, instead of the sum. The distribution will thus be a record of the maximum value seen at each bin, also known as an envelope. """ # Holds the number of times that undefined values have been # returned from calculations for any instance of this class, # e.g. calls to vector_direction() or vector_selectivity() when no # value is non-zero. Useful for warning users when the values are # not meaningful. undefined_vals = 0 def __init__(self, axis_bounds=(0.0, 2*np.pi), cyclic=False, keep_peak=False): self._data = {} self._counts = {} # total_count and total_value hold the total number and sum # (respectively) of values that have ever been provided for # each bin. For a simple distribution these will be the same as # sum_counts() and sum_values(). self.total_count = 0 self.total_value = 0.0 self.axis_bounds = axis_bounds self.axis_range = axis_bounds[1] - axis_bounds[0] self.cyclic = cyclic self.keep_peak = keep_peak ### JABHACKALERT! The semantics of this operation are incorrect, because ### an expression like x+y should not modify x, while this does. It could ### be renamed __iadd__, to implement += (which has the correct semantics), ### but it's not yet clear how to do that. def __add__(self, a): """ Allows add() method to be used via the '+' operator; i.e., Distribution + dictionary does Distribution.add(dictionary). """ self.add(a) return None
[docs] def get_value(self, bin): """ Return the value of the specified bin. (Return None if there is no such bin.) """ return self._data.get(bin)
[docs] def get_count(self, bin): """ Return the count from the specified bin. (Return None if there is no such bin.) """ return self._counts.get(bin)
[docs] def values(self): """ Return a list of values. Various statistics can then be calculated if desired: sum(vals) (total of all values) max(vals) (highest value in any bin) Note that the bin-order of values returned does not necessarily match that returned by counts(). """ return self._data.values()
[docs] def counts(self): """ Return a list of values. Various statistics can then be calculated if desired: sum(counts) (total of all counts) max(counts) (highest count in any bin) Note that the bin-order of values returned does not necessarily match that returned by values(). """ return self._counts.values()
[docs] def bins(self): """ Return a list of bins that have been populated. """ return self._data.keys()
[docs] def add(self, new_data): """ Add a set of new data in the form of a dictionary of (bin, value) pairs. If the bin already exists, the value is added to the current value. If the bin doesn't exist, one is created with that value. Bin numbers outside axis_bounds are allowed for cyclic=True, but otherwise a ValueError is raised. If keep_peak=True, the value of the bin is the maximum of the current value and the supplied value. That is, the bin stores the peak value seen so far. Note that each call will increase the total_value and total_count (and thus decrease the value_mag() and count_mag()) even if the value doesn't happen to be the maximum seen so far, since each data point still helps improve the sampling and thus the confidence. """ for bin in new_data.keys(): if self.cyclic==False: if not (self.axis_bounds[0] <= bin <= self.axis_bounds[1]): raise ValueError("Bin outside bounds.") # CEBALERT: Neet to support wrapping of bin values # else: new_bin = wrap(self.axis_bounds[0], self.axis_bounds[1], bin) new_bin = bin if new_bin not in self._data: self._data[new_bin] = 0.0 self._counts[new_bin] = 0 new_value = new_data[bin] self.total_value += new_value self._counts[new_bin] += 1 self.total_count += 1 if self.keep_peak == True: if new_value > self._data[new_bin]: self._data[new_bin] = new_value else: self._data[new_bin] += new_value
[docs] def sub_distr( self, distr ): """ Subtract the given distribution from the current one. Only existing bins are modified, new bins in the given distribution are discarded without raising errors. Note that total_value and total_count are not affected, and keep_peak is ignored, therefore analysis relying on these values should not call this method. """ for b in distr.bins(): if b in self.bins(): v = distr._data.get(b) if v is not None: self._data[b] -= v
[docs] def max_value_bin(self): """Return the bin with the largest value.""" return self._data.keys()[np.argmax(self._data.values())]
[docs] def weighted_sum(self): """Return the sum of each value times its bin.""" return np.inner(self._data.keys(), self._data.values())
[docs] def value_mag(self, bin): """Return the value of a single bin as a proportion of total_value.""" return self._safe_divide(self._data.get(bin), self.total_value)
[docs] def count_mag(self, bin): """Return the count of a single bin as a proportion of total_count.""" return self._safe_divide(float(self._counts.get(bin)), float(self.total_count)) # use of float()
def _bins_to_radians(self, bin): """ Convert a bin number to a direction in radians. Works for NumPy arrays of bin numbers, returning an array of directions. """ return (2*np.pi)*bin/self.axis_range def _radians_to_bins(self, direction): """ Convert a direction in radians into a bin number. Works for NumPy arrays of direction, returning an array of bin numbers. """ return direction*self.axis_range / (2*np.pi) def _safe_divide(self, numerator, denominator): """ Division routine that avoids division-by-zero errors (returning zero in such cases) but keeps track of them for undefined_values(). """ if denominator == 0: self.undefined_vals += 1 return 0 else: return numerator/denominator
[docs]class Pref(dict): """ This class simply collects named arguments into a dictionary the main purpose is to make pretty readable the output of DistributionStatisticFn functions. In addition, trap missing keys """ def __init__(self, **args): dict.__init__(self, **args) def __getitem__(self, key): try: return dict.__getitem__(self, key) except KeyError: return None
[docs]class DistributionStatisticFn(param.Parameterized): """ Base class for various functions performing statistics on a distribution. """ value_scale = param.NumericTuple((0.0, 1.0), doc=""" Scaling of the resulting value of the distribution statistics, typically the preference of a unit to feature values. The tuple specifies (offset, multiplier) of the output scaling""") # APNOTE: previously selectivity_scale[ 1 ] used to be 17, a value suitable # for combining preference and selectivity in HSV plots. Users wishing to keep # this value should now set it when creating SheetViews, in commands like that # in command/analysis.py selectivity_scale = param.NumericTuple((0.0, 1.0), doc=""" Scaling of the resulting measure of the distribution peakedness, typically the selectivity of a unit to its preferred feature value. The tuple specifies (offset, multiplier) of the output scaling""") __abstract = True def __call__(self, distribution): """ Apply the distribution statistic function; must be implemented by subclasses. Subclasses sould be called with a Distribution as argument, return will be a dictionary, with Pref objects as values """ raise NotImplementedError
[docs]class DescriptiveStatisticFn(DistributionStatisticFn): """ Abstract class for basic descriptive statistics """
[docs] def vector_sum(self, d): """ Return the vector sum of the distribution as a tuple (magnitude, avgbinnum). Each bin contributes a vector of length equal to its value, at a direction corresponding to the bin number. Specifically, the total bin number range is mapped into a direction range [0,2pi]. For a cyclic distribution, the avgbinnum will be a continuous measure analogous to the max_value_bin() of the distribution. But this quantity has more precision than max_value_bin() because it is computed from the entire distribution instead of just the peak bin. However, it is likely to be useful only for uniform or very dense sampling; with sparse, non-uniform sampling the estimates will be biased significantly by the particular samples chosen. The avgbinnum is not meaningful when the magnitude is 0, because a zero-length vector has no direction. To find out whether such cases occurred, you can compare the value of undefined_vals before and after a series of calls to this function. """ # vectors are represented in polar form as complex numbers h = d._data r = h.values() theta = d._bins_to_radians(np.array(h.keys())) v_sum = np.inner(r, np.exp(theta * 1j)) magnitude = abs(v_sum) direction = arg(v_sum) if v_sum == 0: d.undefined_vals += 1 direction_radians = d._radians_to_bins(direction) # wrap the direction because arctan2 returns principal values wrapped_direction = wrap(d.axis_bounds[0], d.axis_bounds[1], direction_radians) return (magnitude, wrapped_direction)
def _weighted_average(self, d ): """ Return the weighted_sum divided by the sum of the values """ return d._safe_divide(d.weighted_sum(), sum(d._data.values()))
[docs] def selectivity(self, d): """ Return a measure of the peakedness of the distribution. The calculation differs depending on whether this is a cyclic variable. For a cyclic variable, returns the magnitude of the vector_sum() divided by the sum_value() (see _vector_selectivity for more details). For a non-cyclic variable, returns the max_value_bin()) as a proportion of the sum_value() (see _relative_selectivity for more details). """ if d.cyclic == True: return self._vector_selectivity(d) else: return self._relative_selectivity(d) # CEBHACKALERT: the definition of selectivity for non-cyclic # quantities probably needs some more thought. # Additionally, this fails the test in testfeaturemap # (see the comment there).
def _relative_selectivity(self, d): """ Return max_value_bin()) as a proportion of the sum_value(). This quantity is a measure of how strongly the distribution is biased towards the max_value_bin(). For a smooth, single-lobed distribution with an inclusive, non-cyclic range, this quantity is an analog to vector_selectivity. To be a precise analog for arbitrary distributions, it would need to compute some measure of the selectivity that works like the weighted_average() instead of the max_value_bin(). The result is scaled such that if all bins are identical, the selectivity is 0.0, and if all bins but one are zero, the selectivity is 1.0. """ # A single bin is considered fully selective (but could also # arguably be considered fully unselective) if len(d._data) <= 1: return 1.0 proportion = d._safe_divide(max(d._data.values()), sum(d._data.values())) offset = 1.0/len(d._data) scaled = (proportion-offset) / (1.0-offset) # negative scaled is possible # e.g. 2 bins, with values that sum to less than 0.5 # this probably isn't what should be done in those cases if scaled >= 0.0: return scaled else: return 0.0 def _vector_selectivity(self, d): """ Return the magnitude of the vector_sum() divided by the sum_value(). This quantity is a vector-based measure of the peakedness of the distribution. If only a single bin has a non-zero value(), the selectivity will be 1.0, and if all bins have the same value() then the selectivity will be 0.0. Other distributions will result in intermediate values. For a distribution with a sum_value() of zero (i.e. all bins empty), the selectivity is undefined. Assuming that one will usually be looking for high selectivity, we return zero in such a case so that high selectivity will not mistakenly be claimed. To find out whether such cases occurred, you can compare the value of undefined_values() before and after a series of calls to this function. """ return d._safe_divide(self.vector_sum(d)[0], sum(d._data.values())) __abstract = True
[docs]class DescriptiveBimodalStatisticFn(DescriptiveStatisticFn): """ Abstract class for descriptive statistics of two-modes distributions """
[docs] def second_max_value_bin(self, d): """ Return the bin with the second largest value. If there is one bin only, return it. This is not a correct result, however it is practical for plotting compatibility, and it will not mistakenly be claimed as secondary maximum, by forcing its selectivity to 0.0 """ h = d._data if len(h) <= 1: return h.keys()[0] k = self.max_value_bin() v = h.pop(k) m = self.max_value_bin() h[k] = v return m
[docs] def second_selectivity(self, d): """ Return the selectivity of the second largest value in the distribution. If there is one bin only, the selectivity is 0, since there is no second peack at all, and this value is also used to discriminate the validity of second_max_value_bin() Selectivity is computed in two ways depending on whether the variable is a cyclic, as in selectivity() """ if len( d._data ) <= 1: return 0.0 if d.cyclic == True: return self._vector_second_selectivity(d) else: return self._relative_second_selectivity(d)
def _relative_second_selectivity(self, d): """ Return the value of the second maximum as a proportion of the sum_value() see _relative_selectivity() for further details """ h = d._data k = d.max_value_bin() v = h.pop(k) m = max(h.values()) h[k] = v proportion = d._safe_divide(m, sum(h.values())) offset = 1.0 / len(h) scaled = (proportion - offset) / (1.0 - offset) return max(scaled, 0.0) def _vector_second_selectivity(self, d): """ Return the magnitude of the vector_sum() of all bins excluding the maximum one, divided by the sum_value(). see _vector_selectivity() for further details """ h = d._data k = d.max_value_bin() v = h.pop(k) s = d.vector_sum()[0] h[k] = v return self._safe_divide(s, sum(h.values()))
[docs] def second_peak_bin(self, d): """ Return the bin with the second peak in the distribution. Unlike second_max_value_bin(), it does not return a bin which is the second largest value, if laying on a wing of the first peak, the second peak is returned only if the distribution is truly multimodal. If it isn't, return the first peak (for compatibility with numpy array type, and plotting compatibility), however the corresponding selectivity will be forced to 0.0 """ h = d._data l = len(h) if l <= 1: return h.keys()[0] ks = h.keys() ks.sort() ik0 = ks.index(h.keys()[np.argmax(h.values())]) k0 = ks[ik0] v0 = h[k0] v = v0 k = k0 ik = ik0 while h[k] <= v: ik += 1 if ik >= l: ik = 0 if ik == ik0: return k0 v = h[k] k = ks[ik] ik1 = ik v = v0 k = k0 ik = ik0 while h[k] <= v: ik -= 1 if ik < 0: ik = l - 1 if ik == ik0: return k0 v = h[k] k = ks[ik] ik2 = ik if ik1 == ik2: return ks[ik1] ik = ik1 m = 0 while ik != ik2: k = ks[ik] if h[k] > m: m = h[k] im = ik ik += 1 if ik >= l: ik = 0 return ks[im]
[docs] def second_peak_selectivity(self, d): """ Return the selectivity of the second peak in the distribution. If the distribution has only one peak, return 0.0, and this value is also usefl to discriminate the validity of second_peak_bin() """ h = d._data if len(h) <= 1: return 0.0 p1 = d.max_value_bin() p2 = self.second_peak_bin(d) if p1 == p2: return 0.0 m = h[p2] proportion = d._safe_divide(m, sum(h.values())) offset = 1.0 / len(h) scaled = (proportion - offset) / (1.0 - offset) return max(scaled, 0.0)
[docs] def second_peak(self, d): """ Return preference and selectivity of the second peak in the distribution. It is just the combination of second_peak_bin() and second_peak_selectivity(), with the advantage of avoiding a duplicate call of second_peak_bin(), if the user is interested in both preference and selectivity, as often is the case. """ h = d._data if len(h) <= 1: return (h.keys()[0], 0.0) p1 = d.max_value_bin() p2 = self.second_peak_bin(d) if p1 == p2: return (p1, 0.0) m = h[p2] proportion = d._safe_divide(m, sum(h.values())) offset = 1.0 / len(h) scaled = (proportion - offset) / (1.0 - offset) return (p2, max(scaled, 0.0))
__abstract = True
[docs]class DSF_MaxValue(DescriptiveStatisticFn): """ Return the peak value of the given distribution """ def __call__(self, d): p = self.value_scale[1] * (d.max_value_bin() + self.value_scale[0]) s = self.selectivity_scale[1] * (self.selectivity(d)+self.selectivity_scale[0]) return {"": Pref(preference=p, selectivity=s)}
[docs]class DSF_WeightedAverage(DescriptiveStatisticFn): """ Return the main mode of the given distribution The prefence value ia a continuous, interpolated equivalent of the max_value_bin(). For a cyclic distribution, this is the direction of the vector sum (see vector_sum()). For a non-cyclic distribution, this is the arithmetic average of the data on the bin_axis, where each bin is weighted by its value. Such a computation will generally produce much more precise maps using fewer test stimuli than the discrete method. However, weighted_average methods generally require uniform and full-range sampling, which is not always feasible. For measurements at evenly-spaced intervals over the full range of possible parameter values, weighted_averages are a good measure of the underlying continuous-valued parameter preference, assuming that neurons are tuned broadly enough (and/or sampled finely enough) that they respond to at least two of the tested parameter values. This method will not usually give good results when those criteria are not met, i.e. if the sampling is too sparse, not at evenly-spaced intervals, or does not cover the full range of possible values. In such cases max_value_bin should be used, and the number of test patterns will usually need to be increased instead. """ def __call__(self, d): p = self.vector_sum(d)[1] if d.cyclic else self._weighted_average(d) p = self.value_scale[1] * (p + self.value_scale[0]) s = self.selectivity_scale[1] * (self.selectivity(d) + self.selectivity_scale[0]) return {"": Pref(preference=p, selectivity=s)}
[docs]class DSF_TopTwoValues(DescriptiveBimodalStatisticFn): """ Return the two max values of distributions in the given matrix """ def __call__(self, d): r = {} p = self.value_scale[1] * (d.max_value_bin() + self.value_scale[0]) s = self.selectivity_scale[1] * (self.selectivity(d) + self.selectivity_scale[0]) r[""] = Pref(preference=p, selectivity=s) p = self.second_max_value_bin(d) s = self.second_selectivity(d) p = self.value_scale[1] * (p + self.value_scale[0]) s = self.selectivity_scale[1] * (s + self.selectivity_scale[0]) r["Mode2"] = Pref(preference=p, selectivity=s) return r
[docs]class DSF_BimodalPeaks(DescriptiveBimodalStatisticFn): """ Return the two peak values of distributions in the given matrix """ def __call__(self, d): r = {} p = self.value_scale[1] * (d.max_value_bin() + self.value_scale[0]) s = self.selectivity_scale[1] * (self.selectivity(d) + self.selectivity_scale[0]) r[""] = Pref(preference=p, selectivity=s) p, s = self.second_peak(d) p = self.value_scale[1] * (p + self.value_scale[0]) s = self.selectivity_scale[1] * (s + self.selectivity_scale[0]) r["Mode2"] = Pref(preference=p, selectivity=s) return r
[docs]class VonMisesStatisticFn(DistributionStatisticFn): """ Base class for von Mises statistics """ # values to fit the maximum value of k parameter in von Mises distribution, # as a function of the number of bins in the distribution. Useful for # keeping selectivity in range 0..1. Values derived offline from distribution # with a single active bin, and total bins from 8 to 32 vm_kappa_fit = (0.206, 0.614) # level of activity in units confoundable with noise. Used in von Mises fit, # for two purposes: if the standard deviation of a distribution is below this # value, the distribution is assumed to lack any mode; it is the maximum level # of random noise added to a distribution before the fit optimization, for # stability reasons noise_level = 0.001 # exit code of the distribution fit function. Codes are function-specific and # each fit function, if provide exit codes, should have corresponding string translation fit_exit_code = 0 user_warned_if_unavailable = False __abstract = True def _orth(self, t): """ Return the orthogonal orientation """ if t < 0.5 * np.pi: return t + 0.5 * np.pi return t - 0.5 * np.pi def _in_pi(self, t): """ Reduce orientation from -pi..2pi to 0..pi """ if t > np.pi: return t - np.pi if t < 0: return t + np.pi return t
[docs] def von_mises(self, pars, x): """ Compute a simplified von Mises function. Original formulation in Richard von Mises, "Wahrscheinlichkeitsrechnung und ihre Anwendungen in der Statistik und theoretischen Physik", 1931, Deuticke, Leipzig; see also Mardia, K.V. and Jupp, P.E., " Directional Statistics", 1999, J. Wiley, p.36; http://en.wikipedia.org/wiki/Von_Mises_distribution The two differences are that this function is a continuous probability distribution on a semi-circle, while von Mises is on the full circle, and that the normalization factor, which is the inverse of the modified Bessel function of first kind and 0 degree in the original, is here a fit parameter. """ a, k, t = pars return a * np.exp(k * (np.cos(2 * (x - t)) - 1))
[docs] def von2_mises(self, pars, x): """ Compute a simplified bimodal von Mises function Two superposed von Mises functions, with different peak and bandwith values """ p1 = pars[: 3] p2 = pars[3:] return self.von_mises(p1, x) + self.von_mises(p2, x)
def von_mises_res(self, pars, x, y): return y - self.von_mises(pars, x) def von2_mises_res(self, pars, x, y): return y - self.von2_mises(pars, x) def norm_sel(self, k, n): m = (self.vm_kappa_fit[0] + n * self.vm_kappa_fit[1])**2 return np.log(1 + k) / np.log(1 + m)
[docs] def fit_vm(self, distribution): """ computes the best fit of the monovariate von Mises function in the semi-circle. Return a tuple with the orientation preference, in the same range of axis_bounds, the orientation selectivity, and an estimate of the goodness-of-fit, as the variance of the predicted orientation preference. The selectivity is given by the bandwith parameter of the von Mises function, modified for compatibility with other selectivity computations in this class. The bandwith parameter is transposed in logaritmic scale, and is normalized by the maximum value for the number of bins in the distribution, in order to give roughly 1.0 for a distribution with one bin at 1.0 an all the other at 0.0, and 0.0 for uniform distributions. The normalizing factor of the selectivity is fit for the total number of bins, using fit parameters computed offline. There are conditions that prevents apriori the possibility to fit the distribution: * not enough bins, at least 4 are necessary * the distribution is too flat, below the noise level and conditions of aposteriori failures: * "ier" flag returned by leastsq out of ( 1, 2, 3, 4 ) * no estimated Jacobian around the solution * negative bandwith (the peak of the distribution is convex) Note that these are the minimal conditions, their fulfillment does not warrant unimodality, is up to the user to check the goodness-of-fit value for an accurate acceptance of the fit. """ if unavailable_scipy_optimize: if not VonMisesStatisticFn.user_warned_if_unavailable: param.Parameterized().warning("scipy.optimize not available, dummy von Mises fit") VonMisesStatisticFn.user_warned_if_unavailable=True self.fit_exit_code = 3 return 0, 0, 0 to_pi = np.pi / distribution.axis_range x = to_pi * np.array(distribution.bins()) n = len(x) if n < 5: param.Parameterized().warning("No von Mises fit possible with less than 4 bins") self.fit_exit_code = -1 return 0, 0, 0 y = np.array(distribution.values()) if y.std() < self.noise_level: self.fit_exit_code = 1 return 0, 0, 0 rn = self.noise_level * np.random.random_sample(y.shape) p0 = (1.0, 1.0, distribution.max_value_bin()) r = optimize.leastsq(self.von_mises_res, p0, args=(x, y + rn), full_output=True) if not r[-1] in ( 1, 2, 3, 4 ): self.fit_exit_code = 100 + r[-1] return 0, 0, 0 residuals = r[2]['fvec'] jacobian = r[1] bandwith = r[0][1] tuning = r[0][2] if bandwith < 0: self.fit_exit_code = 1 return 0, 0, 0 if jacobian is None: self.fit_exit_code = 2 return 0, 0, 0 error = (residuals**2).sum() / (n - len(p0)) covariance = jacobian * error g = covariance[2, 2] p = self._in_pi(tuning) / to_pi s = self.norm_sel(bandwith, n) self.fit_exit_code = 0 return p, s, g
def vm_fit_exit_codes(self): if self.fit_exit_code == 0: return "succesfull exit" if self.fit_exit_code == -1: return "not enough bins for this fit" if self.fit_exit_code == 1: return "flat distribution" if self.fit_exit_code == 2: return "flat distribution" if self.fit_exit_code == 3: return "missing scipy.optimize import" if self.fit_exit_code > 110: return "unknown exit code" if self.fit_exit_code > 100: return "error " + str(self.fit_exit_code - 100) + " in scipy.optimize.leastsq" return "unknown exit code"
[docs] def fit_v2m(self, distribution): """ computes the best fit of the bivariate von Mises function in the semi-circle. Return the tuple: ( orientation1_preference, orientation1_selectivity, goodness_of_fit1, orientation2_preference, orientation2_selectivity, goodness_of_fit2 ) See fit_vm() for considerations about selectivity and goodness_of_fit """ null = 0, 0, 0, 0, 0, 0 if unavailable_scipy_optimize: if not VonMisesStatisticFn.user_warned_if_unavailable: param.Parameterized().warning("scipy.optimize not available, dummy von Mises fit") VonMisesStatisticFn.user_warned_if_unavailable=True self.fit_exit_code = 3 return null to_pi = np.pi / distribution.axis_range x = to_pi * np.array(distribution.bins()) n = len(x) if n < 9: param.Parameterized().warning( "no bimodal von Mises fit possible with less than 8 bins" ) self.fit_exit_code = -1 return null y = np.array(distribution.values()) if y.std() < self.noise_level: self.fit_exit_code = 1 return null rn = self.noise_level * np.random.random_sample(y.shape) t0 = distribution.max_value_bin() p0 = (1.0, 1.0, t0, 1.0, 1.0, self._orth(t0)) r = optimize.leastsq(self.von2_mises_res, p0, args=(x, y + rn), full_output=True) if not r[-1] in ( 1, 2, 3, 4 ): self.fit_exit_code = 100 + r[-1] return null residuals = r[2]['fvec'] jacobian = r[1] bandwith_1 = r[0][1] tuning_1 = r[0][2] bandwith_2 = r[0][4] tuning_2 = r[0][5] if jacobian is None: self.fit_exit_code = 2 return null if bandwith_1 < 0: self.fit_exit_code = 1 return null if bandwith_2 < 0: self.fit_exit_code = 1 return null error = (residuals ** 2).sum() / (n - len(p0)) covariance = jacobian * error g1 = covariance[2, 2] g2 = covariance[5, 5] p1 = self._in_pi(tuning_1) / to_pi p2 = self._in_pi(tuning_2) / to_pi s1 = self.norm_sel(bandwith_1, n) s2 = self.norm_sel(bandwith_2, n) self.fit_exit_code = 0 return p1, s1, g1, p2, s2, g2
def __call__(self, distribution): """ Apply the distribution statistic function; must be implemented by subclasses. """ raise NotImplementedError
[docs]class DSF_VonMisesFit(VonMisesStatisticFn): """ Return the main mode of distribution in the given matrix, by fit with von Mises function. """ worst_fit = param.Number(default=0.5, bounds=(0.0, None), softbounds=(0.0, 1.0), doc=""" worst good-of-fitness value for accepting the distribution as monomodal""") # default result in case of failure of the fit null_result = {"": Pref(preference=0, selectivity=0, goodness_of_fit=0), "Modes": Pref(number=0)} def __call__(self, distribution): f = self.fit_vm(distribution) if self.fit_exit_code != 0 or f[-1] > self.worst_fit: return self.null_result results = {} p, s, g = f p = self.value_scale[1] * (p + self.value_scale[0]) s = self.selectivity_scale[1] * (s + self.selectivity_scale[0]) results[""] = Pref(preference=p, selectivity=s, goodness_of_fit=g) results["Modes"] = Pref(number=1) return results
[docs]class DSF_BimodalVonMisesFit(VonMisesStatisticFn): """ Return the two modes of distributions in the given matrix, by fit with von Mises function The results of the main mode are available in self.{preference,selectivity,good_of_fit}, while the second mode results are in the first element of the self.more_modes list, as a dictionary with keys preference,selectivity,good_of_fit. """ worst_fit = param.Number(default=0.5, bounds=(0.0, None), softbounds=(0.0, 1.0), doc=""" Worst good-of-fitness value for accepting the distribution as mono- or bi-modal""") # default result in case of failure of the fit null_result = { "": Pref(preference=0, selectivity=0, goodness_of_fit=0), "Mode2": Pref(preference=0, selectivity=0, goodness_of_fit=0), "Modes": Pref(number=0) } def _analyze_distr(self, d): """ Analyze the given distribution with von Mises bimodal fit. The distribution is analyzed with both unimodal and bimodal fits, and a decision about the number of modes is made by comparing the goodness of fit. It is a quick but inaccurate way of estimating the number of modes. Return preference, selectivity, goodness of fit for both modes, and the estimated numer of modes, None if even the unimodal fit failed. If the distribution is unimodal, values of the second mode are set to 0. The main mode is always the one with the largest selectivity (von Mises bandwith). """ no1 = False f = self.fit_vm(d) if self.fit_exit_code != 0: no1 = True p, s, g = f f2 = self.fit_v2m(d) if self.fit_exit_code != 0 or f2[2] > self.worst_fit: if no1 or f[-1] > self.worst_fit: return None return p, s, g, 0, 0, 0, 1 p1, s1, g1, p2, s2, g2 = f2 if g1 > g: return p, s, g, 0, 0, 0, 1 if s2 > s1: return p2, s2, g2, p1, s1, g1, 2 return p1, s1, g1, p2, s2, g2, 2 def __call__(self, distribution): f = self._analyze_distr(distribution) if f is None: return self.null_result results = {} p, s, g = f[: 3] p = self.value_scale[1] * (p + self.value_scale[0]) s = self.selectivity_scale[1] * (s + self.selectivity_scale[0]) results[""] = Pref(preference=p, selectivity=s, goodness_of_fit=g) p, s, g, n = f[3:] p = self.value_scale[1] * (p + self.value_scale[0]) s = self.selectivity_scale[1] * (s + self.selectivity_scale[0]) results["Mode2"] = Pref(preference=p, selectivity=s, goodness_of_fit=g) results["Modes"] = Pref(number=n) return results

FeatureMapper

Table Of Contents

This Page