Source code for pyfarm.master.api.jobqueues

# No shebang line, this module is meant to be imported
#
# Copyright 2014 Ambient Entertainment GmbH & Co. KG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Job Queues
----------

This module defines an API for managing and querying job queues
"""
try:
    from httplib import OK, CREATED, CONFLICT, NOT_FOUND, NO_CONTENT, BAD_REQUEST
except ImportError:  # pragma: no cover
    from http.client import (
        OK, CREATED, CONFLICT, NOT_FOUND, NO_CONTENT, BAD_REQUEST)

from flask import g
from flask.views import MethodView

from pyfarm.core.logger import getLogger
from pyfarm.core.enums import STRING_TYPES

from pyfarm.models.job import Job
from pyfarm.models.jobqueue import JobQueue
from pyfarm.master.application import db
from pyfarm.master.utility import jsonify, validate_with_model


logger = getLogger("api.jobqueues")


# Load model mappings once per process
JOBQUEUE_MODEL_MAPPINGS = JobQueue.types().mappings


[docs]def schema(): """ Returns the basic schema of :class:`.JobQueue` .. http:get:: /api/v1/jobqueues/schema HTTP/1.1 **Request** .. sourcecode:: http GET /api/v1/jobqueues/schema HTTP/1.1 Accept: application/json **Response** .. sourcecode:: http HTTP/1.1 200 OK Content-Type: application/json { "id": "INTEGER", "name": VARCHAR(255)", "minimum_agents": "INTEGER", "maximum_agents": "INTEGER", "priority": "INTEGER", "weight": "INTEGER", "parent_jobqueue_id": "INTEGER" } :statuscode 200: no error """ return jsonify(JobQueue.to_schema()), OK
[docs]class JobQueueIndexAPI(MethodView): @validate_with_model(JobQueue)
[docs] def post(self): """ A ``POST`` to this endpoint will create a new job queue. .. http:post:: /api/v1/jobqueues/ HTTP/1.1 **Request** .. sourcecode:: http POST /api/v1/jobqueues/ HTTP/1.1 Accept: application/json { "name": "Test Queue" } **Response** .. sourcecode:: http HTTP/1.1 201 CREATED Content-Type: application/json { "weight": 10, "jobs": [], "minimum_agents": null, "priority": 5, "name": "Test Queue", "maximum_agents": null, "id": 1, "parent": null, "parent_jobqueue_id": null } :statuscode 201: a new job queue was created :statuscode 400: there was something wrong with the request (such as invalid columns being included) :statuscode 409: a job queue with that name already exists """ jobqueue = JobQueue.query.filter_by(name=g.json["name"]).first() if jobqueue: return (jsonify(error="Job queue %s already exists" % g.json["name"]), CONFLICT) jobqueue = JobQueue(**g.json) db.session.add(jobqueue) db.session.flush() jobqueue.fullpath = jobqueue.path() db.session.add(jobqueue) db.session.commit() jobqueue_data = jobqueue.to_dict() logger.info("Created job queue %s: %r", jobqueue.name, jobqueue_data) return jsonify(jobqueue_data), CREATED
[docs] def get(self): """ A ``GET`` to this endpoint will return a list of known job queues. .. http:get:: /api/v1/jobqueues/ HTTP/1.1 **Request** .. sourcecode:: http GET /api/v1/jobqueues/ HTTP/1.1 Accept: application/json **Response** .. sourcecode:: http HTTP/1.1 200 OK Content-Type: application/json [ { "priority": 5, "weight": 10, "parent_jobqueue_id": null, "name": "Test Queue", "minimum_agents": null, "id": 1, "maximum_agents": null }, { "priority": 5, "weight": 10, "parent_jobqueue_id": null, "name": "Test Queue 2", "minimum_agents": null, "id": 2, "maximum_agents": null } ] :statuscode 200: no error """ out = [] for jobqueue in JobQueue.query: out.append(jobqueue.to_dict(unpack_relationships=False)) return jsonify(out), OK
[docs]class SingleJobQueueAPI(MethodView):
[docs] def get(self, queue_rq): """ A ``GET`` to this endpoint will return the requested job queue .. http:get:: /api/v1/jobqueues/[<str:name>|<int:id>] HTTP/1.1 **Request** .. sourcecode:: http GET /api/v1/jobqueues/Test%20Queue HTTP/1.1 Accept: application/json **Response** .. sourcecode:: http HTTP/1.1 200 OK Content-Type: application/json { "id": 1, "parent": [], "jobs": [], "weight": 10, "parent_jobqueue_id": null, "priority": 5, "minimum_agents": null, "name": "Test Queue", "maximum_agents": null } :statuscode 200: no error :statuscode 404: the requested job queue was not found """ if isinstance(queue_rq, STRING_TYPES): jobqueue = JobQueue.query.filter_by(name=queue_rq).first() else: jobqueue = JobQueue.query.filter_by(id=queue_rq).first() if not jobqueue: return (jsonify(error="Requested job queue %r not found" % queue_rq), NOT_FOUND) return jsonify(jobqueue.to_dict()), OK
[docs] def post(self, queue_rq): """ A ``POST`` to this endpoint will update the specified queue with the data in the request. Columns not specified in the request will be left as they are. .. http:post:: /api/v1/jobqueues/[<str:name>|<int:id>] HTTP/1.1 **Request** .. sourcecode:: http POST /api/v1/jobqueues/Test%20Queue HTTP/1.1 Accept: application/json { "priority": 6 } **Response** .. sourcecode:: http HTTP/1.1 201 OK Content-Type: application/json { "id": 1, "parent": [], "jobs": [], "weight": 10, "parent_jobqueue_id": null, "priority": 6, "minimum_agents": null, "name": "Test Queue", "maximum_agents": null } :statuscode 200: the job queue was updated :statuscode 400: there was something wrong with the request (such as invalid columns being included) """ if isinstance(queue_rq, STRING_TYPES): jobqueue = JobQueue.query.filter_by(name=queue_rq).first() else: jobqueue = JobQueue.query.filter_by(id=queue_rq).first() if not jobqueue: return (jsonify(error="Requested job queue %r not found" % queue_rq), NOT_FOUND) # This would allow users to create circles in the job queue tree if "parent_jobqueue_id" in g.json: return (jsonify(error="The parent queue cannot be changed"), BAD_REQUEST) for name in JOBQUEUE_MODEL_MAPPINGS: if name in g.json: expected_type = JOBQUEUE_MODEL_MAPPINGS[name] value = g.json.pop(name) if not isinstance(value, expected_type): return (jsonify(error="Column `%s` is of type %r, but we " "expected %r" % (name, type(value), expected_type)), BAD_REQUEST) setattr(jobqueue, name, value) if g.json: return jsonify(error="Unkown columns: %s" % g.json), BAD_REQUEST # It is possible for a call to this to change a queue's name db.session.add(jobqueue) db.session.flush() jobqueue.fullpath = jobqueue.path() for childqueue in jobqueue.children: childqueue.fullpath = childqueue.path() db.session.add(childqueue) db.session.add(jobqueue) db.session.commit() jobqueue_data = jobqueue.to_dict() logger.info("Updated job queue %s: %r", jobqueue.name, jobqueue_data) return jsonify(jobqueue_data), OK
[docs] def delete(self, queue_rq): """ A ``DELETE`` to this endpoint will delete the specified job queue .. http:delete:: /api/v1/jobqueue/[<str:name>|<int:id>] **Request** .. sourcecode:: http DELETE /api/v1/jobqueues/Test%20Queue HTTP/1.1 Accept: application/json **Response** .. sourcecode:: http HTTP/1.1 204 NO_CONTENT :statuscode 204: the job queue was deleted or didn't exist :statuscode 409: the job queue cannot be deleted because it still contains jobs or child queues """ if isinstance(queue_rq, STRING_TYPES): jobqueue = JobQueue.query.filter_by(name=queue_rq).first() else: jobqueue = JobQueue.query.filter_by(id=queue_rq).first() if not jobqueue: return jsonify(), NO_CONTENT num_sub_queues = JobQueue.query.filter_by(parent=jobqueue).count() if num_sub_queues > 0: return (jsonify(error="Cannot delete: job queue has child queues"), CONFLICT) num_jobs = Job.query.filter_by(queue=jobqueue).count() if num_jobs > 0: return (jsonify(error="Cannot delete: job queue has jobs assigned"), CONFLICT) db.session.delete(jobqueue) db.session.commit() logger.info("Deleted job queue %s", jobqueue.name) return jsonify(), NO_CONTENT