pyfarm.jobtypes.core.jobtype module

Job Type Core

This module contains the core job type from which all other job types are built. All other job types must inherit from the JobType class in this modle.

exception pyfarm.jobtypes.core.jobtype.TaskNotFound[source]

Bases: exceptions.Exception

exception pyfarm.jobtypes.core.jobtype.ConnectionBroken[source]

Bases: exceptions.Exception

class pyfarm.jobtypes.core.jobtype.CommandData(command, *arguments, **kwargs)[source]

Bases: object

Stores data to be returned by JobType.get_command_data(). Instances of this class are alosed used by JobType.spawn_process_inputs() at execution time.

Note

This class does not perform any key of path resolution by default. It is assumed this has already been done using something like JobType.map_path()

Parameters:
  • command (string) – The command that will be executed when the process runs.
  • arguments – Any additional arguments to be passed along to the command being launched.
  • env (dict) – If provided, this will be the environment to launch the command with. If this value is not provided then a default environment will be setup using set_default_environment() when JobType.start() is called. JobType.start() itself will use JobType.set_default_environment() to generate the default environment.
  • cwd (string) – The working directory the process should execute in. If not provided the process will execute in whatever the directory the agent is running inside of.
  • user (string or integer) – The username or user id that the process should run as. On Windows this keyword is ignored and on Linux this requires the agent to be executing as root. The value provided here will be run through JobType.get_uid_gid() to map the incoming value to an integer.
  • group (string or integer) – Same as user above except this sets the group the process will execute.
  • id – An arbitrary id to associate with the resulting process protocol. This can help identify
validate()[source]

Validates that the attributes on an instance of this class contain values we expect. This method is called externally by the job type in JobType.start() and may correct some instance attributes.

set_default_environment(value)[source]

Sets the environment to value if the internal env attribute is None. By default this method is called by the job type and passed in the results from pyfarm.jobtype.core.JobType.get_environment()

class pyfarm.jobtypes.core.jobtype.JobType(assignment)[source]

Bases: pyfarm.jobtypes.core.internals.Cache, pyfarm.jobtypes.core.internals.System, pyfarm.jobtypes.core.internals.Process, pyfarm.jobtypes.core.internals.TypeChecks

Base class for all other job types. This class is intended to abstract away many of the asynchronous necessary to run a job type on an agent.

Variables:
  • PERSISTENT_JOB_DATA (set) – A dictionary of job ids and data that prepare_for_job() has produced. This is used during __init__() to set persistent_job_data.
  • COMMAND_DATA_CLASS (CommandData) – If you need to provide your own class to represent command data you should override this attribute. This attribute is used by by methods within this class to do type checking.
  • PROCESS_PROTOCOL (ProcessProtocol) – The protocol object used to communicate with each process spawned
  • ASSIGNMENT_SCHEMA (voluptuous.Schema) – The schema of an assignment. This object helps to validate the incoming assignment to ensure it’s not missing any data.
  • uuid (UUID) – This is the unique identifier for the job type instance and is automatically set when the class is instanced. This is used by the agent to track assignments and job type instances.
  • finished_tasks (set) – A set of tasks that have had their state changed to finished through set_task_state(). At the start of the assignment, this list is empty.
  • failed_tasks (set) – This is analogous to finished_tasks except it contains failed tasks only.
Parameters:

assignment (dict) – This attribute is a dictionary the keys “job”, “jobtype” and “tasks”. self.assignment[“job”] is itself a dict with keys “id”, “title”, “data”, “environ” and “by”. The most important of those is usually “data”, which is the dict specified when submitting the job and contains jobtype specific data. self.assignment[“tasks”] is a list of dicts representing the tasks in the current assignment. Each of these dicts has the keys “id” and “frame”. The list is ordered by frame number.

PERSISTENT_JOB_DATA = {}
COMMAND_DATA

alias of CommandData

PROCESS_PROTOCOL

alias of ProcessProtocol

ASSIGNMENT_SCHEMA = <Schema({'job': <Schema({'cpus': Any([<type 'int'>, <type 'long'>]), 'ram': Any([<type 'int'>, <type 'long'>]), 'ram_max': Any([Any([<type 'int'>, <type 'long'>]), <type 'NoneType'>]), 'title': Any([<type 'str'>, <type 'unicode'>]), 'notes': Any([<type 'str'>, <type 'unicode'>]), 'num_tiles': Any([Any([<type 'int'>, <type 'long'>]), <type 'NoneType'>]), 'batch': Any([<type 'int'>, <type 'long'>]), 'by': Any([<type 'int'>, <type 'long'>, <type 'float'>, <class 'decimal.Decimal'>]), 'priority': Any([<type 'int'>, <type 'long'>]), 'notified_users': [<Schema({'username': Any([<type 'str'>, <type 'unicode'>]), 'on_failure': <type 'bool'>, 'on_success': <type 'bool'>, 'on_deletion': <type 'bool'>}, extra=PREVENT_EXTRA, required=False) object>], 'tags': [Any([<type 'str'>, <type 'unicode'>])], 'environ': <function validate_environment>, 'agent_id': Any([<type 'int'>, <type 'long'>]), 'job_group': Any([<type 'str'>, <type 'unicode'>]), 'job_group_id': Any([<type 'int'>, <type 'long'>]), 'data': <type 'dict'>, 'id': Any([<type 'int'>, <type 'long'>]), 'ram_warning': Any([Any([<type 'int'>, <type 'long'>]), <type 'NoneType'>]), 'user': Any([<type 'str'>, <type 'unicode'>])}, extra=PREVENT_EXTRA, required=False) object>, 'tasks': <function <lambda>>, 'id': <function validate_uuid>, 'jobtype': <Schema({'version': Any([<type 'int'>, <type 'long'>]), 'name': Any([<type 'str'>, <type 'unicode'>])}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>
classmethod load(assignment)[source]

Given an assignment this class method will load the job type either from cache or from the master.

Parameters:assignment (dict) – The dictionary containing the assignment. This will be passed into an instance of ASSIGNMENT_SCHEMA to validate that the internal data is correct.
classmethod prepare_for_job(job)[source]

Note

This method is not yet implemented

Called before a job executes on the agent first the first time. Whatever this classmethod returns will be available as persistent_job_data on the job type instance.

Parameters:job (int) – The job id which prepare_for_job is being run for

By default this method does nothing.

classmethod cleanup_after_job(persistent_data)[source]

Note

This method is not yet implemented

This classmethod will be called after the last assignment from a given job has finished on this node.

Parameters:persistent_data – The persistent data that prepare_for_job() produced. The value for this data may be None if prepare_for_job() returned None or was not implemented.
classmethod spawn_persistent_process(job, command_data)[source]

Note

This method is not yet implemented

Starts one child process using an instance of CommandData or similiar input. This process is intended to keep running until the last task from this job has been processed, potentially spanning more than one assignment. If the spawned process is still running then we’ll cleanup the process after cleanup_after_job()

node()[source]

Returns live information about this host, the operating system, hardware, and several other pieces of global data which is useful inside of the job type. Currently data from this method includes:

  • master_api - The base url the agent is using to communicate with the master.
  • hostname - The hostname as reported to the master.
  • agent_id - The unique identifier used to identify. this agent to the master.
  • id - The database id of the agent as given to us by the master on startup of the agent.
  • cpus - The number of CPUs reported to the master
  • ram - The amount of ram reported to the master.
  • total_ram - The amount of ram, in megabytes, that’s installed on the system regardless of what was reported to the master.
  • free_ram - How much ram, in megabytes, is free for the entire system.
  • consumed_ram - How much ram, in megabytes, is being consumed by the agent and any processes it has launched.
  • admin - Set to True if the current user is an administrator or ‘root’.
  • user - The username of the current user.
  • case_sensitive_files - True if the file system is case sensitive.
  • case_sensitive_env - True if environment variables are case sensitive.
  • machine_architecture - The architecture of the machine the agent is running on. This will return 32 or 64.
  • operating_system - The operating system the agent is executing on. This value will be ‘linux’, ‘mac’ or ‘windows’. In rare circumstances this could also be ‘other’.
Raises:KeyError

Raised if one or more keys are not present in the global configuration object.

This should rarely if ever be a problem under normal circumstances. The exception to this rule is in unittests or standalone libraries with the global config object may not be populated.

assignments()[source]

Short cut method to access tasks

tempdir(new=False, remove_on_finish=True)[source]

Returns a temporary directory to be used within a job type. By default once called the directory will be created on disk and returned from this method.

Calling this method multiple times will return the same directory instead of creating a new directory unless new is set to True.

Parameters:
  • new (bool) – If set to True then return a new directory when called. This however will not replace the ‘default’ temp directory.
  • remove_on_finish (bool) – If True then keep track of the directory returned so it can be removed when the job type finishes.
get_uid_gid(user, group)[source]

Overridable. This method to convert a named user and group into their respective user and group ids.

get_environment()[source]

Constructs an environment dictionary that can be used when a process is spawned by a job type.

get_command_list(commands)[source]

Convert a list of commands to a tuple with any environment variables expanded.

Parameters:commands (list) – A list of strings to expand. Each entry in list will be passed into and returned from expandvars().
Raises:TypeError – Raised of commands is not a list or tuple.
Return type:tuple
Returns:Returns the expanded list of commands.
get_csvlog_path(protocol_uuid, create_time=None)[source]

Returns the path to the comma separated value (csv) log file. The agent stores logs from processes in a csv format so we can store additional information such as a timestamp, line number, stdout/stderr identification and the the log message itself.

Note

This method should not attempt to create the parent directories of the resulting path. This is already handled by the logger pool in a non-blocking fashion.

Parameters:
  • protocol_uuid (uuid.UUID) – The UUID of the job type’s protocol instance.
  • create_time (datetime.datetime) – If provided then the create time of the log file will equal this value. Otherwise this method will use the current UTC time for create_time
Raises:

TypeError – Raised if protocl_uuid or create_time are not the correct types.

get_command_data()[source]

Overridable. This method returns the arguments necessary for executing a command. For job types which execute a single process per assignment, this is the most important method to implement.

Warning

This method should not be used when this jobtype requires more than one process for one assignment and may not get called at all if start() was overridden.

The default implementation does nothing. When overriding this method you should return an instance of COMMAND_DATA_CLASS:

return self.COMMAND_DATA(
    "/usr/bin/python", "-c", "print 'hello world'",
    env={"FOO": "bar"}, user="bob")

See CommandData’s class documentation for a full description of possible arguments.

Please note however the default command data class, CommandData does not perform path expansion. So instead you have to handle this yourself with map_path().

map_path(path)[source]

Takes a string argument. Translates a given path for any OS to what it should be on this particular node. This does not communicate with the master.

Parameters:path (string) – The path to translate to an OS specific path for this node.
Raises:TypeError – Raised if path is not a string.
expandvars(value, environment=None, expand=None)[source]

Expands variables inside of a string using an environment.

Parameters:
  • value (string) – The path to expand.
  • environment (dict) – The environment to use for expanding value. If this value is None (the default) then we’ll use get_environment() to build this value.
  • expand (bool) – When not provided we use the jobtype_expandvars configuration value to set the default. When this value is True we’ll perform environment variable expansion otherwise we return value untouched.
start()[source]

This method is called when the job type should start working. Depending on the job type’s implementation this will prepare and start one more more processes.

stop(assignment_failed=False, avoid_reassignment=False, error=None, signal='KILL')[source]

This method is called when the job type should stop running. This will terminate any processes associated with this job type and also inform the master of any state changes to an associated task or tasks.

Parameters:
  • assignment_failed (boolean) – Whether this means the assignment has genuinely failed. By default, we assume that stopping this assignment was the result of deliberate user action (like stopping the job or shutting down the agent), and won’t treat it as a failed assignment.
  • avoid_reassignment (boolean) – If set to true, the agent will add itself to the lists of agents that failed the tasks in this assignment. Can be useful when we want to return the assignment to the master without increasing its failures counter, but still don’t want it to be reassigned to us.
  • error (string) – If the assignment has failed, this string is upload as last_error for the failed tasks.
  • signal (string) – The signal to send the any running processes. Valid options are KILL, TERM or INT.
format_error(error)[source]

Takes some kind of object, typically an instance of Exception or :class`Failure`, and produces a human readable string.

Return type:string or None
Returns:Returns a string if we know how to format the error. Otherwise this method returns None and logs an error.
set_states(tasks, state, error=None)[source]

Wrapper around set_state() that that allows you to the state on the master for multiple tasks at once.

set_task_progress(*args, **kwargs)[source]

Sets the progress of the given task

Parameters:
  • task (dict) – The dictionary containing the task we’re changing the progress for.
  • progress (float) – The progress to set on task
add_self_to_failed_on_agents(*args, **kwargs)[source]

Adds this agent to the list of agents that failed to execute the given task, without explicitly setting this task to failed.

Parameters:task (dict) – The dictionary containing the task
set_task_started_now(*args, **kwargs)[source]

Sets the time_started of the given task to the current time on the master.

This method is useful for batched tasks, where the actual work on a single task may start much later than the work the assignment as a whole.

Parameters:task (dict) – The dictionary containing the task we’re changing the start time for.
set_task_state(*args, **kwargs)[source]

Sets the state of the given task

Parameters:
  • task (dict) – The dictionary containing the task we’re changing the state for.
  • state (string) – The state to change task to
  • error (string, Exception) – If the state is changing to ‘error’ then also set the last_error column. Any exception instance that is passed to this keyword will be passed through format_exception() first to format it.
get_local_task_state(task_id)[source]

Returns None if the state of this task has not been changed locally since this assignment has started. This method does not communicate with the master.

is_successful(protocol, reason)[source]

Overridable. This method that determines whether the process referred to by a protocol instance has exited successfully.

The default implementation returns True if the process’s return code was 0 and False` in all other cases.  If you need to modify this behavior please be aware that ``reason may be an integer or an instance of twisted.internet.error.ProcessTerminated if the process terminated without errors or an instance of twisted.python.failure.Failure if there were problems.

Raises:NotImplementedError – Raised if we encounter a condition that the base implementation is unable to handle.
before_start()[source]

Overridable. This method called directly before start() itself is called.

The default implementation does nothing and values returned from this method are ignored.

before_spawn_process(command, protocol)[source]

Overridable. This method called directly before a process is spawned.

By default this method does nothing except log information about the command we’re about to launch both the the agent’s log and to the log file on disk.

Parameters:
process_stopped(protocol, reason)[source]

Overridable. This method called when a child process stopped running.

The default implementation will mark all tasks in the current assignment as done or failed of there was at least one failed process.

process_started(protocol)[source]

Overridable. This method is called when a child process started running.

The default implementation will mark all tasks in the current assignment as running.

process_output(protocol, output, line_fragments, line_handler)[source]

This is a mid-level method which takes output from a process protocol then splits and processes it to ensure we pass complete output lines to the other methods.

Implementors who wish to process the output line by line should override preprocess_stdout_line(), preprocess_stdout_line(), process_stdout_line() or process_stderr_line() instead. This method is a glue method between other parts of the job type and should only be overridden if there’s a problem or you want to change how lines are split.

Parameters:
  • protocol (ProcessProtocol) – The protocol instance which produced output
  • output (string) – The blob of text or line produced
  • line_fragments (dict) – The line fragment dictionary containing individual line fragments. This will be either self._stdout_line_fragments or self._stderr_line_fragments.
  • line_handler (callable) – The function to handle any lines produced. This will be either handle_stdout_line() or handle_stderr_line()
Returns:

This method returns nothing by default and any return value produced by this method will not be consumed by other methods.

handle_stdout_line(protocol, stdout)[source]

Takes a ProcessProtocol instance and stdout line produced by process_output() and runs it through all the steps necessary to preprocess, format, log and handle the line.

The default implementation will run stdout through several methods in order:

Warning

This method is not private however it’s advisable to override the methods above instead of this one. Unlike this method, which is more generalized and invokes several other methods, the above provide more targeted functionality.

Parameters:
  • protocol (ProcessProtocol) – The protocol instance which produced stdout
  • stderr (string) – A complete line to stderr being emitted by the process
Returns:

This method returns nothing by default and any return value produced by this method will not be consumed by other methods.

handle_stderr_line(protocol, stderr)[source]

Overridable. Takes a ProcessProtocol instance and stderr produced by process_output() and runs it through all the steps necessary to preprocess, format, log and handle the line.

The default implementation will run stderr through several methods in order:

Warning

This method is overridable however it’s advisable to override the methods above instead. Unlike this method, which is more generalized and invokes several other methods, the above provide more targeted functionality.

Parameters:
  • protocol (ProcessProtocol) – The protocol instance which produced stdout
  • stderr (string) – A complete line to stderr being emitted by the process
Returns:

This method returns nothing by default and any return value produced by this method will not be consumed by other methods.

preprocess_stdout_line(protocol, stdout)[source]

Overridable. Provides the ability to manipulate stdout or protocol before it’s passed into any other line handling methods.

The default implementation does nothing.

Parameters:
  • protocol (ProcessProtocol) – The protocol instance which produced stdout
  • stderr (string) – A complete line to stdout before any formatting or logging has occurred.
Return type:

string

Returns:

This method returns nothing by default but when overridden should return a string which will be used in line handling methods such as format_stdout_line(), log_stdout_line() and process_stdout_line().

preprocess_stderr_line(protocol, stderr)[source]

Overridable. Formats a line from stdout before it’s passed onto methods such as log_stdout_line() and process_stdout_line().

The default implementation does nothing.

Parameters:
  • protocol (ProcessProtocol) – The protocol instance which produced stderr
  • stderr (string) – A complete line to stderr before any formatting or logging has occurred.
Return type:

string

Returns:

This method returns nothing by default but when overridden should return a string which will be used in line handling methods such as format_stderr_line(), log_stderr_line() and process_stderr_line().

format_stdout_line(protocol, stdout)[source]

Overridable. Formats a line from stdout before it’s passed onto methods such as log_stdout_line() and process_stdout_line().

The default implementation does nothing.

Parameters:
  • protocol (ProcessProtocol) – The protocol instance which produced stdout
  • stdout (string) – A complete line from process to format and return.
Return type:

string

Returns:

This method returns nothing by default but when overridden should return a string which will be used in log_stdout_line() and process_stdout_line()

format_stderr_line(protocol, stderr)[source]

Overridable. Formats a line from stderr before it’s passed onto methods such as log_stderr_line() and process_stderr_line().

The default implementation does nothing.

Parameters:
  • protocol (ProcessProtocol) – The protocol instance which produced stderr
  • stderr (string) – A complete line from the process to format and return.
Return type:

string

Returns:

This method returns nothing by default but when overridden should return a string which will be used in log_stderr_line() and process_stderr_line()

log_stdout_line(protocol, stdout)[source]

Overridable. Called when we receive a complete line on stdout from the process.

The default implementation will use the global logging pool to log stdout to a file.

Parameters:
  • protocol (ProcessProtocol) – The protocol instance which produced stdout
  • stderr (string) – A complete line to stdout that has been formatted and is ready to log to a file.
Returns:

This method returns nothing by default and any return value produced by this method will not be consumed by other methods.

log_stderr_line(protocol, stderr)[source]

Overridable. Called when we receive a complete line on stderr from the process.

The default implementation will use the global logging pool to log stderr to a file.

Parameters:
  • protocol (ProcessProtocol) – The protocol instance which produced stderr
  • stderr (string) – A complete line to stderr that has been formatted and is ready to log to a file.
Returns:

This method returns nothing by default and any return value produced by this method will not be consumed by other methods.

process_stderr_line(protocol, stderr)[source]

Overridable. This method is called when we receive a complete line to stderr. The line will be preformatted and will already have been sent for logging.

The default implementation sends ``stderr`` and ``protocol`` to :meth:`process_stdout_line`.

Parameters:
  • protocol (ProcessProtocol) – The protocol instance which produced stderr
  • stderr (string) – A complete line to stderr after it has been formatted and logged.
Returns:

This method returns nothing by default and any return value produced by this method will not be consumed by other methods.

process_stdout_line(protocol, stdout)[source]

Overridable. This method is called when we receive a complete line to stdout. The line will be preformatted and will already have been sent for logging.

The default implementation does nothing.

Parameters:
  • protocol (ProcessProtocol) – The protocol instance which produced stderr
  • stdout (string) – A complete line to stdout after it has been formatted and logged.
Returns:

This method returns nothing by default and any return value produced by this method will not be consumed by other methods.