Source code for pyfarm.agent.http.api.assign

# No shebang line, this module is meant to be imported
#
# Copyright 2014 Oliver Palmer
# Copyright 2014 Ambient Entertainment GmbH & Co. KG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from uuid import uuid4

try:
    from httplib import (
        ACCEPTED, BAD_REQUEST, CONFLICT, SERVICE_UNAVAILABLE, OK)
except ImportError:  # pragma: no cover
    from http.client import (
        ACCEPTED, BAD_REQUEST, CONFLICT, SERVICE_UNAVAILABLE, OK)

from functools import partial

from twisted.web.server import NOT_DONE_YET
from twisted.internet import reactor
from voluptuous import Schema, Required

from pyfarm.core.enums import WorkState
from pyfarm.agent.config import config
from pyfarm.agent.http.core.client import post, http_retry_delay
from pyfarm.agent.http.api.base import APIResource
from pyfarm.agent.logger import getLogger
from pyfarm.agent.utility import request_from_master
from pyfarm.agent.sysinfo.memory import free_ram
from pyfarm.agent.utility import JOBTYPE_SCHEMA, TASKS_SCHEMA, JOB_SCHEMA
from pyfarm.jobtypes.core.jobtype import JobType

logger = getLogger("agent.http.assign")


[docs]class Assign(APIResource): isLeaf = False # this is not really a collection of things # Schemas used for validating the request before # the target function will handle it. These make # assertions about what kind of input data is required # or not based on the agent's internal code. SCHEMAS = { "POST": Schema({ Required("job"): JOB_SCHEMA, Required("jobtype"): JOBTYPE_SCHEMA, Required("tasks"): TASKS_SCHEMA})} def __init__(self, agent): self.agent = agent
[docs] def post(self, **kwargs): request = kwargs["request"] request_data = kwargs["data"] if request_from_master(request): config.master_contacted() # First, get the resources we have *right now*. In some cases # this means using the functions in pyfarm.core.sysinfo because # entries in `config` could be slightly out of sync with the system. memory_free = free_ram() cpus = config["agent_cpus"] requires_ram = request_data["job"].get("ram") requires_cpus = request_data["job"].get("cpus") if self.agent.shutting_down: logger.error("Rejecting assignment because the agent is in the " "process of shutting down.") request.setResponseCode(SERVICE_UNAVAILABLE) request.write( {"error": "Agent cannot accept assignments because it is " "shutting down"}) request.finish() return NOT_DONE_YET if "restart_requested" in config \ and config["restart_requested"] is True: logger.error("Rejecting assignment because of scheduled restart.") request.setResponseCode(SERVICE_UNAVAILABLE) request.write( {"error": "Agent cannot accept assignments because of a " "pending restart"}) request.finish() return NOT_DONE_YET elif "agent_id" not in config: logger.error( "Agent has not yet connected to the master or `agent_id` " "has not been set yet.") request.setResponseCode(SERVICE_UNAVAILABLE) request.write( {"error": "agent_id has not been set in the config"}) request.finish() return NOT_DONE_YET # Do we have enough ram? elif requires_ram is not None and requires_ram > memory_free: logger.error( "Task %s requires %sMB of ram, this agent has %sMB free. " "Rejecting Task %s.", request_data["job"]["id"], requires_ram, memory_free, request_data["job"]["id"]) request.setResponseCode(BAD_REQUEST) request.write( {"error": "Not enough ram", "agent_ram": memory_free, "requires_ram": requires_ram}) request.finish() # touch the config config["free_ram"] = memory_free return NOT_DONE_YET # Do we have enough cpus (count wise)? elif requires_cpus is not None and requires_cpus > cpus: logger.error( "Task %s requires %s CPUs, this agent has %s CPUs. " "Rejecting Task %s.", request_data["job"]["id"], requires_cpus, cpus, request_data["job"]["id"]) request.setResponseCode(BAD_REQUEST) request.write( {"error": "Not enough cpus", "agent_cpus": cpus, "requires_cpus": requires_cpus}) request.finish() return NOT_DONE_YET # Check for double assignments try: current_assignments = config["current_assignments"].itervalues except AttributeError: # pragma: no cover current_assignments = config["current_assignments"].values existing_task_ids = set() new_task_ids = set(task["id"] for task in request_data["tasks"]) for assignment in current_assignments(): for task in assignment["tasks"]: existing_task_ids.add(task["id"]) if existing_task_ids & new_task_ids: request.setResponseCode(CONFLICT) request.write( {"error": "Double assignment of tasks", "duplicate_tasks": list(existing_task_ids & new_task_ids)}) request.finish() return NOT_DONE_YET assignment_uuid = uuid4() request_data.update(id=assignment_uuid) config["current_assignments"][assignment_uuid] = request_data # In all other cases we have some work to do inside of # deferreds so we just have to respond # TODO Mark this agent as running on the master request.setResponseCode(ACCEPTED) request.write({"id": assignment_uuid}) request.finish() logger.info("Accepted assignment %s: %r", assignment_uuid, request_data) def assignment_failed(result, assign_id): logger.error( "Assignment %s failed, removing.", assign_id) logger.error(result.getTraceback()) assignment = config["current_assignments"].pop(assign_id) if "jobtype" in assignment: jobtype_id = assignment["jobtype"].pop("id", None) if jobtype_id: config["jobtypes"].pop(jobtype_id, None) def assignment_started(_, assign_id): logger.debug("Assignment %s has started", assign_id) def assignment_stopped(_, assign_id): logger.debug("Assignment %s has stopped", assign_id) assignment = config["current_assignments"].pop(assign_id) if "jobtype" in assignment: jobtype_id = assignment["jobtype"].pop("id", None) if jobtype_id: config["jobtypes"].pop(jobtype_id, None) def restart_if_necessary(_): # pragma: no cover if "restart_requested" in config and config["restart_requested"]: config["agent"].stop() def load_jobtype_failed(result, assign_id): logger.error( "Loading jobtype for assignment %s failed, removing.", assign_id) traceback = result.getTraceback() logger.debug("Got traceback") logger.error(traceback) assignment = config["current_assignments"].pop(assign_id) # Mark all tasks as failed on master and set an error message logger.debug("Marking tasks in assignment as failed") def post_update(post_url, post_data, task, delay=0): post_func = partial(post, post_url, data=post_data, callback=lambda x: result_callback( post_url, post_data, task, x), errback=lambda x: error_callback( post_url, post_data, task, x)) reactor.callLater(delay, post_func) def result_callback(cburl, cbdata, task, response): if 500 <= response.code < 600: logger.error( "Error while marking task %s as failed on master, " "retrying", task["id"]) post_update(cburl, cbdata, task, delay=http_retry_delay()) elif response.code != OK: logger.error( "Could not mark task %s as failed, server response " "code was %s", task["id"], response.code) else: logger.info( "Marked task %s as failed on master", task["id"]) def error_callback(cburl, cbdata, task, failure_reason): logger.error( "Error while marking task %s as failed, retrying", task["id"], failure_reason) post_update(cburl, cbdata, task, delay=http_retry_delay()) for task in assignment["tasks"]: url = "%s/jobs/%s/tasks/%s" % ( config["master_api"], assignment["job"]["id"], task["id"]) data = { "state": WorkState.FAILED, "last_error": traceback} post_update(url, data, task) # If the loading was partially successful for some reason, there # might already be an entry for this jobtype in the config. # Remove it if it exists. if "jobtype" in assignment: jobtype_id = assignment["jobtype"].pop("id", None) if jobtype_id: config["jobtypes"].pop(jobtype_id, None) def loaded_jobtype(jobtype_class, assign_id): # TODO: report error to master if hasattr(jobtype_class, "getTraceback"): logger.error(jobtype_class.getTraceback()) return # TODO: add call to prepare_for_job # TODO: add call to spawn_persistent_process # Instance the job type and pass in the assignment data. instance = jobtype_class(request_data) if not isinstance(instance, JobType): raise TypeError( "Expected a subclass of " "pyfarm.jobtypes.core.jobtype.JobType") # TODO: add callback to cleanup_after_job # TODO: add callback to stop persistent process try: started_deferred, stopped_deferred = instance._start() started_deferred.addCallback(assignment_started, assign_id) started_deferred.addErrback(assignment_failed, assign_id) stopped_deferred.addCallback(assignment_stopped, assign_id) stopped_deferred.addErrback(assignment_failed, assign_id) stopped_deferred.addBoth(restart_if_necessary) stopped_deferred.addBoth( lambda *args: instance._remove_tempdirs()) except Exception as e: logger.error("Error on starting jobtype, stopping it now. " "Error was: %s", e) instance.stop(assignment_failed=True, error="Error while loading jobtype: %s" % e) assignment = config["current_assignments"].pop(assign_id) if "jobtype" in assignment: jobtype_id = assignment["jobtype"].pop("id", None) if jobtype_id: config["jobtypes"].pop(jobtype_id, None) # Load the job type then pass the class along to the # callback. No errback here because all the errors # are handled internally in this case. jobtype_loader = JobType.load(request_data) jobtype_loader.addCallback(loaded_jobtype, assignment_uuid) jobtype_loader.addErrback(load_jobtype_failed, assignment_uuid) return NOT_DONE_YET