# No shebang line, this module is meant to be imported
#
# Copyright 2014 Ambient Entertainment GmbH & Co. KG
# Copyright 2015 Oliver Palmer
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Job Queue Model
===============
Model for job queues
"""
from sys import maxsize
from functools import reduce
from logging import DEBUG
from sqlalchemy import event, distinct, or_, and_
from sqlalchemy.schema import UniqueConstraint
from pyfarm.core.logger import getLogger
from pyfarm.core.enums import WorkState, _WorkState, AgentState
from pyfarm.master.application import db
from pyfarm.master.config import config
from pyfarm.models.core.mixins import UtilityMixins, ReprMixin
from pyfarm.models.core.types import id_column, IDTypeWork
from pyfarm.models.agent import Agent
PREFER_RUNNING_JOBS = config.get("queue_prefer_running_jobs")
USE_TOTAL_RAM = config.get("use_total_ram_for_scheduling")
logger = getLogger("pf.models.jobqueue")
if config.get("debug_queue"):
logger.setLevel(DEBUG)
[docs]class JobQueue(db.Model, UtilityMixins, ReprMixin):
"""
Stores information about a job queue. Used for flexible, configurable
distribution of computing capacity to jobs.
"""
__tablename__ = config.get("table_job_queue")
__table_args__ = (UniqueConstraint("parent_jobqueue_id", "name"),)
REPR_COLUMNS = ("id", "name")
id = id_column(IDTypeWork)
parent_jobqueue_id = db.Column(
IDTypeWork,
db.ForeignKey("%s.id" % config.get("table_job_queue")),
nullable=True,
doc="The parent queue of this queue. If NULL, this is a top "
"level queue.")
name = db.Column(
db.String(config.get("max_queue_name_length")),
nullable=False,
doc="The name of the job queue")
minimum_agents = db.Column(
db.Integer,
nullable=True,
doc="The scheduler will try to assign at least this number of "
"agents to jobs in or below this queue as long as it "
"can use them, before any other considerations.")
maximum_agents = db.Column(
db.Integer,
nullable=True,
doc="The scheduler will never assign more than this number of "
"agents to jobs in or below this queue.")
priority = db.Column(
db.Integer,
nullable=False,
default=config.get("queue_default_priority"),
doc="The priority of this job queue. The scheduler will not "
"assign any nodes to other job queues or jobs with the "
"same parent and a lower priority as long as this one "
"can still use nodes. The minimum_agents column takes "
"precedence over this.")
weight = db.Column(
db.Integer,
nullable=False,
default=config.get("queue_default_weight"),
doc="The weight of this job queue. The scheduler will "
"distribute available agents between jobs and job "
"queues in the same queue in proportion to their "
"weights.")
fullpath = db.Column(
db.String(config.get("max_queue_path_length")),
doc="The path of this jobqueue. This column is a "
"database denormalization. It is technically "
"redundant, but faster to access than recursively "
"querying all parent queues. If set to NULL, the "
"path must be computed by recursively querying "
"the parent queues.")
#
# Relationship
#
parent = db.relationship(
"JobQueue",
remote_side=[id],
backref=db.backref("children", lazy="dynamic"),
doc="Relationship between this queue its parent")
[docs] def path(self):
# Import here instead of at the top to break circular dependency
from pyfarm.scheduler.tasks import cache_jobqueue_path
if self.fullpath:
return self.fullpath
else:
cache_jobqueue_path.delay(self.id)
path = "/%s" % (self.name or "")
if self.parent:
return self.parent.path() + path
else:
return path
[docs] def child_queues_sorted(self):
"""
Return child queues sorted by number of currently assigned agents with
priority as a secondary sort key.
"""
queues = [x for x in self.children]
return sorted(queues, key=lambda x: x.num_assigned_agents(),
reverse=True)
[docs] def child_jobs(self, filters):
# Import down here instead of at the top to avoid circular import
from pyfarm.models.job import Job
jobs_query = Job.query
if self.id:
jobs_query = jobs_query.filter_by(queue=self)
wanted_states = []
if filters["state_paused"]:
wanted_states.append(WorkState.PAUSED)
if filters["state_running"]:
wanted_states.append(WorkState.RUNNING)
if filters["state_done"]:
wanted_states.append(WorkState.DONE)
if filters["state_failed"]:
wanted_states.append(WorkState.FAILED)
if filters["state_queued"]:
jobs_query = jobs_query.filter(or_(
Job.state == None,
Job.state.in_(wanted_states)))
else:
jobs_query = jobs_query.filter(
Job.state.in_(wanted_states))
return sorted(jobs_query.all(), key=lambda x: x.num_assigned_agents(),
reverse=True)
[docs] def num_assigned_agents(self):
try:
return self.assigned_agents_count
except AttributeError:
# Import down here instead of at the top to avoid circular import
from pyfarm.models.task import Task
from pyfarm.models.job import Job
self.assigned_agents_count = 0
for queue in self.children:
self.assigned_agents_count += queue.num_assigned_agents()
self.assigned_agents_count +=\
db.session.query(distinct(Task.agent_id)).\
filter(Task.job.has(Job.queue == self),
Task.agent_id != None,
Task.agent.has(
and_(Agent.state != AgentState.OFFLINE,
Agent.state != AgentState.DISABLED)),
or_(Task.state == None,
Task.state == WorkState.RUNNING)).count()
return self.assigned_agents_count
[docs] def clear_assigned_counts(self):
try:
del self.assigned_agents_count
except AttributeError:
pass
if self.parent:
self.parent.clear_assigned_counts()
[docs] def get_job_for_agent(self, agent, unwanted_job_ids=None):
# Import down here instead of at the top to avoid circular import
from pyfarm.models.job import Job
supported_types = agent.get_supported_types()
if not supported_types:
return None
available_ram = agent.ram if USE_TOTAL_RAM else agent.free_ram
child_jobs = Job.query.filter(or_(Job.state == WorkState.RUNNING,
Job.state == None),
Job.job_queue_id == self.id,
~Job.parents.any(or_(
Job.state == None,
Job.state != WorkState.DONE)),
Job.jobtype_version_id.in_(
supported_types),
Job.ram <= available_ram).all()
child_jobs = [x for x in child_jobs if
(agent.satisfies_job_requirements(x) and
x.id not in unwanted_job_ids)]
if unwanted_job_ids:
child_jobs = [x for x in child_jobs if x.id not in unwanted_job_ids]
child_queues = JobQueue.query.filter(
JobQueue.parent_jobqueue_id == self.id).all()
# Before anything else, enforce minimums
for job in child_jobs:
if job.state == _WorkState.RUNNING:
if (job.num_assigned_agents() < (job.minimum_agents or 0) and
job.num_assigned_agents() <
(job.maximum_agents or maxsize) and
job.can_use_more_agents()):
return job
elif job.minimum_agents and job.minimum_agents > 0:
return job
for queue in child_queues:
if (queue.num_assigned_agents() < (queue.minimum_agents or 0) and
queue.num_assigned_agents() <
(queue.maximum_agents or maxsize)):
job = queue.get_job_for_agent(agent, unwanted_job_ids)
if job:
return job
objects_by_priority = {}
for queue in child_queues:
if queue.priority in objects_by_priority:
objects_by_priority[queue.priority] += [queue]
else:
objects_by_priority[queue.priority] = [queue]
for job in child_jobs:
if job.priority in objects_by_priority:
objects_by_priority[job.priority] += [job]
else:
objects_by_priority[job.priority] = [job]
available_priorities = sorted(objects_by_priority.keys(), reverse=True)
# Work through the priorities in descending order
for priority in available_priorities:
objects = objects_by_priority[priority]
active_objects = [x for x in objects if
(type(x) != Job or x.state == _WorkState.RUNNING)]
weight_sum = reduce(lambda a, b: a + b.weight, active_objects, 0)
total_assigned = reduce(lambda a, b: a + b.num_assigned_agents(),
objects, 0)
objects.sort(key=(lambda x:
((float(x.num_assigned_agents()) / total_assigned)
if total_assigned else 0) /
((float(x.weight) / weight_sum)
if weight_sum and x.weight else 1)))
selected_job = None
for item in objects:
if isinstance(item, Job):
if item.state == _WorkState.RUNNING:
if (item.can_use_more_agents() and
item.num_assigned_agents() <
(item.maximum_agents or maxsize)):
if PREFER_RUNNING_JOBS:
return item
elif (selected_job is None or
selected_job.time_submitted >
item.time_submitted):
selected_job = item
elif (selected_job is None or
selected_job.time_submitted > item.time_submitted):
# If this job is not running yet, remember it, but keep
# looking for already running or queued but older jobs
selected_job = item
if isinstance(item, JobQueue):
if (item.num_assigned_agents() <
(item.maximum_agents or maxsize)):
job = item.get_job_for_agent(agent, unwanted_job_ids)
if job:
return job
if selected_job:
return selected_job
return None
@staticmethod
[docs] def top_level_unique_check(mapper, connection, target):
if target.parent_jobqueue_id is None:
count = JobQueue.query.filter_by(parent_jobqueue_id=None,
name=target.name).count()
if count > 0:
raise ValueError("Cannot have two jobqueues named %r at the "
"top level" % target.name)
event.listen(JobQueue, "before_insert", JobQueue.top_level_unique_check)