Source code for pyfarm.models.job

# No shebang line, this module is meant to be imported
#
# Copyright 2013 Oliver Palmer
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Job Models
==========

Models and interface classes related to jobs.

"""

from datetime import datetime

try:
    import pwd
except ImportError:  # pragma: no cover
    pwd = None

from sys import maxsize

from sqlalchemy import event, distinct, or_, and_
from sqlalchemy.orm import validates

from pyfarm.core.logger import getLogger
from pyfarm.core.enums import WorkState, DBWorkState, _WorkState, AgentState
from pyfarm.master.application import db
from pyfarm.master.config import config
from pyfarm.models.core.functions import work_columns
from pyfarm.models.core.types import JSONDict, IDTypeWork

from pyfarm.models.core.mixins import (
    ValidatePriorityMixin, WorkStateChangedMixin, ReprMixin,
    ValidateWorkStateMixin, UtilityMixins)
from pyfarm.models.statistics.task_event_count import TaskEventCount
from pyfarm.models.jobtype import JobType, JobTypeVersion
from pyfarm.models.task import Task

try:
  # pylint: disable=undefined-variable
  range_ = xrange
except NameError:
  range_ = range

__all__ = ("Job", )

logger = getLogger("models.job")


JobTagAssociation = db.Table(
    config.get("table_job_tag_assoc"),
    db.metadata,
    db.Column(
        "job_id",
        IDTypeWork,
        db.ForeignKey("%s.id" % config.get("table_job")),
        primary_key=True,
        doc="The id of the job associated with this task"),
    db.Column(
        "tag_id",
        db.Integer,
        db.ForeignKey("%s.id" % config.get("table_tag")),
        primary_key=True,
        doc="The id of the tag being associated with the job")
)


JobDependency = db.Table(
    config.get("table_job_dependency"), db.metadata,
    db.Column(
        "parentid",
        IDTypeWork,
        db.ForeignKey("%s.id" % config.get("table_job")),
        primary_key=True,
        doc="The parent job id of the job dependency"),
    db.Column(
        "childid",
        IDTypeWork,
        db.ForeignKey("%s.id" % config.get("table_job")),
        primary_key=True,
        doc="The child job id of the job dependency")
)


class JobNotifiedUser(db.Model):
    """
    Defines the table containing users to be notified of certain
    events pertaining to jobs.
    """
    __tablename__ = config.get("table_job_notified_users")

    user_id = db.Column(
        db.Integer,
        db.ForeignKey("%s.id" % config.get("table_user")),
        primary_key=True,
        doc="The id of the user to be notified")

    job_id = db.Column(
        IDTypeWork,
        db.ForeignKey("%s.id" % config.get("table_job")),
        primary_key=True,
        doc="The id of the associated job")

    on_success = db.Column(
        db.Boolean,
        nullable=False, default=True,
        doc="True if a user should be notified on successful "
            "completion of a job")

    on_failure = db.Column(
        db.Boolean,
        nullable=False, default=True,
        doc="True if a user should be notified of a job's failure")

    on_deletion = db.Column(
        db.Boolean,
        nullable=False, default=False,
        doc="True if a user should be notified on deletion of "
            "a job")

    user = db.relationship(
        "User",
        backref=db.backref("subscribed_jobs", lazy="dynamic"))


[docs]class Job(db.Model, ValidatePriorityMixin, ValidateWorkStateMixin, WorkStateChangedMixin, ReprMixin, UtilityMixins): """ Defines the attributes and environment for a job. Individual commands are kept track of by :class:`Task` """ __tablename__ = config.get("table_job") REPR_COLUMNS = ("id", "state", "project") REPR_CONVERT_COLUMN = {"state": repr} STATE_ENUM = list(WorkState) + [None] # shared work columns id, state, priority, time_submitted, time_started, time_finished = \ work_columns(None, "job.priority") jobtype_version_id = db.Column( IDTypeWork, db.ForeignKey("%s.id" % config.get("table_job_type_version")), nullable=False, doc="The foreign key which stores :class:`JobTypeVersion.id`") job_queue_id = db.Column( IDTypeWork, db.ForeignKey("%s.id" % config.get("table_job_queue")), nullable=True, doc="The foreign key which stores :class:`JobQueue.id`") job_group_id = db.Column( IDTypeWork, db.ForeignKey("%s.id" % config.get("table_job_group")), nullable=True, doc="The foreign key which stores:class:`JobGroup.id`") user_id = db.Column( db.Integer, db.ForeignKey("%s.id" % config.get("table_user")), doc="The id of the user who owns this job") minimum_agents = db.Column( db.Integer, nullable=True, doc="The scheduler will try to assign at least this number " "of agents to this job as long as it can use them, " "before any other considerations.") maximum_agents = db.Column( db.Integer, nullable=True, doc="The scheduler will never assign more than this number" "of agents to this job.") weight = db.Column( db.Integer, nullable=False, default=config.get("queue_default_weight"), doc="The weight of this job. The scheduler will distribute " "available agents between jobs and job queues in the " "same queue in proportion to their weights.") title = db.Column( db.String(config.get("jobtitle_max_length")), nullable=False, doc="The title of this job") notes = db.Column( db.Text, default="", doc="Notes that are provided on submission or added after " "the fact. This column is only provided for human " "consumption, is not scanned, indexed, or used when " "searching") output_link = db.Column( db.Text, nullable=True, doc="An optional link to a URI where this job's output can " "be viewed.") # task data by = db.Column( db.Numeric(10, 4), default=1, doc="The number of frames to count by between `start` and " "`end`. This column may also sometimes be referred to " "as 'step' by other software.") num_tiles = db.Column( db.Integer, nullable=True, doc="How many regions to split frames into for rendering.") batch = db.Column( db.Integer, default=config.get("job_default_batch"), doc="Number of tasks to run on a single agent at once. Depending " "on the capabilities of the software being run this will " "either cause a single process to execute on the agent " "or multiple processes one after the other.") requeue = db.Column( db.Integer, default=config.get("job_requeue_default"), doc="Number of times to requeue failed tasks " "" ".. csv-table:: **Special Values**" " :header: Value, Result" " :widths: 10, 50" "" " 0, never requeue failed tasks" " -1, requeue failed tasks indefinitely") cpus = db.Column( db.Integer, default=config.get("job_default_cpus"), doc="Number of cpus or threads each task should consume on" "each agent. Depending on the job type being executed " "this may result in additional cpu consumption, longer " "wait times in the queue (2 cpus means 2 'fewer' cpus on " "an agent), or all of the above." "" ".. csv-table:: **Special Values**" " :header: Value, Result" " :widths: 10, 50" "" " 0, minimum number of cpu resources not required " " -1, agent cpu is exclusive for a task from this job") ram = db.Column( db.Integer, default=config.get("job_default_ram"), doc="Amount of ram a task from this job will require to be " "free in order to run. A task exceeding this value will " "not result in any special behavior." "" ".. csv-table:: **Special Values**" " :header: Value, Result" " :widths: 10, 50" "" "0, minimum amount of free ram not required" "-1, agent ram is exclusive for a task from this job") ram_warning = db.Column( db.Integer, nullable=True, doc="Amount of ram used by a task before a warning raised. " "A task exceeding this value will not cause any work " "stopping behavior.") ram_max = db.Column( db.Integer, nullable=True, doc="Maximum amount of ram a task is allowed to consume on " "an agent." "" ".. warning:: " " If set, the task will be **terminated** if the ram in " " use by the process exceeds this value.") hidden = db.Column( db.Boolean, default=False, nullable=False, doc="If True, keep the job hidden from the queue and web " "ui. This is typically set to True if you either want " "to save a job for later viewing or if the jobs data " "is being populated in a deferred manner.") environ = db.Column( JSONDict, doc="Dictionary containing information about the environment " "in which the job will execute. " "" ".. note::" " Changes made directly to this object are **not** " " applied to the session.") data = db.Column( JSONDict, doc="Json blob containing additional data for a job " "" ".. note:: " " Changes made directly to this object are **not** " " applied to the session.") to_be_deleted = db.Column( db.Boolean, nullable=False, default=False, doc="If true, the master will stop all running tasks for " "this job and then delete it.") completion_notify_sent = db.Column( db.Boolean, nullable=False, default=False, doc="Whether or not the finish notification mail has already " "been sent out.") autodelete_time = db.Column( db.Integer, nullable=True, default=None, doc="If not None, this job will be automatically deleted this " "number of seconds after it finishes.") # # Relationships # queue = db.relationship( "JobQueue", backref=db.backref("jobs", lazy="dynamic"), doc="The queue for this job") group = db.relationship( "JobGroup", backref=db.backref("jobs", lazy="dynamic"), doc="The job group this job belongs to") user = db.relationship( "User", backref=db.backref("jobs", lazy="dynamic"), doc="The owner of this job") # self-referential many-to-many relationship parents = db.relationship( "Job", secondary=JobDependency, primaryjoin=id==JobDependency.c.childid, secondaryjoin=id==JobDependency.c.parentid, backref="children") notified_users = db.relationship( "JobNotifiedUser", lazy="dynamic", backref=db.backref("job"), cascade="all,delete") tasks_queued = db.relationship( "Task", lazy="dynamic", primaryjoin="(Task.state == None) & " "(Task.job_id == Job.id)", doc="Relationship between this job and any :class:`Task` " "objects which are queued.") tasks_running = db.relationship( "Task", lazy="dynamic", primaryjoin="(Task.state == %s) & " "(Task.job_id == Job.id)" % DBWorkState.RUNNING, doc="Relationship between this job and any :class:`Task` " "objects which are running.") tasks_done = db.relationship("Task", lazy="dynamic", primaryjoin="(Task.state == %s) & " "(Task.job_id == Job.id)" % DBWorkState.DONE, doc="Relationship between this job and any :class:`Task` objects " "which are done.") tasks_failed = db.relationship("Task", lazy="dynamic", primaryjoin="(Task.state == %s) & " "(Task.job_id == Job.id)" % DBWorkState.FAILED, doc="Relationship between this job and any :class:`Task` objects " "which have failed.") # resource relationships tags = db.relationship( "Tag", backref="jobs", lazy="dynamic", secondary=JobTagAssociation, doc="Relationship between this job and :class:`.Tag` objects")
[docs] def paused(self): return self.state == WorkState.PAUSED
[docs] def update_state(self): # Import here instead of at the top of the file to avoid a circular # import from pyfarm.scheduler.tasks import send_job_completion_mail from pyfarm.models.agent import Agent num_active_tasks = db.session.query(Task).\ filter(Task.job == self, or_(Task.state == None, and_( Task.state != WorkState.DONE, Task.state != WorkState.FAILED))).count() if num_active_tasks == 0: num_failed_tasks = db.session.query(Task).filter( Task.job == self, Task.state == WorkState.FAILED).count() if num_failed_tasks == 0: if self.state != _WorkState.DONE: logger.info("Job %r (id %s): state transition %r -> 'done'", self.title, self.id, self.state) self.state = WorkState.DONE send_job_completion_mail.apply_async(args=[self.id, True], countdown=5) else: if self.state != _WorkState.FAILED: logger.info("Job %r (id %s): state transition %r -> " "'failed'", self.title, self.id, self.state) self.state = WorkState.FAILED send_job_completion_mail.apply_async(args=[self.id, False], countdown=5) db.session.add(self) elif self.state != _WorkState.PAUSED: num_running_tasks = db.session.query(Task).\ filter(Task.job == self, Task.agent_id != None, Task.agent.has(and_(Agent.state != AgentState.OFFLINE, Agent.state != AgentState.DISABLED)), or_( Task.state == WorkState.RUNNING, Task.state == None)).count() if num_running_tasks == 0: logger.debug("No running tasks in job %s (id %s), setting it " "to queued", self.title, self.id) self.state = None db.session.add(self) elif self.state != _WorkState.RUNNING: self.state = WorkState.RUNNING # Methods used by the scheduler
[docs] def num_assigned_agents(self): # Import here instead of at the top of the file to avoid circular import from pyfarm.models.agent import Agent # Optimization: Blindly assume that we have no agents assigned if not # running if self.state != _WorkState.RUNNING: return 0 try: return self.assigned_agents_count except AttributeError: self.assigned_agents_count =\ db.session.query(distinct(Task.agent_id)).\ filter(Task.job == self, Task.agent_id != None, or_(Task.state == None, Task.state == WorkState.RUNNING), Task.agent.has( and_(Agent.state != AgentState.OFFLINE, Agent.state != AgentState.DISABLED)))\ .count() return self.assigned_agents_count
[docs] def clear_assigned_counts(self): try: del self.assigned_agents_count except AttributeError: pass if self.queue: self.queue.clear_assigned_counts()
[docs] def can_use_more_agents(self): # Import here instead of at the top of the file to avoid circular import from pyfarm.models.agent import Agent unassigned_tasks = Task.query.filter( Task.job == self, or_(Task.state == None, ~Task.state.in_([WorkState.DONE, WorkState.FAILED])), or_(Task.agent == None, Task.agent.has(Agent.state.in_( [AgentState.OFFLINE, AgentState.DISABLED])))).count() return unassigned_tasks > 0
[docs] def get_batch(self, agent): # Import here instead of at the top of the file to avoid circular import from pyfarm.models.agent import Agent tasks_query = Task.query.filter( Task.job == self, ~Task.failed_in_agents.any(id=agent.id), or_(Task.state == None, ~Task.state.in_([WorkState.DONE, WorkState.FAILED])), or_(Task.agent == None, Task.agent.has(Agent.state.in_( [AgentState.OFFLINE, AgentState.DISABLED])))).\ order_by("frame asc, tile asc") batch = [] for task in tasks_query: if (len(batch) < self.batch and len(batch) < (self.jobtype_version.max_batch or maxsize) and (not self.jobtype_version.batch_contiguous or (len(batch) == 0 or batch[-1].frame + self.by == task.frame))): batch.append(task) return batch
[docs] def alter_frame_range(self, start, end, by): # We have to import this down here instead of at the top to break a # circular dependency between the modules from pyfarm.scheduler.tasks import delete_task if end < start: raise ValueError("`end` must be greater than or equal to `start`") self.by = by required_frames = [] current_frame = start while current_frame <= end: required_frames.append(current_frame) current_frame += by existing_tasks = Task.query.filter_by(job=self).all() frames_to_create = required_frames num_created = 0 for task in existing_tasks: if task.frame not in required_frames: delete_task.delay(task.id) else: frames_to_create.remove(task.frame) for frame in frames_to_create: if self.num_tiles: for tile in range_(self.num_tiles - 1): num_created += 1 task = Task() task.job = self task.frame = frame task.tile = tile task.priority = self.priority db.session.add(task) else: num_created += 1 task = Task() task.job = self task.frame = frame task.priority = self.priority db.session.add(task) if frames_to_create: if self.state != WorkState.RUNNING: self.state = None if config.get("enable_statistics"): task_event_count = TaskEventCount(num_new=num_created, job_queue_id=self.job_queue_id) task_event_count.time_start = datetime.utcnow() task_event_count.time_end = datetime.utcnow() db.session.add(task_event_count)
[docs] def rerun(self): """ Makes this job rerun all its task. Tasks that are currently running are left untouched. """ num_restarted = 0 for task in self.tasks: if task.state != _WorkState.RUNNING and task.state is not None: task.state = None task.agent = None task.failures = 0 db.session.add(task) num_restarted += 1 self.completion_notify_sent = False self.update_state() db.session.add(self) if config.get("enable_statistics"): task_event_count = TaskEventCount(job_queue_id=self.job_queue_id, num_restarted=num_restarted) task_event_count.time_start = datetime.utcnow() task_event_count.time_end = datetime.utcnow() db.session.add(task_event_count) db.session.commit() for child in self.children: child.rerun()
[docs] def rerun_failed(self): """ Makes this job rerun all its failed tasks. Tasks that are done or are currently running are left untouched """ num_restarted = 0 for task in self.tasks: if task.state == _WorkState.FAILED: task.state = None task.agent = None task.failures = 0 db.session.add(task) num_restarted += 1 self.completion_notify_sent = False self.update_state() db.session.add(self) if config.get("enable_statistics"): task_event_count = TaskEventCount(job_queue_id=self.job_queue_id, num_restarted=num_restarted) task_event_count.time_start = datetime.utcnow() task_event_count.time_end = datetime.utcnow() db.session.commit() for child in self.children: child.rerun_failed()
@validates("ram", "cpus")
[docs] def validate_resource(self, key, value): """ Validation that ensures that the value provided for either :attr:`.ram` or :attr:`.cpus` is a valid value with a given range """ assert isinstance(value, int), "%s must be an integer" % key min_value = config.get("agent_min_%s" % key) max_value = config.get("agent_max_%s" % key) # check the provided input if min_value > value or value > max_value: msg = "value for `%s` must be between " % key msg += "%s and %s" % (min_value, max_value) raise ValueError(msg) return value
@validates("progress")
[docs] def validate_progress(self, key, value): if value < 0.0 or value > 1.0: raise ValueError("Progress must be between 0.0 and 1.0")
event.listen(Job.state, "set", Job.state_changed)