pyfarm.jobtypes.core.jobtype module¶
Job Type Core¶
This module contains the core job type from which all
other job types are built. All other job types must
inherit from the JobType
class in this modle.
-
class
pyfarm.jobtypes.core.jobtype.
CommandData
(command, *arguments, **kwargs)[source]¶ Bases:
object
Stores data to be returned by
JobType.get_command_data()
. Instances of this class are alosed used byJobType.spawn_process_inputs()
at execution time.Note
This class does not perform any key of path resolution by default. It is assumed this has already been done using something like
JobType.map_path()
Parameters: - command (string) – The command that will be executed when the process runs.
- arguments – Any additional arguments to be passed along to the command being launched.
- env (dict) – If provided, this will be the environment to launch the command
with. If this value is not provided then a default environment
will be setup using
set_default_environment()
whenJobType.start()
is called.JobType.start()
itself will useJobType.set_default_environment()
to generate the default environment. - cwd (string) – The working directory the process should execute in. If not provided the process will execute in whatever the directory the agent is running inside of.
- user (string or integer) – The username or user id that the process should run as. On Windows
this keyword is ignored and on Linux this requires the agent to be
executing as root. The value provided here will be run through
JobType.get_uid_gid()
to map the incoming value to an integer. - group (string or integer) – Same as
user
above except this sets the group the process will execute. - id – An arbitrary id to associate with the resulting process protocol. This can help identify
-
validate
()[source]¶ Validates that the attributes on an instance of this class contain values we expect. This method is called externally by the job type in
JobType.start()
and may correct some instance attributes.
-
class
pyfarm.jobtypes.core.jobtype.
JobType
(assignment)[source]¶ Bases:
pyfarm.jobtypes.core.internals.Cache
,pyfarm.jobtypes.core.internals.System
,pyfarm.jobtypes.core.internals.Process
,pyfarm.jobtypes.core.internals.TypeChecks
Base class for all other job types. This class is intended to abstract away many of the asynchronous necessary to run a job type on an agent.
Variables: - PERSISTENT_JOB_DATA (set) – A dictionary of job ids and data that
prepare_for_job()
has produced. This is used during__init__()
to setpersistent_job_data
. - COMMAND_DATA_CLASS (CommandData) – If you need to provide your own class to represent command data you should override this attribute. This attribute is used by by methods within this class to do type checking.
- PROCESS_PROTOCOL (ProcessProtocol) – The protocol object used to communicate with each process spawned
- ASSIGNMENT_SCHEMA (voluptuous.Schema) – The schema of an assignment. This object helps to validate the incoming assignment to ensure it’s not missing any data.
- uuid (UUID) – This is the unique identifier for the job type instance and is automatically set when the class is instanced. This is used by the agent to track assignments and job type instances.
- finished_tasks (set) – A set of tasks that have had their state changed to finished through
set_task_state()
. At the start of the assignment, this list is empty. - failed_tasks (set) – This is analogous to
finished_tasks
except it contains failed tasks only.
Parameters: assignment (dict) – This attribute is a dictionary the keys “job”, “jobtype” and “tasks”. self.assignment[“job”] is itself a dict with keys “id”, “title”, “data”, “environ” and “by”. The most important of those is usually “data”, which is the dict specified when submitting the job and contains jobtype specific data. self.assignment[“tasks”] is a list of dicts representing the tasks in the current assignment. Each of these dicts has the keys “id” and “frame”. The list is ordered by frame number.
-
PERSISTENT_JOB_DATA
= {}¶
-
COMMAND_DATA
¶ alias of
CommandData
-
PROCESS_PROTOCOL
¶ alias of
ProcessProtocol
-
ASSIGNMENT_SCHEMA
= <Schema({'job': <Schema({'cpus': Any([<type 'int'>, <type 'long'>]), 'ram': Any([<type 'int'>, <type 'long'>]), 'ram_max': Any([Any([<type 'int'>, <type 'long'>]), <type 'NoneType'>]), 'title': Any([<type 'str'>, <type 'unicode'>]), 'notes': Any([<type 'str'>, <type 'unicode'>]), 'num_tiles': Any([Any([<type 'int'>, <type 'long'>]), <type 'NoneType'>]), 'batch': Any([<type 'int'>, <type 'long'>]), 'by': Any([<type 'int'>, <type 'long'>, <type 'float'>, <class 'decimal.Decimal'>]), 'priority': Any([<type 'int'>, <type 'long'>]), 'notified_users': [<Schema({'username': Any([<type 'str'>, <type 'unicode'>]), 'on_failure': <type 'bool'>, 'on_success': <type 'bool'>, 'on_deletion': <type 'bool'>}, extra=PREVENT_EXTRA, required=False) object>], 'tags': [Any([<type 'str'>, <type 'unicode'>])], 'environ': <function validate_environment>, 'agent_id': Any([<type 'int'>, <type 'long'>]), 'job_group': Any([<type 'str'>, <type 'unicode'>]), 'job_group_id': Any([<type 'int'>, <type 'long'>]), 'data': <type 'dict'>, 'id': Any([<type 'int'>, <type 'long'>]), 'ram_warning': Any([Any([<type 'int'>, <type 'long'>]), <type 'NoneType'>]), 'user': Any([<type 'str'>, <type 'unicode'>])}, extra=PREVENT_EXTRA, required=False) object>, 'tasks': <function <lambda>>, 'id': <function validate_uuid>, 'jobtype': <Schema({'version': Any([<type 'int'>, <type 'long'>]), 'name': Any([<type 'str'>, <type 'unicode'>])}, extra=PREVENT_EXTRA, required=False) object>}, extra=PREVENT_EXTRA, required=False) object>¶
-
classmethod
load
(assignment)[source]¶ Given an assignment this class method will load the job type either from cache or from the master.
Parameters: assignment (dict) – The dictionary containing the assignment. This will be passed into an instance of ASSIGNMENT_SCHEMA
to validate that the internal data is correct.
-
classmethod
prepare_for_job
(job)[source]¶ Note
This method is not yet implemented
Called before a job executes on the agent first the first time. Whatever this classmethod returns will be available as
persistent_job_data
on the job type instance.Parameters: job (int) – The job id which prepare_for_job is being run for By default this method does nothing.
-
classmethod
cleanup_after_job
(persistent_data)[source]¶ Note
This method is not yet implemented
This classmethod will be called after the last assignment from a given job has finished on this node.
Parameters: persistent_data – The persistent data that prepare_for_job()
produced. The value for this data may beNone
ifprepare_for_job()
returned None or was not implemented.
-
classmethod
spawn_persistent_process
(job, command_data)[source]¶ Note
This method is not yet implemented
Starts one child process using an instance of
CommandData
or similiar input. This process is intended to keep running until the last task from this job has been processed, potentially spanning more than one assignment. If the spawned process is still running then we’ll cleanup the process aftercleanup_after_job()
-
node
()[source]¶ Returns live information about this host, the operating system, hardware, and several other pieces of global data which is useful inside of the job type. Currently data from this method includes:
- master_api - The base url the agent is using to communicate with the master.
- hostname - The hostname as reported to the master.
- agent_id - The unique identifier used to identify. this agent to the master.
- id - The database id of the agent as given to us by the master on startup of the agent.
- cpus - The number of CPUs reported to the master
- ram - The amount of ram reported to the master.
- total_ram - The amount of ram, in megabytes, that’s installed on the system regardless of what was reported to the master.
- free_ram - How much ram, in megabytes, is free for the entire system.
- consumed_ram - How much ram, in megabytes, is being consumed by the agent and any processes it has launched.
- admin - Set to True if the current user is an administrator or ‘root’.
- user - The username of the current user.
- case_sensitive_files - True if the file system is case sensitive.
- case_sensitive_env - True if environment variables are case sensitive.
- machine_architecture - The architecture of the machine the agent is running on. This will return 32 or 64.
- operating_system - The operating system the agent is executing on. This value will be ‘linux’, ‘mac’ or ‘windows’. In rare circumstances this could also be ‘other’.
Raises: KeyError – Raised if one or more keys are not present in the global configuration object.
This should rarely if ever be a problem under normal circumstances. The exception to this rule is in unittests or standalone libraries with the global config object may not be populated.
-
tempdir
(new=False, remove_on_finish=True)[source]¶ Returns a temporary directory to be used within a job type. By default once called the directory will be created on disk and returned from this method.
Calling this method multiple times will return the same directory instead of creating a new directory unless
new
is set to True.Parameters:
-
get_uid_gid
(user, group)[source]¶ Overridable. This method to convert a named user and group into their respective user and group ids.
-
get_environment
()[source]¶ Constructs an environment dictionary that can be used when a process is spawned by a job type.
-
get_command_list
(commands)[source]¶ Convert a list of commands to a tuple with any environment variables expanded.
Parameters: commands (list) – A list of strings to expand. Each entry in list will be passed into and returned from expandvars()
.Raises: TypeError – Raised of commands
is not a list or tuple.Return type: tuple Returns: Returns the expanded list of commands.
-
get_csvlog_path
(protocol_uuid, create_time=None)[source]¶ Returns the path to the comma separated value (csv) log file. The agent stores logs from processes in a csv format so we can store additional information such as a timestamp, line number, stdout/stderr identification and the the log message itself.
Note
This method should not attempt to create the parent directories of the resulting path. This is already handled by the logger pool in a non-blocking fashion.
Parameters: - protocol_uuid (uuid.UUID) – The UUID of the job type’s protocol instance.
- create_time (datetime.datetime) – If provided then the create time of the log file will equal
this value. Otherwise this method will use the current UTC
time for
create_time
Raises: TypeError – Raised if
protocl_uuid
orcreate_time
are not the correct types.
-
get_command_data
()[source]¶ Overridable. This method returns the arguments necessary for executing a command. For job types which execute a single process per assignment, this is the most important method to implement.
Warning
This method should not be used when this jobtype requires more than one process for one assignment and may not get called at all if
start()
was overridden.The default implementation does nothing. When overriding this method you should return an instance of
COMMAND_DATA_CLASS
:return self.COMMAND_DATA( "/usr/bin/python", "-c", "print 'hello world'", env={"FOO": "bar"}, user="bob")
See
CommandData
’s class documentation for a full description of possible arguments.Please note however the default command data class,
CommandData
does not perform path expansion. So instead you have to handle this yourself withmap_path()
.
-
map_path
(path)[source]¶ Takes a string argument. Translates a given path for any OS to what it should be on this particular node. This does not communicate with the master.
Parameters: path (string) – The path to translate to an OS specific path for this node. Raises: TypeError – Raised if path
is not a string.
-
expandvars
(value, environment=None, expand=None)[source]¶ Expands variables inside of a string using an environment.
Parameters: - value (string) – The path to expand.
- environment (dict) – The environment to use for expanding
value
. If this value is None (the default) then we’ll useget_environment()
to build this value. - expand (bool) – When not provided we use the
jobtype_expandvars
configuration value to set the default. When this value is True we’ll perform environment variable expansion otherwise we returnvalue
untouched.
-
start
()[source]¶ This method is called when the job type should start working. Depending on the job type’s implementation this will prepare and start one more more processes.
-
stop
(assignment_failed=False, avoid_reassignment=False, error=None, signal='KILL')[source]¶ This method is called when the job type should stop running. This will terminate any processes associated with this job type and also inform the master of any state changes to an associated task or tasks.
Parameters: - assignment_failed (boolean) – Whether this means the assignment has genuinely failed. By default, we assume that stopping this assignment was the result of deliberate user action (like stopping the job or shutting down the agent), and won’t treat it as a failed assignment.
- avoid_reassignment (boolean) – If set to true, the agent will add itself to the lists of agents that failed the tasks in this assignment. Can be useful when we want to return the assignment to the master without increasing its failures counter, but still don’t want it to be reassigned to us.
- error (string) – If the assignment has failed, this string is upload as last_error for the failed tasks.
- signal (string) – The signal to send the any running processes. Valid options are KILL, TERM or INT.
-
format_error
(error)[source]¶ Takes some kind of object, typically an instance of
Exception
or :class`Failure`, and produces a human readable string.Return type: string or None Returns: Returns a string if we know how to format the error. Otherwise this method returns None
and logs an error.
-
set_states
(tasks, state, error=None)[source]¶ Wrapper around
set_state()
that that allows you to the state on the master for multiple tasks at once.
-
add_self_to_failed_on_agents
(*args, **kwargs)[source]¶ Adds this agent to the list of agents that failed to execute the given task, without explicitly setting this task to failed.
Parameters: task (dict) – The dictionary containing the task
-
set_task_started_now
(*args, **kwargs)[source]¶ Sets the time_started of the given task to the current time on the master.
This method is useful for batched tasks, where the actual work on a single task may start much later than the work the assignment as a whole.
Parameters: task (dict) – The dictionary containing the task we’re changing the start time for.
-
set_task_state
(*args, **kwargs)[source]¶ Sets the state of the given task
Parameters: - task (dict) – The dictionary containing the task we’re changing the state for.
- state (string) – The state to change
task
to - error (string,
Exception
) – If the state is changing to ‘error’ then also set thelast_error
column. Any exception instance that is passed to this keyword will be passed throughformat_exception()
first to format it.
-
get_local_task_state
(task_id)[source]¶ Returns None if the state of this task has not been changed locally since this assignment has started. This method does not communicate with the master.
-
is_successful
(protocol, reason)[source]¶ Overridable. This method that determines whether the process referred to by a protocol instance has exited successfully.
The default implementation returns
True
if the process’s return code was 0 andFalse` in all other cases. If you need to modify this behavior please be aware that ``reason
may be an integer or an instance oftwisted.internet.error.ProcessTerminated
if the process terminated without errors or an instance oftwisted.python.failure.Failure
if there were problems.Raises: NotImplementedError – Raised if we encounter a condition that the base implementation is unable to handle.
-
before_start
()[source]¶ Overridable. This method called directly before
start()
itself is called.The default implementation does nothing and values returned from this method are ignored.
-
before_spawn_process
(command, protocol)[source]¶ Overridable. This method called directly before a process is spawned.
By default this method does nothing except log information about the command we’re about to launch both the the agent’s log and to the log file on disk.
Parameters: - command (CommandData) – An instance of
CommandData
which contains the environment to use, command and arguments. Modifications to this object will be applied to the process being spawned. - protocol (ProcessProtocol) – An instance of
pyfarm.jobtypes.core.process.ProcessProtocol
which contains the protocol used to communicate between the process and this job type.
- command (CommandData) – An instance of
-
process_stopped
(protocol, reason)[source]¶ Overridable. This method called when a child process stopped running.
The default implementation will mark all tasks in the current assignment as done or failed of there was at least one failed process.
-
process_started
(protocol)[source]¶ Overridable. This method is called when a child process started running.
The default implementation will mark all tasks in the current assignment as running.
-
process_output
(protocol, output, line_fragments, line_handler)[source]¶ This is a mid-level method which takes output from a process protocol then splits and processes it to ensure we pass complete output lines to the other methods.
Implementors who wish to process the output line by line should override
preprocess_stdout_line()
,preprocess_stdout_line()
,process_stdout_line()
orprocess_stderr_line()
instead. This method is a glue method between other parts of the job type and should only be overridden if there’s a problem or you want to change how lines are split.Parameters: - protocol (
ProcessProtocol
) – The protocol instance which producedoutput
- output (string) – The blob of text or line produced
- line_fragments (dict) – The line fragment dictionary containing individual line
fragments. This will be either
self._stdout_line_fragments
orself._stderr_line_fragments
. - line_handler (callable) – The function to handle any lines produced. This will be either
handle_stdout_line()
orhandle_stderr_line()
Returns: This method returns nothing by default and any return value produced by this method will not be consumed by other methods.
- protocol (
-
handle_stdout_line
(protocol, stdout)[source]¶ Takes a
ProcessProtocol
instance andstdout
line produced byprocess_output()
and runs it through all the steps necessary to preprocess, format, log and handle the line.The default implementation will run
stdout
through several methods in order:Warning
This method is not private however it’s advisable to override the methods above instead of this one. Unlike this method, which is more generalized and invokes several other methods, the above provide more targeted functionality.
Parameters: - protocol (
ProcessProtocol
) – The protocol instance which producedstdout
- stderr (string) – A complete line to
stderr
being emitted by the process
Returns: This method returns nothing by default and any return value produced by this method will not be consumed by other methods.
- protocol (
-
handle_stderr_line
(protocol, stderr)[source]¶ Overridable. Takes a
ProcessProtocol
instance andstderr
produced byprocess_output()
and runs it through all the steps necessary to preprocess, format, log and handle the line.The default implementation will run
stderr
through several methods in order:Warning
This method is overridable however it’s advisable to override the methods above instead. Unlike this method, which is more generalized and invokes several other methods, the above provide more targeted functionality.
Parameters: - protocol (
ProcessProtocol
) – The protocol instance which producedstdout
- stderr (string) – A complete line to
stderr
being emitted by the process
Returns: This method returns nothing by default and any return value produced by this method will not be consumed by other methods.
- protocol (
-
preprocess_stdout_line
(protocol, stdout)[source]¶ Overridable. Provides the ability to manipulate
stdout
orprotocol
before it’s passed into any other line handling methods.The default implementation does nothing.
Parameters: - protocol (
ProcessProtocol
) – The protocol instance which producedstdout
- stderr (string) – A complete line to
stdout
before any formatting or logging has occurred.
Return type: Returns: This method returns nothing by default but when overridden should return a string which will be used in line handling methods such as
format_stdout_line()
,log_stdout_line()
andprocess_stdout_line()
.- protocol (
-
preprocess_stderr_line
(protocol, stderr)[source]¶ Overridable. Formats a line from
stdout
before it’s passed onto methods such aslog_stdout_line()
andprocess_stdout_line()
.The default implementation does nothing.
Parameters: - protocol (
ProcessProtocol
) – The protocol instance which producedstderr
- stderr (string) – A complete line to
stderr
before any formatting or logging has occurred.
Return type: Returns: This method returns nothing by default but when overridden should return a string which will be used in line handling methods such as
format_stderr_line()
,log_stderr_line()
andprocess_stderr_line()
.- protocol (
-
format_stdout_line
(protocol, stdout)[source]¶ Overridable. Formats a line from
stdout
before it’s passed onto methods such aslog_stdout_line()
andprocess_stdout_line()
.The default implementation does nothing.
Parameters: - protocol (
ProcessProtocol
) – The protocol instance which producedstdout
- stdout (string) – A complete line from process to format and return.
Return type: Returns: This method returns nothing by default but when overridden should return a string which will be used in
log_stdout_line()
andprocess_stdout_line()
- protocol (
-
format_stderr_line
(protocol, stderr)[source]¶ Overridable. Formats a line from
stderr
before it’s passed onto methods such aslog_stderr_line()
andprocess_stderr_line()
.The default implementation does nothing.
Parameters: - protocol (
ProcessProtocol
) – The protocol instance which producedstderr
- stderr (string) – A complete line from the process to format and return.
Return type: Returns: This method returns nothing by default but when overridden should return a string which will be used in
log_stderr_line()
andprocess_stderr_line()
- protocol (
-
log_stdout_line
(protocol, stdout)[source]¶ Overridable. Called when we receive a complete line on stdout from the process.
The default implementation will use the global logging pool to log
stdout
to a file.Parameters: - protocol (
ProcessProtocol
) – The protocol instance which producedstdout
- stderr (string) – A complete line to
stdout
that has been formatted and is ready to log to a file.
Returns: This method returns nothing by default and any return value produced by this method will not be consumed by other methods.
- protocol (
-
log_stderr_line
(protocol, stderr)[source]¶ Overridable. Called when we receive a complete line on stderr from the process.
The default implementation will use the global logging pool to log
stderr
to a file.Parameters: - protocol (
ProcessProtocol
) – The protocol instance which producedstderr
- stderr (string) – A complete line to
stderr
that has been formatted and is ready to log to a file.
Returns: This method returns nothing by default and any return value produced by this method will not be consumed by other methods.
- protocol (
-
process_stderr_line
(protocol, stderr)[source]¶ Overridable. This method is called when we receive a complete line to
stderr
. The line will be preformatted and will already have been sent for logging.The default implementation sends ``stderr`` and ``protocol`` to :meth:`process_stdout_line`.
Parameters: - protocol (
ProcessProtocol
) – The protocol instance which producedstderr
- stderr (string) – A complete line to
stderr
after it has been formatted and logged.
Returns: This method returns nothing by default and any return value produced by this method will not be consumed by other methods.
- protocol (
-
process_stdout_line
(protocol, stdout)[source]¶ Overridable. This method is called when we receive a complete line to
stdout
. The line will be preformatted and will already have been sent for logging.The default implementation does nothing.
Parameters: - protocol (
ProcessProtocol
) – The protocol instance which producedstderr
- stdout (string) – A complete line to
stdout
after it has been formatted and logged.
Returns: This method returns nothing by default and any return value produced by this method will not be consumed by other methods.
- protocol (
- PERSISTENT_JOB_DATA (set) – A dictionary of job ids and data that