Source code for pyfarm.master.api.agents

# No shebang line, this module is meant to be imported
#
# Copyright 2013 Oliver Palmer
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Agents
------

Contained within this module are an API handling functions which can
manage or query agents using JSON.
"""

import re
import uuid
from datetime import datetime
import json

try:
    from httplib import (
        NOT_FOUND, NO_CONTENT, OK, CREATED, BAD_REQUEST, CONFLICT,
        INTERNAL_SERVER_ERROR)
except ImportError:  # pragma: no cover
    from http.client import (
        NOT_FOUND, NO_CONTENT, OK, CREATED, BAD_REQUEST, CONFLICT,
        INTERNAL_SERVER_ERROR)


from flask import request, g
from flask.views import MethodView

from sqlalchemy import or_, not_

from pyfarm.core.logger import getLogger
from pyfarm.core.enums import WorkState, AgentState
from pyfarm.core.config import read_env
from pyfarm.scheduler.tasks import (
    assign_tasks, update_agent, assign_tasks_to_agent, send_tasks_to_agent)
from pyfarm.models.agent import Agent, AgentMacAddress
from pyfarm.models.gpu import GPU
from pyfarm.models.task import Task
from pyfarm.models.software import Software, SoftwareVersion
from pyfarm.master.application import db
from pyfarm.master.utility import (
    jsonify, validate_with_model, get_ipaddr_argument, get_integer_argument,
    get_hostname_argument, get_port_argument, isuuid)

logger = getLogger("api.agents")

DEFAULT_AGENT_SOFTWARE = json.loads(
    read_env("PYFARM_DEFAULT_AGENT_SOFTWARE", "{}"))
MAC_RE = re.compile("^([0-9a-fA-F]{2}:){5}[0-9a-fA-F]{2}$")


[docs]def fail_missing_assignments(agent, current_assignments): known_task_ids = [] for assignment in current_assignments.values(): for task in assignment["tasks"]: known_task_ids.append(task["id"]) tasks_query = Task.query.filter(Task.agent == agent, or_(Task.state == None, ~Task.state.in_( [WorkState.FAILED, WorkState.DONE]))) if known_task_ids: tasks_query = tasks_query.filter(not_(Task.id.in_(known_task_ids))) failed_tasks = [] unsent_tasks = False for task in tasks_query: if task.sent_to_agent: task.state = WorkState.FAILED db.session.add(task) failed_tasks.append(task) logger.warning("Task %s (frame %s from job %r (%s)) was not in the " "current assignments of agent %r (id %s) when it " "should be. Marking it as failed.", task.id, task.frame, task.job.title, task.job_id, agent.hostname, agent.id) else: unsent_tasks = True if unsent_tasks: send_tasks_to_agent.delay(agent.id) return failed_tasks
[docs]def schema(): """ Returns the basic schema of :class:`.Agent` .. http:get:: /api/v1/agents/schema HTTP/1.1 **Request** .. sourcecode:: http GET /api/v1/agents/schema HTTP/1.1 Accept: application/json **Response** .. sourcecode:: http HTTP/1.1 200 OK Content-Type: application/json { "ram": "INTEGER", "free_ram": "INTEGER", "time_offset": "INTEGER", "use_address": "INTEGER", "hostname": "VARCHAR(255)", "cpus": "INTEGER", "port": "INTEGER", "state": "INTEGER", "ram_allocation": "FLOAT", "cpu_allocation": "FLOAT", "id": "UUIDType", "remote_ip": "IPv4Address" } :statuscode 200: no error """ return jsonify(Agent.to_schema())
[docs]class AgentIndexAPI(MethodView): @validate_with_model(Agent, ignore=("current_assignments", "id"))
[docs] def post(self): """ A ``POST`` to this endpoint will either create or update an existing agent. The ``port`` and ``id`` columns will determine if an agent already exists. * If an agent is found matching the ``port`` and ``id`` columns from the request the existing model will be updated and the resulting data and the ``OK`` code will be returned. * If we don't find an agent matching the ``port`` and ``id`` however a new agent will be created and the resulting data and the ``CREATED`` code will be returned. .. note:: The ``remote_ip`` field is not required and should typically not be included in a request. When not provided ``remote_ip`` is be populated by the server based off of the ip of the incoming request. Providing ``remote_ip`` in your request however will override this behavior. .. http:post:: /api/v1/agents/ HTTP/1.1 **Request** .. sourcecode:: http POST /api/v1/agents/ HTTP/1.1 Accept: application/json { "cpu_allocation": 1.0, "cpus": 14, "free_ram": 133, "hostname": "agent1", "id": "6a0c11df-660f-4c1e-9fb4-5fe2b8cd2437", "remote_ip": "10.196.200.115", "port": 64994, "ram": 2157, "ram_allocation": 0.8, "state": 8 } **Response (agent created)** .. sourcecode:: http HTTP/1.1 201 CREATED Content-Type: application/json { "cpu_allocation": 1.0, "cpus": 14, "use_address": "remote", "free_ram": 133, "time_offset": 0, "hostname": "agent1", "id": "6a0c11df-660f-4c1e-9fb4-5fe2b8cd2437", "port": 64994, "ram": 2157, "ram_allocation": 0.8, "state": "online", "remote_ip": "10.196.200.115" } **Response (existing agent updated)** .. sourcecode:: http HTTP/1.1 200 OK Content-Type: application/json { "cpu_allocation": 1.0, "cpus": 14, "use_address": "remote", "free_ram": 133, "time_offset": 0, "hostname": "agent1", "id": "6a0c11df-660f-4c1e-9fb4-5fe2b8cd2437", "port": 64994, "ram": 2157, "ram_allocation": 0.8, "state": "online", "remote_ip": "10.196.200.115" } :statuscode 201: a new agent was created :statuscode 200: an existing agent is updated with data from the request :statuscode 400: there was something wrong with the request (such as invalid columns being included) """ # Read in and convert the id field try: g.json["id"] = uuid.UUID(g.json["id"]) except KeyError: return jsonify(error="`id` not provided"), BAD_REQUEST # Set remote_ip if it did not come in with the request g.json.setdefault("remote_ip", request.remote_addr) current_assignments = g.json.pop("current_assignments", None) mac_addresses = g.json.pop("mac_addresses", None) # TODO return BAD_REQUEST on bad mac addresses if mac_addresses is not None: mac_addresses = [x.lower() for x in mac_addresses if MAC_RE.match(x)] gpus = g.json.pop("gpus", None) agent = Agent.query.filter_by( port=g.json["port"], id=g.json["id"]).first() if agent is None: try: agent = Agent(**g.json) # There may be something wrong with one of the fields # that's causing our sqlalchemy model raise a ValueError. except ValueError as e: return jsonify(error=str(e)), BAD_REQUEST for item in DEFAULT_AGENT_SOFTWARE: software = Software.query.filter_by( software=item["software"]).first() if not software: return (jsonify( error="Default agent software %r not found" % item["software"]), INTERNAL_SERVER_ERROR) version = SoftwareVersion.query.filter_by( software=software, version=item["version"]).first() if not version: return (jsonify( error="Default agent software %r version %s not found" % (item["software"], item["version"])), INTERNAL_SERVER_ERROR) agent.software_versions.append(version) if mac_addresses is not None: for address in mac_addresses: mac_address = AgentMacAddress(agent=agent, mac_address=address) db.session.add(mac_address) if gpus is not None: for gpu_name in gpus: gpu = GPU.query.filter_by( fullname=gpu_name).first() if not gpu: gpu = GPU(fullname=gpu_name) db.session.add(gpu) agent.gpus.append(gpu) db.session.add(agent) try: db.session.commit() except Exception as e: e = e.args[0].lower() error = "Unhandled error: %s. This is often an issue " \ "with the agent's data for `ip`, `hostname` and/or " \ "`port` not being unique enough. In other cases " \ "this can sometimes happen if the underlying " \ "database driver is either non-compliant with " \ "expectations or we've encountered a database error " \ "that we don't know how to handle yet. If the " \ "latter is the case, please report this as a bug." % e return jsonify(error=error), INTERNAL_SERVER_ERROR else: agent_data = agent.to_dict(unpack_relationships=False) logger.info("Created agent %r: %r", agent.id, agent_data) assign_tasks.delay() return jsonify(agent_data), CREATED else: updated = False for key in g.json.copy(): value = g.json.pop(key) if not hasattr(agent, key): return jsonify( error="Agent has no such column `%s`" % key), \ BAD_REQUEST if getattr(agent, key) != value: try: setattr(agent, key, value) except Exception as e: return jsonify( error="Error while setting `%s`: %s" % (key, e)), \ BAD_REQUEST else: updated = True if mac_addresses is not None: updated = True for existing_address in agent.mac_addresses: if existing_address.mac_address.lower() not in mac_addresses: logger.debug("Existing address %s is not in supplied " "mac addresses, for agent %s, removing it.", existing_address.mac_address, agent.hostname) agent.mac_addresses.remove(existing_address) else: mac_addresses.remove( existing_address.mac_address.lower()) for new_address in mac_addresses: mac_address = AgentMacAddress( agent=agent, mac_address=new_address) db.session.add(mac_address) if gpus is not None: updated = True for existing_gpu in agent.gpus: if existing_gpu.fullname not in gpus: logger.debug("Existing gpu %s is not in supplied " "gpus, for agent %s, removing it.", existing_address.mac_address, agent.hostname) agent.gpus.remove(existing_gpu) else: gpus.remove(existing_gpu.fullname) for gpu_name in gpus: gpu = GPU.query.filter_by(fullname=gpu_name).first() if not gpu: gpu = GPU(fullname=gpu_name) db.session.add(gpu) agent.gpus.append(gpu) # TODO Only do that if this is really the agent speaking to us. failed_tasks = [] if (current_assignments is not None and agent.state != AgentState.OFFLINE): fail_missing_assignments(agent, current_assignments) if updated or failed_tasks: agent.last_heard_from = datetime.utcnow() db.session.add(agent) try: db.session.commit() except Exception as e: return jsonify(error="Unhandled error: %s" % e), \ INTERNAL_SERVER_ERROR else: agent_data = agent.to_dict(unpack_relationships=False) logger.info("Updated agent %r: %r", agent.id, agent_data) for task in failed_tasks: task.job.update_state() db.session.commit() assign_tasks.delay() return jsonify(agent_data), OK
[docs] def get(self): """ A ``GET`` to this endpoint will return a list of known agents, with id and name. .. http:get:: /api/v1/agents/ HTTP/1.1 **Request** .. sourcecode:: http GET /api/v1/agents/ HTTP/1.1 Accept: application/json **Response** .. sourcecode:: http HTTP/1.1 200 OK Content-Type: application/json [ { "hostname": "agent1", "id": "dd0c6da2-0c91-42cf-a82f-6d503aae43d3" }, { "hostname": "agent2", "id": "8326779e-90b5-447c-8da8-1eaa154771d9" }, { "hostname": "agent3.local", "id": "14b28230-64a1-4b62-803e-5fd1baa209e4" } ] **Request (with filters)** .. sourcecode:: http GET /api/v1/agents/?min_ram=4096&min_cpus=4 HTTP/1.1 Accept: application/json **Response** .. sourcecode:: http HTTP/1.1 200 OK Content-Type: application/json [ { "hostname": "foobar", "port": 50000, "remote_ip": "127.0.0.1", "id": "e20bae92-6472-442e-98a8-0ea4c9ee41cd" } ] :qparam min_ram: If set, list only agents with ``min_ram`` ram or more :qparam max_ram: If set, list only agents with ``max_ram`` ram or less :qparam min_cpus: If set, list only agents with ``min_cpus`` cpus or more :qparam max_cpus: If set, list only agents with ``max_cpus`` cpus or less :qparam hostname: If set, list only agents matching ``hostname`` :qparam remote_ip: If set, list only agents matching ``remote_ip`` :qparam port: If set, list only agents matching ``port``. :statuscode 200: no error, host may or may not have been found """ query = db.session.query( Agent.id, Agent.hostname, Agent.port, Agent.remote_ip) # parse url arguments min_ram = get_integer_argument("min_ram") max_ram = get_integer_argument("max_ram") min_cpus = get_integer_argument("min_cpus") max_cpus = get_integer_argument("max_cpus") hostname = get_hostname_argument("hostname") remote_ip = get_ipaddr_argument("remote_ip") port = get_port_argument("port") # construct query if min_ram is not None: query = query.filter(Agent.ram >= min_ram) if max_ram is not None: query = query.filter(Agent.ram <= max_ram) if min_cpus is not None: query = query.filter(Agent.cpus >= min_cpus) if max_cpus is not None: query = query.filter(Agent.cpus <= max_cpus) if hostname is not None: query = query.filter(Agent.hostname == hostname) if remote_ip is not None: query = query.filter(Agent.remote_ip == remote_ip) if port is not None: query = query.filter(Agent.port == port) # run query and convert the results output = [] for host in query: host = dict(zip(host.keys(), host)) # convert the IPAddress object, if set if host["remote_ip"] is not None: host["remote_ip"] = str(host["remote_ip"]) output.append(host) return jsonify(output), OK
[docs]class SingleAgentAPI(MethodView): """ API view which is used for retrieving information about and updating single agents. """
[docs] def get(self, agent_id): """ Return basic information about a single agent .. http:get:: /api/v1/agents/(str:agent_id) HTTP/1.1 **Request (agent exists)** .. sourcecode:: http GET /api/v1/agents/4eefca76-1127-4c17-a3df-c1a7de685541 HTTP/1.1 Accept: application/json **Response** .. sourcecode:: http HTTP/1.1 200 OK Content-Type: application/json { "cpu_allocation": 1.0, "cpus": 14, "use_address": 311, "free_ram": 133, "time_offset": 0, "hostname": "agent1", "id": "322360ad-976f-4103-9acc-a811d43fd24d", "ip": "10.196.200.115", "port": 64994, "ram": 2157, "ram_allocation": 0.8, "state": 202, "remote_ip": "10.196.200.115" } **Request (no such agent)** .. sourcecode:: http GET /api/v1/agents/4eefca76-1127-4c17-a3df-c1a7de685541 HTTP/1.1 Accept: application/json **Response** .. sourcecode:: http HTTP/1.1 404 NOT FOUND Content-Type: application/json {"error": "Agent `4eefca76-1127-4c17-a3df-c1a7de685541` not " "found"} :statuscode 200: no error :statuscode 400: something within the request is invalid :statuscode 404: no agent could be found using the given id """ agent = Agent.query.filter_by(id=agent_id).first() if agent is not None: return jsonify(agent.to_dict(unpack_relationships=False)) else: return jsonify(error="Agent %s not found" % agent_id), NOT_FOUND
@validate_with_model( Agent, type_checks={"id": isuuid}, ignore=("current_assignments", ), ignore_missing=( "ram", "cpus", "port", "free_ram", "hostname"))
[docs] def post(self, agent_id): """ Update an agent's columns with new information by merging the provided data with the agent's current definition in the database. .. http:post:: /api/v1/agents/(str:agent_id) HTTP/1.1 **Request** .. sourcecode:: http POST /api/v1/agents/29d466a5-34f8-408a-b613-e6c2715077a0 HTTP/1.1 Accept: application/json {"ram": 1234} **Response** .. sourcecode:: http HTTP/1.1 200 OK Content-Type: application/json { "cpu_allocation": 1.0, "cpus": 14, "use_address": 311, "free_ram": 133, "time_offset": 0, "hostname": "agent1", "id": "29d466a5-34f8-408a-b613-e6c2715077a0", "ip": "10.196.200.115", "port": 64994, "ram": 1234, "ram_allocation": 0.8, "state": "running", "remote_ip": "10.196.200.115" } :statuscode 200: no error :statuscode 400: something within the request is invalid :statuscode 404: no agent could be found using the given id """ agent = Agent.query.filter_by(id=agent_id).first() if agent is None: return jsonify(error="Agent %s not found" % agent_id), NOT_FOUND if "remote_ip" not in g.json: g.json["remote_ip"] = request.remote_addr current_assignments = g.json.pop("current_assignments", None) mac_addresses = g.json.pop("mac_addresses", None) # TODO return BAD_REQUEST on bad mac addresses if mac_addresses is not None: mac_addresses = [x.lower() for x in mac_addresses if MAC_RE.match(x)] gpus = g.json.pop("gpus", None) try: items = g.json.iteritems except AttributeError: items = g.json.items modified = {} for key, value in items(): if value != getattr(agent, key): try: setattr(agent, key, value) # There may be something wrong with one of the fields # that's causing our sqlalchemy model to raise a ValueError. except ValueError as e: return jsonify(error=str(e)), BAD_REQUEST modified[key] = value agent.last_heard_from = datetime.utcnow() if "upgrade_to" in modified: update_agent.delay(agent.id) if mac_addresses is not None: modified["mac_addresses"] = mac_addresses for existing_address in agent.mac_addresses: if existing_address.mac_address.lower() not in mac_addresses: logger.debug("Existing address %s is not in supplied " "mac addresses, for agent %s, removing it.", existing_address.mac_address, agent.hostname) agent.mac_addresses.remove(existing_address) else: mac_addresses.remove( existing_address.mac_address.lower()) for new_address in mac_addresses: mac_address = AgentMacAddress( agent=agent, mac_address=new_address) db.session.add(mac_address) if gpus is not None: modified["gpus"] = gpus for existing_gpu in agent.gpus: if existing_gpu.fullname not in gpus: logger.debug("Existing gpu %s is not in supplied " "gpus, for agent %s, removing it.", existing_address.mac_address, agent.hostname) agent.gpus.remove(existing_gpu) else: gpus.remove(existing_gpu.fullname) for gpu_name in gpus: gpu = GPU.query.filter_by(fullname=gpu_name).first() if not gpu: gpu = GPU(fullname=gpu_name) db.session.add(gpu) agent.gpus.append(gpu) # TODO Only do that if this is really the agent speaking to us. failed_tasks = [] if (current_assignments is not None and agent.state != AgentState.OFFLINE): failed_tasks = fail_missing_assignments(agent, current_assignments) logger.debug( "Updated agent %r: %r", agent.id, modified) db.session.add(agent) db.session.commit() for task in failed_tasks: task.job.update_state() db.session.commit() assign_tasks_to_agent.delay(agent_id) return jsonify(agent.to_dict(unpack_relationships=False)), OK
[docs] def delete(self, agent_id): """ Delete a single agent .. http:delete:: /api/v1/agents/(uuid:agent_id) HTTP/1.1 **Request (agent exists)** .. sourcecode:: http DELETE /api/v1/agents/b25ee7eb-9586-439a-b131-f5d022e0d403 HTTP/1.1 Accept: application/json **Response** .. sourcecode:: http HTTP/1.1 204 NO CONTENT Content-Type: application/json :statuscode 204: the agent was deleted or did not exist """ agent = Agent.query.filter_by(id=agent_id).first() if agent is None: return jsonify(None), NO_CONTENT else: db.session.delete(agent) db.session.commit() assign_tasks.delay() return jsonify(None), NO_CONTENT
[docs]class TasksInAgentAPI(MethodView):
[docs] def get(self, agent_id): """ A ``GET`` to this endpoint will return a list of all tasks assigned to this agent. .. http:get:: /api/v1/agents/<str:agent_id>/tasks/ HTTP/1.1 **Request** .. sourcecode:: http GET /api/v1/agents/bbf55143-f2b1-4c15-9d41-139bd8057931/tasks/ HTTP/1.1 Accept: application/json **Response** .. sourcecode:: http HTTP/1.1 200 OK Content-Type: application/json [ { "state": "assign", "priority": 0, "job": { "jobtype": "TestJobType", "id": 1, "title": "Test Job", "jobtype_version": 1, "jobtype_id": 1 }, "hidden": false, "time_started": null, "project_id": null, "frame": 2.0 "agent_id": "bbf55143-f2b1-4c15-9d41-139bd8057931", "id": 2, "attempts": 2, "project": null, "time_finished": null, "time_submitted": "2014-03-06T15:40:58.338904", "job_id": 1 } ] :statuscode 200: no error :statuscode 404: agent not found """ agent = Agent.query.filter_by(id=agent_id).first() if agent is None: return jsonify(error="Agent %r not found" % agent_id), NOT_FOUND out = [] for task in agent.tasks: task_dict = task.to_dict(unpack_relationships=False) task_dict["job"] = { "id": task.job.id, "title": task.job.title, "jobtype": task.job.jobtype_version.jobtype.name, "jobtype_id": task.job.jobtype_version.jobtype_id, "jobtype_version": task.job.jobtype_version.version } out.append(task_dict) return jsonify(out), OK
[docs] def post(self, agent_id): """ A ``POST`` to this endpoint will assign am existing task to the agent. .. http:post:: /api/v1/agents/<str:agent_id>/tasks/ HTTP/1.1 **Request** .. sourcecode:: http POST /api/v1/agents/238d7334-8ca5-4469-9f54-e76c66614a43/tasks/ HTTP/1.1 Accept: application/json { "id": 2 } **Response** .. sourcecode:: http HTTP/1.1 200 OK Content-Type: application/json { "agent_id": 1, "parents": [], "attempts": 2, "children": [], "job": { "title": "Test Job", "id": 1 }, "project_id": null, "agent": { "ip": null, "hostname": "agent1", "port": 50000, "id": "238d7334-8ca5-4469-9f54-e76c66614a43" }, "hidden": false, "job_id": 1, "time_submitted": "2014-03-06T15:40:58.338904", "frame": 2.0, "priority": 0, "state": "assign", "time_finished": null, "id": 2, "project": null, "time_started": null } :statuscode 200: no error :statuscode 404: agent not found """ if "id" not in g.json: return jsonify(error="No id given for task"), BAD_REQUEST if len(g.json) > 1: return jsonify(error="Unknown keys in request"), BAD_REQUEST agent = Agent.query.filter_by(id=agent_id).first() if agent is None: return jsonify(error="Agent %r not found" % agent_id), NOT_FOUND task = Task.query.filter_by(id=g.json["id"]).first() if not task: return jsonify(error="Task not found"), NOT_FOUND task.agent = agent db.session.add(task) db.session.commit() logger.info("Assigned task %s (frame %s, job %s) to agent %s (%s)", task.id, task.frame, task.job.title, agent.id, agent.hostname) assign_tasks.delay() return jsonify(task.to_dict()), OK