# No shebang line, this module is meant to be imported
#
# Copyright 2013 Oliver Palmer
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Agent Models
============
Models and interface classes related to the agent.
"""
import re
import uuid
from textwrap import dedent
from datetime import datetime
from sqlalchemy import or_
from sqlalchemy.schema import UniqueConstraint
from sqlalchemy.orm import validates
from netaddr import AddrFormatError, IPAddress
from pyfarm.core.enums import (
AgentState, STRING_TYPES, UseAgentAddress, INTEGER_TYPES, WorkState)
from pyfarm.core.config import read_env_number, read_env_int, read_env
from pyfarm.master.application import db, app
from pyfarm.models.core.functions import repr_ip
from pyfarm.models.core.mixins import (
ValidatePriorityMixin, UtilityMixins, ReprMixin, ValidateWorkStateMixin)
from pyfarm.models.core.types import (
id_column, IPv4Address, IDTypeAgent, UseAgentAddressEnum,
OperatingSystemEnum, AgentStateEnum, MACAddress)
from pyfarm.models.core.cfg import (
TABLE_AGENT, TABLE_SOFTWARE_VERSION, TABLE_TAG, TABLE_AGENT_MAC_ADDRESS,
TABLE_AGENT_TAG_ASSOC, MAX_HOSTNAME_LENGTH, MAX_CPUNAME_LENGTH,
TABLE_AGENT_SOFTWARE_VERSION_ASSOC, MAX_OSNAME_LENGTH, TABLE_GPU_IN_AGENT,
TABLE_GPU)
from pyfarm.models.jobtype import JobTypeVersion
from pyfarm.models.job import Job
__all__ = ("Agent", )
REGEX_HOSTNAME = re.compile("^(?!-)[A-Z\d-]{1,63}(?<!-)"
"(\.(?!-)[A-Z\d-]{1,63}(?<!-))*\.?$",
re.IGNORECASE)
AgentSoftwareVersionAssociation = db.Table(
TABLE_AGENT_SOFTWARE_VERSION_ASSOC, db.metadata,
db.Column("agent_id", IDTypeAgent,
db.ForeignKey("%s.id" % TABLE_AGENT), primary_key=True),
db.Column("software_version_id", db.Integer,
db.ForeignKey("%s.id" % TABLE_SOFTWARE_VERSION), primary_key=True))
AgentTagAssociation = db.Table(
TABLE_AGENT_TAG_ASSOC, db.metadata,
db.Column("agent_id", IDTypeAgent,
db.ForeignKey("%s.id" % TABLE_AGENT), primary_key=True),
db.Column("tag_id", db.Integer,
db.ForeignKey("%s.id" % TABLE_TAG), primary_key=True))
GPUInAgent = db.Table(
TABLE_GPU_IN_AGENT, db.metadata,
db.Column("agent_id", IDTypeAgent,
db.ForeignKey("%s.id" % TABLE_AGENT), primary_key=True),
db.Column("gpu_id", db.Integer,
db.ForeignKey("%s.id" % TABLE_GPU), primary_key=True))
class AgentTaggingMixin(object):
"""
Mixin used which provides some common structures to
:class:`.AgentTag` and :class:`.AgentSoftware`
"""
@validates("tag", "software")
def validate_string_column(self, key, value):
"""
Ensures `value` is a string or something that can be converted
to a string.
"""
if isinstance(value, INTEGER_TYPES):
value = str(value)
elif not isinstance(value, STRING_TYPES):
raise ValueError("expected a string for `%s`" % key)
return value
class AgentMacAddress(db.Model):
__tablename__ = TABLE_AGENT_MAC_ADDRESS
__table_args__ = (UniqueConstraint("agent_id", "mac_address"), )
agent_id = db.Column(IDTypeAgent, db.ForeignKey("%s.id" % TABLE_AGENT),
primary_key=True, nullable=False)
mac_address = db.Column(MACAddress, primary_key=True, nullable=False,
autoincrement=False)
[docs]class Agent(db.Model, ValidatePriorityMixin, ValidateWorkStateMixin,
UtilityMixins, ReprMixin):
"""
Stores information about an agent include its network address,
state, allocation configuration, etc.
.. note::
This table enforces two forms of uniqueness. The :attr:`id` column
must be unique and the combination of these columns must also be
unique to limit the frequency of duplicate data:
* :attr:`hostname`
* :attr:`port`
* :attr:`id`
"""
__tablename__ = TABLE_AGENT
__table_args__ = (UniqueConstraint("hostname", "port", "id"), )
STATE_ENUM = AgentState
STATE_DEFAULT = "online"
REPR_COLUMNS = (
"id", "hostname", "port", "state", "remote_ip",
"cpus", "ram", "free_ram")
REPR_CONVERT_COLUMN = {
"remote_ip": repr_ip}
MIN_PORT = read_env_int("PYFARM_AGENT_MIN_PORT", 1024)
MAX_PORT = read_env_int("PYFARM_AGENT_MAX_PORT", 65535)
MIN_CPUS = read_env_int("PYFARM_AGENT_MIN_CPUS", 1)
MAX_CPUS = read_env_int("PYFARM_AGENT_MAX_CPUS", 256)
MIN_RAM = read_env_int("PYFARM_AGENT_MIN_RAM", 16)
MAX_RAM = read_env_int("PYFARM_AGENT_MAX_RAM", 262144)
# quick check of the configured data
assert MIN_PORT >= 1, "$PYFARM_AGENT_MIN_PORT must be > 0"
assert MAX_PORT >= 1, "$PYFARM_AGENT_MAX_PORT must be > 0"
assert MAX_PORT >= MIN_PORT, "MIN_PORT must be <= MAX_PORT"
assert MIN_CPUS >= 1, "$PYFARM_AGENT_MIN_CPUS must be > 0"
assert MAX_CPUS >= 1, "$PYFARM_AGENT_MAX_CPUS must be > 0"
assert MAX_CPUS >= MIN_CPUS, "MIN_CPUS must be <= MAX_CPUS"
assert MIN_RAM >= 1, "$PYFARM_AGENT_MIN_RAM must be > 0"
assert MAX_RAM >= 1, "$PYFARM_AGENT_MAX_RAM must be > 0"
assert MAX_RAM >= MIN_RAM, "MIN_RAM must be <= MAX_RAM"
id = id_column(IDTypeAgent, default=uuid.uuid4, autoincrement=False)
# basic host attribute information
hostname = db.Column(db.String(MAX_HOSTNAME_LENGTH), nullable=False,
doc=dedent("""
The hostname we should use to talk to this host.
Preferably this value will be the fully qualified
name instead of the base hostname alone."""))
remote_ip = db.Column(IPv4Address, nullable=True,
doc="the remote address which came in with the "
"request")
use_address = db.Column(UseAgentAddressEnum, nullable=False,
default=UseAgentAddress.REMOTE,
doc="The address we should use when communicating "
"with the agent")
# TODO Make non-nullable later
os_class = db.Column(OperatingSystemEnum,
doc="The type of operating system running on the "
"agent; \"linux\", \"windows\", or \"mac\".")
os_fullname = db.Column(db.String(MAX_OSNAME_LENGTH),
doc="The full human-readable name of the agent's "
"OS, as returned by platform.platform()")
ram = db.Column(db.Integer, nullable=False,
doc="The amount of ram installed on the agent in megabytes")
free_ram = db.Column(db.Integer, nullable=False,
doc="The amount of ram which was last considered free")
cpus = db.Column(db.Integer, nullable=False,
doc="The number of logical CPU cores installed on the "
"agent")
cpu_name = db.Column(db.String(MAX_CPUNAME_LENGTH),
doc="The make and model of CPUs in this agents")
port = db.Column(db.Integer, nullable=False,
doc="The port the agent is currently running on")
time_offset = db.Column(db.Integer, nullable=False, default=0,
doc="The offset in seconds the agent is from "
"an official time server")
version = db.Column(db.String(16), nullable=True,
doc="The pyfarm version number this agent is running.")
upgrade_to = db.Column(db.String(16), nullable=True,
doc="The version this agent should upgrade to.")
restart_requested = db.Column(db.Boolean, default=False, nullable=False,
doc="If True, the agent will be restarted")
# host state
state = db.Column(AgentStateEnum, default=AgentState.ONLINE,
nullable=False,
doc=dedent("""
Stores the current state of the host. This value can be
changed either by a master telling the host to do
something with a task or from the host via REST api."""))
last_heard_from = db.Column(db.DateTime,
default=datetime.utcnow,
doc="Time we last had contact with this agent")
last_polled = db.Column(db.DateTime,
doc="Time we last tried to contact the agent")
# Max allocation of the two primary resources which `1.0` is 100%
# allocation. For `cpu_allocation` 100% allocation typically means
# one task per cpu.
ram_allocation = db.Column(db.Float,
default=read_env_number(
"PYFARM_AGENT_RAM_ALLOCATION", .8),
doc=dedent("""
The amount of ram the agent is allowed to
allocate towards work. A value of 1.0 would
mean to let the agent use all of the memory
installed on the system when assigning work."""))
cpu_allocation = db.Column(db.Float,
default=read_env_number(
"PYFARM_AGENT_CPU_ALLOCATION", 1.0),
doc=dedent("""
The total amount of cpu space an agent is
allowed to process work in. A value of 1.0
would mean an agent can handle as much work
as the system could handle given the
requirements of a task. For example if an agent
has 8 cpus, cpu_allocation is .5, and a task
requires 4 cpus then only that task will run
on the system."""))
# relationships
tasks = db.relationship("Task", backref="agent", lazy="dynamic",
doc=dedent("""
Relationship between an :class:`Agent`
and any :class:`pyfarm.models.Task`
objects"""))
tags = db.relationship("Tag", secondary=AgentTagAssociation,
backref=db.backref("agents", lazy="dynamic"),
lazy="dynamic",
doc="Tags associated with this agent")
software_versions = db.relationship("SoftwareVersion",
secondary=AgentSoftwareVersionAssociation,
backref=db.backref("agents",
lazy="dynamic"),
lazy="dynamic",
doc="software this agent has installed "
"or is configured for")
mac_addresses = db.relationship("AgentMacAddress", backref="agent",
lazy="dynamic",
doc="The MAC addresses this agent has",
cascade="save-update, merge, delete, "
"delete-orphan")
gpus = db.relationship("GPU",
secondary=GPUInAgent,
backref=db.backref("agents", lazy="dynamic"),
lazy="dynamic",
doc="The graphics cards that are installed in this "
"agent")
[docs] def is_offline(self):
return self.state == AgentState.OFFLINE
[docs] def get_supported_types(self):
try:
return self.support_jobtype_versions
except AttributeError:
jobtype_versions_query = JobTypeVersion.query.filter(
JobTypeVersion.jobs.any(
or_(Job.state == None, Job.state == WorkState.RUNNING)))
self.support_jobtype_versions = []
for jobtype_version in jobtype_versions_query:
if self.satisfies_jobtype_requirements(jobtype_version):
self.support_jobtype_versions.append(jobtype_version.id)
return self.support_jobtype_versions
[docs] def satisfies_jobtype_requirements(self, jobtype_version):
requirements_to_satisfy = list(jobtype_version.software_requirements)
for software_version in self.software_versions:
for requirement in list(requirements_to_satisfy):
if (software_version.software == requirement.software and
(requirement.min_version == None or
requirement.min_version.rank <= software_version.rank) and
(requirement.max_version == None or
requirement.max_version.rank >= software_version.rank)):
requirements_to_satisfy.remove(requirement)
return len(requirements_to_satisfy) == 0
@classmethod
[docs] def validate_hostname(cls, key, value):
"""
Ensures that the hostname provided by `value` matches a regular
expression that expresses what a valid hostname is.
"""
# ensure hostname does not contain characters we can't use
if not REGEX_HOSTNAME.match(value):
raise ValueError("%s is not valid for %s" % (value, key))
return value
@classmethod
[docs] def validate_resource(cls, key, value):
"""
Ensure the ``value`` provided for ``key`` is within an expected
range. This classmethod retrieves the min and max values from
the :class:`Agent` class directory using:
>>> min_value = getattr(Agent, "MIN_%s" % key.upper())
>>> max_value = getattr(Agent, "MAX_%s" % key.upper())
"""
min_value = getattr(cls, "MIN_%s" % key.upper())
max_value = getattr(cls, "MAX_%s" % key.upper())
# check the provided input
if not min_value <= value <= max_value:
msg = "value for `%s` must be between " % key
msg += "%s and %s" % (min_value, max_value)
raise ValueError(msg)
return value
@classmethod
[docs] def validate_ipv4_address(cls, _, value):
"""
Ensures the :attr:`ip` address is valid. This checks to ensure
that the value provided is:
* not a hostmask
* not link local (:rfc:`3927`)
* not used for multicast (:rfc:`1112`)
* not a netmask (:rfc:`4632`)
* not reserved (:rfc:`6052`)
* a private address (:rfc:`1918`)
"""
if value is None:
return value
try:
address = IPAddress(value)
except (AddrFormatError, ValueError) as e:
raise ValueError(
"%s is not a valid address format: %s" % (value, e))
if app.config.get("ALLOW_AGENT_LOOPBACK_ADDRESSES"):
loopback = lambda: False
else:
loopback = address.is_loopback
if any([address.is_hostmask(), address.is_link_local(),
loopback(), address.is_multicast(),
address.is_netmask(), address.is_reserved()]):
raise ValueError("%s is not a valid address type" % value)
return value
[docs] def api_url(self,
scheme=read_env("PYFARM_AGENT_API_SCHEME", "http"),
version=read_env_int("PYFARM_AGENT_API_VERSION", 1)):
"""
Returns the base url which should be used to access the api
of this specific agent.
:except ValueError:
Raised if this function is called while the agent's
:attr:`use_address` column is set to ``PASSIVE``
"""
assert scheme in ("http", "https")
assert isinstance(version, int)
if self.use_address == UseAgentAddress.REMOTE:
return "%s://%s:%d/api/v%d" % (
scheme, self.remote_ip, self.port, version)
elif self.use_address == UseAgentAddress.HOSTNAME:
return "%s://%s:%d/api/v%d" % (
scheme, self.hostname, self.port, version)
else:
raise ValueError(
"Cannot construct an agent API url using mode %r "
"`use_address`" % self.use_address)
@validates("hostname")
[docs] def validate_hostname_column(self, key, value):
"""Validates the hostname column"""
return self.validate_hostname(key, value)
@validates("ram", "cpus", "port")
[docs] def validate_numeric_column(self, key, value):
"""
Validates several numerical columns. Columns such as ram, cpus
and port a are validated with this method.
"""
return self.validate_resource(key, value)
@validates("remote_ip")
[docs] def validate_remote_ip(self, key, value):
"""Validates the remote_ip column"""
return self.validate_ipv4_address(key, value)