# No shebang line, this module is meant to be imported
#
# Copyright 2014 Oliver Palmer
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Main
----
The main module which constructs the entrypoint for the
``pyfarm-agent`` command line tool.
"""
from __future__ import division
import os
import sys
import pdb
import time
from errno import ENOENT, EEXIST
from json import dumps
from os.path import dirname, isfile, isdir, expandvars, expanduser
try:
from httplib import ACCEPTED, OK, responses
except ImportError: # pragma: no cover
from http.client import ACCEPTED, OK, responses
# Platform specific imports. These should either all fail or
# import without problems so we're grouping them together.
try:
from grp import getgrgid
from pwd import getpwuid
from os import setuid, getuid, setgid, getgid, fork
except ImportError: # pragma: no cover
getgrgid = NotImplemented
getpwuid = NotImplemented
setuid = NotImplemented
getuid = NotImplemented
setgid = NotImplemented
getgid = NotImplemented
fork = NotImplemented
import psutil
import requests
import signal
from requests import ConnectionError
from twisted.internet import reactor
from pyfarm.core.enums import (
OS, WINDOWS, AgentState, INTEGER_TYPES, operating_system)
from pyfarm.agent.logger import getLogger
from pyfarm.agent.logger.twistd import Observer
from pyfarm.agent.config import config
from pyfarm.agent.entrypoints.parser import (
AgentArgumentParser, ip, port, uidgid, enum, number, uuid_type)
from pyfarm.agent.entrypoints.utility import start_daemon_posix
from pyfarm.agent.sysinfo import memory, cpu
from pyfarm.agent.utility import AgentUUID, remove_file
try:
WindowsError
except NameError: # pragma: no cover
WindowsError = OSError
logger = getLogger("agent.cmd")
config["start"] = time.time()
[docs]class AgentEntryPoint(object):
"""Main object for parsing command line options"""
def __init__(self):
self.args = None
self.parser = AgentArgumentParser(
usage="%(prog)s [status|start|stop]",
epilog="%(prog)s is a command line client for working with a "
"local agent. You can use it to stop, start, and report "
"the general status of a running agent process.")
# main subparser for start/stop/status/etc
subparsers = self.parser.add_subparsers(
help="individual operations %(prog)s can run")
start = subparsers.add_parser(
"start", help="starts the agent")
stop = subparsers.add_parser(
"stop", help="stops the agent")
status = subparsers.add_parser(
"status", help="query the 'running' state of the agent")
# relate a name and function to each subparser
start.set_defaults(target_name="start", target_func=self.start)
stop.set_defaults(target_name="stop", target_func=self.stop)
status.set_defaults(target_name="status", target_func=self.status)
# command line flags which configure the agent's network service
global_network = self.parser.add_argument_group(
"Agent Network Service",
description="Main flags which control the network services running "
"on the agent.")
global_network.add_argument(
"--port", config="agent_api_port", type=port,
type_kwargs=dict(get_uid=lambda: self.args.uid == 0),
help="The port number which the agent is either running on or "
"will run on when started. This port is also reported the "
"master when an agent starts. [default: %(default)s]")
global_network.add_argument(
"--host", config="agent_hostname",
help="The host to communicate with or hostname to present to the "
"master when starting. Defaults to the fully qualified "
"hostname.")
global_network.add_argument(
"--agent-api-username", default="agent", config=False,
help="The username required to access or manipulate the agent "
"using REST. [default: %(default)s]")
global_network.add_argument(
"--agent-api-password", default="agent", config=False,
help="The password required to access manipulate the agent "
"using REST. [default: %(default)s]")
global_network.add_argument(
"--agent-id", config="agent_id", type=uuid_type,
default=None,
help="The UUID used to identify this agent to the master. By "
"default the agent will attempt to load a cached value "
"however a specific UUID could be provided with this flag.")
global_network.add_argument(
"--agent-id-file", config="agent_id_file",
default=expanduser(expandvars(
config["agent_id_file_platform_defaults"][operating_system()])),
help="The location to store the agent's id. By default the path "
"is platform specific and defined by the "
"`agent_id_file_platform_defaults` key in the configuration. "
"[default: %(default)s]")
# command line flags for the connecting the master apis
global_apis = self.parser.add_argument_group(
"Network Resources",
description="Resources which the agent will be communicating with.")
global_apis.add_argument(
"--master", config="master",
help="This is a convenience flag which will allow you to set the "
"hostname for the master. By default this value will be "
"substituted in --master-api")
global_apis.add_argument(
"--master-api", config="master_api",
help="The location where the master's REST api is located. "
"[default: %(default)s]")
global_apis.add_argument(
"--master-api-version", config="master_api_version",
help="Sets the version of the master's REST api the agent should"
"use [default: %(default)s]")
# global command line flags which apply to top level
# process control
global_process = self.parser.add_argument_group(
"Process Control",
description="These settings apply to the parent process of the "
"agent and contribute to allowing the process to run "
"as other users or remain isolated in an environment. "
"They also assist in maintaining the 'running state' "
"via a process id file.")
global_process.add_argument(
"--pidfile", config="agent_lock_file",
help="The file to store the process id in. [default: %(default)s]")
global_process.add_argument(
"-n", "--no-daemon", default=False, action="store_true",
config=False,
help="If provided then do not run the process in the background.")
global_process.add_argument(
"--chdir", config="agent_chdir", type=isdir,
help="The working directory to change the agent into upon launch")
global_process.add_argument(
"--uid", type=uidgid, config=False,
type_kwargs=dict(get_id=getuid, check_id=getpwuid, set_id=setuid),
help="The user id to run the agent as. *This setting is "
"ignored on Windows.*")
global_process.add_argument(
"--gid", type=uidgid, config=False,
type_kwargs=dict(get_id=getgid, check_id=getgrgid, set_id=setgid),
help="The group id to run the agent as. *This setting is "
"ignored on Windows.*")
global_process.add_argument(
"--pdb-on-unhandled", action="store_true",
help="When set pdb.set_trace() will be called if an unhandled "
"error is caught in the logger")
# start general group
start_general_group = start.add_argument_group(
"General Configuration",
description="These flags configure parts of the agent related to "
"hardware, state, and certain timing and scheduling "
"attributes.")
start_general_group.add_argument(
"--state", default=AgentState.ONLINE, config=False,
type=enum, type_kwargs=dict(enum=AgentState),
help="The current agent state, valid values are "
"" + str(list(AgentState)) + ". [default: %(default)s]")
start_general_group.add_argument(
"--time-offset", config="agent_time_offset",
type=int, type_kwargs=dict(min_=0),
help="If provided then don't talk to the NTP server at all to "
"calculate the time offset. If you know for a fact that this "
"host's time is always up to date then setting this to 0 is "
"probably a safe bet.")
start_general_group.add_argument(
"--ntp-server", config="agent_ntp_server",
help="The default network time server this agent should query to "
"retrieve the real time. This will be used to help determine "
"the agent's clock skew if any. Setting this value to '' "
"will effectively disable this query. [default: %(default)s]")
start_general_group.add_argument(
"--ntp-server-version", config="agent_ntp_server_version",
type=int,
help="The version of the NTP server in case it's running an older"
"or newer version. [default: %(default)s]")
start_general_group.add_argument(
"--no-pretty-json", config="agent_pretty_json",
action="store_false",
help="If provided do not dump human readable json via the agent's "
"REST api")
start_general_group.add_argument(
"--shutdown-timeout", config="agent_shutdown_timeout",
type=int, type_kwargs=dict(min_=0),
help="How many seconds the agent should spend attempting to inform "
"the master that it's shutting down.")
start_general_group.add_argument(
"--updates-drop-dir", config="agent_updates_dir",
help="The directory to drop downloaded updates in. This should be "
"the same directory pyfarm-supervisor will look for updates in. "
"[default: %(default)s]")
start_general_group.add_argument(
"--run-control-file", config="run_control_file",
default=expanduser(expandvars(
config["run_control_file_by_platform"][operating_system()])),
help="The path to a file that will signal to the supervisor that "
"agent is supposed to be restarted if it stops for whatever "
"reason."
"[default: %(default)s]")
start_general_group.add_argument(
"--farm-name", config="farm_name",
default=None,
help="The name of the farm the agent should join. If unset, the "
"agent will join any farm.")
# start hardware group
start_hardware_group = start.add_argument_group(
"Physical Hardware",
description="Command line flags which describe the hardware of "
"the agent.")
start_hardware_group.add_argument(
"--cpus", default=cpu.total_cpus(),
config="agent_cpus", type=int,
help="The total amount of cpus installed on the "
"system. Defaults to the number of cpus installed "
"on the system.")
start_hardware_group.add_argument(
"--ram", default=memory.total_ram(),
config="agent_ram", type=int,
help="The total amount of ram installed on the system in "
"megabytes. Defaults to the amount of ram the "
"system has installed.")
# start interval controls
start_interval_group = start.add_argument_group(
"Interval Controls",
description="Controls which dictate when certain internal "
"intervals should occur.")
start_interval_group.add_argument(
"--ram-check-interval",
config="agent_ram_check_interval", type=int,
help="How often ram resources should be checked for changes. "
"The amount of memory currently being consumed on the system "
"is checked after certain events occur such as a process but "
"this flag specifically controls how often we should check "
"when no such events are occurring. [default: %(default)s]")
start_interval_group.add_argument(
"--ram-max-report-frequency",
config="agent_ram_max_report_frequency", type=int,
help="This is a limiter that prevents the agent from reporting "
"memory changes to the master more often than a specific "
"time interval. This is done in order to ensure that when "
"100s of events fire in a short period of time cause changes "
"in ram usage only one or two will be reported to the "
"master. [default: %(default)s]")
start_interval_group.add_argument(
"--ram-report-delta", config="agent_ram_report_delta", type=int,
help="Only report a change in ram if the value has changed "
"at least this many megabytes. [default: %(default)s]")
start_interval_group.add_argument(
"--master-reannounce", config="agent_master_reannounce", type=int,
help="Controls how often the agent should reannounce itself "
"to the master. The agent may be in contact with the master "
"more often than this however during long period of "
"inactivity this is how often the agent will 'inform' the "
"master the agent is still online.")
# start logging options
logging_group = start.add_argument_group(
"Logging Options",
description="Settings which control logging of the agent's parent "
"process and/or any subprocess it runs.")
logging_group.add_argument(
"--log", config="agent_log",
help="If provided log all output from the agent to this path. "
"This will append to any existing log data. [default: "
"%(default)s]")
logging_group.add_argument(
"--capture-process-output", config="jobtype_capture_process_output",
action="store_true",
help="If provided then all log output from each process launched "
"by the agent will be sent through agent's loggers.")
logging_group.add_argument(
"--task-log-dir", config="jobtype_task_logs",
type=isdir, type_kwargs=dict(create=True),
help="The directory tasks should log to.")
# network options for the agent when start is called
start_network = start.add_argument_group(
"Network Service",
description="Controls how the agent is seen or interacted with "
"by external services such as the master.")
start_network.add_argument(
"--ip-remote", type=ip, config=False,
help="The remote IPv4 address to report. In situation where the "
"agent is behind a firewall this value will typically be "
"different.")
start_manhole = start.add_argument_group(
"Manhole Service",
description="Controls the manhole service which allows a telnet "
"connection to be made directly into the agent as "
"it's running.")
start_manhole.add_argument(
"--enable-manhole", config="agent_manhole",
action="store_true",
help="When provided the manhole service will be started once the "
"reactor is running.")
start_manhole.add_argument(
"--manhole-port", config="agent_manhole_port", type=port,
type_kwargs=dict(get_uid=lambda: self.args.uid == 0),
help="The port the manhole service should run on if enabled.")
start_manhole.add_argument(
"--manhole-username", config="agent_manhole_username",
help="The telnet username that's allowed to connect to the "
"manhole service running on the agent.")
start_manhole.add_argument(
"--manhole-password", config="agent_manhole_password",
help="The telnet password to use when connecting to the "
"manhole service running on the agent.")
# various options for how the agent will interact with the
# master server
start_http_group = start.add_argument_group(
"HTTP Configuration",
description="Options for how the agent will interact with the "
"master's REST api and how it should run it's own "
"REST api.")
start_http_group.add_argument(
"--html-templates-reload", config="agent_html_template_reload",
action="store_true",
help="If provided then force Jinja2, the html template system, "
"to check the file system for changes with every request. "
"This flag should not be used in production but is useful "
"for development and debugging purposes.")
start_http_group.add_argument(
"--static-files", config="agent_static_root", type=isdir,
help="The default location where the agent's http server should "
"find static files to serve.")
start_http_group.add_argument(
"--http-retry-delay-offset",
config="agent_http_retry_delay_offset", type=number,
help="If a http request to the master has failed, wait at least "
"this amount of time before resending the request.")
start_http_group.add_argument(
"--http-retry-delay-factor",
config="agent_http_retry_delay_factor", type=number,
help="The value provided here is used in combination with "
"--http-retry-delay-offset to calculate the retry delay. "
"This is used as a multiplier against random() before being "
"added to the offset.")
jobtype_group = start.add_argument_group("Job Types")
jobtype_group.add_argument(
"--jobtype-no-cache", config="jobtype_enable_cache",
action="store_true",
help="If provided then do not cache job types, always directly "
"retrieve them. This is beneficial if you're testing the "
"agent or a new job type class.")
# options when stopping the agent
stop_group = stop.add_argument_group(
"optional flags",
description="Flags that control how the agent is stopped")
stop_group.add_argument(
"--no-wait", default=False, action="store_true", config=False,
help="If provided then don't wait on the agent to shut itself "
"down. By default we would want to wait on each task to stop "
"so we can catch any errors and then finally wait on the "
"agent to shutdown too. If you're in a hurry or stopping a "
"bunch of agents at once then setting this flag will let the "
"agent continue to stop itself without waiting for each agent")
def __call__(self):
logger.debug("Parsing command line arguments")
self.args = self.parser.parse_args()
# No daemon mode must be set with --pdb-on-unhanded, without this
# you could end up with a blocking agent and no way of knowing
# about it.
if not self.args.no_daemon and self.args.pdb_on_unhandled:
self.parser.error(
"You cannot set --pdb-on-unhandled without --no-daemon")
if self.args.pdb_on_unhandled:
Observer.PDB_ON_UNHANDLED_ERROR = True
def excepthook(exctype, value, traceback):
pdb.set_trace()
sys.excepthook = excepthook
if not config["master"] and self.args.target_name == "start":
self.parser.error(
"--master must be provided (ex. "
"'pyfarm-agent --master=foobar start')")
# if we're on windows, produce some warnings about
# flags which are not supported
if WINDOWS and self.args.uid:
logger.warning("--uid is not currently supported on Windows")
if WINDOWS and self.args.gid:
logger.warning("--gid is not currently supported on Windows")
if WINDOWS and self.args.no_daemon:
logger.warning("--no-daemon is not currently supported on Windows")
if self.args.agent_id is None:
agent_id = AgentUUID.load(self.args.agent_id_file)
# No agent id saved, generate one then try to save it. If we
# can't then an error will be raised when AgentUUID.save is called.
if agent_id is None:
agent_id = AgentUUID.generate()
AgentUUID.save(agent_id, self.args.agent_id_file)
self.args.agent_id = agent_id
# A custom --agent-id was provided, warn if it varies from one
# we load from disk. We won't try to save it however because
# that could cause conflicts if someone is using --agent-id
# and trying to run multiple agents.
else:
saved_agent_id = AgentUUID.load(self.args.agent_id_file)
if (saved_agent_id is not None
and saved_agent_id != self.args.agent_id):
logger.warning(
"Custom agent ID has been provided by --agent-id")
config["agent_id"] = self.args.agent_id
if self.args.target_name == "start":
# update configuration with values from the command line
config_flags = {
"state": self.args.state,
"pids": {
"parent": os.getpid()}}
config.update(config_flags)
return_code = self.args.target_func()
# If a specific return code is provided then use it
# directly in sys.exit
if isinstance(return_code, INTEGER_TYPES):
sys.exit(return_code)
@property
def agent_api(self):
return "http://%s:%s/api/v1" % (
config["agent_hostname"], config["agent_api_port"])
[docs] def start(self):
url = self.agent_api + "/status"
try:
response = requests.get(
url, headers={"Content-Type": "application/json"})
except ConnectionError:
pid = None
remove_lock_file = False
# Try to open an existing lock file
try:
with open(config["agent_lock_file"], "r") as pidfile:
try:
pid = int(pidfile.read().strip())
except ValueError:
logger.warning(
"Could not convert pid in %s to an integer.",
config["agent_lock_file"])
remove_lock_file = True
# If the file is missing we ignore the error and read
# pid/remove_lock_file later on.
except (OSError, IOError) as e:
if e.errno == ENOENT:
logger.debug(
"Process ID file %s does not exist",
config["agent_lock_file"])
else:
raise
else:
assert pid is not None, "pid was not set"
logger.debug(
"Process ID file %s exists", config["agent_lock_file"])
if pid is not None:
try:
process = psutil.Process(pid)
# Process does exist, does it appear to be our agent?
if process.name() == "pyfarm-agent":
logger.error(
"Agent is already running, pid %s", pid)
return 1
# Not our agent so the lock file is probably wrong.
else:
logger.debug(
"Process %s does not appear to be the "
"agent.", pid)
remove_lock_file = True
# Process in the pid file does not exist or we don't have the
# rights to access it
except (psutil.NoSuchProcess, psutil.AccessDenied):
logger.debug(
"Process ID in %s is stale.",
config["agent_lock_file"])
remove_lock_file = True
if remove_lock_file:
logger.debug(
"Attempting to remove PID file %s",
config["agent_lock_file"])
try:
remove_file(
config["agent_lock_file"],
retry_on_exit=True, raise_=True)
except (WindowsError, OSError, IOError):
return 1
else:
code = "%s %s" % (
response.status_code, responses[response.status_code])
pid = response.json()["pids"]["parent"]
logger.error(
"Agent at pid %s is already running, got %s from %s.",
pid, code, url)
return 1
logger.info("Starting agent")
if not isdir(config["jobtype_task_logs"]):
logger.debug("Creating %s", config["jobtype_task_logs"])
try:
os.makedirs(config["jobtype_task_logs"])
except (OSError, WindowsError):
logger.error("Failed to create %s", config["jobtype_task_logs"])
return 1
# create the directory for log
if not self.args.no_daemon and not isfile(config["agent_log"]):
try:
os.makedirs(dirname(config["agent_log"]))
except (OSError, WindowsError) as e:
# Not an error because it could be created later on
logger.warning(
"failed to create %s: %r", dirname(config["agent_log"]), e)
# If we don't explicitly clear this exception here, twisted will
# keep displaying it with its stacktrace every time a callback
# raises an exception.
sys.exc_clear()
# so long as fork could be imported and --no-daemon was not set
# then setup the log files
if not self.args.no_daemon and fork is not NotImplemented:
logger.info("sending log output to %s" % config["agent_log"])
daemon_start_return_code = start_daemon_posix(
config["agent_log"], config["agent_chdir"],
self.args.uid, self.args.gid)
if isinstance(daemon_start_return_code, INTEGER_TYPES):
return daemon_start_return_code
elif not self.args.no_daemon and fork is NotImplemented:
logger.warning(
"`fork` is not implemented on %s, starting in "
"foreground" % OS.title())
# PID file should not exist now. Either the last agent instance
# should have removed it or we should hae above.
if isfile(config["agent_lock_file"]):
logger.error("PID file should not exist on disk at this point.")
return 1
# Create the directory for the pid file if necessary
pid_dirname = dirname(config["agent_lock_file"])
if not isdir(pid_dirname):
try:
os.makedirs(pid_dirname)
except OSError: # pragma: no cover
logger.error(
"Failed to create parent directory for %s",
config["agent_lock_file"])
return 1
else:
logger.debug("Created directory %s", pid_dirname)
# Write the PID file
pid = os.getpid()
try:
with open(config["agent_lock_file"], "w") as pidfile:
pidfile.write(str(pid))
except OSError as e:
logger.error(
"Failed to write PID file %s: %s", config["agent_lock_file"], e)
return 1
else:
logger.debug("Wrote PID to %s", config["agent_lock_file"])
logger.info("pid: %s", pid)
if not isfile(config["run_control_file"]):
directory = dirname(config["run_control_file"])
try:
os.makedirs(directory)
except (OSError, IOError) as e: # pragma: no cover
if e.errno != EEXIST:
logger.error(
"Failed to create parent directory for %s: %s: %s",
config["run_control_file"], type(e).__name__, e)
raise
else:
logger.debug("Created directory %s", directory)
try:
with open(config["run_control_file"], "a"):
pass
except (OSError, IOError) as e:
logger.error("Failed to create run control file %s: %s: %s",
config["run_control_file"], type(e).__name__, e)
else:
logger.info("Created run control file %s",
config["run_control_file"])
if getuid is not NotImplemented:
logger.info("uid: %s" % getuid())
if getgid is not NotImplemented:
logger.info("gid: %s" % getgid())
from pyfarm.agent.service import Agent
# Setup the agent, register stop(), then run the agent
service = Agent()
signal.signal(signal.SIGINT, service.sigint_handler)
reactor.callWhenRunning(service.start)
reactor.run()
[docs] def stop(self):
logger.info("Stopping the agent")
url = self.agent_api + "/stop"
try:
# TODO: this NEEDS to be an authenticated request
response = requests.post(
url,
data=dumps({"wait": not self.args.no_wait}),
headers={"Content-Type": "application/json"})
except Exception as e:
logger.error("Failed to contact %s: %s", url, e)
return 1
if response.status_code == ACCEPTED:
logger.info("Agent is stopping")
elif response.status_code == OK:
logger.info("Agent has stopped")
else:
logger.error(
"Received code %s when attempting to access %s: %s",
response.status_code, url, response.text)
[docs] def status(self):
url = self.agent_api + "/status"
logger.debug("Checking agent status via api using 'GET %s'", url)
try:
response = requests.get(
url, headers={"Content-Type": "application/json"})
# REST request failed for some reason, try with the pid file
except Exception as e:
logger.debug(str(e))
logger.warning(
"Failed to communicate with %s's API. We can only roughly "
"determine if agent is offline or online.",
config["agent_hostname"])
# TODO: use config for pidfile, --pidfile should be an override
if not isfile(config["agent_lock_file"]):
logger.debug(
"Process ID file %s does not exist",
config["agent_lock_file"])
logger.info("Agent is offline")
return 1
else:
with open(config["agent_lock_file"], "r") as pidfile:
try:
pid = int(pidfile.read().strip())
except ValueError:
logger.error(
"Could not convert pid in %s to an integer.",
config["agent_lock_file"])
return 1
try:
process = psutil.Process(pid)
except psutil.NoSuchProcess:
logger.info("Agent is offline.")
return 1
else:
if process.name() == "pyfarm-agent":
logger.info("Agent is online.")
else:
logger.warning(
"Process %s does not appear to be the agent", pid)
logger.info("Agent is offline.")
return
data = response.json()
pid_parent = data["pids"]["parent"]
pid_child = data["pids"]["child"]
# Print some general information about the agent
logger.info("Agent %(agent_hostname)s is %(state)s" % data)
logger.info(" Uptime: %(uptime)s seconds" % data)
logger.info(
" Last Master Contact: %(last_master_contact)s seconds" % data)
logger.info(" Parent Process ID: %(pid_parent)s" % locals())
logger.info(" Process ID: %(pid_child)s" % locals())
logger.info(" Database ID: %(id)s" % data)
logger.info(" System ID: %(agent_id)s" % data)
logger.info(
" Child Processes: %(child_processes)s "
"(+%(grandchild_processes)s grandchildren)" % data)
logger.info(" Memory Consumption: %(consumed_ram)sMB" % data)
agent = AgentEntryPoint()