# No shebang line, this module is meant to be imported
#
# Copyright 2013 Oliver Palmer
# Copyright 2014 Ambient Entertainment GmbH & Co. KG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Jobs
----
This module defines an API for managing and querying jobs
"""
from decimal import Decimal
from json import loads
from datetime import datetime
try:
from httplib import (
OK, BAD_REQUEST, NOT_FOUND, INTERNAL_SERVER_ERROR, CREATED, NO_CONTENT)
except ImportError: # pragma: no cover
from http.client import (
OK, BAD_REQUEST, NOT_FOUND, INTERNAL_SERVER_ERROR, CREATED, NO_CONTENT)
from flask.views import MethodView
from flask import g, request
from sqlalchemy.sql import func, or_
from pyfarm.core.logger import getLogger
from pyfarm.core.enums import STRING_TYPES, NUMERIC_TYPES, WorkState, _WorkState
from pyfarm.scheduler.tasks import (
assign_tasks_to_agent, assign_tasks, delete_job)
from pyfarm.models.statistics.task_event_count import TaskEventCount
from pyfarm.models.jobtype import JobType, JobTypeVersion
from pyfarm.models.task import Task
from pyfarm.models.user import User
from pyfarm.models.job import Job, JobNotifiedUser
from pyfarm.models.software import (
Software, SoftwareVersion, JobSoftwareRequirement)
from pyfarm.models.tag import Tag, JobTagRequirement
from pyfarm.models.jobqueue import JobQueue
from pyfarm.models.agent import Agent
from pyfarm.master.application import db
from pyfarm.master.utility import (
jsonify, validate_with_model, get_request_argument)
from pyfarm.master.config import config
RANGE_TYPES = NUMERIC_TYPES[:-1] + (Decimal, )
try:
# pylint: disable=undefined-variable
range_ = xrange
except NameError:
range_ = range
logger = getLogger("api.jobs")
# Load model mappings once per process
TASK_MODEL_MAPPINGS = Task.types().mappings
AUTOCREATE_USERS = config.get("autocreate_users")
AUTO_USER_EMAIL = config.get("autocreate_user_email")
DEFAULT_JOB_DELETE_TIME = config.get("default_job_delete_time")
[docs]class ObjectNotFound(Exception):
pass
[docs]def parse_requirements(requirements):
"""
Takes a list dicts specifying a software and optional min- and max-versions
and returns a list of :class:`JobRequirement` objects.
Raises TypeError if the input was not as expected or ObjectNotFound if a
referenced software or version was not found.
:param list requirements:
A list of of dicts specifying a software and optionally min_version
and/or max_version.
:raises TypeError:
Raised if ``requirements`` is not a list or if an entry in
``requirements`` is not a dictionary.
:raises ValueError:
Raised if there's a problem with the content of at least one of the
requirement dictionaries.
:raises ObjectNotFound:
Raised if the referenced software or version was not found
"""
if not isinstance(requirements, list):
raise TypeError("software_requirements must be a list")
out = []
for entry in requirements:
if not isinstance(entry, dict):
raise TypeError("Every software_requirement must be a dict")
requirement = JobSoftwareRequirement()
software_name = entry.pop("software", None)
if software_name is None:
raise ValueError("Software requirement does not specify a software.")
software = Software.query.filter_by(software=software_name).first()
if not software:
raise ObjectNotFound("Software %s not found" % software_name)
requirement.software = software
min_version_str = entry.pop("min_version", None)
if min_version_str is not None:
min_version = SoftwareVersion.query.filter(
SoftwareVersion.software == software,
SoftwareVersion.version == min_version_str).first()
if not min_version:
raise ObjectNotFound("Version %s of software %s not found" %
(min_version_str, software_name))
requirement.min_version = min_version
max_version_str = entry.pop("max_version", None)
if max_version_str is not None:
max_version = SoftwareVersion.query.filter(
SoftwareVersion.software == software,
SoftwareVersion.version == max_version_str).first()
if not max_version:
raise ObjectNotFound("Version %s of software %s not found" %
(max_version_str, software_name))
requirement.max_version = max_version
if entry:
raise ValueError("Unexpected keys in software requirement: %r" %
entry.keys())
out.append(requirement)
return out
[docs]def schema():
"""
Returns the basic schema of :class:`.Job`
.. http:get:: /api/v1/jobs/schema HTTP/1.1
**Request**
.. sourcecode:: http
GET /api/v1/jobs/schema HTTP/1.1
Accept: application/json
**Response**
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Type: application/json
{
"batch": "INTEGER",
"by": "NUMERIC(10, 4)",
"cpus": "INTEGER",
"data": "JSONDict",
"end": "NUMERIC(10,4)",
"environ": "JSONDict",
"hidden": "BOOLEAN",
"id": "INTEGER",
"jobtype": "VARCHAR(64)",
"jobtype_version": "INTEGER",
"jobqueue": "VARCHAR(255)",
"notes": "TEXT",
"priority": "INTEGER",
"project_id": "INTEGER",
"ram": "INTEGER",
"ram_max": "INTEGER",
"ram_warning": "INTEGER",
"requeue": "INTEGER",
"start": "NUMERIC(10,4)",
"state": "WorkStateEnum",
"time_finished": "DATETIME",
"time_started": "DATETIME",
"time_submitted": "DATETIME",
"title": "VARCHAR(255)",
"user": "VARCHAR(255)"
}
:statuscode 200: no error
"""
schema_dict = Job.to_schema()
# These columns are not part of the actual database model, but will be
# dynamically computed by the api
schema_dict["start"] = "NUMERIC(10,4)"
schema_dict["end"] = "NUMERIC(10,4)"
schema_dict["user"] = "VARCHAR(%s)" % config.get("max_username_length")
schema_dict["jobqueue"] = \
"VARCHAR(%s)" % config.get("max_queue_name_length")
# In the database, we are storing the jobtype_version_id, but over the wire,
# we are using the jobtype's name plus version to identify it
del schema_dict["jobtype_version_id"]
# Same for user_id
del schema_dict["user_id"]
# jobqueue too
del schema_dict["job_queue_id"]
schema_dict["jobtype"] = \
"VARCHAR(%s)" % config.get("job_type_max_name_length")
schema_dict["jobtype_version"] = "INTEGER"
return jsonify(schema_dict), OK
[docs]class JobIndexAPI(MethodView):
@validate_with_model(Job,
type_checks={"by": lambda x: isinstance(
x, RANGE_TYPES)},
ignore=["start", "end", "jobtype", "jobtype_version",
"user", "jobqueue", "tag_requirements"],
disallow=["jobtype_version_id", "time_submitted",
"time_started", "time_finished",
"job_queue_id"])
[docs] def post(self):
"""
A ``POST`` to this endpoint will submit a new job.
.. http:post:: /api/v1/jobs/ HTTP/1.1
**Request**
.. sourcecode:: http
POST /api/v1/jobs/ HTTP/1.1
Accept: application/json
{
"end": 2.0,
"title": "Test Job 2",
"jobtype": "TestJobType",
"data": {
"foo": "bar"
},
"software_requirements": [
{
"software": "blender"
}
],
"start": 1.0
}
**Response**
.. sourcecode:: http
HTTP/1.1 201 CREATED
Content-Type: application/json
{
"time_finished": null,
"time_started": null,
"end": 2.0,
"time_submitted": "2014-03-06T15:40:58.335259",
"jobtype_version": 1,
"jobtype": "TestJobType",
"jobqueue": None
"start": 1.0,
"priority": 0,
"state": "queued",
"parents": [],
"hidden": false,
"project_id": null,
"ram_warning": null,
"title": "Test Job 2",
"tags": [],
"user": null,
"by": 1.0,
"data": {
"foo": "bar"
},
"ram_max": null,
"notes": "",
"batch": 1,
"project": null,
"environ": null,
"requeue": 3,
"software_requirements": [
{
"min_version": null,
"max_version": null,
"max_version_id": null,
"software_id": 1,
"min_version_id": null,
"software": "blender"
}
],
"tag_requirements": [
{
"tag": "workstation",
"negate": true
}
],
"id": 2,
"ram": 32,
"cpus": 1,
"children": []
}
:statuscode 201: a new job item was created
:statuscode 400: there was something wrong with the request (such as
invalid columns being included)
:statuscode 404: a referenced object, like a software or software
version, does not exist
:statuscode 409: a conflicting job already exists
"""
if "jobtype" not in g.json:
return jsonify(error="No jobtype specified"), BAD_REQUEST
if not isinstance(g.json["jobtype"], STRING_TYPES):
return jsonify(error="jobtype must be of type string"), BAD_REQUEST
q = JobTypeVersion.query.filter(
JobTypeVersion.jobtype.has(JobType.name == g.json["jobtype"]))
del g.json["jobtype"]
if "jobtype_version" in g.json:
if not isinstance(g.json["jobtype_version"], int):
return (jsonify(error="jobtype_version must be of type int"),
BAD_REQUEST)
q = q.filter(JobTypeVersion.version == g.json["jobtype_version"])
del g.json["jobtype_version"]
jobtype_version = q.order_by("version desc").first()
if not jobtype_version:
return jsonify("Jobtype or version not found"), NOT_FOUND
software_requirements = []
if "software_requirements" in g.json:
try:
software_requirements = parse_requirements(
g.json["software_requirements"])
except (TypeError, ValueError) as e:
return jsonify(error=e.args), BAD_REQUEST
except ObjectNotFound as e:
return jsonify(error=e.args), NOT_FOUND
del g.json["software_requirements"]
parents = []
if "parents" in g.json:
for parent_job_data in g.json["parents"]:
parent_job = Job.query.filter_by(
id=parent_job_data["id"]).first()
if not parent_job:
return (jsonify("Parent job %s not found" %
parent_job_data["id"] ), NOT_FOUND)
parents.append(parent_job)
del g.json["parents"]
tag_names = g.json.pop("tags", None)
tags = []
if tag_names:
for tag_name in tag_names:
tag = Tag.query.filter_by(tag=tag_name).first()
if not tag:
tag = Tag(tag=tag_name)
tags.append(tag)
user = None
username = g.json.pop("user", None)
if username:
user = User.query.filter_by(username=username).first()
if not user and AUTOCREATE_USERS:
user = User(username=username)
if AUTO_USER_EMAIL:
user.email = AUTO_USER_EMAIL.format(username=username)
db.session.add(user)
logger.warning("User %s was autocreated on job submit", username)
elif not user:
return (jsonify(
error="User %s not found" % username), NOT_FOUND)
jobqueue = None
jobqueue_name = g.json.pop("jobqueue", None)
if jobqueue_name:
path_elements = jobqueue_name.split("/")
for element in path_elements:
jobqueue = JobQueue.query.filter_by(
parent=jobqueue, name=element).first()
if not jobqueue:
return (jsonify(error="Jobqueue %s not found" %
jobqueue_name),
NOT_FOUND)
notified_usernames = g.json.pop("notified_users", None)
tag_requirements = g.json.pop("tag_requirements", None)
g.json.pop("start", None)
g.json.pop("end", None)
job = Job(**g.json)
job.jobtype_version = jobtype_version
job.software_requirements = software_requirements
job.parents = parents
job.tags = tags
job.user = user
job.queue = jobqueue
job.autodelete_time = g.json.get("autodelete_time",
DEFAULT_JOB_DELETE_TIME)
if notified_usernames:
for entry in notified_usernames:
user = User.query.filter_by(username=entry["username"]).first()
if not user and AUTOCREATE_USERS:
username = entry["username"]
user = User(username=username)
if AUTO_USER_EMAIL:
user.email = AUTO_USER_EMAIL.format(username=username)
db.session.add(user)
db.session.flush()
logger.warning("User %s was autocreated on job submit",
username)
elif not user:
return (jsonify(
error="User %s not found" % entry["username"]),
NOT_FOUND)
notified_user = JobNotifiedUser(user=user, job=job)
if "on_success" in entry:
notified_user.on_success = entry["on_success"]
if "on_failure" in entry:
notified_user.on_failure = entry["on_failure"]
if "on_deletion" in entry:
notified_user.on_deletion = entry["on_deletion"]
db.session.add(notified_user)
if tag_requirements:
for entry in tag_requirements:
tag = Tag.query.filter_by(tag=entry["tag"]).first()
if not tag:
tag = Tag(tag=entry["tag"])
db.session.add(tag)
tag_requirement = JobTagRequirement(job=job, tag=tag)
if entry["negate"]:
tag_requirement.negate = True
db.session.add(tag_requirement)
custom_json = loads(request.data.decode(), parse_float=Decimal)
if "end" in custom_json and "start" not in custom_json:
return (jsonify(error="`end` is specified while `start` is not"),
BAD_REQUEST)
start = custom_json.get("start", Decimal("1.0"))
end = custom_json.get("end", start)
if (not isinstance(start, RANGE_TYPES) or
not isinstance(end, RANGE_TYPES)):
return (jsonify(error="`start` and `end` need to be of type decimal "
"or int"), BAD_REQUEST)
if not end >= start:
return (jsonify(error="`end` must be larger than or equal to start"),
BAD_REQUEST)
by = custom_json.pop("by", Decimal("1.0"))
if not isinstance(by, RANGE_TYPES):
return (jsonify(error="`by` needs to be of type decimal or int"),
BAD_REQUEST)
num_tiles = g.json.get("num_tiles", None)
if not jobtype_version.supports_tiling and num_tiles is not None:
return (jsonify(error="`num_tiles` is set, but this "
"jobtype does not support tiling."),
BAD_REQUEST)
job.alter_frame_range(start, end, by)
db.session.add(job)
db.session.add_all(software_requirements)
db.session.commit()
job_data = job.to_dict(unpack_relationships=["tags",
"data",
"software_requirements",
"parents",
"children",
"notified_users",
"tag_requirements"])
job_data["start"] = start
job_data["end"] = end
del job_data["jobtype_version_id"]
job_data["jobtype"] = job.jobtype_version.jobtype.name
job_data["jobtype_version"] = job.jobtype_version.version
job_data["user"] = job.user.username if job.user else None
del job_data["user_id"]
job_data["jobqueue"] = job.queue.path() if job.queue else None
del job_data["job_queue_id"]
job_data["jobgroup"] = job.group.title if job.group else None
if job.state is None:
num_assigned_tasks = Task.query.filter(Task.job == job,
Task.agent != None).count()
if num_assigned_tasks > 0:
job_data["state"] = "running"
else:
job_data["state"] = "queued"
logger.info("Created new job %r", job_data)
assign_tasks.delay()
return jsonify(job_data), CREATED
[docs] def get(self):
"""
A ``GET`` to this endpoint will return a list of all jobs.
.. http:get:: /api/v1/jobs/ HTTP/1.1
**Request**
.. sourcecode:: http
GET /api/v1/jobs/ HTTP/1.1
Accept: application/json
**Response**
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Type: application/json
[
{
"title": "Test Job",
"state": "queued",
"id": 1
},
{
"title": "Test Job 2",
"state": "queued",
"id": 2
}
]
:statuscode 200: no error
"""
jobtype_name = get_request_argument("jobtype")
user_name = get_request_argument("user")
job_title = get_request_argument("title")
jobqueue_names = []
if "jobqueue" in request.args:
jobqueue_names = request.args.getlist("jobqueue")
out = []
subq = db.session.query(
Task.job_id,
func.count(Task.id).label('assigned_tasks_count')).\
filter(Task.agent_id != None).group_by(Task.job_id).subquery()
q = db.session.query(Job.id, Job.title, Job.state,
subq.c.assigned_tasks_count).\
outerjoin(subq, Job.id == subq.c.job_id)
if jobtype_name is not None:
jobtype = JobType.query.filter_by(name=jobtype_name).first()
if not jobtype:
return (jsonify(error="Jobtype %s not found" % jobtype_name),
NOT_FOUND)
q = q.join(JobTypeVersion,
Job.jobtype_version_id == JobTypeVersion.id)
q = q.filter(JobTypeVersion.jobtype == jobtype)
if user_name is not None:
user = User.query.filter_by(username=user_name).first()
if not user:
return (jsonify(error="User %s not found" % user_name),
NOT_FOUND)
q = q.filter(Job.user == user)
if jobqueue_names:
jobqueue_ids = []
for jobqueue_name in jobqueue_names:
jobqueue = JobQueue.query.filter_by(
fullpath=jobqueue_name).first()
if not jobqueue:
return (
jsonify(error="Jobqueue %s not found" % jobqueue_name),
NOT_FOUND)
jobqueue_ids.append(jobqueue.id)
q = q.filter(Job.job_queue_id.in_(jobqueue_ids))
if job_title:
q = q.filter(Job.title.ilike("%%%s%%" % job_title))
for id, title, state, assigned_tasks_count in q:
data = {"id": id, "title": title}
if state is None and not assigned_tasks_count:
data["state"] = "queued"
elif state is None:
data["state"] = "assigned"
else:
data["state"] = str(state)
out.append(data)
return jsonify(out), OK
[docs]class SingleJobAPI(MethodView):
[docs] def get(self, job_name):
"""
A ``GET`` to this endpoint will return the specified job, by name or id.
.. http:get:: /api/v1/jobs/[<str:name>|<int:id>] HTTP/1.1
**Request**
.. sourcecode:: http
GET /api/v1/jobs/Test%20Job%202 HTTP/1.1
Accept: application/json
**Response**
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Type: application/json
{
"ram_warning": null,
"title": "Test Job",
"state": "queued",
"jobtype_version": 1,
"jobtype": "TestJobType",
"environ": null,
"user": null,
"priority": 0,
"time_finished": null,
"start": 2.0,
"id": 1,
"notes": "",
"notified_users": []
"ram": 32,
"tags": [],
"hidden": false,
"data": {
"foo": "bar"
},
"software_requirements": [
{
"software": "blender",
"software_id": 1,
"min_version": null,
"max_version": null,
"min_version_id": null,
"max_version_id": null
}
],
"tag_requirements": [
{
"tag": "workstation",
"negate": true
}
],
"batch": 1,
"time_started": null,
"time_submitted": "2014-03-06T15:40:58.335259",
"requeue": 3,
"end": 4.0,
"parents": [],
"cpus": 1,
"ram_max": null,
"children": [],
"by": 1.0,
"project_id": null
}
:statuscode 200: no error
:statuscode 404: job not found
"""
if isinstance(job_name, STRING_TYPES):
job = Job.query.filter_by(title=job_name).first()
else:
job = Job.query.filter_by(id=job_name).first()
if not job:
return jsonify(error="Job not found"), NOT_FOUND
job_data = job.to_dict(unpack_relationships=["tags",
"data",
"software_requirements",
"parents",
"children",
"notified_users",
"tag_requirements"])
first_task = Task.query.filter_by(job=job).order_by("frame asc").first()
last_task = Task.query.filter_by(job=job).order_by("frame desc").first()
if not first_task or not last_task: # pragma: no cover
return (jsonify(error="Job does not have any tasks"),
INTERNAL_SERVER_ERROR)
job_data["start"] = first_task.frame
job_data["end"] = last_task.frame
job_data["jobtype"] = job.jobtype_version.jobtype.name
job_data["jobtype_version"] = job.jobtype_version.version
job_data["user"] = job.user.username if job.user else None
del job_data["user_id"]
job_data["jobqueue"] = job.queue.path() if job.queue else None
del job_data["job_queue_id"]
job_data["jobgroup"] = job.group.title if job.group else None
if job.state is None:
num_assigned_tasks = Task.query.filter(Task.job == job,
Task.agent != None).count()
if num_assigned_tasks > 0:
job_data["state"] = "running"
else:
job_data["state"] = "queued"
del job_data["jobtype_version_id"]
return jsonify(job_data), OK
[docs] def post(self, job_name):
"""
A ``POST`` to this endpoint will update the specified job with the data
in the request. Columns not specified in the request will be left as
they are.
If the "start", "end" or "by" columns are updated, tasks will be created
or deleted as required.
.. http:post:: /api/v1/jobs/[<str:name>|<int:id>] HTTP/1.1
**Request**
.. sourcecode:: http
PUT /api/v1/jobs/Test%20Job HTTP/1.1
Accept: application/json
{
"start": 2.0
}
**Response**
.. sourcecode:: http
HTTP/1.1 201 OK
Content-Type: application/json
{
"end": 4.0,
"children": [],
"jobtype_version": 1,
"jobtype": "TestJobType",
"time_started": null,
"tasks_failed": [],
"project_id": null,
"id": 1,
"software_requirements": [
{
"software": "blender",
"min_version": null,
"max_version_id": null,
"software_id": 1,
"max_version": null,
"min_version_id": null
}
],
"tags": [],
"environ": null,
"requeue": 3,
"start": 2.0,
"ram_warning": null,
"title": "Test Job",
"batch": 1,
"time_submitted": "2014-03-06T15:40:58.335259",
"ram_max": null,
"user": null,
"notes": "",
"data": {
"foo": "bar"
},
"tag_requirements": [
{
"tag": "workstation",
"negate": true
}
],
"ram": 32,
"parents": [],
"hidden": false,
"priority": 0,
"cpus": 1,
"state": "queued",
"by": 1.0,
"time_finished": null
}
:statuscode 200: the job was updated
:statuscode 400: there was something wrong with the request (such as
invalid columns being included)
"""
if isinstance(job_name, STRING_TYPES):
job = Job.query.filter_by(title=job_name).first()
else:
job = Job.query.filter_by(id=job_name).first()
if not job:
return jsonify(error="Job not found"), NOT_FOUND
if "tiles" in g.json:
return (jsonify(error="Frame tiling cannot be updated after job "
"creation."), BAD_REQUEST)
old_first_task = Task.query.filter_by(job=job).order_by(
"frame asc").first()
old_last_task = Task.query.filter_by(job=job).order_by(
"frame desc").first()
if not old_first_task or not old_last_task: # pragma: no cover
return (jsonify(error="Job does not have any tasks"),
INTERNAL_SERVER_ERROR)
start = old_first_task.frame
end = old_last_task.frame
if "start" in g.json or "end" in g.json or "by" in g.json:
json = loads(request.data.decode(), parse_float=Decimal)
start = Decimal(json.pop("start", old_first_task.frame))
end = Decimal(json.pop("end", old_last_task.frame))
by = Decimal(json.pop("by", job.by))
try:
job.alter_frame_range(start, end, by)
except ValueError as e:
return (jsonify(
error=str(e)), BAD_REQUEST)
if "parents" in g.json:
parents = []
for parent_job_data in g.json["parents"]:
parent_job = Job.query.filter_by(
id=parent_job_data["id"]).first()
if not parent_job:
return (jsonify("Parent job %s not found" %
parent_job_data["id"] ), NOT_FOUND)
parents.append(parent_job)
job.parents = parents
username = g.json.pop("user", None)
if username:
user = User.query.filter_by(username=username).first()
if not user:
return (jsonify(
error="User %s not found" % username), NOT_FOUND)
job.user = user
jobqueue_name = g.json.pop("jobqueue", None)
if jobqueue_name:
jobqueue = None
path_elements = jobqueue_name.split("/")
for element in path_elements:
jobqueue = JobQueue.query.filter_by(
parent=jobqueue, name=element).first()
if not jobqueue:
return (jsonify(error="Jobqueue %s not found" %
jobqueue_name),
NOT_FOUND)
job.queue = jobqueue
if "time_started" in g.json:
return (jsonify(error="`time_started` cannot be set manually"),
BAD_REQUEST)
if "time_finished" in g.json:
return (jsonify(error="`time_finished` cannot be set manually"),
BAD_REQUEST)
if "time_submitted" in g.json:
return (jsonify(error="`time_submitted` cannot be set manually"),
BAD_REQUEST)
if "jobtype_version_id" in g.json:
return (jsonify(error=
"`jobtype_version_id` cannot be set manually"),
BAD_REQUEST)
if "jobgroup" in g.json:
return (jsonify(error=
"`jobgroup` cannot be set directly, use "
" job_group_id."), BAD_REQUEST)
for name in Job.types().columns:
if name in g.json:
type = Job.types().mappings[name]
value = g.json.pop(name)
if not isinstance(value, type):
return jsonify(error="Column `%s` is of type %r, but we "
"expected %r" % (name,
type(value),
type))
setattr(job, name, value)
if "software_requirements" in g.json:
try:
job.software_requirements = parse_requirements(
g.json["software_requirements"])
except (TypeError, ValueError) as e:
return jsonify(error=e.args), BAD_REQUEST
except ObjectNotFound as e:
return jsonify(error=e.args), NOT_FOUND
del g.json["software_requirements"]
tag_requirements = g.json.pop("tag_requirements", None)
if tag_requirements:
job.tag_requirements = []
for entry in tag_requirements:
tag = Tag.query.filter_by(tag=entry["tag"]).first()
if not tag:
tag = Tag(tag=entry["tag"])
db.session.add(tag)
tag_requirement = JobTagRequirement(job=job, tag=tag)
if entry["negate"]:
tag_requirement.negate = True
db.session.add(tag_requirement)
jobtype_version_number = g.json.pop("jobtype_version", None)
if jobtype_version_number:
jobtype_version = JobTypeVersion.query.filter_by(
jobtype=job.jobtype_version.jobtype,
version=jobtype_version_number).first()
if not jobtype_version:
return (jsonify(error="Unknown jobtype version: %s" %
jobtype_version),
BAD_REQUEST)
job.jobtype_version = jobtype_version
g.json.pop("start", None)
g.json.pop("end", None)
if g.json:
return jsonify(error="Unknown columns: %r" % g.json), BAD_REQUEST
db.session.add(job)
db.session.commit()
job_data = job.to_dict(unpack_relationships=["tags",
"data",
"software_requirements",
"parents",
"children",
"tag_requirements"])
job_data["start"] = start
job_data["end"] = end
del job_data["jobtype_version_id"]
job_data["jobtype"] = job.jobtype_version.jobtype.name
job_data["jobtype_version"] = job.jobtype_version.version
job_data["user"] = job.user.username if job.user else None
del job_data["user_id"]
job_data["jobqueue"] = job.queue.path if job.queue else None
del job_data["job_queue_id"]
job_data["jobgroup"] = job.group.title if job.group else None
if job.state is None:
num_assigned_tasks = Task.query.filter(Task.job == job,
Task.agent != None).count()
if num_assigned_tasks > 0:
job_data["state"] = "running"
else:
job_data["state"] = "queued"
logger.info("Job %s has been updated to: %r", job.id, job_data)
assign_tasks.delay()
return jsonify(job_data), OK
[docs] def delete(self, job_name):
"""
A ``DELETE`` to this endpoint will mark the specified job for deletion
and remove it after stopping and removing all of its tasks.
.. http:delete:: /api/v1/jobs/[<str:name>|<int:id>] HTTP/1.1
**Request**
.. sourcecode:: http
DELETE /api/v1/jobs/1 HTTP/1.1
Accept: application/json
**Response**
.. sourcecode:: http
HTTP/1.1 204 NO_CONTENT
:statuscode 204: the specified job was marked for deletion
:statuscode 404: the job does not exist
"""
if isinstance(job_name, STRING_TYPES):
job = Job.query.filter_by(title=job_name).first()
else:
job = Job.query.filter_by(id=job_name).first()
if not job:
return jsonify(error="Job not found"), NOT_FOUND
job.to_be_deleted = True
child_job_ids = []
for child_job in job.children:
child_job.to_be_deleted = True
child_job_ids.append(child_job.id)
db.session.add(child_job)
db.session.add(job)
db.session.commit()
for id_ in child_job_ids + [job.id]:
logger.info("Marking job %s for deletion", id_)
delete_job(id_)
return jsonify(None), NO_CONTENT
[docs]class JobTasksIndexAPI(MethodView):
[docs] def get(self, job_name):
"""
A ``GET`` to this endpoint will return a list of all tasks in a job.
.. http:get:: /api/v1/jobs/[<str:name>|<int:id>]/tasks HTTP/1.1
**Request**
.. sourcecode:: http
GET /api/v1/jobs/Test%20Job%202/tasks/ HTTP/1.1
Accept: application/json
**Response**
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Type: application/json
[
{
"hidden": false,
"id": 3,
"attempts": 0,
"priority": 0,
"time_started": null,
"time_submitted": "2014-03-06T15:49:51.892228",
"frame": 1.0,
"time_finished": null,
"job_id": 2,
"project_id": null,
"state": "queued",
"agent_id": null
},
{
"hidden": false,
"id": 4,
"attempts": 0,
"priority": 0,
"time_started": null,
"time_submitted": "2014-03-06T15:49:51.892925",
"frame": 2.0,
"time_finished": null,
"job_id": 2,
"project_id": null,
"state": "queued",
"agent_id": null
}
]
:statuscode 200: no error
"""
if isinstance(job_name, STRING_TYPES):
job = Job.query.filter_by(title=job_name).first()
else:
job = Job.query.filter_by(id=job_name).first()
if not job:
return jsonify(error="Job not found",
id=job_name), NOT_FOUND
tasks_q = Task.query.filter_by(job=job).order_by("frame asc")
out = []
for task in tasks_q:
data = task.to_dict(unpack_relationships=False)
if task.state == None and task.agent == None:
data["state"] = "queued"
elif task.state == None:
data["state"] = "assigned"
out.append(data)
return jsonify(out), OK
[docs]class JobSingleTaskAPI(MethodView):
[docs] def post(self, job_name, task_id):
"""
A ``POST`` to this endpoint will update the specified task with the data
in the request. Columns not specified in the request will be left as
they are.
The agent will use this endpoint to inform the master of its progress.
.. http:post:: /api/v1/jobs/[<str:name>|<int:id>]/tasks/<int:task_id> HTTP/1.1
**Request**
.. sourcecode:: http
PUT /api/v1/job/Test%20Job/tasks/1 HTTP/1.1
Accept: application/json
{
"state": "running"
}
**Response**
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Type: application/json
{
"time_finished": null,
"agent": null,
"attempts": 0,
"failures": 0,
"frame": 2.0,
"agent_id": null,
"job": {
"id": 1,
"title": "Test Job"
},
"time_started": null,
"state": "running",
"project_id": null,
"id": 2,
"time_submitted": "2014-03-06T15:40:58.338904",
"project": null,
"parents": [],
"job_id": 1,
"hidden": false,
"children": [],
"priority": 0
}
:statuscode 200: the task was updated
:statuscode 400: there was something wrong with the request (such as
invalid columns being included)
"""
task_query = Task.query.filter_by(id=task_id)
if isinstance(job_name, STRING_TYPES):
task_query.filter(Task.job.has(Job.title == job_name))
else:
task_query.filter(Task.job.has(Job.id == job_name))
task = task_query.first()
if not task:
return jsonify(error="Task not found"), NOT_FOUND
if "time_started" in g.json and g.json["time_started"] != "now":
return (jsonify(error="`time_started` cannot be set manually"),
BAD_REQUEST)
elif "time_started" in g.json and g.json["time_started"] == "now":
g.json["time_started"] = datetime.utcnow()
if "time_finished" in g.json:
return (jsonify(error="`time_finished` cannot be set manually"),
BAD_REQUEST)
if "time_submitted" in g.json:
return (jsonify(error="`time_submitted` cannot be set manually"),
BAD_REQUEST)
if "job_id" in g.json:
return jsonify(error="`job_id` cannot be changed"), BAD_REQUEST
if "frame" in g.json:
return jsonify(error="`frame` cannot be changed"), BAD_REQUEST
if (("state" in g.json or "progress" in g.json) and
request.headers.get("User-Agent", "") == "PyFarm/1.0 (agent)" and
(task.agent is None or
request.remote_addr != task.agent.remote_ip)):
logger.error("Agent with IP address %s tried to set state or "
"progress for task %s. IP address for assigned agent "
"is %s. Request rejected.",
request.remote_addr, task.id,
(task.agent.remote_ip if task.agent else "(n/a)"))
return jsonify(error="`state` and `progress` can only be changed "
"by the agent owning this task"), BAD_REQUEST
if (task.state == _WorkState.DONE and
"progress" in g.json and
g.json["progress"] != 1.0):
return jsonify(error="Cannot set progress: task is already in "
"state `done`"), BAD_REQUEST
new_state = g.json.pop("state", None)
agent = task.agent
state_transition = False
if new_state is not None and new_state != task.state:
logger.info("Task %s of job %s: state transition \"%s\" -> \"%s\"",
task_id, task.job.title, task.state, new_state)
state_transition = True
if new_state != "queued":
task.state = new_state
else:
task.state = None
# Iterate over all keys in the request
for key in list(g.json):
if key in TASK_MODEL_MAPPINGS:
value = g.json.pop(key)
expected_types = TASK_MODEL_MAPPINGS[key]
# incorrect type for `value`
if not isinstance(value, (expected_types, type(None))):
return (jsonify(
error="Column %r is of type %r but we expected "
"type(s) %r" % (key, type(value),
expected_types)), BAD_REQUEST)
# correct type for `value`
setattr(task, key, value)
if g.json:
return (jsonify(error="Unknown columns in request: %r" % g.json),
BAD_REQUEST)
db.session.add(task)
db.session.commit()
task_data = task.to_dict(unpack_relationships=("job", "agent",
"children", "parents",
"project"))
if task.state is None and task.agent is None:
task_data["state"] = "queued"
elif task.state is None:
task_data["state"] = "assigned"
logger.info("Task %s of job %s has been updated, new data: %r",
task_id, task.job.title, task_data)
if agent:
task_count = Task.query.filter(
Task.agent == agent,
or_(Task.state == None,
Task.state == WorkState.RUNNING)).\
order_by(Task.job_id, Task.frame).count()
if task_count == 0:
assign_tasks_to_agent.delay(agent.id)
# This needs to be done after the transaction in which the task state
# was set has committed, so that the new transaction will see the results
# of other threads that were running concurrently but finished earlier.
if task.job and state_transition:
old_state = task.job.state
task.job.update_state()
db.session.commit()
if task.job.state != old_state and task.job.state == WorkState.DONE:
assign_tasks.delay()
if config.get("enable_statistics") and task.job and state_transition:
task_event_count = TaskEventCount(
job_queue_id=task.job.job_queue_id)
if new_state == "queued":
task_event_count.num_restarted = 1
elif new_state == "running":
task_event_count.num_started = 1
elif new_state == "done":
task_event_count.num_done = 1
elif new_state == "failed":
task_event_count.num_failed = 1
task_event_count.time_start = datetime.utcnow()
task_event_count.time_end = datetime.utcnow()
db.session.add(task_event_count)
db.session.commit()
return jsonify(task_data), OK
[docs] def get(self, job_name, task_id):
"""
A ``GET`` to this endpoint will return the requested task
.. http:get:: /api/v1/jobs/[<str:name>|<int:id>]/tasks/<int:task_id> HTTP/1.1
**Request**
.. sourcecode:: http
GET /api/v1/jobs/Test%20Job%202/tasks/1 HTTP/1.1
Accept: application/json
**Response**
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Type: application/json
{
"time_finished": null,
"agent": null,
"attempts": 0,
"frame": 2.0,
"agent_id": null,
"job": {
"id": 1,
"title": "Test Job"
},
"time_started": null,
"state": "running",
"project_id": null,
"id": 2,
"time_submitted": "2014-03-06T15:40:58.338904",
"project": null,
"parents": [],
"job_id": 1,
"hidden": false,
"children": [],
"priority": 0
}
:statuscode 200: no error
"""
task_query = Task.query.filter_by(id=task_id)
if isinstance(job_name, STRING_TYPES):
task_query.filter(Task.job.has(Job.title == job_name))
else:
task_query.filter(Task.job.has(Job.id == job_name))
task = task_query.first()
if not task:
return jsonify(error="Task not found"), NOT_FOUND
task_data = task.to_dict(unpack_relationships=("job", "agent",
"children", "parents",
"project"))
if task.state is None and task.agent is None:
task_data["state"] = "queued"
elif task.state is None:
task_data["state"] = "assigned"
return jsonify(task_data), OK
[docs]class TaskFailedOnAgentsIndexAPI(MethodView):
[docs] def get(self, job_id, task_id):
"""
A ``GET`` to this endpoint will return a list of all agents that failed
to execute this task
.. http:get:: /api/v1/jobs/<int:job_id>/tasks/<int:task_id>/failed_on_agents/ HTTP/1.1
**Request**
.. sourcecode:: http
GET /api/v1/jobs/12/tasks/1234/failed_on_agents/ HTTP/1.1
Accept: application/json
**Response**
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Type: application/json
[
{
"id": "02f08241-c556-4355-9e5e-33243d8c4577",
"hostname": "agent1"
}
]
:statuscode 200: no error
:statuscode 404: job or task not found
"""
task_query = Task.query.filter_by(id=task_id)
task_query.filter(Task.job.has(Job.id == job_id))
task = task_query.first()
if not task:
return jsonify(error="Task not found"), NOT_FOUND
out = []
for agent in task.failed_in_agents:
out.append({"hostname": agent.hostname,
"id": agent.id})
return jsonify(out), OK
[docs] def post(self, job_id, task_id):
"""
A ``POST`` to this endpoint will add the specified agent to the list
of agents that failed to execute this task
.. http:post:: /api/v1/jobs/<int:id>/tasks/<int:task_id>/failed_on_agents/ HTTP/1.1
**Request**
.. sourcecode:: http
POST /api/v1/jobs/12/tasks/1234/failed_on_agents/ HTTP/1.1
Accept: application/json
Content-Type: application/json
{
"id": "02f08241-c556-4355-9e5e-33243d8c4577"
}
**Response**
.. sourcecode:: http
HTTP/1.1 201 CREATED
Content-Type: application/json
{
"id": "02f08241-c556-4355-9e5e-33243d8c4577",
"hostname": "agent1"
}
:statuscode 201: a new entry was created
:statuscode 400: there was something wrong with the request (such as
invalid columns being included)
:statuscode 404: the job, task or agent specified does not exist
"""
task_query = Task.query.filter_by(id=task_id)
task_query.filter(Task.job.has(Job.id == job_id))
task = task_query.first()
if not task:
return jsonify(error="Task not found"), NOT_FOUND
agent_id = g.json.get("id")
agent = Agent.query.filter_by(id=agent_id).first()
if not agent:
return jsonify(error="Agent not found"), NOT_FOUND
if agent not in task.failed_in_agents:
task.failed_in_agents.append(agent)
db.session.add(task)
db.session.commit()
return jsonify({"id": agent_id, "hostname": agent.hostname}), CREATED
[docs]class SingleTaskOnAgentFailureAPI(MethodView):
[docs] def delete(self, job_id, task_id, agent_id):
"""
A ``DELETE`` to this endpoint will remove the specified agent from the
list of agents that failed to execute this task
.. http:delete:: /api/v1/jobs/<int:id>/tasks/<int:task_id>/failed_on_agents/<str:agent_id> HTTP/1.1
**Request**
.. sourcecode:: http
DELETE /api/v1/jobs/12/tasks/1234/failed_on_agents/02f08241-c556-4355-9e5e-33243d8c4577 HTTP/1.1
**Response**
.. sourcecode:: http
HTTP/1.1 204 NO_CONTENT
:statuscode 204: the given agent was removed from the list of agents
that failed to execute this task
:statuscode 404: the job, task or agent does not exist
"""
task_query = Task.query.filter_by(id=task_id)
task_query.filter(Task.job.has(Job.id == job_id))
task = task_query.first()
if not task:
return jsonify(error="Task not found"), NOT_FOUND
agent = Agent.query.filter_by(id=agent_id).first()
if not agent:
return jsonify(error="Agent not found"), NOT_FOUND
if agent in task.failed_in_agents:
task.failed_in_agents.remove(agent)
return jsonify(), NO_CONTENT
[docs]class JobNotifiedUsersIndexAPI(MethodView):
[docs] def get(self, job_name):
"""
A ``GET`` to this endpoint will return a list of all users to be notified
on events in this job.
.. http:get:: /api/v1/jobs/[<str:name>|<int:id>]/notified_users/ HTTP/1.1
**Request**
.. sourcecode:: http
GET /api/v1/jobs/Test%20Job%202/notified_users/ HTTP/1.1
Accept: application/json
**Response**
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Type: application/json
[
{
"id": 1,
"username": "testuser",
"email": "testuser@localhost"
}
]
:statuscode 200: no error
:statuscode 404: job not found
"""
if isinstance(job_name, STRING_TYPES):
job = Job.query.filter_by(title=job_name).first()
else:
job = Job.query.filter_by(id=job_name).first()
if not job:
return jsonify(error="Job not found"), NOT_FOUND
out = []
for notified_user in job.notified_users:
out.append({
"id": notified_user.user_id,
"username": notified_user.user.username,
"email": notified_user.user.email,
"on_success": notified_user.on_success,
"on_failure": notified_user.on_failure,
"on_deletion": notified_user.on_deletion})
return jsonify(out), OK
[docs] def post(self, job_name):
"""
A ``POST`` to this endpoint will add the specified user to the list of
notified users for this job.
.. http:post:: /api/v1/jobs/[<str:name>|<int:id>]/notified_users/ HTTP/1.1
**Request**
.. sourcecode:: http
POST /api/v1/jobs/Test%20Job/notified_users/ HTTP/1.1
Accept: application/json
{
"username": "testuser"
"on_success": true,
"on_failure": true,
"on_deletion": false
}
**Response**
.. sourcecode:: http
HTTP/1.1 201 CREATED
Content-Type: application/json
{
"id": 1
"username": "testuser"
"email": "testuser@example.com"
}
:statuscode 201: a new notified user entry was created
:statuscode 400: there was something wrong with the request (such as
invalid columns being included)
:statuscode 404: the job or the specified user does not exist
"""
if isinstance(job_name, STRING_TYPES):
job = Job.query.filter_by(title=job_name).first()
else:
job = Job.query.filter_by(id=job_name).first()
if not job:
return jsonify(error="Job not found"), NOT_FOUND
if "username" not in g.json:
return jsonify(error="No username specified"), BAD_REQUEST
username = g.json.pop("username")
user = User.query.filter_by(username=username).first()
if not user:
return jsonify("User %s not found" % username), NOT_FOUND
notified_user = JobNotifiedUser(job=job, user=user)
if "on_success" in g.json:
notified_user.on_success = g.json.pop("on_success")
if "on_failure" in g.json:
notified_user.on_failure = g.json.pop("on_failure")
if "on_deletion" in g.json:
notified_user.on_deletion = g.json.pop("on_deletion")
if g.json:
return jsonify(error="Unknown fields in request"), BAD_REQUEST
db.session.add(notified_user)
db.session.commit()
logger.info("Added user %s (id %s) to notified users for job %s (%s)",
user.username,
user.id,
job.title,
job.id)
return (jsonify({
"id": user.id,
"username": user.username,
"email": user.email,
"on_success": notified_user.on_success,
"on_failure": notified_user.on_failure,
"on_deletion": notified_user.on_deletion}),
CREATED)
[docs]class JobSingleNotifiedUserAPI(MethodView):
[docs] def delete(self, job_name, username):
"""
A ``DELETE`` to this endpoint will remove the specified user from the
list of notified users for this job.
.. http:delete:: /api/v1/jobs/[<str:name>|<int:id>]/notified_users/<str:username> HTTP/1.1
**Request**
.. sourcecode:: http
DELETE /api/v1/jobs/Test%20Job/notified_users/testuser HTTP/1.1
Accept: application/json
**Response**
.. sourcecode:: http
HTTP/1.1 204 NO_CONTENT
:statuscode 204: the notified user was removed from this job or wasn't
in the list in the first place
:statuscode 404: the job or the specified user does not exist
"""
if isinstance(job_name, STRING_TYPES):
job = Job.query.filter_by(title=job_name).first()
else:
job = Job.query.filter_by(id=job_name).first()
if not job:
return jsonify(error="Job not found"), NOT_FOUND
user = User.query.filter_by(username=username).first()
if not user:
return jsonify(error="User %s not found" % username), NOT_FOUND
notified_user = JobNotifiedUser.query.filter_by(
job=job, user=user).first()
if not notified_user:
return jsonify(), NO_CONTENT
db.session.delete(notified_user)
db.session.commit()
logger.info("Removed user %s (id %s) from notified users for "
"job %s (%s)", user.username, user.id, job.title, job.id)
return jsonify(), NO_CONTENT