Source code for pyfarm.models.jobqueue

# No shebang line, this module is meant to be imported
#
# Copyright 2014 Ambient Entertainment GmbH & Co. KG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Job Queue Model
===============

Model for job queues
"""

from sys import maxsize
from textwrap import dedent
from functools import reduce
from logging import DEBUG

from sqlalchemy import event, desc, asc, func, distinct, or_
from sqlalchemy.schema import UniqueConstraint

from pyfarm.core.config import read_env_int, read_env_bool

from pyfarm.core.logger import getLogger
from pyfarm.core.enums import WorkState, _WorkState
from pyfarm.master.application import db
from pyfarm.models.core.cfg import (
    TABLE_JOB_QUEUE, MAX_JOBQUEUE_NAME_LENGTH, MAX_JOBQUEUE_PATH_LENGTH)
from pyfarm.models.core.mixins import UtilityMixins, ReprMixin
from pyfarm.models.core.types import id_column, IDTypeWork

PREFER_RUNNING_JOBS = read_env_bool("PYFARM_PREFER_RUNNING_JOBS", True)
logger = getLogger("pf.models.jobqueue")
if read_env_bool("PYFARM_JOBQUEUE_DEBUG", True):
    logger.setLevel(DEBUG)


[docs]class JobQueue(db.Model, UtilityMixins, ReprMixin): """ Stores information about a job queue. Used for flexible, configurable distribution of computing capacity to jobs. """ __tablename__ = TABLE_JOB_QUEUE __table_args__ = (UniqueConstraint("parent_jobqueue_id", "name"),) REPR_COLUMNS = ("id", "name") id = id_column(IDTypeWork) parent_jobqueue_id = db.Column(IDTypeWork, db.ForeignKey("%s.id" % TABLE_JOB_QUEUE), nullable=True, doc="The parent queue of this queue. If " "NULL, this is a top level queue.") name = db.Column(db.String(MAX_JOBQUEUE_NAME_LENGTH), nullable=False) minimum_agents = db.Column(db.Integer, nullable=True, doc=dedent(""" The scheduler will try to assign at least this number of agents to jobs in or below this queue as long as it can use them, before any other considerations.""")) maximum_agents = db.Column(db.Integer, nullable=True, doc=dedent(""" The scheduler will never assign more than this number of agents to jobs in or below this queue.""")) priority = db.Column(db.Integer, nullable=False, default=read_env_int( "PYFARM_QUEUE_DEFAULT_PRIORITY", 0), doc=dedent(""" The priority of this job queue. The scheduler will not assign any nodes to other job queues or jobs with the same parent and a lower priority as long as this one can still use nodes. The minimum_agents column takes precedence over this.""")) weight = db.Column(db.Integer, nullable=False, default=read_env_int( "PYFARM_QUEUE_DEFAULT_WEIGHT", 10), doc=dedent(""" The weight of this job queue. The scheduler will distribute available agents between jobs and job queues in the same queue in proportion to their weights. """)) fullpath = db.Column(db.String(MAX_JOBQUEUE_PATH_LENGTH), doc="The path of this jobqueue. This column is a " "database denormalization. It is technically " "redundant, but faster to access than recursively " "querying all parent queues. If set to NULL, the " "path must be computed by recursively querying " "the parent queues.") parent = db.relationship("JobQueue", remote_side=[id], backref=db.backref("children", lazy="dynamic"), doc="Relationship between this queue its parent")
[docs] def path(self): # Import here instead of at the top to break circular dependency from pyfarm.scheduler.tasks import cache_jobqueue_path if self.fullpath: return self.fullpath else: cache_jobqueue_path.delay(self.id) path = "/%s" % (self.name or "") if self.parent: return self.parent.path() + path else: return path
[docs] def num_assigned_agents(self): try: return self.assigned_agents_count except AttributeError: # Import down here instead of at the top to avoid circular import from pyfarm.models.task import Task from pyfarm.models.job import Job self.assigned_agents_count = 0 for queue in self.children: self.assigned_agents_count += queue.num_assigned_agents() self.assigned_agents_count +=\ db.session.query(distinct(Task.agent_id)).\ filter(Task.job.has(Job.queue == self), Task.agent_id != None, or_(Task.state == None, Task.state == WorkState.RUNNING)).count() return self.assigned_agents_count
[docs] def get_job_for_agent(self, agent): # Import down here instead of at the top to avoid circular import from pyfarm.models.job import Job supported_types = agent.get_supported_types() if not supported_types: return None child_jobs = Job.query.filter(or_(Job.state == WorkState.RUNNING, Job.state == None), Job.job_queue_id == self.id, ~Job.parents.any(or_( Job.state == None, Job.state != WorkState.DONE)), Job.jobtype_version_id.in_( supported_types)).all() child_queues = JobQueue.query.filter( JobQueue.parent_jobqueue_id == self.id).all() # Before anything else, enforce minimums for job in child_jobs: if job.state == _WorkState.RUNNING: if (job.num_assigned_agents() < (job.minimum_agents or 0) and job.num_assigned_agents()+1 < (job.maximum_agents or maxsize) and job.can_use_more_agents()): return job elif job.minimum_agents and job.minimum_agents > 0: return job for queue in child_queues: if (queue.num_assigned_agents() < (queue.minimum_agents or 0) and queue.num_assigned_agents() + 1 < (queue.maximum_agents or maxsize)): job = queue.get_job_for_agent(agent) if job: return job objects_by_priority = {} for queue in child_queues: if queue.priority in objects_by_priority: objects_by_priority[queue.priority] += [queue] else: objects_by_priority[queue.priority] = [queue] for job in child_jobs: if job.priority in objects_by_priority: objects_by_priority[job.priority] += [job] else: objects_by_priority[job.priority] = [job] available_priorities = sorted(objects_by_priority.keys(), reverse=True) # Work through the priorities in descending order for priority in available_priorities: objects = objects_by_priority[priority] active_objects = [x for x in objects if (type(x) != Job or x.state == _WorkState.RUNNING)] weight_sum = reduce(lambda a, b: a + b.weight, active_objects, 0) total_assigned = reduce(lambda a, b: a + b.num_assigned_agents(), objects, 0) objects.sort(key=(lambda x: ((float(x.weight) / weight_sum) if weight_sum else 0) - ((float(x.num_assigned_agents()) / total_assigned) if total_assigned else 0)), reverse=True) selected_job = None for item in objects: if isinstance(item, Job): if item.state == _WorkState.RUNNING: if (item.can_use_more_agents() and item.num_assigned_agents() + 1 < (item.maximum_agents or maxsize)): if PREFER_RUNNING_JOBS: return item elif (selected_job is None or selected_job.time_submitted > item.time_submitted): selected_job = item elif (selected_job is None or selected_job.time_submitted > item.time_submitted): # If this job is not running yet, remember it, but keep # looking for already running or queued but older jobs selected_job = item if isinstance(item, JobQueue): if (item.num_assigned_agents() + 1 < (item.maximum_agents or maxsize)): job = item.get_job_for_agent(agent) if job: return job if selected_job: return selected_job return None
@staticmethod
[docs] def top_level_unique_check(mapper, connection, target): if target.parent_jobqueue_id is None: count = JobQueue.query.filter_by(parent_jobqueue_id=None, name=target.name).count() if count > 0: raise ValueError("Cannot have two jobqueues named %r at the " "top level" % target.name)
event.listen(JobQueue, "before_insert", JobQueue.top_level_unique_check)