Source code for pyfarm.models.agent

# No shebang line, this module is meant to be imported
#
# Copyright 2013 Oliver Palmer
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Agent Models
============

Models and interface classes related to the agent.
"""

import re
import uuid
from datetime import datetime

from sqlalchemy import or_
from sqlalchemy.schema import UniqueConstraint
from sqlalchemy.orm import validates
from netaddr import AddrFormatError, IPAddress

from pyfarm.core.enums import (
    AgentState, STRING_TYPES, UseAgentAddress, INTEGER_TYPES, WorkState)
from pyfarm.master.config import config
from pyfarm.master.application import db
from pyfarm.models.core.functions import repr_ip
from pyfarm.models.core.mixins import (
    ValidatePriorityMixin, UtilityMixins, ReprMixin, ValidateWorkStateMixin)
from pyfarm.models.core.types import (
    id_column, IPv4Address, IDTypeAgent, IDTypeWork, UseAgentAddressEnum,
    OperatingSystemEnum, AgentStateEnum, MACAddress)
from pyfarm.models.jobtype import JobTypeVersion
from pyfarm.models.job import Job


__all__ = ("Agent", )

ALLOW_AGENT_LOOPBACK = config.get("allow_agents_from_loopback")
REGEX_HOSTNAME = re.compile("^(?!-)[A-Z\d-]{1,63}(?<!-)"
                            "(\.(?!-)[A-Z\d-]{1,63}(?<!-))*\.?$",
                            re.IGNORECASE)


AgentSoftwareVersionAssociation = db.Table(
    config.get("table_agent_software_version_assoc"), db.metadata,
    db.Column(
        "agent_id", IDTypeAgent,
        db.ForeignKey("%s.id" % config.get("table_agent")),
        primary_key=True),
    db.Column(
        "software_version_id", db.Integer,
        db.ForeignKey("%s.id" % config.get("table_software_version")),
        primary_key=True))


AgentTagAssociation = db.Table(
    config.get("table_agent_tag_assoc"), db.metadata,
    db.Column(
        "agent_id", IDTypeAgent,
        db.ForeignKey("%s.id" % config.get("table_agent")),
        primary_key=True),
    db.Column(
        "tag_id", db.Integer,
        db.ForeignKey("%s.id" % config.get("table_tag")),
        primary_key=True))


FailedTaskInAgent = db.Table(
    config.get("table_failed_task_in_agent"), db.metadata,
    db.Column(
        "agent_id", IDTypeAgent,
        db.ForeignKey("%s.id" % config.get("table_agent")),
        primary_key=True),
    db.Column(
        "task_id", IDTypeWork,
        db.ForeignKey("%s.id" % config.get("table_task")),
        primary_key=True))


GPUInAgent = db.Table(
    config.get("table_gpu_in_agent"), db.metadata,
    db.Column(
        "agent_id", IDTypeAgent,
        db.ForeignKey("%s.id" % config.get("table_agent")),
        primary_key=True),
    db.Column(
        "gpu_id", db.Integer,
        db.ForeignKey("%s.id" % config.get("table_gpu")),
        primary_key=True))


class AgentTaggingMixin(object):
    """
    Mixin used which provides some common structures to
    :class:`.AgentTag` and :class:`.AgentSoftware`
    """
    @validates("tag", "software")
    def validate_string_column(self, key, value):
        """
        Ensures `value` is a string or something that can be converted
        to a string.
        """
        if isinstance(value, INTEGER_TYPES):
            value = str(value)
        elif not isinstance(value, STRING_TYPES):
            raise ValueError("expected a string for `%s`" % key)

        return value


class AgentMacAddress(db.Model):
    __tablename__ = config.get("table_agent_mac_address")
    __table_args__ = (UniqueConstraint("agent_id", "mac_address"), )

    agent_id = db.Column(
        IDTypeAgent,
        db.ForeignKey("%s.id" % config.get("table_agent")),
        primary_key=True, nullable=False)
    mac_address = db.Column(
        MACAddress,
        primary_key=True, nullable=False, autoincrement=False)


[docs]class Agent(db.Model, ValidatePriorityMixin, ValidateWorkStateMixin, UtilityMixins, ReprMixin): """ Stores information about an agent include its network address, state, allocation configuration, etc. .. note:: This table enforces two forms of uniqueness. The :attr:`id` column must be unique and the combination of these columns must also be unique to limit the frequency of duplicate data: * :attr:`hostname` * :attr:`port` * :attr:`id` """ __tablename__ = config.get("table_agent") __table_args__ = (UniqueConstraint("hostname", "port", "id"), ) STATE_ENUM = AgentState STATE_DEFAULT = "online" REPR_COLUMNS = ( "id", "hostname", "port", "state", "remote_ip", "cpus", "ram", "free_ram") REPR_CONVERT_COLUMN = {"remote_ip": repr_ip} URL_TEMPLATE = config.get("agent_api_url_template") MIN_PORT = config.get("agent_min_port") MAX_PORT = config.get("agent_max_port") MIN_CPUS = config.get("agent_min_cpus") MAX_CPUS = config.get("agent_max_cpus") MIN_RAM = config.get("agent_min_ram") MAX_RAM = config.get("agent_max_ram") # quick check of the configured data assert MIN_PORT >= 1, "`agent_min_port` must be > 0" assert MAX_PORT >= 1, "`agent_max_port` must be > 0" assert MAX_PORT >= MIN_PORT, "MIN_PORT must be <= MAX_PORT" assert MIN_CPUS >= 1, "`agent_min_cpus` must be > 0" assert MAX_CPUS >= 1, "`agent_max_cpus` must be > 0" assert MAX_CPUS >= MIN_CPUS, "MIN_CPUS must be <= MAX_CPUS" assert MIN_RAM >= 1, "`agent_min_ram` must be > 0" assert MAX_RAM >= 1, "`agent_max_ram` must be > 0" assert MAX_RAM >= MIN_RAM, "`agent_min_ram` must be <= `agent_max_ram`" id = id_column(IDTypeAgent, default=uuid.uuid4, autoincrement=False) # basic host attribute information hostname = db.Column( db.String(config.get("max_hostname_length")), nullable=False, doc="The hostname we should use to talk to this host. " "Preferably this value will be the fully qualified " "name instead of the base hostname alone.") notes = db.Column( db.Text, default="", doc="Free form notes about this agent") remote_ip = db.Column( IPv4Address, nullable=True, doc="the remote address which came in with the request") use_address = db.Column( UseAgentAddressEnum, nullable=False, default=UseAgentAddress.REMOTE, doc="The address we should use when communicating with the agent") # TODO Make non-nullable later os_class = db.Column( OperatingSystemEnum, doc="The type of operating system running on the " "agent; 'linux', 'windows', or 'mac'.") os_fullname = db.Column( db.String(config.get("max_osname_length")), doc="The full human-readable name of the agent's OS, as returned " "by platform.platform()") ram = db.Column( db.Integer, nullable=False, doc="The amount of ram installed on the agent in megabytes") free_ram = db.Column( db.Integer, nullable=False, doc="The amount of ram which was last considered free") cpus = db.Column( db.Integer, nullable=False, doc="The number of logical CPU cores installed on the agent") cpu_name = db.Column( db.String(config.get("max_cpuname_length")), doc="The make and model of CPUs in this agents") port = db.Column( db.Integer, nullable=False, doc="The port the agent is currently running on") time_offset = db.Column( db.Integer, nullable=False, default=0, doc="The offset in seconds the agent is from an official time server") version = db.Column( db.String(16), nullable=True, doc="The pyfarm version number this agent is running.") upgrade_to = db.Column( db.String(16), nullable=True, doc="The version this agent should upgrade to.") restart_requested = db.Column( db.Boolean, default=False, nullable=False, doc="If True, the agent will be restarted") # host state state = db.Column( AgentStateEnum, default=AgentState.ONLINE, nullable=False, doc="Stores the current state of the host. This value can be " "changed either by a master telling the host to do " "something with a task or from the host via REST api.") last_heard_from = db.Column( db.DateTime, default=datetime.utcnow, doc="Time we last had contact with this agent") last_success_on = db.Column( db.DateTime, nullable=True, doc="The last time this agent has set a task to `done`") last_polled = db.Column( db.DateTime, doc="Time we last tried to contact the agent") # Max allocation of the two primary resources which `1.0` is 100% # allocation. For `cpu_allocation` 100% allocation typically means # one task per cpu. ram_allocation = db.Column( db.Float, default=config.get("agent_ram_allocation"), doc="The amount of ram the agent is allowed to allocate " "towards work. A value of 1.0 would mean to let the " "agent use all of the memory installed on the system " "when assigning work.") cpu_allocation = db.Column( db.Float, default=config.get("agent_cpu_allocation"), doc="The total amount of cpu space an agent is allowed to " "process work in. A value of 1.0 would mean an agent " "can handle as much work as the system could handle " "given the requirements of a task. For example if " "an agent has 8 cpus, cpu_allocation is .5, and a " "task requires 4 cpus then only that task will " "run on the system.") # # Relationships # tasks = db.relationship( "Task", backref="agent", lazy="dynamic", doc="Relationship between an :class:`Agent` and any " ":class:`pyfarm.models.Task` objects") tags = db.relationship( "Tag", secondary=AgentTagAssociation, backref=db.backref("agents", lazy="dynamic"), lazy="dynamic", doc="Tags associated with this agent") software_versions = db.relationship( "SoftwareVersion", secondary=AgentSoftwareVersionAssociation, backref=db.backref("agents", lazy="dynamic"), lazy="dynamic", doc="software this agent has installed or is configured for") mac_addresses = db.relationship( "AgentMacAddress", backref="agent", lazy="dynamic", doc="The MAC addresses this agent has", cascade="save-update, merge, delete, delete-orphan") gpus = db.relationship( "GPU", secondary=GPUInAgent, backref=db.backref("agents", lazy="dynamic"), lazy="dynamic", doc="The graphics cards that are installed in this agent") disks = db.relationship( "AgentDisk", backref=db.backref("agent"), lazy="dynamic", doc="The known disks available to this agent", cascade="save-update, merge, delete, delete-orphan") failed_tasks = db.relationship( "Task", secondary=FailedTaskInAgent, backref=db.backref("failed_in_agents", lazy="dynamic"), lazy="dynamic", doc="The tasks this agents failed to execute")
[docs] def is_offline(self): return self.state == AgentState.OFFLINE
[docs] def is_disabled(self): return self.state == AgentState.DISABLED
[docs] def get_supported_types(self): try: return self.support_jobtype_versions except AttributeError: jobtype_versions_query = JobTypeVersion.query.filter( JobTypeVersion.jobs.any( or_(Job.state == None, Job.state == WorkState.RUNNING))) self.support_jobtype_versions = [] for jobtype_version in jobtype_versions_query: if self.satisfies_jobtype_requirements(jobtype_version): self.support_jobtype_versions.append(jobtype_version.id) return self.support_jobtype_versions
[docs] def satisfies_jobtype_requirements(self, jobtype_version): requirements_to_satisfy = list(jobtype_version.software_requirements) for software_version in self.software_versions: for requirement in list(requirements_to_satisfy): if (software_version.software == requirement.software and (requirement.min_version == None or requirement.min_version.rank <= software_version.rank) and (requirement.max_version == None or requirement.max_version.rank >= software_version.rank)): requirements_to_satisfy.remove(requirement) return len(requirements_to_satisfy) == 0
[docs] def satisfies_job_requirements(self, job): if not self.satisfies_jobtype_requirements(job.jobtype_version): return False if self.cpus < job.cpus: return False if self.free_ram < job.ram: return False for tag_requirement in job.tag_requirements: if (not tag_requirement.negate and tag_requirement.tag not in self.tags): return False if (tag_requirement.negate and tag_requirement.tag in self.tags): return False return True
@classmethod
[docs] def validate_hostname(cls, key, value): """ Ensures that the hostname provided by `value` matches a regular expression that expresses what a valid hostname is. """ # ensure hostname does not contain characters we can't use if not REGEX_HOSTNAME.match(value): raise ValueError("%s is not valid for %s" % (value, key)) return value
@classmethod
[docs] def validate_resource(cls, key, value): """ Ensure the ``value`` provided for ``key`` is within an expected range. This classmethod retrieves the min and max values from the :class:`Agent` class directory using: >>> min_value = getattr(Agent, "MIN_%s" % key.upper()) >>> max_value = getattr(Agent, "MAX_%s" % key.upper()) """ min_value = getattr(cls, "MIN_%s" % key.upper()) max_value = getattr(cls, "MAX_%s" % key.upper()) # check the provided input if not min_value <= value <= max_value: msg = "value for `%s` must be between " % key msg += "%s and %s" % (min_value, max_value) raise ValueError(msg) return value
@classmethod
[docs] def validate_ipv4_address(cls, _, value): """ Ensures the :attr:`ip` address is valid. This checks to ensure that the value provided is: * not a hostmask * not link local (:rfc:`3927`) * not used for multicast (:rfc:`1112`) * not a netmask (:rfc:`4632`) * not reserved (:rfc:`6052`) * a private address (:rfc:`1918`) """ if value is None: return value try: address = IPAddress(value) except (AddrFormatError, ValueError) as e: raise ValueError( "%s is not a valid address format: %s" % (value, e)) if ALLOW_AGENT_LOOPBACK: loopback = lambda: False else: loopback = address.is_loopback if any([address.is_hostmask(), address.is_link_local(), loopback(), address.is_multicast(), address.is_netmask(), address.is_reserved()]): raise ValueError("%s is not a valid address type" % value) return value
[docs] def api_url(self): """ Returns the base url which should be used to access the api of this specific agent. :except ValueError: Raised if this function is called while the agent's :attr:`use_address` column is set to ``PASSIVE`` """ if self.use_address == UseAgentAddress.REMOTE: return self.URL_TEMPLATE.format( host=self.remote_ip, port=self.port ) elif self.use_address == UseAgentAddress.HOSTNAME: return self.URL_TEMPLATE.format( host=self.hostname, port=self.port ) else: raise ValueError( "Cannot construct an agent API url using mode %r " "`use_address`" % self.use_address)
@validates("hostname")
[docs] def validate_hostname_column(self, key, value): """Validates the hostname column""" return self.validate_hostname(key, value)
@validates("ram", "cpus", "port")
[docs] def validate_numeric_column(self, key, value): """ Validates several numerical columns. Columns such as ram, cpus and port a are validated with this method. """ return self.validate_resource(key, value)
@validates("remote_ip")
[docs] def validate_remote_ip(self, key, value): """Validates the remote_ip column""" return self.validate_ipv4_address(key, value)