Source code for lancet.launch

# Lancet launchers and launch helpers

from collections import OrderedDict, namedtuple
from glob import glob
import os, sys, platform, time, pipes, subprocess
import json, pickle

import param

import lancet.core as core
from lancet import __version__ as lancet_version
from lancet.dynamic import DynamicArgs

# For Python 2 and 3 compatibility
    input = raw_input
except NameError:

# Commands Template #

[docs]class Command(core.PrettyPrinted, param.Parameterized): """ A command is a way of converting the dictionaries returned by argument specifiers into a particular command. When called with an argument specifier, a command template returns a list of strings corresponding to a subprocess Popen argument list. __call__(self, spec, tid=None, info={}): All Commands must be callable. The tid argument is the task id and info is a dictionary of run-time information supplied by the launcher. See the _setup_launch method of Launcher to see details about the launch information supplied. """ executable = param.String(default='python', constant=True, doc=''' The executable that is to be run by this Command. Unless the executable is a standard command expected on the system path, this should be an absolute path. By default this invokes python or the python environment used to invoke the Command (Topographica for instance).''') do_format = param.Boolean(default=True, doc= ''' Set to True to receive input arguments as formatted strings, False for the raw unformatted objects.''') def __init__(self, executable=None, **params): if executable is None: executable = sys.executable self._pprint_args = ([],[],None,{}) super(Command,self).__init__(executable=executable, **params) self.pprint_args([],[]) def __call__(self, spec, tid=None, info={}): """ Formats a single argument specification supplied as a dictionary of argument name/value pairs. The info dictionary contains launch information as defined in the _setup_launch method of Launcher. """ raise NotImplementedError def _formatter(self, spec): if self.do_format: return core.Arguments.spec_formatter(spec) else : return spec
[docs] def show(self, args, file_handle=None, **kwargs): "Write to file_handle if supplied, othewise print output" full_string = '' info = {'root_directory': '<root_directory>', 'batch_name': '<batch_name>', 'batch_tag': '<batch_tag>', 'batch_description': '<batch_description>', 'launcher': '<launcher>', 'timestamp_format': '<timestamp_format>', 'timestamp': tuple(time.localtime()), 'varying_keys': args.varying_keys, 'constant_keys': args.constant_keys, 'constant_items': args.constant_items} quoted_cmds = [ subprocess.list2cmdline( [el for el in self(self._formatter(s),'<tid>',info)]) for s in args.specs] cmd_lines = ['%d: %s\n' % (i, qcmds) for (i,qcmds) in enumerate(quoted_cmds)] full_string += ''.join(cmd_lines) if file_handle: file_handle.write(full_string) file_handle.flush() else: print(full_string)
[docs] def verify(self, args): """ Optional, final check that ensures valid arguments have been passed before launch. Allows the constant and varying_keys to be be checked and can inspect the specs attribute if an instance of Args. If invalid, raise an Exception with the appropriate error message, otherwise return None. """ return
[docs] def finalize(self, info): """ Optional method that allows a Command to save state before launch. The info argument is supplied by the Launcher. """ return
[docs] def summary(self): """ A succinct summary of the Command configuration. Unlike the repr, a summary does not have to be complete but must supply key information relevant to the user. Must begin by stating the executable. """ raise NotImplementedError
[docs]class ShellCommand(Command): """ A generic Command that can be used to invoke shell commands on most operating systems where Python can be run. By default, follows the GNU coding convention for commandline arguments. """ expansions = param.Dict(default={}, constant=True, doc=''' Allows extension of the specification that supports functions that expand to valid argument values. If a function is used, it must have the signature (spec, info, tid). A typical usage for a function value is to build a valid output filename given the context. Three such subclasses are provided: 'RootDirectory', 'LongFilename' and 'Expand'.''') posargs = param.List(default=[], constant=True, doc=''' The list of positional argument keys. Positional arguments are always supplied at the end of a command in the order given.''') short_prefix = param.String(default='-', constant=True, doc=''' Although the single dash is a GNU coding convention, the argument prefix may depend on the applications and/or platform.''') long_prefix = param.String(default='--', constant=True, doc=''' Although the double dash is a GNU coding convention, some applications use single dashes for long options.''') def __init__(self, executable, **params): super(ShellCommand,self).__init__(executable = executable, do_format=False, **params) self.pprint_args(['executable','posargs'],['long_prefix']) def __call__(self, spec, tid=None, info={}): # Function expansions are called here. expanded = type(spec)() for (k,v) in self.expansions.items(): if callable(v): expanded[k] = v(spec, info, tid) else: expanded[k] = v expanded.update(spec.items()) expanded = core.Arguments.spec_formatter(expanded) options = [] for (k, v) in expanded.items(): if k in self.posargs or spec.get(k) is False: continue options.append('%s%s' % (self.long_prefix if len(k) > 1 else self.short_prefix, k)) if spec.get(k) is not True: options.append(v) posargs = [expanded[parg] if (parg in expanded) else parg(spec, info, tid) for parg in self.posargs] return [self.executable] + options + posargs def summary(self): print("Command executable: %s" % self.executable) print("Long prefix: %r" % self.long_prefix)
[docs] class RootDirectory(object): """ Supplies the root_directory to a command. """ def __call__(self, spec, info, tid): return info['root_directory'] def __repr__(self): return "ShellCommand.RootDirectory()"
[docs] class LongFilename(object): """ Generates a long filename based on the input arguments in the root directory with the given extension. Ignores constant items. """ def __init__(self, extension, excluding=[]): self.extension = extension self.excluding = excluding def __call__(self, spec, info, tid): root_dir = info['root_directory'] params = [('tid' , tid)] + [(k,v) for (k,v) in spec.items() if k in info['varying_keys'] and k not in self.excluding] basename = '_'.join('%s=%s' % (k,v) for (k,v) in sorted(params)) return os.path.join(root_dir, '%s_%s%s' % (info['batch_name'], basename, self.extension)) def __repr__(self): items = ([self.extension, self.excluding] if self.excluding else [self.extension]) return ("ShellCommand.LongFilename(%s)" % ', '.join('%r' % el for el in items))
[docs] class Expand(object): """ Takes a new-style format string template and expands it out using the keys in the spec, info and tid. """ def __init__(self, template): self.template = template def __call__(self,spec, info, tid): all_params = {'tid' : tid} all_params.update(spec) all_params.update(info) return self.template.format(**all_params) def __repr__(self): return "ShellCommand.Expand(%r)" % self.template #===============# # Output Helper # #===============#
[docs]class Output(param.Parameterized): """A convenience class to help collect the generated outputs from an invocation of a `Launcher` (which is called a "launch"). Given an output directory, the object reads the log file, info file and provides easy access to the output streams for each of the launches. The Launch information for each launch is stored in a namedtuple called `LaunchInfo` which has the following fields:: 'timestamp', 'path', 'tids', 'specs', 'stdout', 'stderr', 'log', 'info' If there are any expansions specified (see the ShellCommand's expansions parameter), those are also added to the named tuple. Here is an example of using the class:: >> output = Output('output') >> len(output) 2 >> output[-1]._fields # the fields of the namedtuple. ('timestamp', 'path', 'tids', 'specs', 'stdout', 'stderr', 'log', 'info') >> output[-1].path # the path of the last run. u'/tmp/output/2015-06-21_0325-prime_quintuplet' >> len(output[-1].specs) # spec the arguments for each case. 16 >> output[-1].specs[-1] {'integer': 115} >> len(output[1].stdout) # the path to the stdout for each case. 16 >> open(output[1].stdout[-1]).read() '109: 109\n' One can iterate over the LaunchInfo for the launches like so:: >> for li in output: .. print(li.path) .. /tmp/output/2015-06-21_0315-prime_quintuplet /tmp/output/2015-06-21_0325-prime_quintuplet """ output_dir = param.String(default='.', doc=''' The output directory - the directory that will contain all the root directories for the individual launches.''') expansions = param.Dict(default={}, constant=True, doc=''' Perform expansions (analogous to a ShellCommand) given a callable. Allows extension of the specification that supports callables that expand to valid argument values. The callable must have the signature (spec, info, tid). A typical usage for a function value is to build a valid output filename given the context. See the `ShellCommand.RootDirectory`, `ShellCommand.LongFilename` and `ShellCommand.Expand`.''') def __init__(self, output_dir, **params): super(Output,self).__init__(output_dir=output_dir, **params) self.launches = [] self.expansion_keys = sorted(self.expansions.keys()) self.LaunchInfo = namedtuple( 'LaunchInfo', [ 'timestamp', 'path', 'tids', 'specs', 'stdout', 'stderr', 'log', 'info' ] + self.expansion_keys ) self.update() def __getitem__(self, launch): return self.launches[launch] def __iter__(self): return iter(self.launches) def __len__(self): return len(self.launches) def _get_launch_info(self, launch_dir): log_path = glob(os.path.join(launch_dir, '*.log'))[0] log = core.Log.extract_log(log_path, OrderedDict) tids = list(log.keys()) specs = list(log.values()) info_file = glob(os.path.join(launch_dir, '*.info'))[0] info = json.load(open(info_file)) stdout, stderr = self._get_streams(info) info_dict = dict( timestamp=info['timestamp'], path=info['root_directory'], tids=tids, specs=specs, stdout=stdout, stderr=stderr, log=log, info=info ) # Do all the expansions. for name, func in self.expansions.items(): info_dict[name] = [func(specs[tid], info, tid) for tid in tids] launch_info = self.LaunchInfo(**info_dict) return launch_info def _get_streams(self, info): def _get_paths(pattern): streams = os.path.join(info['root_directory'], 'streams') files = glob(os.path.join(streams, pattern)) files = sorted(files, key=lambda x: os.path.splitext(x)[1]) return files batch_name = info['batch_name'] stdout = _get_paths('%s_*.o.*'%batch_name) stderr = _get_paths('%s_*.e.*'%batch_name) return stdout, stderr
[docs] def update(self): """Update the launch information -- use if additional launches were made. """ launches = [] for path in os.listdir(self.output_dir): full_path = os.path.join(self.output_dir, path) if os.path.isdir(full_path): launches.append(self._get_launch_info(full_path)) self.launches = sorted(launches) #===========# # Launchers # #===========#
[docs]class Launcher(core.PrettyPrinted, param.Parameterized): """ A Launcher is constructed using a name, an argument specifier and a command template. It can then launch the corresponding tasks appropriately when invoked. This default Launcher uses subprocess to launch tasks. It is intended to illustrate the basic design and should be used as a base class for more complex Launchers. In particular all Launchers should retain the same behaviour of writing stdout/stderr to the streams directory, writing a log file and recording launch information. """ batch_name = param.String(default=None, allow_None=True, constant=True, doc='''A unique identifier for the current batch''') args = param.ClassSelector(core.Arguments, constant=True, doc=''' The specifier used to generate the varying parameters for the tasks.''') command = param.ClassSelector(Command, constant=True, doc=''' The command template used to generate the commands for the current tasks.''') output_directory = param.String(default='.', doc=''' The output directory - the directory that will contain all the root directories for the individual launches.''') subdir = param.List(default=[], doc=''' A list of subdirectory names that allows custom organization within the output directory before the root directory.''') tag = param.String(default='', doc=''' A very short, identifiable human-readable string that meaningfully describes the batch to be executed. Should not include spaces as it may be used in filenames.''') description = param.String(default='', doc=''' A short description of the purpose of the current set of tasks.''') metadata = param.Dict(default={}, doc=''' Metadata information to add to the info file.''') max_concurrency = param.Integer(default=2, allow_None=True, doc=''' Concurrency limit to impose on the launch. As the current class uses subprocess locally, multiple processes are possible at once. Set to None for no limit (eg. for clusters)''') reduction_fn = param.Callable(default=None, doc=''' A callable that will be invoked when the Launcher has completed all tasks. For example, this could inform the user of completion (eg. send an e-mail) among other possibilities.''') timestamp = param.NumericTuple(default=(0,)*9, doc=''' Optional override of timestamp (default timestamp set on launch call) in Python struct_time 9-tuple format. Useful when you need to have a known root_directory path (see root_directory documentation) before launch. For example, you should store state related to analysis (eg. pickles) in the same location as everything else.''') timestamp_format = param.String(default='%Y-%m-%d_%H%M', allow_None=True, doc=''' The timestamp format for the root directories in python datetime format. If None, the timestamp is omitted from root directory name.''') def __init__(self, batch_name, args, command, **params): self._pprint_args = ([],[],None,{}) if 'name' not in params: params['name'] = self.__class__.__name__ super(Launcher,self).__init__(batch_name=batch_name, args=args, command = command, **params) self._spec_log = [] if self.timestamp == (0,)*9: self.timestamp = tuple(time.localtime()) self.pprint_args(['batch_name','args','command'], ['description', 'tag', 'output_directory', 'subdir','metadata']) self.dynamic = isinstance(args, DynamicArgs)
[docs] def get_root_directory(self, timestamp=None): """ A helper method that supplies the root directory name given a timestamp. """ if timestamp is None: timestamp = self.timestamp if self.timestamp_format is not None: root_name = (time.strftime(self.timestamp_format, timestamp) + '-' + self.batch_name) else: root_name = self.batch_name path = os.path.join(self.output_directory, *(self.subdir+[root_name])) return os.path.abspath(path)
def _append_log(self, specs): """ The log contains the tids and corresponding specifications used during launch with the specifications in JSON format. """ self._spec_log += specs # This should be removed log_path = os.path.join(self.root_directory, ("%s.log" % self.batch_name)) core.Log.write_log(log_path, [spec for (_, spec) in specs], allow_append=True) def _record_info(self, setup_info=None): """ All launchers should call this method to write the info file at the end of the launch. The .info file is saved given setup_info supplied by _setup_launch into the root_directory. When called without setup_info, the existing info file is updated with the end-time. """ info_path = os.path.join(self.root_directory, ('' % self.batch_name)) if setup_info is None: try: with open(info_path, 'r') as info_file: setup_info = json.load(info_file) except: setup_info = {} setup_info.update({'end_time' : tuple(time.localtime())}) else: setup_info.update({ 'end_time' : None, 'metadata' : self.metadata }) with open(info_path, 'w') as info_file: json.dump(setup_info, info_file, sort_keys=True, indent=4) def _setup_launch(self): """ Method to be used by all launchers that prepares the root directory and generate basic launch information for command templates to use (including a registered timestamp). """ self.root_directory = self.get_root_directory() if not os.path.isdir(self.root_directory): os.makedirs(self.root_directory) platform_dict = {} python_version = (platform.python_implementation() + platform.python_version()) platform_dict['platform'] = platform.platform() platform_dict['python_version'] = python_version platform_dict['lancet_version'] = str(lancet_version) return {'root_directory': self.root_directory, 'batch_name': self.batch_name, 'batch_tag': self.tag, 'batch_description': self.description, 'launcher': repr(self), 'platform' : platform_dict, 'timestamp': self.timestamp, 'timestamp_format': self.timestamp_format, 'varying_keys': self.args.varying_keys, 'constant_keys': self.args.constant_keys, 'constant_items': self.args.constant_items} def _setup_streams_path(self): streams_path = os.path.join(self.root_directory, "streams") try: os.makedirs(streams_path) except: pass # Waiting till these directories exist (otherwise potential qstat error) while not os.path.isdir(streams_path): pass return streams_path def _launch_process_group(self, process_commands, streams_path): """ Launches processes defined by process_commands, but only executes max_concurrency processes at a time; if a process completes and there are still outstanding processes to be executed, the next processes are run until max_concurrency is reached again. """ processes = {} def check_complete_processes(wait=False): """ Returns True if a process completed, False otherwise. Optionally allows waiting for better performance (avoids sleep-poll cycle if possible). """ result = False # list creates copy of keys, as dict is modified in loop for proc in list(processes): if wait: proc.wait() if proc.poll() is not None: # process is done, free up slot self.debug("Process %d exited with code %d." % (processes[proc]['tid'], proc.poll())) processes[proc]['stdout'].close() processes[proc]['stderr'].close() del processes[proc] result = True return result for cmd, tid in process_commands: self.debug("Starting process %d..." % tid) job_timestamp = time.strftime('%H%M%S') basename = "%s_%s_tid_%d" % (self.batch_name, job_timestamp, tid) stdout_handle = open(os.path.join(streams_path, "%s.o.%d" % (basename, tid)), "wb") stderr_handle = open(os.path.join(streams_path, "%s.e.%d" % (basename, tid)), "wb") proc = subprocess.Popen(cmd, stdout=stdout_handle, stderr=stderr_handle) processes[proc] = { 'tid' : tid, 'stdout' : stdout_handle, 'stderr' : stderr_handle } if self.max_concurrency: # max_concurrency reached, wait until more slots available while len(processes) >= self.max_concurrency: if not check_complete_processes(len(processes)==1): time.sleep(0.1) # Wait for all processes to complete while len(processes) > 0: if not check_complete_processes(True): time.sleep(0.1) def __call__(self): """ Call to start Launcher execution. Typically invoked by review_and_launch but may be called directly by the user. """ launchinfo = self._setup_launch() streams_path = self._setup_streams_path() self.command.finalize(launchinfo) self._record_info(launchinfo) last_tid = 0 last_tids = [] for gid, groupspecs in enumerate(self.args): tids = list(range(last_tid, last_tid+len(groupspecs))) last_tid += len(groupspecs) allcommands = [self.command( self.command._formatter(spec), tid, launchinfo) \ for (spec,tid) in zip(groupspecs,tids)] self._append_log(list(zip(tids,groupspecs))) self.message("Group %d: executing %d processes..." % (gid, len(allcommands))) self._launch_process_group(zip(allcommands,tids), streams_path) last_tids = tids[:] if self.dynamic: self.args.update(last_tids, launchinfo) self._record_info() if self.reduction_fn is not None: self.reduction_fn(self._spec_log, self.root_directory)
[docs] def summary(self): """ A succinct summary of the Launcher configuration. Unlike the repr, a summary does not have to be complete but must supply key information relevant to the user. """ print("Type: %s" % self.__class__.__name__) print("Batch Name: %r" % self.batch_name) if self.tag: print("Tag: %s" % self.tag) print("Root directory: %r" % self.get_root_directory()) print("Maximum concurrency: %s" % self.max_concurrency) if self.description: print("Description: %s" % self.description)
[docs]class QLauncher(Launcher): """ Launcher that operates with Grid Engine using default arguments chosen to be suitable for a typical cluster (tested on the Edinburgh University Eddie cluster). One of the main features of this class is that it is non-blocking - it alway exits shortly after invoking qsub. This means that the script is not left running or waiting for long periods of time. By convention the standard output and error streams go to the corresponding folders in the 'streams' subfolder of the root directory - any -o or -e qsub options will be overridden. The job name (the -N flag) is specified automatically and any user value will be ignored. """ qsub_switches = param.List(default=['-V', '-cwd'], doc = ''' Specifies the qsub switches (flags without arguments) as a list of strings. By default the -V switch is used to exports all environment variables in the host environment to the batch job.''') qsub_flag_options = param.Dict(default={'-b':'y'}, doc=''' Specifies qsub flags and their corresponding options as a dictionary. Valid values may be strings or lists of string. If a plain Python dictionary is used, the keys arealphanumerically sorted, otherwise the dictionary is assumed to be an OrderedDict (Python 2.7+ or param.external.OrderedDict) and the key ordering will be preserved. By default the -b (binary) flag is set to 'y' to allow binaries to be directly invoked. Note that the '-' is added to the key if missing (to make into a valid flag) so you can specify using keywords in the dict constructor: ie. using qsub_flag_options=dict(key1=value1, key2=value2, ....)''') def __init__(self, batch_name, args, command, **params): super(QLauncher, self).__init__(batch_name, args, command, **params) self._launchinfo = None self.last_tids = [] self._spec_log = [] self.last_tid = 0 self.collate_count = 0 self.spec_iter = iter(self.args) self.max_concurrency = None # Inherited def _qsub_args(self, override_options, cmd_args, append_options=[]): """ Method to generate Popen style argument list for qsub using the qsub_switches and qsub_flag_options parameters. Switches are returned first. The qsub_flag_options follow in keys() ordered if not a vanilla Python dictionary (ie. a Python 2.7+ or param.external OrderedDict). Otherwise the keys are sorted alphanumerically. Note that override_options is a list of key-value pairs. """ opt_dict = type(self.qsub_flag_options)() opt_dict.update(self.qsub_flag_options) opt_dict.update(override_options) if type(self.qsub_flag_options) == dict: # Alphanumeric sort if vanilla Python dictionary ordered_options = [(k, opt_dict[k]) for k in sorted(opt_dict)] else: ordered_options = list(opt_dict.items()) ordered_options += append_options unpacked_groups = [[(k,v) for v in val] if type(val)==list else [(k,val)] for (k,val) in ordered_options] unpacked_kvs = [el for group in unpacked_groups for el in group] # Adds '-' if missing (eg, keywords in dict constructor) and flattens lists. ordered_pairs = [(k,v) if (k[0]=='-') else ('-%s' % (k), v) for (k,v) in unpacked_kvs] ordered_options = [[k]+([v] if type(v) == str else list(v)) for (k,v) in ordered_pairs] flattened_options = [el for kvs in ordered_options for el in kvs] return (['qsub'] + self.qsub_switches + flattened_options + [pipes.quote(c) for c in cmd_args]) def __call__(self): """ Main entry point for the launcher. Collects the static information about the launch and sets up the stdout and stderr stream output directories. Generates the first call to collate_and_launch(). """ self._launchinfo = self._setup_launch() self.command.finalize(self._launchinfo) self.job_timestamp = time.strftime('%H%M%S') streams_path = self._setup_streams_path() self.qsub_flag_options['-o'] = streams_path self.qsub_flag_options['-e'] = streams_path self.collate_and_launch() self._record_info(self._launchinfo)
[docs] def collate_and_launch(self): """ Method that collates the previous jobs and launches the next block of concurrent jobs when using DynamicArgs. This method is invoked on initial launch and then subsequently via a commandline call (to Python via qsub) to collate the previously run jobs and launch the next block of jobs. """ try: specs = next(self.spec_iter) except StopIteration: self.qdel_batch() if self.reduction_fn is not None: self.reduction_fn(self._spec_log, self.root_directory) self._record_info() return tid_specs = [(self.last_tid + i, spec) for (i,spec) in enumerate(specs)] self.last_tid += len(specs) self._append_log(tid_specs) # Updating the argument specifier if self.dynamic: self.args.update(self.last_tids, self._launchinfo) self.last_tids = [tid for (tid,_) in tid_specs] output_dir = self.qsub_flag_options['-o'] error_dir = self.qsub_flag_options['-e'] self._qsub_block(output_dir, error_dir, tid_specs) # Pickle launcher before exit if necessary. if self.dynamic or (self.reduction_fn is not None): pickle_path = os.path.join(self.root_directory, 'qlauncher.pkl') pickle.dump(self, open(pickle_path,'wb'))
def _qsub_collate_and_launch(self, output_dir, error_dir, job_names): """ The method that actually runs qsub to invoke the python process with the necessary commands to trigger the next collation step and next block of jobs. """ job_name = "%s_%s_collate_%d" % (self.batch_name, self.job_timestamp, self.collate_count) overrides = [("-e",error_dir), ('-N',job_name), ("-o",output_dir), ('-hold_jid',','.join(job_names))] resume_cmds =["import os, pickle, lancet", ("pickle_path = os.path.join(%r, 'qlauncher.pkl')" % self.root_directory), "launcher = pickle.load(open(pickle_path,'rb'))", "launcher.collate_and_launch()"] cmd_args = [self.command.executable, '-c', ';'.join(resume_cmds)] popen_args = self._qsub_args(overrides, cmd_args) p = subprocess.Popen(popen_args, stdout=subprocess.PIPE) (stdout, stderr) = p.communicate() self.debug(stdout) if p.poll() != 0: raise EnvironmentError("qsub command exit with code: %d" % p.poll()) self.collate_count += 1 self.message("Invoked qsub for next batch.") return job_name def _qsub_block(self, output_dir, error_dir, tid_specs): """ This method handles static argument specifiers and cases where the dynamic specifiers cannot be queued before the arguments are known. """ processes = [] job_names = [] for (tid, spec) in tid_specs: job_name = "%s_%s_tid_%d" % (self.batch_name, self.job_timestamp, tid) job_names.append(job_name) cmd_args = self.command( self.command._formatter(spec), tid, self._launchinfo) popen_args = self._qsub_args([("-e",error_dir), ('-N',job_name), ("-o",output_dir)], cmd_args) p = subprocess.Popen(popen_args, stdout=subprocess.PIPE) (stdout, stderr) = p.communicate() self.debug(stdout) if p.poll() != 0: raise EnvironmentError("qsub command exit with code: %d" % p.poll()) processes.append(p) self.message("Invoked qsub for %d commands" % len(processes)) if (self.reduction_fn is not None) or self.dynamic: self._qsub_collate_and_launch(output_dir, error_dir, job_names)
[docs] def qdel_batch(self): """ Runs qdel command to remove all remaining queued jobs using the <batch_name>* pattern . Necessary when StopIteration is raised with scheduled jobs left on the queue. Returns exit-code of qdel. """ p = subprocess.Popen(['qdel', '%s_%s*' % (self.batch_name, self.job_timestamp)], stdout=subprocess.PIPE) (stdout, stderr) = p.communicate() return p.poll()
[docs]class ScriptLauncher(Launcher): """ Script-based launcher. Calls a script with a path to a JSON file containing process group job options. This easily supports more environment-specific job-submission schemes without having to create a new Launcher every time. """ script_path = param.String(default=os.path.join(os.getcwd(), ''), doc=''' Path to script which is called for every group, with JSON file, batch_name, number of commands for this group and max_concurrency as arguments.''') json_name = param.String(default='processes_%s.json', doc=''' Name of the JSON file output per process group.''') def __init__(self, batch_name, args, command, **params): super(ScriptLauncher, self).__init__(batch_name, args, command, **params) def _launch_process_group(self, process_commands, streams_path): """ Aggregates all process_commands and the designated output files into a list, and outputs it as JSON, after which the wrapper script is called. """ processes = [] for cmd, tid in process_commands: job_timestamp = time.strftime('%H%M%S') basename = "%s_%s_tid_%d" % (self.batch_name, job_timestamp, tid) stdout_path = os.path.join(streams_path, "%s.o.%d" % (basename, tid)) stderr_path = os.path.join(streams_path, "%s.e.%d" % (basename, tid)) process = { 'tid' : tid, 'cmd' : cmd, 'stdout' : stdout_path, 'stderr' : stderr_path } processes.append(process) # To make the JSON filename unique per group, we use the last tid in # this group. json_path = os.path.join(self.root_directory, self.json_name % (tid)) with open(json_path, 'w') as json_file: json.dump(processes, json_file, sort_keys=True, indent=4) p = subprocess.Popen([self.script_path, json_path, self.batch_name, str(len(processes)), str(self.max_concurrency)]) if p.wait() != 0: raise EnvironmentError("Script command exit with code: %d" % p.poll()) #===============# # Launch Helper # #===============#
[docs]class review_and_launch(core.PrettyPrinted, param.Parameterized): """ A helper decorator that always checks for consistency and can prompt the user for a full review of the launch configuration. """ output_directory = param.String(default='.', doc=''' The output directory - the directory that will contain all the root directories for the individual launches.''') review = param.Boolean(default=True, doc=''' Whether or not to perform a detailed review of the launch.''') launch_args = param.ClassSelector(default=None, allow_None=True, class_=core.Args, doc= '''An optional argument specifier to parameterise lancet, allowing multi-launch scripts. For instance, this may be useful for collecting statistics over runs that are not deterministic or are affected by a random input seed.''') launch_fn = param.Callable(doc='''The function that is to be decorated.''') def __init__(self, **params): self._pprint_args = ([],[],None,{}) super(review_and_launch, self).__init__(**params) self.pprint_args(['output_directory', 'launch_fn'], ['review', 'launch_args'])
[docs] def cross_check_launchers(self, launchers): """ Performs consistency checks across all the launchers. """ if len(launchers) == 0: raise Exception('Empty launcher list') timestamps = [launcher.timestamp for launcher in launchers] if not all(timestamps[0] == tstamp for tstamp in timestamps): raise Exception("Launcher timestamps not all equal. " "Consider setting timestamp explicitly.") root_directories = [] for launcher in launchers: command = launcher.command args = launcher.args command.verify(args) root_directory = launcher.get_root_directory() if os.path.isdir(root_directory): raise Exception("Root directory already exists: %r" % root_directory) if root_directory in root_directories: raise Exception("Each launcher requires a unique root directory") root_directories.append(root_directory)
def __call__(self, fn=None): # On first call, simply wrap the provided launch function. if fn is not None: self.launch_fn = fn return self # Calling the wrapped function with appropriate arguments as # supplied by launch_args. kwargs = self.launch_args.specs if self.launch_args else [{}] launchers = [self.launch_fn(**kwargs[0])] if self.launch_args is not None: launchers += [self.launch_fn(**kws) for kws in kwargs[1:]] current_timestamp = tuple(time.localtime()) # Across all the launchers... for launcher in launchers: # Ensure a shared timestamp throughout launcher.timestamp = current_timestamp # Set the output directory appropriately if self.output_directory is not None: launcher.output_directory = self.output_directory # Cross check the launchers self.cross_check_launchers(launchers) if not return self._launch_all(launchers) else: return self._review_all(launchers) def _launch_all(self, launchers): """ Launches all available launchers. """ for launcher in launchers: print("== Launching %s ==" % launcher.batch_name) launcher() return True def _review_all(self, launchers): """ Runs the review process for all the launchers. """ # Run review of launch args if necessary if self.launch_args is not None: proceed = self.review_args(self.launch_args, show_repr=True, heading='Meta Arguments') if not proceed: return False reviewers = [self.review_args, self.review_command, self.review_launcher] for (count, launcher) in enumerate(launchers): # Run reviews for all launchers if desired... if not all(reviewer(launcher) for reviewer in reviewers): print("\n == Aborting launch ==") return False # But allow the user to skip these extra reviews if len(launchers)!= 1 and count < len(launchers)-1: skip_remaining = self.input_options(['Y', 'n','quit'], '\nSkip remaining reviews?', default='y') if skip_remaining == 'y': break elif skip_remaining == 'quit': return False if self.input_options(['y','N'], 'Execute?', default='n') != 'y': return False else: return self._launch_all(launchers) def review_launcher(self, launcher): launcher_name = launcher.__class__.__name__ print('%s\n' % self.summary_heading(launcher_name)) launcher.summary() print('') if self.input_options(['Y','n'], '\nShow complete launch repr?', default='y') == 'y': print("\n%s\n" % launcher) return True
[docs] def review_args(self, obj, show_repr=False, heading='Arguments'): """ Reviews the given argument specification. Can review the meta-arguments (launch_args) or the arguments themselves. """ args = obj.args if isinstance(obj, Launcher) else obj print('\n%s\n' % self.summary_heading(heading)) args.summary() if show_repr: print("\n%s\n" % args) response = self.input_options(['y', 'N','quit'], '\nShow available argument specifier entries?', default='n') if response == 'quit': return False if response == 'y': print('') return True
def review_command(self, launcher): command = launcher.command template_name = command.__class__.__name__ print('%s\n' % self.summary_heading(template_name)) command.summary() print("\n") response = self.input_options(['y', 'N','quit','save'], '\nShow available command entries?', default='n') args = launcher.args if response == 'quit': return False elif response == 'y': elif response == 'save': fname = input('Filename: ').replace(' ','_') with open(os.path.abspath(fname),'w') as f:, file_handle=f) print('') return True def summary_heading(self, text, car='=', carvert='|'): text = text + " Summary" length=len(text)+4 return '%s\n%s %s %s\n%s' % (car*length, carvert, text, carvert, car*length)
[docs] def input_options(self, options, prompt='Select option', default=None): """ Helper to prompt the user for input on the commandline. """ check_options = [x.lower() for x in options] while True: response = input('%s [%s]: ' % (prompt, ', '.join(options))).lower() if response in check_options: return response.strip() elif response == '' and default is not None: return default.lower().strip()

Table Of Contents

This Page