"""
The Topographica Lancet extension allows Topographica simulations to
be easily integrated into a Lancet workflow (see
github.com/ioam/lancet). The TopoCommand is appropriate for simple
runs using the default analysis function, whereas the RunBatchCommand
allow for more sophisticated measurements and analysis to be executed
during a simulation run using a holoviews Collector object.
"""
import os, pickle
from collections import namedtuple, OrderedDict
import numpy.version as np_version
import param
from holoviews import NdMapping, Layout
from holoviews.interface.collector import Collector
from holoviews.core.element import Collator
from holoviews.core.io import Pickler
from lancet import PrettyPrinted, vcs_metadata
from lancet import Command
from lancet import Launcher, review_and_launch
from lancet import Log, FileInfo, FileType
from lancet import List, Args
import topo
try:
from external import sys_paths
submodule_paths = sys_paths()
ordering = ['topographica', 'param', 'paramtk', 'imagen', 'lancet']
summarized = ['topographica', 'param', 'imagen', 'lancet']
submodules = [[p for p in submodule_paths if p.endswith(name)][0] for name in ordering]
except:
submodules = []
from topo.misc.commandline import default_output_path
review_and_launch.output_directory = default_output_path()
Launcher.output_directory = default_output_path()
[docs]class TopoCommand(Command):
"""
TopoCommand is designed to to format Lancet Args objects into
run_batch commands in a general way. Note that Topographica is
always invoked with the -a flag so all of topo.command is imported.
Some of the parameters duplicate those in run_batch to ensure
consistency with previous run_batch usage in Topographica. As a
consequence, this class sets all the necessary options for
run_batch except the 'times' parameter which may vary specified
arbitrarily by the Lancet Args object.
"""
tyfile = param.String(doc="The Topographica model file to run.")
analysis_fn = param.String(default="default_analysis_function", doc="""
The name of the analysis_fn to run. If modified from the
default, the named callable will need to be imported into the
namespace using a '-c' command in topo_flag_options.""")
tag = param.Boolean(default=False, doc="""
Whether to label the run_batch generated directory with the
batch name and batch tag.""")
topo_switches = param.List(default=['-a'], doc = """
Specifies the Topographica qsub switches (flags without
arguments) as a list of strings. Note the that the -a switch
is always used to auto import commands.""")
topo_flag_options = param.Dict(default={}, doc="""
Specifies Topographica flags and their corresponding options
as a dictionary. This parameter is suitable for setting -c
and -p flags for Topographica. This parameter is important
for introducing the callable named by the analysis_fn
parameter into the namespace.
Tuples can be used to indicate groups of options using the
same flag:
{'-p':'retina_density=5'} => -p retina_density=5
{'-p':('retina_density=5', 'scale=2') => -p retina_density=5 -p scale=2
If a plain Python dictionary is used, the keys are
alphanumerically sorted, otherwise the dictionary is assumed
to be an OrderedDict (Python 2.7+, Python3 or
param.external.OrderedDict) and the key ordering will be
preserved. Note that the '-' is prefixed to the key if
missing (to ensure a valid flag). This allows keywords to be
specified with the dict constructor eg.. dict(key1=value1,
key2=value2).""")
param_formatter = param.Callable(param_formatter.instance(),
doc="""Used to specify run_batch formatting.""")
max_name_length= param.Number(default=200,
doc="Matches run_batch parameter of same name.")
snapshot = param.Boolean(default=True,
doc="Matches run_batch parameter of same name.")
vc_info = param.Boolean(default=True,
doc="Matches run_batch parameter of same name.")
save_global_params = param.Boolean(default=True,
doc="Matches run_batch parameter of same name.")
progress_bar = param.String(default='disabled',
doc="Matches run_batch parameter of same name.")
progress_interval = param.Number(default=100,
doc="Matches run_batch parameter of same name.")
def __init__(self, tyfile, executable=None, **params):
auto_executable = os.path.realpath(
os.path.join(topo.__file__, '..', '..', 'topographica'))
executable = executable if executable else auto_executable
super(TopoCommand, self).__init__(tyfile=tyfile, executable=executable, **params)
self.pprint_args(['executable', 'tyfile', 'analysis_fn'],['topo_switches', 'snapshot'])
self._typath = os.path.abspath(self.tyfile)
if not os.path.isfile(self.executable):
raise Exception('Cannot find the topographica script relative to topo/__init__.py.')
if not os.path.exists(self._typath):
raise Exception("Tyfile doesn't exist! Cannot proceed.")
if ((self.analysis_fn.strip() != "default_analysis_function")
and (type(self) == TopoCommand)
and ('-c' not in self.topo_flag_options)):
raise Exception, 'Please use -c option to introduce the appropriate analysis into the namespace.'
def _topo_args(self, switch_override=[]):
"""
Method to generate Popen style argument list for Topographica
using the topo_switches and topo_flag_options
parameters. Switches are returned first, sorted
alphanumerically. The qsub_flag_options follow in the order
given by keys() which may be controlled if an OrderedDict is
used (eg. in Python 2.7+ or using param.external
OrderedDict). Otherwise the keys are sorted alphanumerically.
"""
opt_dict = type(self.topo_flag_options)()
opt_dict.update(self.topo_flag_options)
# Alphanumeric sort if vanilla Python dictionary
if type(self.topo_flag_options) == dict:
ordered_options = [(k, opt_dict[k]) for k in sorted(opt_dict)]
else:
ordered_options = list(opt_dict.items())
# Unpack tuple values so flag:(v1, v2,...)) => ..., flag:v1, flag:v2, ...
unpacked_groups = [[(k,v) for v in val] if type(val)==tuple else [(k,val)]
for (k,val) in ordered_options]
unpacked_kvs = [el for group in unpacked_groups for el in group]
# Adds '-' if missing (eg, keywords in dict constructor) and flattens lists.
ordered_pairs = [(k,v) if (k[0]=='-') else ('-%s' % (k), v)
for (k,v) in unpacked_kvs]
ordered_options = [[k]+([v] if type(v) == str else v)
for (k,v) in ordered_pairs]
flattened_options = [el for kvs in ordered_options for el in kvs]
switches = [s for s in switch_override
if (s not in self.topo_switches)] + self.topo_switches
return sorted(switches) + flattened_options
def _run_batch_kwargs(self, spec, tid, info):
"""
Defines the keywords accepted by run_batch and so specifies
run_batch behaviour. These keywords are those consumed by
run_batch for controlling run_batch behaviour.
"""
# Direct options for controlling run_batch.
options = {'name_time_format': repr(info['timestamp_format']),
'max_name_length': self.max_name_length,
'snapshot': self.snapshot,
'vc_info': self.vc_info,
'save_global_params': self.save_global_params,
'progress_interval': self.progress_interval,
'progress_bar': repr(self.progress_bar),
'metadata_dir': repr('metadata'),
'compress_metadata': repr('zip'),
'save_script_repr': repr('first')}
# Settings inferred using information from launcher ('info')
tag_info = (info['batch_name'], info['batch_tag'])
tag = '[%s]_' % ':'.join(el for el in tag_info if el) if self.tag else ''
derived_options = {'dirname_prefix': repr(''),
'tag': repr('%st%s_' % (tag, tid)),
'output_directory':repr(info['root_directory'])}
# Use fixed timestamp argument to run_batch if available.
if info['timestamp'] is not None:
derived_options['timestamp'] = info['timestamp']
# The analysis_fn is set my self.analysis_fn
derived_options['analysis_fn'] = self.analysis_fn
# Use the specified param_formatter to create the suitably named
# lambda (returning the desired string) in run_batch.
dir_format = self.param_formatter(info['constant_keys'],
info['varying_keys'], spec)
dir_formatter = 'lambda p: %s' % repr(dir_format)
derived_options['dirname_params_filter'] = dir_formatter
return dict(options.items() + derived_options.items())
def __call__(self, spec, tid=None, info={}):
"""
Returns a Popen argument list to invoke Topographica and execute
run_batch with all options appropriately set (in alphabetical
order). Keywords that are not run_batch options are also in
alphabetical order at the end of the keyword list.
"""
kwarg_opts = self._run_batch_kwargs(spec, tid, info)
# Override spec values if mistakenly included.
allopts = dict(spec,**kwarg_opts)
keywords = ', '.join(['%s=%s' % (k,allopts[k]) for k in
sorted(kwarg_opts.keys())+sorted(spec.keys())])
run_batch_list = ["run_batch(%s,%s)" % (repr(self._typath), keywords)]
topo_args = self._topo_args(['-a'])
return [self.executable] + topo_args + ['-c', '; '.join(run_batch_list)]
[docs]class BatchCollector(PrettyPrinted, param.Parameterized):
"""
BatchCollector is a wrapper class used to execute a Collector in a
Topographica run_batch context, saving the HoloViews to disk as
*.hvz files.
"""
metadata = param.List(default=['tid'], doc="""
Spec keys or collector paths to include as metadata in the
output file along with the simulation time.
Layout paths are specified by dotted paths
e.g. 'PinwheelAnalysis.V1' would add the pinwheel analysis on
V1 to the metadata.
""")
time_dimension = param.String(default='time', doc="""
Name of the Topographica simulation time dimension.""")
@classmethod
[docs] def pickle_path(cls, root_directory, batch_name):
"""
Locates the pickle file based on the given launch info
dictionary. Used by load as a classmethod and by save as an
instance method.
"""
return os.path.join(root_directory, '%s.collector' % batch_name)
@classmethod
[docs] def load(cls, tid, specs, root_directory, batch_name, batch_tag):
"""
Classmethod used to load the RunBatchCommand callable into a
Topographica run_batch context. Loads the pickle file based on
the batch_name and root directory in batch_info.
"""
pkl_path = cls.pickle_path(root_directory, batch_name)
with open(pkl_path,'rb') as pkl:
collectorfn = pickle.load(pkl)
info = namedtuple('info',['tid', 'specs', 'batch_name', 'batch_tag'])
collectorfn._info = info(tid, specs, batch_name, batch_tag)
return collectorfn
def __init__(self, collector, **params):
from topo.analysis import Collector
self._pprint_args = ([],[],None,{})
super(BatchCollector, self).__init__(**params)
if not isinstance(collector, Collector):
raise TypeError("Please supply a Collector to BatchCollector")
self.collector = collector
# The _info attribute holds information about the batch.
self._info = ()
def __call__(self):
"""
Calls the collector specified by the user in the run_batch
context. Invoked as an analysis function by RunBatchCommand.
"""
self.collector.interval_hook = topo.sim.run
topo_time = topo.sim.time()
filename = '%s%s_%s' % (self._info.batch_name,
('[%s]' % self._info.batch_tag
if self._info.batch_tag else ''),
topo_time)
viewtree = Layout()
viewtree = self.collector(viewtree, times=[topo_time])
spec_metadata = [(key, self._info.specs[key])
for key in self.metadata if '.' not in key]
path_metadata = [(key, viewtree.items.get(tuple(key.split('.')), float('nan')))
for key in self.metadata if '.' in key]
Pickler.save(viewtree,
param.normalize_path(filename),
key=dict(spec_metadata + path_metadata + [(self.time_dimension, topo_time)]))
[docs] def verify(self, specs, model_params):
"""
Check that a times list has been supplied, call verify_times on
the Collator and if model_params has been supplied, check that a
valid parameter set has been used.
"""
# Note: Parameter types could also be checked...
unknown_params = set()
known_params = (set(model_params if model_params else []) # Model
| set(['times'])) # Extras
for spec in specs:
if 'times' not in spec:
raise Exception("BatchCollector requires a times argument.")
self.collector.verify_times(spec['times'], strict=True)
if not model_params: continue
unknown_params = unknown_params | (set(spec) - known_params)
if not set(self.metadata).issubset(spec.keys()):
raise Exception("Metadata keys not always available: %s"
% ', '.join(self.metadata))
if unknown_params:
raise KeyError("The following keys do not belong to "
"the model parameters or RunBatchCommand: %s"
% ', '.join('%r' % p for p in unknown_params))
def summary(self):
print "Collector definition summary:\n\n%s" % self.collector
def _pprint(self, cycle=False, flat=False, annotate=False,
onlychanged=True, level=1, tab = ' '):
"""Pretty print the collector in a declarative style."""
split = '\n%s' % (tab*(level+1))
spec_strs = []
for path, val in self.collector.items():
key = repr('.'.join(path)) if isinstance(path, tuple) else 'None'
spec_strs.append('(%s,%s%s%r),' % (key, split, tab, val))
return 'Collector([%s%s%s])' % (split, split.join(spec_strs)[:-1], split)
[docs]class RunBatchCommand(TopoCommand):
"""
Runs a custom analysis function specified by a Collector using
run_batch. This command is far more flexible for regular usage than
TopoCommand as it allows you to build a run_batch analysis
incrementally.
"""
metadata = param.List(default=[], doc="""
Keys to include as metadata in the output file along with
'time' (Topographica simulation time).""")
analysis = param.ClassSelector(default=None, class_=(Collector, BatchCollector),
allow_None=True, doc="""
The object used to define the analysis executed in
RunBatch. This object may be a Topographica Collector or a
BatchCollector which is a wrapper of a Collector.""" )
model_params = param.Parameter(default={}, doc="""
A list or dictionary of model parameters to be passed to the
model via run_batch. This is used to validate the parameter names
specified. If set to an empty container, no checking is applied
(default).""")
def __init__(self, tyfile, analysis, **params):
super(RunBatchCommand, self).__init__(tyfile=tyfile,
analysis_fn = 'analysis_fn',
analysis = analysis,
do_format=False,
**params)
self.pprint_args(['executable', 'tyfile', 'analysis'], [])
if isinstance(self.analysis, Collector):
self.analysis = BatchCollector(analysis, metadata=self.metadata)
def __call__(self, spec=None, tid=None, info={}):
"""
Generates the appropriate Topographica run_batch command to make
use of the pickled RunBatchCommand object.
"""
formatted_spec = dict((k, repr(v) if isinstance(v,str) else str(v))
for (k,v) in spec.items())
kwarg_opts = self._run_batch_kwargs(formatted_spec, tid, info)
allopts = dict(formatted_spec,**kwarg_opts) # Override spec values if
# mistakenly included.
# Load and configure the analysis
prelude = ['from topo.misc.lancext import BatchCollector']
prelude += ["analysis_fn=BatchCollector.load(%r, %r, %r, %r, %r)"
% (tid, spec, info['root_directory'],
info['batch_name'], info['batch_tag']) ]
# Create the keyword representation to pass into run_batch
keywords = ', '.join(['%s=%s' % (k,allopts[k]) for k in
sorted(kwarg_opts.keys())
+sorted(formatted_spec.keys())])
run_batch_list = prelude + ["run_batch(%s,%s)"
% (repr(self.tyfile), keywords)]
topo_args = self._topo_args(['-a'])
return [self.executable] + topo_args + ['-c', '; '.join(run_batch_list)]
[docs] def verify(self, args):
"""
Check that the supplied arguments make sense given the specified
analysis.
"""
return self.analysis.verify(args.specs, self.model_params)
[docs] def finalize(self, info):
"""Pickle the analysis before launch."""
pkl_path = self.analysis.pickle_path(info['root_directory'],
info['batch_name'])
with open(pkl_path,'wb') as pkl:
pickle.dump(self.analysis, pkl)
def summary(self):
print("Command executable: %s" % self.executable)
self.analysis.summary()