# No shebang line, this module is meant to be imported
#
# Copyright 2013 Oliver Palmer
# Copyright 2014 Ambient Entertainment GmbH & Co. KG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Manager Service
---------------
Sends and receives information from the master and performs systems level tasks
such as log reading, system information gathering, and management of processes.
"""
import os
import sys
import time
import uuid
from datetime import datetime, timedelta
from functools import partial
from os.path import join
from platform import platform
try:
from httplib import (
responses, OK, CREATED, NOT_FOUND, INTERNAL_SERVER_ERROR, BAD_REQUEST)
except ImportError: # pragma: no cover
from http.client import (
responses, OK, CREATED, NOT_FOUND, INTERNAL_SERVER_ERROR, BAD_REQUEST)
import treq
from ntplib import NTPClient
from twisted.internet import reactor
from twisted.internet.defer import (
Deferred, inlineCallbacks, returnValue, DeferredLock)
from twisted.internet.error import ConnectionRefusedError
from twisted.internet.task import deferLater
from twisted.web._newclient import (
ResponseNeverReceived, RequestTransmissionFailed)
from pyfarm.core.enums import NUMERIC_TYPES, AgentState
from pyfarm.agent.config import config
from pyfarm.agent.http.api.assign import Assign
from pyfarm.agent.http.api.base import APIRoot, Versions
from pyfarm.agent.http.api.config import Config
from pyfarm.agent.http.api.state import Status, Stop, Restart
from pyfarm.agent.http.api.tasks import Tasks
from pyfarm.agent.http.api.tasklogs import TaskLogs
from pyfarm.agent.http.api.update import Update
from pyfarm.agent.http.api.software import CheckSoftware
from pyfarm.agent.http.core.client import http_retry_delay, post_direct
from pyfarm.agent.http.core.resource import Resource
from pyfarm.agent.http.core.server import Site, StaticPath
from pyfarm.agent.http.system import Index, Configuration
from pyfarm.agent.logger import getLogger
from pyfarm.agent.sysinfo import memory, network, system, cpu, graphics, disks
from pyfarm.agent import utility
svclog = getLogger("agent.service")
ntplog = getLogger("agent.service.ntp")
[docs]class Agent(object):
"""
Main class associated with getting getting the internals of the
agent's operations up and running including adding or updating
itself with the master, starting the periodic task manager,
and handling shutdown conditions.
"""
def __init__(self):
# so parts of this instance are accessible elsewhere
assert "agent" not in config
config["agent"] = self
self.services = {}
self.register_shutdown_events = False
self.last_free_ram_post = time.time()
self.repeating_call_counter = {}
self.shutdown_timeout = None
self.post_shutdown_lock = DeferredLock()
self.stop_lock = DeferredLock()
self.reannounce_lock = utility.TimedDeferredLock()
self.stopped = False
# Register a callback so self.shutdown_timeout is set when
# "shutting_down" is set or modified.
config.register_callback(
"shutting_down", self.callback_shutting_down_changed)
[docs] @classmethod
def agent_api(cls):
"""
Return the API url for this agent or None if `agent_id` has not
been set
"""
try:
return cls.agents_endpoint() + str(config["agent_id"])
except KeyError:
svclog.error(
"The `agent_id` configuration value has not been set yet")
return None
[docs] @classmethod
def agents_endpoint(cls):
"""
Returns the API endpoint for used for updating or creating
agents on the master
"""
return config["master_api"] + "/agents/"
@property
def shutting_down(self):
return config.get("shutting_down", False)
@shutting_down.setter
def shutting_down(self, value):
assert value in (True, False)
config["shutting_down"] = value
[docs] def repeating_call(
self, delay, function, function_args=None, function_kwargs=None,
now=True, repeat_max=None, function_id=None):
"""
Causes ``function`` to be called repeatedly up until ``repeat_max``
or until stopped.
:param int delay:
Number of seconds to delay between calls of ``function``.
.. note::
``delay`` is an approximate interval between when one call ends
and the next one begins. The exact time can vary due
to how the Twisted reactor runs, how long it takes
``function`` to run and what else may be going on in the
agent at the time.
:param function:
A callable function to run
:type function_args: tuple, list
:keyword function_args:
Arguments to pass into ``function``
:keyword dict function_kwargs:
Keywords to pass into ``function``
:keyword bool now:
If True then run ``function`` right now in addition
to scheduling it.
:keyword int repeat_max:
Repeat calling ``function`` this may times. If not provided
then we'll continue to repeat calling ``function`` until
the agent shuts down.
:keyword uuid.UUID function_id:
Used internally to track a function's execution count. This
keyword exists so if you call :meth:`repeating_call` multiple
times on the same function or method it will handle ``repeat_max``
properly.
"""
if self.shutting_down:
svclog.debug(
"Skipping task %s(*%r, **%r) [shutting down]",
function.__name__, function_args, function_kwargs
)
return
if function_args is None:
function_args = ()
if function_kwargs is None:
function_kwargs = {}
if function_id is None:
function_id = uuid.uuid4()
assert isinstance(delay, NUMERIC_TYPES[:-1])
assert callable(function)
assert isinstance(function_args, (list, tuple))
assert isinstance(function_kwargs, dict)
assert repeat_max is None or isinstance(repeat_max, int)
repeat_count = self.repeating_call_counter.setdefault(function_id, 0)
if repeat_max is None or repeat_count < repeat_max:
svclog.debug(
"Executing task %s(*%r, **%r). Next execution in %s seconds.",
function.__name__, function_args, function_kwargs, delay
)
# Run this function right now using another deferLater so
# it's scheduled by the reactor and executed before we schedule
# another.
if now:
deferLater(
reactor, 0, function, *function_args, **function_kwargs
)
self.repeating_call_counter[function_id] += 1
repeat_count = self.repeating_call_counter[function_id]
# Schedule the next call but only if we have not hit the max
if repeat_max is None or repeat_count < repeat_max:
deferLater(
reactor, delay, self.repeating_call, delay,
function, function_args=function_args,
function_kwargs=function_kwargs, now=True,
repeat_max=repeat_max, function_id=function_id
)
[docs] def should_reannounce(self):
"""Small method which acts as a trigger for :meth:`reannounce`"""
if self.reannounce_lock.locked or self.shutting_down:
return False
contacted = config.master_contacted(update=False)
if contacted is None:
return True
return utility.total_seconds(
datetime.utcnow() - contacted) > config["agent_master_reannounce"]
[docs] @inlineCallbacks
def reannounce(self, force=False):
"""
Method which is used to periodically contact the master. This
method is generally called as part of a scheduled task.
"""
# Attempt to acquire the reannounce lock but fail after 70%
# of the total time between reannouncements elapses. This should
# help prevent an accumulation of requests in the event the master
# is having issues.
try:
yield self.reannounce_lock.acquire(
config["agent_master_reannounce"] * .70
)
except utility.LockTimeoutError:
svclog.debug("Timed out while waiting to acquire reannounce_lock")
returnValue(None)
if not self.should_reannounce() and not force:
yield self.reannounce_lock.release()
returnValue(None)
svclog.debug("Announcing %s to master", config["agent_hostname"])
data = None
num_retry_errors = 0
while True: # for retries
try:
response = yield post_direct(
self.agent_api(),
data={
"state": config["state"],
"current_assignments": config.get(
"current_assignments", {} # may not be set yet
),
"free_ram": memory.free_ram(),
"disks": disks.disks(as_dict=True)
}
)
except (ResponseNeverReceived, RequestTransmissionFailed) as error:
num_retry_errors += 1
if num_retry_errors > config["broken_connection_max_retry"]:
svclog.error(
"Failed to announce self to the master, "
"caught try-again type errors %s times in a row.",
num_retry_errors)
break
else:
svclog.debug("While announcing self to master, caught "
"%s. Retrying immediately.",
error.__class__.__name__)
except Exception as error:
if force:
delay = http_retry_delay()
svclog.error(
"Failed to announce self to the master: %s. Will "
"retry in %s seconds.", error, delay)
deferred = Deferred()
reactor.callLater(delay, deferred.callback, None)
yield deferred
else:
# Don't retry because reannounce is called periodically
svclog.error(
"Failed to announce self to the master: %s. This "
"request will not be retried.", error)
break
else:
data = yield treq.json_content(response)
if response.code == OK:
config.master_contacted(announcement=True)
svclog.info("Announced self to the master server.")
break
elif response.code >= INTERNAL_SERVER_ERROR:
if not self.shutting_down:
delay = http_retry_delay()
svclog.warning(
"Could not announce self to the master server, "
"internal server error: %s. Retrying in %s "
"seconds.", data, delay)
deferred = Deferred()
reactor.callLater(delay, deferred.callback, None)
yield deferred
else:
svclog.warning(
"Could not announce to master. Not retrying "
"because of pending shutdown.")
break
elif response.code == NOT_FOUND:
svclog.warning("The master says it does not know about our "
"agent id. Posting as a new agent.")
yield self.post_agent_to_master()
break
# If this is a client problem retrying the request
# is unlikely to fix the issue so we stop here
elif response.code >= BAD_REQUEST:
svclog.error(
"Failed to announce self to the master, bad "
"request: %s. This request will not be retried.",
data)
break
else:
svclog.error(
"Unhandled error when posting self to the "
"master: %s (code: %s). This request will not be "
"retried.", data, response.code)
break
yield self.reannounce_lock.release()
returnValue(data)
[docs] def system_data(self, requery_timeoffset=False):
"""
Returns a dictionary of data containing information about the
agent. This is the information that is also passed along to
the master.
"""
# query the time offset and then cache it since
# this is typically a blocking operation
if config["agent_time_offset"] == "auto":
config["agent_time_offset"] = None
if requery_timeoffset or config["agent_time_offset"] is None:
ntplog.info(
"Querying ntp server %r for current time",
config["agent_ntp_server"])
ntp_client = NTPClient()
try:
pool_time = ntp_client.request(
config["agent_ntp_server"],
version=config["agent_ntp_server_version"])
except Exception as e:
ntplog.warning("Failed to determine network time: %s", e)
else:
config["agent_time_offset"] = \
int(pool_time.tx_time - time.time())
# format the offset for logging purposes
utcoffset = datetime.utcfromtimestamp(pool_time.tx_time)
iso_timestamp = utcoffset.isoformat()
ntplog.debug(
"network time: %s (local offset: %r)",
iso_timestamp, config["agent_time_offset"])
if config["agent_time_offset"] != 0:
ntplog.warning(
"Agent is %r second(s) off from ntp server at %r",
config["agent_time_offset"],
config["agent_ntp_server"])
data = {
"id": config["agent_id"],
"hostname": config["agent_hostname"],
"version": config.version,
"os_class": system.operating_system(),
"os_fullname": platform(),
"ram": int(config["agent_ram"]),
"cpus": config["agent_cpus"],
"cpu_name": cpu.cpu_name(),
"port": config["agent_api_port"],
"free_ram": memory.free_ram(),
"time_offset": config["agent_time_offset"] or 0,
"state": config["state"],
"mac_addresses": list(network.mac_addresses()),
"current_assignments": config.get(
"current_assignments", {}), # may not be set yet
"disks": disks.disks(as_dict=True)
}
try:
gpu_names = graphics.graphics_cards()
data["gpus"] = gpu_names
except graphics.GPULookupError:
pass
if "remote_ip" in config:
data.update(remote_ip=config["remote_ip"])
if config["farm_name"]:
data["farm_name"] = config["farm_name"]
return data
[docs] def build_http_resource(self):
svclog.debug("Building HTTP Service")
root = Resource()
# static endpoints to redirect resources
# to the right objects
root.putChild(
"favicon.ico",
StaticPath(join(config["agent_static_root"], "favicon.ico"),
defaultType="image/x-icon"))
root.putChild(
"static",
StaticPath(config["agent_static_root"]))
# external endpoints
root.putChild("", Index())
root.putChild("configuration", Configuration())
# api endpoints
api = root.putChild("api", APIRoot())
api.putChild("versions", Versions())
v1 = api.putChild("v1", APIRoot())
# Top level api endpoints
v1.putChild("assign", Assign(self))
v1.putChild("tasks", Tasks())
v1.putChild("config", Config())
v1.putChild("task_logs", TaskLogs())
# Endpoints which are generally used for status
# and operations.
v1.putChild("status", Status())
v1.putChild("stop", Stop())
v1.putChild("restart", Restart())
v1.putChild("update", Update())
v1.putChild("check_software", CheckSoftware())
return root
def _start_manhole(self, port, username, password):
"""
Starts the manhole server so we can connect to the agent
over telnet
"""
if "manhole" in self.services:
svclog.warning(
"Telnet manhole service is already running on port %s",
self.services["manhole"].port)
return
svclog.info("Starting telnet manhole on port %s", port)
# Since we don't always need this module we import
# it here to save on memory and other resources
from pyfarm.agent.manhole import manhole_factory
# Contains the things which will be in the top level
# namespace of the Python interpreter.
namespace = {
"config": config, "agent": self,
"jobtypes": config["jobtypes"],
"current_assignments": config["current_assignments"]}
factory = manhole_factory(namespace, username, password)
self.services["manhole"] = reactor.listenTCP(port, factory)
def _start_http_api(self, port):
"""
Starts the HTTP api so the master can communicate
with the agent.
"""
if "api" in self.services:
svclog.warning(
"HTTP API service already running on port %s",
self.services["api"].port)
return
http_resource = self.build_http_resource()
self.services["api"] = reactor.listenTCP(port, Site(http_resource))
[docs] def start(self, shutdown_events=True, http_server=True):
"""
Internal code which starts the agent, registers it with the master,
and performs the other steps necessary to get things running.
:param bool shutdown_events:
If True register all shutdown events so certain actions, such as
information the master we're going offline, can take place.
:param bool http_server:
If True then construct and serve the externally facing http
server.
"""
if config["agent_manhole"]:
self._start_manhole(config["agent_manhole_port"],
config["agent_manhole_username"],
config["agent_manhole_password"])
# setup the internal http server so external entities can
# interact with the service.
if http_server:
self._start_http_api(config["agent_api_port"])
# Update the configuration with this pid (which may be different
# than the original pid).
config["pids"].update(child=os.getpid())
# get ready to 'publish' the agent
config.register_callback(
"agent_id",
partial(
self.callback_agent_id_set, shutdown_events=shutdown_events))
return self.post_agent_to_master()
[docs] @inlineCallbacks
def stop(self):
"""
Internal code which stops the agent. This will terminate any running
processes, inform the master of the terminated tasks, update the
state of the agent on the master.
"""
yield self.stop_lock.acquire()
if self.stopped:
yield self.stop_lock.release()
svclog.warning("Agent is already stopped")
returnValue(None)
svclog.info("Stopping the agent")
self.shutting_down = True
self.shutdown_timeout = (
datetime.utcnow() + timedelta(
seconds=config["agent_shutdown_timeout"]))
if self.agent_api() is not None:
try:
yield self.post_shutdown_to_master()
except Exception as error: # pragma: no cover
svclog.warning(
"Error while calling post_shutdown_to_master()", error)
else:
svclog.warning("Cannot post shutdown, agent_api() returned None")
utility.remove_file(
config["agent_lock_file"], retry_on_exit=True, raise_=False)
svclog.debug("Stopping execution of jobtypes")
for jobtype_id, jobtype in config["jobtypes"].items():
try:
jobtype.stop()
except Exception as error: # pragma: no cover
svclog.warning(
"Error while calling stop() on %s (id: %s): %s",
jobtype, jobtype_id, error
)
config["jobtypes"].pop(jobtype_id)
svclog.info(
"Waiting on %s job types to terminate", len(config["jobtypes"]))
while config["jobtypes"] and datetime.utcnow() < self.shutdown_timeout:
for jobtype_id, jobtype in config["jobtypes"].copy().items():
if not jobtype._has_running_processes():
svclog.warning(
"%r has not removed itself, forcing removal",
jobtype)
config["jobtypes"].pop(jobtype_id)
# Brief delay so we don't tie up the cpu
delay = Deferred()
reactor.callLater(1, delay.callback, None)
yield delay
self.stopped = True
yield self.stop_lock.release()
returnValue(None)
[docs] def sigint_handler(self, *_):
utility.remove_file(
config["run_control_file"], retry_on_exit=True, raise_=False)
def errback(failure):
svclog.error(
"Error while attempting to shutdown the agent: %s", failure)
# Stop the reactor but handle the exit code ourselves otherwise
# Twisted will just exit with 0.
reactor.stop()
sys.exit(1)
# Call stop() and wait for it to finish before we stop
# the reactor.
# NOTE: We're not using inlineCallbacks here because reactor.stop()
# would be called in the middle of the generator unwinding
deferred = self.stop()
deferred.addCallbacks(lambda _: reactor.stop(), errback)
[docs] @inlineCallbacks
def post_shutdown_to_master(self):
"""
This method is called before the reactor shuts down and lets the
master know that the agent's state is now ``offline``
"""
# We're under the assumption that something's wrong with
# our code if we try to call this method before self.shutting_down
# is set.
assert self.shutting_down
yield self.post_shutdown_lock.acquire()
svclog.info("Informing master of shutdown")
# Because post_shutdown_to_master is blocking and needs to
# stop the reactor from finishing we perform the retry in-line
data = None
tries = 0
num_retry_errors = 0
response = None
timed_out = False
while True:
tries += 1
try:
response = yield post_direct(
self.agent_api(),
data={
"state": AgentState.OFFLINE,
"free_ram": memory.free_ram(),
"current_assignments": config["current_assignments"]})
except (ResponseNeverReceived, RequestTransmissionFailed) as error:
num_retry_errors += 1
if num_retry_errors > config["broken_connection_max_retry"]:
svclog.error(
"Failed to post shutdown to the master, "
"caught try-again errors %s times in a row.",
num_retry_errors)
break
elif self.shutdown_timeout < datetime.utcnow():
svclog.error("While posting shutdown to master, caught "
"%s. Shutdown timeout has been reached, not "
"retrying.",
error.__class__.__name__)
break
else:
svclog.debug("While posting shutdown to master, caught "
"%s. Retrying immediately.",
error.__class__.__name__)
# When we get a hard failure it could be an issue with the
# server, although it's unlikely, so we retry. Only retry
# for a set period of time though since the shutdown as a timeout
except Exception as failure:
if self.shutdown_timeout > datetime.utcnow():
delay = http_retry_delay()
svclog.warning(
"State update failed due to unhandled error: %s. "
"Retrying in %s seconds",
failure, delay)
# Wait for 'pause' to fire, introducing a delay
pause = Deferred()
reactor.callLater(delay, pause.callback, None)
yield pause
else:
timed_out = True
svclog.warning(
"State update failed due to unhandled error: %s. "
"Shutdown timeout reached, not retrying.",
failure)
break
else:
data = yield treq.json_content(response)
if response.code == NOT_FOUND:
svclog.warning(
"Agent %r no longer exists, cannot update state.",
config["agent_id"])
break
elif response.code == OK:
svclog.info(
"Agent %r has POSTed shutdown state change "
"successfully.",
config["agent_id"])
break
elif response.code >= INTERNAL_SERVER_ERROR:
if self.shutdown_timeout > datetime.utcnow():
delay = http_retry_delay()
svclog.warning(
"State update failed due to server error: %s. "
"Retrying in %s seconds.",
data, delay)
# Wait for 'pause' to fire, introducing a delay
pause = Deferred()
reactor.callLater(delay, pause.callback, None)
yield pause
else:
timed_out = True
svclog.warning(
"State update failed due to server error: %s. "
"Shutdown timeout reached, not retrying.",
data)
break
yield self.post_shutdown_lock.release()
extra_data = {
"response": response,
"timed_out": timed_out,
"tries": tries,
"retry_errors": num_retry_errors
}
if isinstance(data, dict):
data.update(extra_data)
else:
data = extra_data
returnValue(data)
[docs] @inlineCallbacks
def post_agent_to_master(self):
"""
Runs the POST request to contact the master. Running this method
multiple times should be considered safe but is generally something
that should be avoided.
"""
url = self.agents_endpoint()
data = self.system_data()
try:
response = yield post_direct(url, data=data)
except Exception as failure:
delay = http_retry_delay()
if isinstance(failure, ConnectionRefusedError):
svclog.error(
"Failed to POST agent to master, the connection was "
"refused. Retrying in %s seconds", delay)
else: # pragma: no cover
svclog.error(
"Unhandled error when trying to POST the agent to the "
"master. The error was %s.", failure)
if not self.shutting_down:
svclog.info(
"Retrying failed POST to master in %s seconds.", delay)
yield deferLater(reactor, delay, self.post_agent_to_master)
else:
svclog.warning("Not retrying POST to master, shutting down.")
else:
# Master might be down or have some other internal problems
# that might eventually be fixed. Retry the request.
if response.code >= INTERNAL_SERVER_ERROR:
if not self.shutting_down:
delay = http_retry_delay()
svclog.warning(
"Failed to post to master due to a server side error "
"error %s, retrying in %s seconds",
response.code, delay)
yield deferLater(reactor, delay, self.post_agent_to_master)
else:
svclog.warning(
"Failed to post to master due to a server side error "
"error %s. Not retrying, because the agent is "
"shutting down", response.code)
# Master is up but is rejecting our request because there's
# something wrong with it. Do not retry the request.
elif response.code >= BAD_REQUEST:
text = yield response.text()
svclog.error(
"%s accepted our POST request but responded with code %s "
"which is a client side error. The message the server "
"responded with was %r. Sorry, but we cannot retry this "
"request as it's an issue with the agent's request.",
url, response.code, text)
else:
data = yield treq.json_content(response)
config["agent_id"] = data["id"]
config.master_contacted()
if response.code == OK:
svclog.info(
"POST to %s was successful. Agent %s was updated.",
url, config["agent_id"])
elif response.code == CREATED:
svclog.info(
"POST to %s was successful. A new agent "
"with an id of %s was created.",
url, config["agent_id"])
returnValue(data)
[docs] def callback_agent_id_set(
self, change_type, key, new_value, old_value, shutdown_events=True):
"""
When `agent_id` is created we need to:
* Register a shutdown event so that when the agent is told to
shutdown it will notify the master of a state change.
* Star the scheduled task manager
"""
if key == "agent_id" and change_type == config.CREATED \
and not self.register_shutdown_events:
if shutdown_events:
self.register_shutdown_events = True
# set the initial free_ram
config["free_ram"] = memory.free_ram()
config.master_contacted()
svclog.debug(
"`%s` was %s, adding system event trigger for shutdown",
key, change_type)
self.repeating_call(
config["agent_master_reannounce"], self.reannounce)
[docs] def callback_shutting_down_changed(
self, change_type, key, new_value, old_value):
"""
When `shutting_down` is changed in the configuration, set or
reset self.shutdown_timeout
"""
if change_type not in (config.MODIFIED, config.CREATED):
return
if new_value is not True:
self.shutdown_timeout = None
return
self.shutdown_timeout = timedelta(
seconds=config["agent_shutdown_timeout"]) + datetime.utcnow()
svclog.debug("New shutdown_timeout is %s", self.shutdown_timeout)