Source code for pyfarm.models.task

# No shebang line, this module is meant to be imported
#
# Copyright 2013 Oliver Palmer
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Task Models
===========

Models and interface classes related to tasks
"""

from functools import partial
from datetime import datetime

from sqlalchemy import event

from pyfarm.core.logger import getLogger
from pyfarm.core.enums import WorkState, _WorkState
from pyfarm.master.application import db
from pyfarm.master.config import config
from pyfarm.models.core.types import IDTypeAgent, IDTypeWork
from pyfarm.models.core.functions import work_columns, repr_enum
from pyfarm.models.core.mixins import (
    ValidatePriorityMixin, UtilityMixins, ReprMixin, ValidateWorkStateMixin)

__all__ = ("Task", )

logger = getLogger("models.task")


[docs]class Task(db.Model, ValidatePriorityMixin, ValidateWorkStateMixin, UtilityMixins, ReprMixin): """ Defines a task which a child of a :class:`Job`. This table represents rows which contain the individual work unit(s) for a job. """ __tablename__ = config.get("table_task") STATE_ENUM = list(WorkState) + [None] STATE_DEFAULT = None REPR_COLUMNS = ("id", "state", "frame", "project") REPR_CONVERT_COLUMN = {"state": partial(repr_enum, enum=STATE_ENUM)} # shared work columns id, state, priority, time_submitted, time_started, time_finished = \ work_columns(STATE_DEFAULT, "job.priority") agent_id = db.Column( IDTypeAgent, db.ForeignKey("%s.id" % config.get("table_agent")), doc="Foreign key which stores :attr:`Job.id`") job_id = db.Column( IDTypeWork, db.ForeignKey("%s.id" % config.get("table_job")), nullable=False, doc="Foreign key which stores :attr:`Job.id`") hidden = db.Column( db.Boolean, default=False, doc="When True this hides the task from queue and web ui") attempts = db.Column( db.Integer, nullable=False, default=0, doc="The number of attempts which have been made on this " "task. This value is auto incremented when " "``state`` changes to a value synonymous with a " "running state.") failures = db.Column( db.Integer, nullable=False, default=0, doc="The number of times this task has failed. This value " "is auto incremented when :attr:`state` changes to a " "value synonymous with a failed state.") frame = db.Column( db.Numeric(10, 4), nullable=False, doc="The frame this :class:`Task` will be executing.") tile = db.Column( db.Integer, nullable=True, doc="When using tiled rendering, the number of the tile this task " "refers to. The jobtype will have to translate that into an " "actual image region. This will be NULL if the job doesn't use " "tiled rendering.") last_error = db.Column( db.UnicodeText, nullable=True, doc="This column may be set when an error is " "present. The agent typically sets this " "column when the job type either can't or " "won't run a given task. This column will " "be cleared whenever the task's state is " "returned to a non-error state.") sent_to_agent = db.Column( db.Boolean, default=False, nullable=False, doc="Whether this task was already sent to the assigned agent") progress = db.Column( db.Float, default=0.0, doc="The progress for this task, as a value between " "0.0 and 1.0. Used purely for display purposes.") # # Relationships # job = db.relationship( "Job", backref=db.backref("tasks", lazy="dynamic"), doc="relationship attribute which retrieves the " "associated job for this task")
[docs] def running(self): return self.state == WorkState.RUNNING
[docs] def failed(self): return self.state == WorkState.FAILED
@staticmethod
[docs] def increment_attempts(target, new_value, old_value, initiator): if new_value is not None and new_value != old_value: target.attempts += 1
@staticmethod
[docs] def log_assign_change(target, new_value, old_value, initiator): logger.debug("Agent change for task %s: old %s new: %s", target.id, old_value, new_value)
@staticmethod
[docs] def update_failures(target, new_value, old_value, initiator): if new_value == WorkState.FAILED and new_value != old_value: target.failures += 1 if target not in target.agent.failed_tasks: target.agent.failed_tasks.append(target)
@staticmethod
[docs] def set_progress_on_success(target, new_value, old_value, initiator): if new_value == WorkState.DONE: target.progress = 1.0
@staticmethod
[docs] def update_agent_on_success(target, new_value, old_value, initiator): if new_value == WorkState.DONE: agent = target.agent if agent: agent.last_success_on = datetime.utcnow() db.session.add(agent)
@staticmethod
[docs] def reset_agent_if_failed_and_retry( target, new_value, old_value, initiator): # There's nothing else we should do here if # we don't have a parent job. This can happen if you're # testing or a job is disconnected from a task. if target.job is None: return new_value if (new_value == WorkState.FAILED and target.failures <= target.job.requeue): logger.info("Failed task %s will be retried", target.id) target.agent_id = None return None else: return new_value
@staticmethod
[docs] def clear_error_state(target, new_value, old_value, initiator): """ Sets ``last_error`` column to ``None`` if the task's state is 'done' """ if new_value == WorkState.DONE and target.last_error is not None: target.last_error = None
@staticmethod
[docs] def set_times(target, new_value, old_value, initiator): """update the datetime objects depending on the new value""" if (new_value == _WorkState.RUNNING and (old_value not in [_WorkState.RUNNING, _WorkState.PAUSED] or target.time_started == None)): if not target.job.jobtype_version.no_automatic_start_time: target.time_started = datetime.utcnow() target.time_finished = None elif (new_value in (_WorkState.DONE, _WorkState.FAILED) and not target.time_finished): target.time_finished = datetime.utcnow()
@staticmethod
[docs] def reset_finished_time(target, new_value, old_value, initiator): if (target.state not in (_WorkState.DONE, _WorkState.FAILED) or new_value is None): target.time_finished = None elif new_value is not None: if target.time_finished is not None: target.time_finished = max(target.time_finished, new_value) else: target.time_finished = max(new_value, datetime.utcnow())
event.listen(Task.state, "set", Task.clear_error_state) event.listen(Task.state, "set", Task.set_times) event.listen(Task.state, "set", Task.update_failures) event.listen(Task.state, "set", Task.set_progress_on_success) event.listen(Task.state, "set", Task.update_agent_on_success) event.listen(Task.agent_id, "set", Task.increment_attempts) event.listen(Task.agent_id, "set", Task.log_assign_change) event.listen(Task.state, "set", Task.reset_agent_if_failed_and_retry, retval=True) event.listen(Task.time_started, "set", Task.reset_finished_time)