Source code for topo.misc.util

"""
General utility functions and classes.
"""

import re
import random
import numpy
import functools



[docs]def NxN(tuple): """ Converts a tuple (X1,X2,...,Xn) to a string 'X1xX2x...xXn' """ return 'x'.join([`N` for N in tuple])
enum = enumerate
[docs]class Struct: """ A simple structure class, taking keyword args and assigning them to attributes. For instance: >>> s = Struct(foo='a',bar=1) >>> s.foo 'a' >>> s.bar 1 From http://www.norvig.com/python-iaq.html """ def __init__(self, **entries): self.__dict__.update(entries) def __repr__(self): # args = ['%s=%s' % (k, repr(v)) for (k,v) in vars(self).items()] return 'Struct(%s)' % ', '.join(args)
[docs]def flat_indices(shape): """ Returns a list of the indices needed to address or loop over all the elements of a 1D or 2D matrix with the given shape. E.g.: flat_indices((3,)) == [0,1,2] """ if len(shape) == 1: return [(x,) for x in range(shape[0])] else: rows,cols = shape return [(r,c) for r in range(rows) for c in range(cols)]
[docs]def flatten(l): """ Flattens a list. Written by Bearophile as published on the www.python.org newsgroups. Pulled into Topographica 3/5/2005. """ if type(l) != list: return l else: result = [] stack = [] stack.append((l,0)) while len(stack) != 0: sequence, j = stack.pop(-1) while j < len(sequence): if type(sequence[j]) != list: k, j = j, j+1 while j < len(sequence) and \ (type(sequence[j]) != list): j += 1 result.extend(sequence[k:j]) else: stack.append((sequence, j+1)) sequence, j = sequence[j], 0 return result
""" Return the cross-product of a variable number of lists (e.g. of a list of lists). Use to obtain permutations, e.g. l1=[a,b] l2=[c,d] cross_product([l1,l2]) = [[a,c], [a,d], [b,c], [b,d]] From: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/159975 """ # Need to re-write so someone other than Python knows what might happen when this runs cross_product=lambda ss,row=[],level=0: len(ss)>1 \ and reduce(lambda x,y:x+y,[cross_product(ss[1:],row+[i],level+1) for i in ss[0]]) \ or [row+[i] for i in ss[0]] # JABALERT: Should frange be replaced with numpy.arange or numpy.linspace?
[docs]def frange(start, end=None, inc=1.0, inclusive=False): """ A range function that accepts float increments. Otherwise, works just as the inbuilt range() function. If inclusive is False, as in the default, the range is exclusive (not including the end value), as in the inbuilt range(). If inclusive is true, the range may include the end value. 'All theoretic restrictions apply, but in practice this is more useful than in theory.' From: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/66472 """ if end == None: end = start + 0.0 start = 0.0 # Increments of zero would lead to an infinite loop, which can happen if # this is mistakenly called with a integer-based rational expression like 1/2. assert ((inc>0 and start<=end) or (inc<0 and start>=end)) L = [] while 1: next = start + len(L) * inc if inclusive: if inc > 0 and next > end: break elif inc < 0 and next < end: break else: if inc > 0 and next >= end: break elif inc < 0 and next <= end: break L.append(next) return L
[docs]def shortclassname(x): """ Returns the class name of x as a string with the leading package information removed. E.g. if x is of type "<class 'topo.base.sheet.Sheet'>", returns "Sheet" """ return re.sub("'>","",re.sub(".*[.]","",repr(type(x))))
[docs]def profile(command,n=50,sorting=('cumulative','time'),strip_dirs=False): """ Profile the given command (supplied as a string), printing statistics about the top n functions when ordered according to sorting. sorting defaults to ordering by cumulative time and then internal time; see http://docs.python.org/lib/profile-stats.html for other sorting options. By default, the complete paths of files are not shown. If there are multiple files with the same name, you might wish to set strip_dirs=False to make it easier to follow the output. Examples: - profile loading a simulation: profile('execfile("examples/hierarchical.ty")') - profile running an already loaded simulation: profile('topo.sim.run(10)') - profile running a whole simulation: profile('execfile("examples/lissom_oo_or.ty");topo.sim.run(20000)') - profile running a simulation, but from the commandline: ./topographica examples/hierarchical.ty -c "from topo.misc.util import profile; profile('topo.sim.run(10)')" """ # This function simply wraps some functions from the cProfile # module, making profiling easier. import cProfile, pstats # CB: leaves around "filename": should give this a proper name and maybe # put in /tmp/ and maybe allow someone to choose where to save it cProfile.run(command,'filename') prof_stats = pstats.Stats('filename') if strip_dirs:prof_stats.strip_dirs() prof_stats.sort_stats(*sorting).print_callees(n) ### the above lets us see which times are due to which calls ### unambiguously, while the version below only reports total time ### spent in each object, not the time due to that particular ### call. prof_stats.sort_stats(*sorting).print_stats(n)
[docs]def weighted_sample(seq,weights=[]): """ Select randomly from the given sequence. The weights, if given, should be a sequence the same length as seq, as would be passed to weighted_sample_idx(). """ if not weights: return seq[random.randrange(len(seq))] else: assert len(weights) == len(seq) return seq[weighted_sample_idx(weights)]
[docs]def weighted_sample_idx(weights): """ Return an integer generated by sampling the discrete distribution represented by the sequence of weights. The integer will be in the range [0,len(weights)). The weights need not sum to unity, but can contain any non-negative values (e.g., [1 1 1 100] is a valid set of weights). To use weights from a 2D numpy array w, specify w.ravel() (not the w.flat iterator). """ total = sum(weights) if total == 0: return random.randrange(len(weights)) index = random.random() * total accum = 0 for i,x in enumerate(weights): accum += x if index < accum: return i
[docs]def idx2rowcol(idx,shape): """ Given a flat matrix index and a 2D matrix shape, return the (row,col) coordinates of the index. """ assert len(shape) == 2 rows,cols = shape return idx/cols,idx%cols
[docs]def rowcol2idx(r,c,shape): """ Given a row, column, and matrix shape, return the corresponding index into the flattened (raveled) matrix. """ assert len(shape) == 2 rows,cols = shape return r * cols + c
[docs]def centroid(pts,weights): """ Return the centroid of a weighted set of points as an array. The pts argument should be an array of points, one per row, and weights should be a vector of weights. """ # CEBALERT: use numpy.sum? Worthwhile if weights is a numpy.array. return numpy.dot(numpy.transpose(pts),weights)/sum(weights)
[docs]def signabs(x): """ Split x into its sign and absolute value. Returns a tuple (sign(x),abs(x)). Note: sign(0) = 1, unlike numpy.sign. """ if x < 0: sgn = -1 else: sgn = 1 return sgn,abs(x)
[docs]def linearly_interpolate(table,value): """ Interpolate an appropriate value from the given list of values, by number. Assumes the table is a list of items to be returned for integer values, and interpolates between those items for non-integer values. """ lower_index=int(value) upper_index=lower_index+1 # Intermediate value; interpolate or return exact value as appropriate if lower_index+1<len(table): lookup=table[lower_index]+(value%1.0)*(table[upper_index]-table[lower_index]) # Upper bound -- return largest value elif lower_index+1==len(table): lookup=table[len(table)-1] # Over upper bound -- return largest value and print warning # JABALERT: Printing a warning message is not necessarily the most # useful behavior. Should at least provide some identification of # where the warning is coming from. Should add bounds_error and/or # bounds_warn options. (Could turn into a ParameterizedFunction so # that any message includes an identification, and so that # warnings-as-errors will work.) Would be nice if we could use the # equivalent function from scipy, but we can't yet depend on scipy # being available. else: lookup=table[len(table)-1] print "Warning -- value %f out of range; returning maximum of %f" % (value,lookup) return lookup # CB: note that this has only really been tested for output; # I've never tried using it to e.g. read multiple files.
[docs]class MultiFile(object): """ For all file_like_objs passed on initialization, provides a convenient way to call any of file's methods (on all of them). E.g. The following would cause 'test' to be written into two files, as well as to stdout: import sys f1 = open('file1','w') f2 = open('file2','w') m = MultiFile(f1,f2,sys.stdout) m.write('test') """ def __init__(self,*file_like_objs): self.file_like_objs=file_like_objs self.__provide_file_methods() def __provide_file_methods(self): # Provide a version of all the methods of the type file that # don't start with '_'. In each case, the provided version # loops over all the file_like_objs, calling the file method # on all of them. file_methods = [attr for attr in file.__dict__ if not attr.startswith('_') and callable(file.__dict__[attr])] for method in file_methods: def looped_method(method_,*args,**kw): all_out = [] for output in self.file_like_objs: out = getattr(output,method_)(*args,**kw) all_out.append(out) return all_out setattr(self,method,functools.partial(looped_method,method)) # CEBALERT: should be moved to legacy, once that file is reorganized # so that classes can be imported from it relatively independently, # without requiring all the legacy functions to be loaded. # (Right now, topo.__init__ can't import gmpyFaker from # topo.misc.legacy because importing topo.misc.legacy causes various # pieces of code to run that depend on topo.sim existing.) ############################################################ # Alternative module faking using import hooks (see # http://www.python.org/dev/peps/pep-0302/). # Based on http://orestis.gr/blog/2008/12/20/python-import-hooks/.
import sys,imp class ModuleFaker(object): def load_module(self,name): if name not in sys.modules: module = self.create_module(name) module.__file__ = self.path sys.modules[name] = module if '.' in name: parent_name, child_name = name.rsplit('.', 1) setattr(sys.modules[parent_name], child_name, module) return sys.modules[name] def create_module(self,name): raise NotImplementedError class ModuleImporter(object): def find_module(self,fullname,path=None): raise NotImplementedError class gmpyFaker(ModuleFaker): def create_module(self,name): module = imp.new_module(name) # CEBALERT: not sure what precision should be used for FixedPoint to # replace rational. Should we set the precision really high? code = \ """ from __future__ import division import topo.misc.fixedpoint as fixedpoint import param class mpq(object): def __new__(self,*args,**kw): n = fixedpoint.FixedPoint(eval(str(args[0])),precision=4) param.Parameterized().warning("gmpy.mpq('%s') replaced by fixedpoint.FixedPoint('%s')"%(args[0],n)) return n """ exec code in module.__dict__ return module class gmpyImporter(ModuleImporter): def find_module(self, fullname, path=None): if fullname == 'gmpy' or fullname.startswith('gmpy.'): import param param.Parameterized().warning('Module "gmpy" is not available. gmpy.mpq is provided by using fixedpoint.FixedPoint.') g = gmpyFaker() g.path = path return g return None def unit_value(str): m = re.match(r'([^\d]*)(\d*\.?\d+)([^\d]*)', str) if m: g = m.groups() return ' '.join((g[0], g[2])).strip(), float(g[1]) else: return int(str)