Source code for pyfarm.models.task

# No shebang line, this module is meant to be imported
#
# Copyright 2013 Oliver Palmer
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Task Models
===========

Models and interface classes related to tasks
"""

from functools import partial
from textwrap import dedent

from sqlalchemy import event
from sqlalchemy.orm.attributes import NO_VALUE, NO_CHANGE
from sqlalchemy.sql import or_, and_

from pyfarm.core.logger import getLogger
from pyfarm.core.enums import WorkState, _WorkState
from pyfarm.master.application import db
from pyfarm.models.core.types import IDTypeAgent, IDTypeWork
from pyfarm.models.core.functions import work_columns, repr_enum
from pyfarm.models.core.cfg import (
    TABLE_JOB, TABLE_TASK, TABLE_AGENT)
from pyfarm.models.core.mixins import (
    ValidatePriorityMixin, WorkStateChangedMixin, UtilityMixins, ReprMixin,
    ValidateWorkStateMixin)

__all__ = ("Task", )

logger = getLogger("models.task")


[docs]class Task(db.Model, ValidatePriorityMixin, ValidateWorkStateMixin, WorkStateChangedMixin, UtilityMixins, ReprMixin): """ Defines a task which a child of a :class:`Job`. This table represents rows which contain the individual work unit(s) for a job. """ __tablename__ = TABLE_TASK STATE_ENUM = list(WorkState) + [None] STATE_DEFAULT = None REPR_COLUMNS = ("id", "state", "frame", "project") REPR_CONVERT_COLUMN = {"state": partial(repr_enum, enum=STATE_ENUM)} # shared work columns id, state, priority, time_submitted, time_started, time_finished = \ work_columns(STATE_DEFAULT, "job.priority") agent_id = db.Column(IDTypeAgent, db.ForeignKey("%s.id" % TABLE_AGENT), doc="Foreign key which stores :attr:`Job.id`") job_id = db.Column(IDTypeWork, db.ForeignKey("%s.id" % TABLE_JOB), nullable=False, doc="Foreign key which stores :attr:`Job.id`") hidden = db.Column(db.Boolean, default=False, doc=dedent(""" hides the task from queue and web ui""")) attempts = db.Column(db.Integer, nullable=False, default=0, doc=dedent(""" The number of attempts which have been made on this task. This value is auto incremented when :attr:`state` changes to a value synonymous with a running state.""")) failures = db.Column(db.Integer, nullable=False, default=0, doc=dedent(""" The number of times this task has failed. This value is auto incremented when :attr:`state` changes to a value synonymous with a failed state.""")) frame = db.Column(db.Numeric(10, 4), nullable=False, doc="The frame this :class:`Task` will be executing.") last_error = db.Column(db.UnicodeText, nullable=True, doc="This column may be set when an error is " "present. The agent typically sets this " "column when the job type either can't or " "won't run a given task. This column will " "be cleared whenever the task's state is " "returned to a non-error state.") sent_to_agent = db.Column(db.Boolean, default=False, nullable=False, doc="Whether this task was already sent to the " "assigned agent") # relationships job = db.relationship("Job", backref=db.backref("tasks", lazy="dynamic"), doc=dedent(""" relationship attribute which retrieves the associated job for this task""")) @staticmethod
[docs] def increment_attempts(target, new_value, old_value, initiator): if new_value is not None and new_value != old_value: target.attempts += 1
@staticmethod
[docs] def update_failures(target, new_value, old_value, initiator): if new_value == WorkState.FAILED and new_value != old_value: target.failures += 1
@staticmethod
[docs] def reset_agent_if_failed_and_retry( target, new_value, old_value, initiator): # There's nothing else we should do here if # we don't have a parent job. This can happen if you're # testing or a job is disconnected from a task. if target.job is None: return new_value if (new_value == WorkState.FAILED and target.failures <= target.job.requeue): logger.info("Failed task %s will be retried", target.id) target.agent_id = None return None else: return new_value
@staticmethod
[docs] def clear_error_state(target, new_value, old_value, initiator): """ Sets ``last_error`` column to ``None`` if the task's state is 'done' """ if new_value == WorkState.DONE and target.last_error is not None: target.last_error = None
event.listen(Task.state, "set", Task.clear_error_state) event.listen(Task.state, "set", Task.state_changed) event.listen(Task.state, "set", Task.update_failures) event.listen(Task.agent_id, "set", Task.increment_attempts) event.listen(Task.state, "set", Task.reset_agent_if_failed_and_retry, retval=True)