# Work in progress. Only SimpleOptimization is currently available.

import os, json, fnmatch
import param
from lancet.core import Arguments, Concatenate, CartesianProduct

[docs]class DynamicArgs(Arguments): """ DynamicArgs are declarative specifications that specify a parameter space via a dynamic algorithmic process instead of using precomputed arguments. Unlike the static Args objects, new arguments can only be generated in response to some feedback from the processes that are executed. This type of dynamic feedback is a common feature for many algorithms such as hill climbing optimization, genetic algorithms, bisection search and other sophisticated optimization and search procedures. Like the Args objects, a DynamicArgs object is an iterator. On each iteration, one or more argument sets defining a collection of independent jobs are returned (these jobs should be possible to execute concurrently). Between iterations, the output_extractor function is used to extract the necessary information from the standard output streams of the previously executed jobs. This information is used to update the internal state of the DynamicArgs object can then generate more arguments to explore or terminate. All DynamicArgs classes need to declare the expected return format of the output_extractor function. """ output_extractor = param.Callable(default=json.loads, doc=""" The function that returns the relevant data from the standard output stream dumped to file in the streams subdirectory. This information must be retyrned in a format suitable for updating the specifier.. By default uses json.loads but pickle.loads could also be a valid option. The callable must take a string and return a Python object suitable for updating the specifier.""") def __init__(self, **params): super(DynamicArgs, self).__init__(**params) # Returned on next iteration and updated by _updated_state method self._next_val = self._initial_state(**params) # Trace of inputs from output_extractor and returned arguments self.trace = [(None, self._next_val)] def _update_state(self, data): """ Determines the next desired point in the parameter space using the parsed data returned from the public update method. Returns the value that will next emitted on the next iteration using the list of dictionaries format for arguments. If the update fails or data is None, StopIteration should be supplied as the return value. """ raise NotImplementedError def _initial_state(self, **kwargs): """ Reset the the DynamicArgs object to its initial state and used to reset the object adter the iterator is exhausted. The return value is the initial argument to be returned by next(). """ raise NotImplementedError def __next__(self): if self._next_val is StopIteration: self._initial_state() raise StopIteration current_val = self._next_val self._next_val = StopIteration return current_val next = __next__
[docs] def update(self, tids, info): """ Called to update the state of the iterator. This methods receives the set of task ids from the previous set of tasks together with the launch information to allow the output values to be parsed using the output_extractor. This data is then used to determine the next desired point in the parameter space by calling the _update_state method. """ outputs_dir = os.path.join(info['root_directory'], 'streams') pattern = '%s_*_tid_*{tid}.o.{tid}*' % info['batch_name'] flist = os.listdir(outputs_dir) try: outputs = [] for tid in tids: matches = fnmatch.filter(flist, pattern.format(tid=tid)) if len(matches) != 1: self.warning("No unique output file for tid %d" % tid) contents = open(os.path.join(outputs_dir, matches[0]),'r').read() outputs.append(self.output_extractor(contents)) self._next_val = self._update_state(outputs) self.trace.append((outputs, self._next_val)) except: self.warning("Cannot load required output files. Cannot continue.") self._next_val = StopIteration
[docs] def show(self): """ When dynamic, not all argument values may be available. """ copied = self.copy() enumerated = [el for el in enumerate(copied)] for (group_ind, specs) in enumerated: if len(enumerated) > 1: print("Group %d" % group_ind) ordering = self.constant_keys + self.varying_keys # Ordered nicely by varying_keys definition. spec_lines = [', '.join(['%s=%s' % (k, s[k]) for k in ordering]) for s in specs] print('\n'.join(['%d: %s' % (i,l) for (i,l) in enumerate(spec_lines)])) print('Remaining arguments not available for %s' % self.__class__.__name__)
def _trace_summary(self): """ Summarizes the trace of values used to update the DynamicArgs and the arguments subsequently returned. May be used to implement the summary method. """ for (i, (val, args)) in enumerate(self.trace): if args is StopIteration: info = "Terminated" else: pprint = ','.join('{' + ','.join('%s=%r' % (k,v) for (k,v) in arg.items()) + '}' for arg in args) info = ("exploring arguments [%s]" % pprint ) if i == 0: print("Step %d: Initially %s." % (i, info)) else: print("Step %d: %s after receiving input(s) %s." % (i, info.capitalize(), val)) def __add__(self, other): """ Concatenates two argument specifiers. See Concatenate and DynamicConcatenate documentation respectively. """ if not other: return self dynamic = (isinstance(self, DynamicArgs), isinstance(other, DynamicArgs)) if dynamic == (True, True): raise Exception('Cannot concatenate two dynamic specifiers.') elif (True in dynamic): return DynamicConcatenate(self,other) else: return Concatenate(self,other) def __mul__(self, other): """ Takes the cartesian product of two argument specifiers. See CartesianProduct and DynamicCartesianProduct documentation. """ if not other: return [] dynamic = (isinstance(self, DynamicArgs), isinstance(other, DynamicArgs)) if dynamic == (True, True): raise Exception('Cannot take Cartesian product two dynamic specifiers.') elif (True in dynamic): return DynamicCartesianProduct(self, other) else: return CartesianProduct(self, other) def __len__(self): """ Many DynamicArgs won't have a length that can be precomputed. Most DynamicArgs objects will have an iteration limit to guarantee eventual termination. If so, the maximum possible number of arguments that could be generated should be returned. """ raise NotImplementedError
[docs]class SimpleGradientDescent(DynamicArgs): """ Very simple gradient descent optimizer designed to illustrate how Dynamic Args may be implemented. This class has been deliberately kept simple to clearly illustrate how Dynamic Args may be implemented. A more practical example would likely probably make use of mature, third party optimization libraries (such as the routines offered in scipy.optimize). This particular algorithm greedily minimizes an output value via greedy gradient descent. The local parameter space is explored by examining the change in output value when an increment or decrement of 'stepsize' is made in the parameter space, centered around the current position. The initial parameter is initialized with the 'start' value and the optimization process terminates when either a local minima/maxima has been found or when 'max_steps' is reached. The 'output_extractor' function is expected to return a single scalar number to drive the gradient descent algorithm forwards. """ key = param.String(constant=True, doc=""" The name of the argument that will be optimized in a greedy fashion.""") start = param.Number(default=0.0, constant=True, doc=""" The starting argument value for the gradient ascent or descent""") stepsize = param.Number(default=1.0, constant=True, doc=""" The size of the steps taken in parameter space.""") max_steps=param.Integer(default=100, constant=True, doc=""" Once max_steps is reached, the optimization terminates.""") def __init__(self, key, **params): super(SimpleGradientDescent, self).__init__(key=key, **params) self.pprint_args(['key', 'start', 'stepsize'],[]) self._termination_info = None def _initial_state(self, **kwargs): self._steps_complete = 0 self._best_val = float('inf') self._arg = self.start return [{self.key:self.start+1}, {self.key:self.start-1}] def _update_state(self, vals): """ Takes as input a list or tuple of two elements. First the value returned by incrementing by 'stepsize' followed by the value returned after a 'stepsize' decrement. """ self._steps_complete += 1 if self._steps_complete == self.max_steps: self._termination_info = (False, self._best_val, self._arg) return StopIteration arg_inc, arg_dec = vals best_val = min(arg_inc, arg_dec, self._best_val) if best_val == self._best_val: self._termination_info = (True, best_val, self._arg) return StopIteration self._arg += self.stepsize if (arg_dec > arg_inc) else -self.stepsize self._best_val= best_val return [{self.key:self._arg+self.stepsize}, {self.key:self._arg-self.stepsize}] @property def constant_keys(self): return [] @property def constant_items(self): return [] @property def varying_keys(self): return [self.key] def summary(self): print('Varying Keys: %r' % self.key) print('Maximum steps allowed: %d' % self.max_steps) self._trace_summary() (val, arg) = (self.trace[-1]) if self._termination_info: (success, best_val, arg) = self._termination_info condition = 'Successfully converged.' if success else 'Maximum step limit reached.' print("%s Minimum value of %r at %s=%r." % (condition, best_val, self.key, arg)) def __len__(self): return 2*self.max_steps # Each step specifies 2 concurrent jobs #=========================# # Experimental code (WIP) # #=========================#
[docs]class DynamicConcatenate(DynamicArgs): def __init__(self, first, second): self.first = first self.second = second super(Concatenate, self).__init__(dynamic=True) self._exhausted = False self._first_sent = False self._first_cached = None self._second_cached = None if not isinstance(first,DynamicArgs): self._first_cached = next(first.copy()) if not isinstance(second,DynamicArgs): self._second_cached = next(second.copy()) self.pprint_args(['first', 'second'],[], infix_operator='+') def constant_keys(self): return list(set(self.first.constant_keys) | set(self.second.constant_keys)) def varying_keys(self): return list(set(self.first.varying_keys) | set(self.second.varying_keys)) def update(self, tids, info): if (self.isinstance(self.first,DynamicArgs) and not self._exhausted): self.first.update(tids, info) elif (self.isinstance(self.second,DynamicArgs) and self._first_sent): self.second.update(tids, info) def __next__(self): if self._first_cached is None: try: return next(self.first) except StopIteration: self._exhausted = True return self._second_cached else: if not self._first_sent: self._first_sent = True return self._first_cached else: return next(self.second) next = __next__
[docs]class DynamicCartesianProduct(DynamicArgs): def __init__(self, first, second): self.first = first self.second = second overlap = set(self.first.varying_keys) & set(self.second.varying_keys) assert overlap == set(), 'Sets of keys cannot overlap between argument specifiers in cartesian product.' super(CartesianProduct, self).__init__(dynamic=True) self._first_cached = None self._second_cached = None if not isinstance(first,DynamicArgs): self._first_cached = next(first.copy()) if not isinstance(second,DynamicArgs): self._second_cached = next(second.copy()) self.pprint_args(['first', 'second'],[], infix_operator='*') def constant_keys(self): return list(set(self.first.constant_keys) | set(self.second.constant_keys)) def varying_keys(self): return list(set(self.first.varying_keys) | set(self.second.varying_keys)) def update(self, tids, info): if self.isinstance(self.first,DynamicArgs): self.first.update(tids, info) if self.isinstance(self.second,DynamicArgs): self.second.update(tids, info) def __next__(self): if self._first_cached is None: first_spec = next(self.first) return self._cartesian_product(first_spec, self._second_cached) else: second_spec = next(self.second) return self._cartesian_product(self._first_cached, second_spec) next = __next__

