#
# Lancet core
#
import os, itertools, copy
import re, glob, string
import json
import param
try:
import numpy as np
np_ftypes = np.sctypes['float']
except:
np, np_ftypes = None, []
try: from pandas import DataFrame
except: DataFrame = None # pyflakes:ignore (try/except import)
try: from holoviews import Table
except: Table = None # pyflakes:ignore (try/except import)
from collections import defaultdict, OrderedDict
float_types = [float] + np_ftypes
def identityfn(x): return x
def fp_repr(x): return str(x) if (type(x) in float_types) else repr(x)
[docs]def set_fp_precision(value):
"""
Function to set the floating precision across lancet.
"""
Arguments.set_default('fp_precision', value)
[docs]def to_table(args, vdims=[]):
"Helper function to convet an Args object to a HoloViews Table"
if not Table:
return "HoloViews Table not available"
kdims = args.constant_keys + args.varying_keys
items = ((tuple([spec[k] for k in kdims]), ()) for spec in args.specs)
return Table(items, kdims=kdims, vdims=[]).reindex(None, vdims)
#=====================#
# Argument Specifiers #
#=====================#
[docs]class PrettyPrinted(object):
"""
A mixin class for generating pretty-printed representations.
"""
[docs] def pprint_args(self, pos_args, keyword_args, infix_operator=None, extra_params={}):
"""
Method to define the positional arguments and keyword order
for pretty printing.
"""
if infix_operator and not (len(pos_args)==2 and keyword_args==[]):
raise Exception('Infix format requires exactly two'
' positional arguments and no keywords')
(kwargs,_,_,_) = self._pprint_args
self._pprint_args = (keyword_args + kwargs, pos_args, infix_operator, extra_params)
def _pprint(self, cycle=False, flat=False, annotate=False, onlychanged=True, level=1, tab = ' '):
"""
Pretty printer that prints only the modified keywords and
generates flat representations (for repr) and optionally
annotates the top of the repr with a comment.
"""
(kwargs, pos_args, infix_operator, extra_params) = self._pprint_args
(br, indent) = ('' if flat else '\n', '' if flat else tab * level)
prettify = lambda x: isinstance(x, PrettyPrinted) and not flat
pretty = lambda x: x._pprint(flat=flat, level=level+1) if prettify(x) else repr(x)
params = dict(self.get_param_values())
show_lexsort = getattr(self, '_lexorder', None) is not None
modified = [k for (k,v) in self.get_param_values(onlychanged=onlychanged)]
pkwargs = [(k, params[k]) for k in kwargs if (k in modified)] + list(extra_params.items())
arg_list = [(k,params[k]) for k in pos_args] + pkwargs
lines = []
if annotate: # Optional annotating comment
len_ckeys, len_vkeys = len(self.constant_keys), len(self.varying_keys)
info_triple = (len(self),
', %d constant key(s)' % len_ckeys if len_ckeys else '',
', %d varying key(s)' % len_vkeys if len_vkeys else '')
annotation = '# == %d items%s%s ==\n' % info_triple
lines = [annotation]
if show_lexsort: lines.append('(')
if cycle:
lines.append('%s(...)' % self.__class__.__name__)
elif infix_operator:
level = level - 1
triple = (pretty(params[pos_args[0]]), infix_operator, pretty(params[pos_args[1]]))
lines.append('%s %s %s' % triple)
else:
lines.append('%s(' % self.__class__.__name__)
for (k,v) in arg_list:
lines.append('%s%s=%s' % (br+indent, k, pretty(v)))
lines.append(',')
lines = lines[:-1] +[br+(tab*(level-1))+')'] # Remove trailing comma
if show_lexsort:
lines.append(').lexsort(%s)' % ', '.join(repr(el) for el in self._lexorder))
return ''.join(lines)
def __repr__(self):
return self._pprint(flat=True, onlychanged=False)
def __str__(self):
return self._pprint()
[docs]class Arguments(PrettyPrinted, param.Parameterized):
"""
The abstract, base class that defines the core interface and
methods for all members of the Arguments family of classes,
including either the simple, static members of Args below, or the
sophisticated parameter exploration algorithms subclassing from
DynamicArgs defined in dynamic.py.
The Args subclass may be used directly and forms the root of one
family of classes that have statically defined or precomputed
argument sets (defined below). The second subfamily are the
DynamicArgs, designed to allow more sophisticated, online
parameter space exploration techniques such as hill climbing,
bisection search, genetic algorithms and so on.
"""
fp_precision = param.Integer(default=4, constant=True, doc='''
The floating point precision to use for floating point
values. Unlike other basic Python types, floats need care
with their representation as you only want to display up to
the precision actually specified. A floating point precision
of 0 casts number to integers before representing them.''')
def __init__(self, **params):
self._pprint_args = ([],[],None,{})
self.pprint_args([],['fp_precision', 'dynamic'])
super(Arguments,self).__init__(**params)
# Some types cannot be sorted easily (e.g. numpy arrays)
self.unsortable_keys = []
def __iter__(self): return self
def __contains__(self, value):
return value in (self.constant_keys + self.varying_keys)
@classmethod
@property
[docs] def constant_keys(self):
"""
Returns the list of parameter names whose values are constant
as the argument specifier is iterated. Note that the union of
constant and varying_keys should partition the entire set of
keys in the case where there are no unsortable keys.
"""
raise NotImplementedError
@property
[docs] def constant_items(self):
"""
Returns the set of constant items as a list of tuples. This
allows easy conversion to dictionary format. Note, the items
should be supplied in the same key ordering as for
constant_keys for consistency.
"""
raise NotImplementedError
@property
[docs] def varying_keys(self):
"""
Returns the list of parameters whose values vary as the
argument specifier is iterated. Whenever it is possible, keys
should be sorted from those slowest to faster varying and
sorted alphanumerically within groups that vary at the same
rate.
"""
raise NotImplementedError
def round_floats(self, specs, fp_precision):
_round_float = lambda v, fp: np.round(v, fp) if (type(v) in np_ftypes) else round(v, fp)
_round = (lambda v, fp: int(v)) if fp_precision==0 else _round_float
return (dict((k, _round(v, fp_precision) if (type(v) in float_types) else v)
for (k,v) in spec.items()) for spec in specs)
def __next__(self):
"""
Called to get a list of specifications: dictionaries with
parameter name keys and string values.
"""
raise StopIteration
next = __next__
[docs] def copy(self):
"""
Convenience method to avoid using the specifier without
exhausting it.
"""
return copy.copy(self)
def _collect_by_key(self,specs):
"""
Returns a dictionary like object with the lists of values
collapsed by their respective key. Useful to find varying vs
constant keys and to find how fast keys vary.
"""
# Collect (key, value) tuples as list of lists, flatten with chain
allkeys = itertools.chain.from_iterable(
[[(k, run[k]) for k in run] for run in specs])
collection = defaultdict(list)
for (k,v) in allkeys: collection[k].append(v)
return collection
def _operator(self, operator, other):
identities = [isinstance(el, Identity) for el in [self, other]]
if not any(identities): return operator(self,other)
if all(identities): return Identity()
elif identities[1]: return self
else: return other
def __add__(self, other):
"""
Concatenates two argument specifiers.
"""
return self._operator(Concatenate, other)
def __mul__(self, other):
"""
Takes the Cartesian product of two argument specifiers.
"""
return self._operator(CartesianProduct, other)
def _cartesian_product(self, first_specs, second_specs):
"""
Takes the Cartesian product of the specifications. Result will
contain N specifications where N = len(first_specs) *
len(second_specs) and keys are merged.
Example: [{'a':1},{'b':2}] * [{'c':3},{'d':4}] =
[{'a':1,'c':3},{'a':1,'d':4},{'b':2,'c':3},{'b':2,'d':4}]
"""
return [ dict(zip(
list(s1.keys()) + list(s2.keys()),
list(s1.values()) + list(s2.values())
))
for s1 in first_specs for s2 in second_specs ]
[docs] def summary(self):
"""
A succinct summary of the argument specifier. Unlike the repr,
a summary does not have to be complete but must supply the
most relevant information about the object to the user.
"""
print("Items: %s" % len(self))
varying_keys = ', '.join('%r' % k for k in self.varying_keys)
print("Varying Keys: %s" % varying_keys)
items = ', '.join(['%s=%r' % (k,v)
for (k,v) in self.constant_items])
if self.constant_items:
print("Constant Items: %s" % items)
[docs]class Identity(Arguments):
"""
The identity element for any Arguments object 'args' under the *
operator (CartesianProduct) and + operator (Concatenate). The
following identities hold:
args is (Identity() * args)
args is (args * Identity())
args is (Identity() + args)
args is (args + Identity())
Note that the empty Args() object can also fulfill the role of
Identity under the addition operator.
"""
fp_precision = param.Integer(default=None, allow_None=True,
precedence=(-1), constant=True, doc='''
fp_precision is disabled as Identity() never contains any
arguments.''')
def __eq__(self, other): return isinstance(other, Identity)
def __repr__(self): return "Identity()"
def __str__(self): return repr(self)
def __nonzero__(self): raise ValueError("The boolean value of Identity is undefined")
def __bool__(self): raise ValueError("The boolean value of Identity is undefined")
[docs]class Args(Arguments):
"""
An Arguments class that supports statically specified or
precomputed argument sets. It may be used directly to specify
argument values but also forms the base class for a family of more
specific static Argument classes. Each subclass is less flexible
and general but allows arguments to be easily and succinctly
specified. For instance, the Range subclass allows parameter
ranges to be easily declared.
The constructor of Args accepts argument definitions in two
different formats. The keyword format allows constant arguments to
be specified directly and easily. For instance:
>>> v1 = Args(a=2, b=3)
>>> v1
Args(fp_precision=4,a=2,b=3)
The alternative input format takes an explicit list of the
argument specifications:
>>> v2 = Args([{'a':2, 'b':3}]) # Equivalent behaviour to above
>>> v1.specs == v2.specs
True
This latter format is completely flexible and general, allowing
any arbitrary list of arguments to be specified as desired. This
is not generally recommended however as the structure of a
parameter space is often expressed more clearly by composing
together simpler, more succinct Args objects with the
CartesianProduct (*) or Concatenation (+) operators.
"""
specs = param.List(default=[], constant=True, doc='''
The static list of specifications (ie. dictionaries) to be
returned by the specifier. Float values are rounded
according to fp_precision.''')
def __init__(self, specs=None, fp_precision=None, **params):
if fp_precision is None: fp_precision = Arguments.fp_precision
raw_specs, params, explicit = self._build_specs(specs, params, fp_precision)
super(Args, self).__init__(fp_precision=fp_precision, specs=raw_specs, **params)
self._lexorder = None
if explicit:
self.pprint_args(['specs'],[])
else: # Present in kwarg format
self.pprint_args([], self.constant_keys, None,
OrderedDict(sorted(self.constant_items)))
def _build_specs(self, specs, kwargs, fp_precision):
"""
Returns the specs, the remaining kwargs and whether or not the
constructor was called with kwarg or explicit specs.
"""
if specs is None:
overrides = param.ParamOverrides(self, kwargs,
allow_extra_keywords=True)
extra_kwargs = overrides.extra_keywords()
kwargs = dict([(k,v) for (k,v) in kwargs.items()
if k not in extra_kwargs])
rounded_specs = list(self.round_floats([extra_kwargs],
fp_precision))
if extra_kwargs=={}: return [], kwargs, True
else: return rounded_specs, kwargs, False
return list(self.round_floats(specs, fp_precision)), kwargs, True
def __iter__(self):
self._exhausted = False
return self
def __next__(self):
if self._exhausted:
raise StopIteration
else:
self._exhausted=True
return self.specs
next = __next__
def _unique(self, sequence, idfun=repr):
"""
Note: repr() must be implemented properly on all objects. This
is implicitly assumed by Lancet when Python objects need to be
formatted to string representation.
"""
seen = {}
return [seen.setdefault(idfun(e),e) for e in sequence
if idfun(e) not in seen]
[docs] def show(self, exclude=[]):
"""
Convenience method to inspect the available argument values in
human-readable format. The ordering of keys is determined by
how quickly they vary.
The exclude list allows specific keys to be excluded for
readability (e.g. to hide long, absolute filenames).
"""
ordering = self.constant_keys + self.varying_keys
spec_lines = [', '.join(['%s=%s' % (k, s[k]) for k in ordering
if (k in s) and (k not in exclude)])
for s in self.specs]
print('\n'.join(['%d: %s' % (i,l) for (i,l) in enumerate(spec_lines)]))
[docs] def lexsort(self, *order):
"""
The lexical sort order is specified by a list of string
arguments. Each string is a key name prefixed by '+' or '-'
for ascending and descending sort respectively. If the key is
not found in the operand's set of varying keys, it is ignored.
"""
if order == []:
raise Exception("Please specify the keys for sorting, use"
"'+' prefix for ascending,"
"'-' for descending.)")
if not set(el[1:] for el in order).issubset(set(self.varying_keys)):
raise Exception("Key(s) specified not in the set of varying keys.")
sorted_args = copy.deepcopy(self)
specs_param = sorted_args.params('specs')
specs_param.constant = False
sorted_args.specs = self._lexsorted_specs(order)
specs_param.constant = True
sorted_args._lexorder = order
return sorted_args
def _lexsorted_specs(self, order):
"""
A lexsort is specified using normal key string prefixed by '+'
(for ascending) or '-' for (for descending).
Note that in Python 2, if a key is missing, None is returned
(smallest Python value). In Python 3, an Exception will be
raised regarding comparison of heterogenous types.
"""
specs = self.specs[:]
if not all(el[0] in ['+', '-'] for el in order):
raise Exception("Please specify the keys for sorting, use"
"'+' prefix for ascending,"
"'-' for descending.)")
sort_cycles = [(el[1:], True if el[0]=='+' else False)
for el in reversed(order)
if el[1:] in self.varying_keys]
for (key, ascending) in sort_cycles:
specs = sorted(specs, key=lambda s: s.get(key, None),
reverse=(not ascending))
return specs
@property
def constant_keys(self):
collection = self._collect_by_key(self.specs)
return [k for k in sorted(collection) if
(len(self._unique(collection[k])) == 1)]
@property
def constant_items(self):
collection = self._collect_by_key(self.specs)
return [(k,collection[k][0]) for k in self.constant_keys]
@property
def varying_keys(self):
collection = self._collect_by_key(self.specs)
constant_set = set(self.constant_keys)
unordered_varying = set(collection.keys()).difference(constant_set)
# Finding out how fast keys are varying
grouplens = [(len([len(list(y)) for (_,y)
in itertools.groupby(collection[k])]),k)
for k in collection
if (k not in self.unsortable_keys)]
varying_counts = [(n,k) for (n,k) in sorted(grouplens) if (k in unordered_varying)]
# Grouping keys with common frequency alphanumerically (desired behaviour).
ddict = defaultdict(list)
for (n,k) in varying_counts: ddict[n].append(k)
alphagroups = [sorted(ddict[k]) for k in sorted(ddict)]
return [el for group in alphagroups for el in group] + sorted(self.unsortable_keys)
@property
def dframe(self):
return DataFrame(self.specs) if DataFrame else "Pandas not available"
@property
def table(self):
return to_table(self)
def __len__(self): return len(self.specs)
[docs]class Concatenate(Args):
"""
Concatenate is the sequential composition of two specifiers. The
specifier created by the compositon (firsts + second) generates
the arguments in first followed by the arguments in second.
"""
first = param.ClassSelector(default=None, class_=Args, allow_None=True,
constant=True, doc='''The first specifier in the concatenation.''')
second = param.ClassSelector(default=None, class_=Args, allow_None=True,
constant=True, doc='''The second specifier in the concatenation.''')
def __init__(self, first, second):
max_precision = max(first.fp_precision, second.fp_precision)
specs = first.specs + second.specs
super(Concatenate, self).__init__(specs, fp_precision=max_precision,
first=first, second=second)
self.pprint_args(['first', 'second'],[], infix_operator='+')
[docs]class CartesianProduct(Args):
"""
CartesianProduct is the Cartesian product of two specifiers. The
specifier created by the compositon (firsts * second) generates
the cartesian produce of the arguments in first followed by the
arguments in second. Note that len(first * second) =
len(first)*len(second)
"""
first = param.ClassSelector(default=None, class_=Args, allow_None=True,
constant=True, doc='''The first specifier in the Cartesian product.''')
second = param.ClassSelector(default=None, class_=Args, allow_None=True,
constant=True, doc='''The second specifier in the Cartesian product.''')
def __init__(self, first, second):
max_precision = max(first.fp_precision, second.fp_precision)
specs = self._cartesian_product(first.specs, second.specs)
overlap = (set(first.varying_keys + first.constant_keys)
& set(second.varying_keys + second.constant_keys))
assert overlap == set(), ('Sets of keys cannot overlap'
'between argument specifiers'
'in cartesian product.')
super(CartesianProduct, self).__init__(specs, fp_precision=max_precision,
first=first, second=second)
self.pprint_args(['first', 'second'],[], infix_operator='*')
[docs]class Range(Args):
"""
Range generates an argument from a numerically interpolated range
which is linear by default. An optional function can be specified
to sample a numeric range with regular intervals.
"""
key = param.String(default='', constant=True, doc='''
The key assigned to the values computed over the numeric range.''')
start_value = param.Number(default=None, allow_None=True, constant=True,
doc='''The starting numeric value of the range.''')
end_value = param.Number(default=None, allow_None=True, constant=True,
doc='''The ending numeric value of the range (inclusive).''')
steps = param.Integer(default=2, constant=True, bounds=(1,None),
doc='''The number of steps to interpolate over. Default is 2
which returns the start and end values without interpolation.''')
# Can't this be a lambda?
mapfn = param.Callable(default=identityfn, constant=True, doc='''
The function to be mapped across the linear range. The
identity function is used by by default''')
def __init__(self, key, start_value, end_value, steps=2, mapfn=identityfn, **params):
values = self.linspace(start_value, end_value, steps)
specs = [{key:mapfn(val)} for val in values ]
super(Range, self).__init__(specs, key=key, start_value=start_value,
end_value=end_value, steps=steps,
mapfn=mapfn, **params)
self.pprint_args(['key', 'start_value'], ['end_value', 'steps'])
[docs] def linspace(self, start, stop, n):
""" Simple replacement for numpy linspace"""
if n == 1: return [start]
L = [0.0] * n
nm1 = n - 1
nm1inv = 1.0 / nm1
for i in range(n):
L[i] = nm1inv * (start*(nm1 - i) + stop*i)
return L
[docs]class List(Args):
"""
An argument specifier that takes its values from a given list.
"""
values = param.List(default=[], constant=True, doc='''
The list values that are to be returned by the specifier''')
key = param.String(default='default', constant=True, doc='''
The key assigned to the elements of the supplied list.''')
def __init__(self, key, values, **params):
specs = [{key:val} for val in values]
super(List, self).__init__(specs, key=key, values=values, **params)
self.pprint_args(['key', 'values'], [])
[docs]class Log(Args):
"""
Specifier that loads arguments from a log file in task id (tid)
order. This wrapper class allows a concise representation of file
logs with the option of adding the task id to the loaded
specifications.
For full control over the arguments, you can use this class to
create a fully specified Args object as follows:
Args(Log.extract_log(<log_file>).values()),
"""
log_path = param.String(default=None, allow_None=True, constant=True,
doc='''The relative or absolute path to the log file. If a
relative path is given, the absolute path is computed
relative to os.getcwd().''')
tid_key = param.String(default='tid', constant=True, allow_None=True,
doc='''If not None, the key given to the tid values included
in the loaded specifications. If None, the tid number
is ignored.''')
@staticmethod
@staticmethod
[docs] def write_log(log_path, data, allow_append=True):
"""
Writes the supplied specifications to the log path. The data
may be supplied as either as a an Args or as a list of
dictionaries.
By default, specifications will be appropriately appended to
an existing log file. This can be disabled by setting
allow_append to False.
"""
append = os.path.isfile(log_path)
islist = isinstance(data, list)
if append and not allow_append:
raise Exception('Appending has been disabled'
' and file %s exists' % log_path)
if not (islist or isinstance(data, Args)):
raise Exception('Can only write Args objects or dictionary'
' lists to log file.')
specs = data if islist else data.specs
if not all(isinstance(el,dict) for el in specs):
raise Exception('List elements must be dictionaries.')
log_file = open(log_path, 'r+') if append else open(log_path, 'w')
start = int(log_file.readlines()[-1].split()[0])+1 if append else 0
ascending_indices = range(start, start+len(data))
log_str = '\n'.join(['%d %s' % (tid, json.dumps(el))
for (tid, el) in zip(ascending_indices,specs)])
log_file.write("\n"+log_str if append else log_str)
log_file.close()
def __init__(self, log_path, tid_key='tid', **params):
log_items = sorted(Log.extract_log(log_path).items())
if tid_key is None:
log_specs = [spec for (_, spec) in log_items]
else:
log_specs = [dict(list(spec.items())+[(tid_key,idx)])
for (idx, spec) in log_items]
super(Log, self).__init__(log_specs,
log_path=log_path,
tid_key=tid_key,
**params)
self.pprint_args(['log_path'], ['tid_key'])
[docs]class FilePattern(Args):
"""
A FilePattern specifier allows files to be matched and information
encoded in filenames to be extracted via an extended form of
globbing. This object may be used to specify filename arguments to
CommandTemplates when launching jobs but it also very useful for
collating files for analysis.
For instance, you can find the absolute filenames of all npz files
in a 'data' subdirectory (relative to the root) that start with
'timeseries' using the pattern 'data/timeseries*.npz'.
In addition to globbing supported by the glob module, patterns can
extract metadata encoded in filenames using a subset of the Python
format specification syntax. To illustrate, you can use
'data/timeseries-{date}.npz' to record the date strings associated
with matched files. Note that a particular named fields can only
be used in a particular pattern once.
By default metadata is extracted as strings but format types are
supported in the usual manner
eg. 'data/timeseries-{day:d}-{month:d}.npz' will extract the day
and month from the filename as integer values. Only field names
and types are recognised with other format specification syntax
ignored. Type codes supported: 'd', 'b', 'o', 'x', 'e','E','f',
'F','g', 'G', 'n' (if ommited, result is a string by default).
Note that ordering is determined via ascending alphanumeric sort
and that actual filenames should not include any globbing
characters, namely: '?','*','[' and ']' (general good practice for
filenames anyway).
"""
key = param.String(default=None, allow_None=True, constant=True,
doc='''The key name given to the matched file path strings.''')
pattern = param.String(default=None, allow_None=True, constant=True,
doc='''The pattern files are to be searched against.''')
root = param.String(default=None, allow_None=True, constant=True,
doc='''The root directory from which patterns are to be loaded.
The root is set relative to os.getcwd().''')
@classmethod
[docs] def directory(cls, directory, root=None, extension=None, **kwargs):
"""
Load all the files in a given directory selecting only files
with the given extension if specified. The given kwargs are
passed through to the normal constructor.
"""
root = os.getcwd() if root is None else root
suffix = '' if extension is None else '.' + extension.rsplit('.')[-1]
pattern = directory + os.sep + '*' + suffix
key = os.path.join(root, directory,'*').rsplit(os.sep)[-2]
format_parse = list(string.Formatter().parse(key))
if not all([el is None for el in zip(*format_parse)[1]]):
raise Exception('Directory cannot contain format field specifications')
return cls(key, pattern, root, **kwargs)
def __init__(self, key, pattern, root=None, **params):
root = os.getcwd() if root is None else root
specs = self._load_expansion(key, root, pattern)
self.files = [s[key] for s in specs]
super(FilePattern, self).__init__(specs, key=key, pattern=pattern,
root=root, **params)
self.pprint_args(['key', 'pattern'], ['root'])
[docs] def fields(self):
"""
Return the fields specified in the pattern using Python's
formatting mini-language.
"""
parse = list(string.Formatter().parse(self.pattern))
return [f for f in zip(*parse)[1] if f is not None]
def _load_expansion(self, key, root, pattern):
"""
Loads the files that match the given pattern.
"""
path_pattern = os.path.join(root, pattern)
expanded_paths = self._expand_pattern(path_pattern)
specs=[]
for (path, tags) in expanded_paths:
filelist = [os.path.join(path,f) for f in os.listdir(path)] if os.path.isdir(path) else [path]
for filepath in filelist:
specs.append(dict(tags,**{key:os.path.abspath(filepath)}))
return sorted(specs, key=lambda s: s[key])
def _expand_pattern(self, pattern):
"""
From the pattern decomposition, finds the absolute paths
matching the pattern.
"""
(globpattern, regexp, fields, types) = self._decompose_pattern(pattern)
filelist = glob.glob(globpattern)
expansion = []
for fname in filelist:
if fields == []:
expansion.append((fname, {}))
continue
match = re.match(regexp, fname)
if match is None: continue
match_items = match.groupdict().items()
tags = dict((k,types.get(k, str)(v)) for (k,v) in match_items)
expansion.append((fname, tags))
return expansion
def _decompose_pattern(self, pattern):
"""
Given a path pattern with format declaration, generates a
four-tuple (glob_pattern, regexp pattern, fields, type map)
"""
sep = '~lancet~sep~'
float_codes = ['e','E','f', 'F','g', 'G', 'n']
typecodes = dict([(k,float) for k in float_codes]
+ [('b',bin), ('d',int), ('o',oct), ('x',hex)])
parse = list(string.Formatter().parse(pattern))
text, fields, codes, _ = zip(*parse)
# Finding the field types from format string
types = []
for (field, code) in zip(fields, codes):
if code in ['', None]: continue
constructor = typecodes.get(code[-1], None)
if constructor: types += [(field, constructor)]
stars = ['' if not f else '*' for f in fields]
globpat = ''.join(text+star for (text,star) in zip(text,stars))
refields = ['' if not f else sep+('(?P<%s>.*?)'% f)+sep for f in fields]
parts = ''.join(text+group for (text,group) in zip(text, refields)).split(sep)
for i in range(0, len(parts), 2): parts[i] = re.escape(parts[i])
regexp_pattern = ''.join(parts).replace('\\*','.*')
fields = list(f for f in fields if f)
return globpat, regexp_pattern , fields, dict(types)
@property
def table(self):
return to_table(self, [self.key])
# Importing from filetypes requires PrettyPrinted to be defined first
from lancet.filetypes import FileType
[docs]class FileInfo(Args):
"""
Loads metadata from a set of filenames. For instance, you can load
metadata associated with a series of image files given by a
FilePattern. Unlike other explicit instances of Args, this object
extends the values of an existing Args object. Once you have
loaded the metadata, FileInfo allows you to load the file data
into a pandas DataFrame or a HoloViews Table.
"""
source = param.ClassSelector(class_ = Args, doc='''
The argument specifier that supplies the file paths.''')
filetype = param.ClassSelector(constant=True, class_= FileType, doc='''
A FileType object to be applied to each file path.''')
key = param.String(constant=True, doc='''
The key used to find the file paths for inspection.''')
ignore = param.List(default=[], constant=True, doc='''
Metadata keys that are to be explicitly ignored. ''')
def __init__(self, source, key, filetype, ignore = [], **params):
specs = self._info(source, key, filetype, ignore)
super(FileInfo, self).__init__(specs,
source = source,
filetype = filetype,
key = key,
ignore=ignore,
**params)
self.pprint_args(['source', 'key', 'filetype'], ['ignore'])
@classmethod
[docs] def from_pattern(cls, pattern, filetype=None, key='filename', root=None, ignore=[]):
"""
Convenience method to directly chain a pattern processed by
FilePattern into a FileInfo instance.
Note that if a default filetype has been set on FileInfo, the
filetype argument may be omitted.
"""
filepattern = FilePattern(key, pattern, root=root)
if FileInfo.filetype and filetype is None:
filetype = FileInfo.filetype
elif filetype is None:
raise Exception("The filetype argument must be supplied unless "
"an appropriate default has been specified as "
"FileInfo.filetype")
return FileInfo(filepattern, key, filetype, ignore=ignore)
@property
def table(self):
return to_table(self, [self.key])
[docs] def load(self, val, **kwargs):
"""
Load the file contents into the supplied pandas dataframe or
HoloViews Table. This allows a selection to be made over the
metadata before loading the file contents (may be slow).
"""
if Table and isinstance(val, Table):
return self.load_table(val, **kwargs)
elif DataFrame and isinstance(val, DataFrame):
return self.load_dframe(val, **kwargs)
else:
raise Exception("Type %s not a DataFrame or Table." % type(val))
[docs] def load_table(self, table):
"""
Load the file contents into the supplied Table using the
specified key and filetype. The input table should have the
filenames as values which will be replaced by the loaded
data. If data_key is specified, this key will be used to index
the loaded data to retrive the specified item.
"""
items, data_keys = [], None
for key, filename in table.items():
data_dict = self.filetype.data(filename[0])
current_keys = tuple(sorted(data_dict.keys()))
values = [data_dict[k] for k in current_keys]
if data_keys is None:
data_keys = current_keys
elif data_keys != current_keys:
raise Exception("Data keys are inconsistent")
items.append((key, values))
return Table(items, kdims=table.kdims, vdims=data_keys)
[docs] def load_dframe(self, dframe):
"""
Load the file contents into the supplied dataframe using the
specified key and filetype.
"""
filename_series = dframe[self.key]
loaded_data = filename_series.map(self.filetype.data)
keys = [list(el.keys()) for el in loaded_data.values]
for key in set().union(*keys):
key_exists = key in dframe.columns
if key_exists:
self.warning("Appending '_data' suffix to data key %r to avoid"
"overwriting existing metadata with the same name." % key)
suffix = '_data' if key_exists else ''
dframe[key+suffix] = loaded_data.map(lambda x: x.get(key, np.nan))
return dframe
def _info(self, source, key, filetype, ignore):
"""
Generates the union of the source.specs and the metadata
dictionary loaded by the filetype object.
"""
specs, mdata = [], {}
mdata_clashes = set()
for spec in source.specs:
if key not in spec:
raise Exception("Key %r not available in 'source'." % key)
mdata = dict((k,v) for (k,v) in filetype.metadata(spec[key]).items()
if k not in ignore)
mdata_spec = dict(spec, **mdata)
specs.append(mdata_spec)
mdata_clashes = mdata_clashes | (set(spec.keys()) & set(mdata.keys()))
# Metadata clashes can be avoided by using the ignore list.
if mdata_clashes:
self.warning("Loaded metadata keys overriding source keys.")
return specs