Source code for pyfarm.agent.utility

# No shebang line, this module is meant to be imported
#
# Copyright 2014 Oliver Palmer
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Utilities
---------

Top level utilities for the agent to use internally.  Many of these
are copied over from the master (which we can't import here).
"""

import atexit
import csv
import codecs
import os
import shutil
from decimal import Decimal
from datetime import datetime, timedelta
from errno import EEXIST, ENOENT, errorcode
from json import dumps as _dumps
from os.path import dirname
from UserDict import UserDict
from uuid import UUID, uuid4

try:
    from urlparse import urlsplit
    from urllib import quote
except ImportError:  # pragma: no cover
    from http.client import urlsplit
    from urllib.parse import quote

try:
    from httplib import OK
except ImportError:  # pragma: no cover
    from http.client import OK

try:
    WindowsError
except NameError:  # pragma: no cover
    WindowsError = OSError

from voluptuous import Schema, Any, Required, Optional, Invalid
from twisted.internet import reactor
from twisted.internet.defer import AlreadyCalledError, DeferredLock, Deferred
from twisted.internet.error import AlreadyCalled

from pyfarm.core.enums import STRING_TYPES
from pyfarm.agent.config import config
from pyfarm.agent.logger import getLogger

logger = getLogger("agent.util")
STRINGS = Any(*STRING_TYPES)
try:
    WHOLE_NUMBERS = Any(*(int, long))
    NUMBERS = Any(*(int, long, float, Decimal))
except NameError:  # pragma: no cover
    WHOLE_NUMBERS = int
    NUMBERS = Any(*(int, float, Decimal))


[docs]def validate_environment(values): """ Ensures that ``values`` is a dictionary and that it only contains string keys and values. """ if not isinstance(values, dict): raise Invalid("Expected a dictionary") for key, value in values.items(): if not isinstance(key, STRING_TYPES): raise Invalid("Key %r must be a string" % key) if not isinstance(value, STRING_TYPES): raise Invalid("Value %r for key %r must be a string" % (key, value))
[docs]def validate_uuid(value): """ Ensures that ``value`` can be converted to or is a UUID object. """ if isinstance(value, UUID): return value elif isinstance(value, STRING_TYPES): try: return UUID(hex=value) except ValueError: raise Invalid("%s cannot be converted to a UUID" % value) else: raise Invalid("Expected string or UUID instance for `value`")
# Shared schema declarations JOBTYPE_SCHEMA = Schema({ Required("name"): STRINGS, Required("version"): WHOLE_NUMBERS}) TASK_SCHEMA = Schema({ Required("id"): WHOLE_NUMBERS, Required("frame"): NUMBERS, Optional("tile"): Any(WHOLE_NUMBERS, type(None)), Required("attempt", default=0): WHOLE_NUMBERS}) TASKS_SCHEMA = lambda values: map(TASK_SCHEMA, values) NOTIFIED_USERS_SCHEMA = Schema({ Required("username"): STRINGS, Required("on_success", default=True): bool, Required("on_failure", default=True): bool, Required("on_deletion", default=False): bool}) JOB_SCHEMA = Schema({ Required("id"): WHOLE_NUMBERS, Required("by"): NUMBERS, Optional("num_tiles"): Any(WHOLE_NUMBERS, type(None)), Optional("agent_id"): WHOLE_NUMBERS, Optional("batch"): WHOLE_NUMBERS, Optional("user"): STRINGS, Optional("ram"): WHOLE_NUMBERS, Optional("ram_warning"): Any(WHOLE_NUMBERS, type(None)), Optional("ram_max"): Any(WHOLE_NUMBERS, type(None)), Optional("cpus"): WHOLE_NUMBERS, Optional("data"): dict, Optional("environ"): validate_environment, Optional("title"): STRINGS, Optional("notified_users"): [NOTIFIED_USERS_SCHEMA], Optional("priority"): WHOLE_NUMBERS, Optional("job_group_id"): WHOLE_NUMBERS, Optional("job_group"): STRINGS, Optional("notes"): STRINGS, Optional("tags"): [STRINGS]})
[docs]def default_json_encoder(obj, return_obj=False): if isinstance(obj, Decimal): return float(obj) elif isinstance(obj, datetime): return obj.isoformat() elif isinstance(obj, UUID): return str(obj) elif return_obj: return obj
[docs]def json_safe(source): """ Recursively converts ``source`` into something that should be safe for :func:`json.dumps` to handle. This is used in conjunction with :func:`default_json_encoder` to also convert keys to something the json encoder can understand. """ if not isinstance(source, dict): return source result = {} try: items = source.iteritems except AttributeError: # pragma: no cover items = source.items for k, v in items(): result[default_json_encoder(k, return_obj=True)] = \ default_json_encoder(json_safe(v), return_obj=True) return result
[docs]def quote_url(source_url): """ This function serves as a wrapper around :func:`.urlsplit` and :func:`.quote` and a url that has the path quoted. """ url = urlsplit(source_url) # If a url is just "/" then we don't want to add :// # since a scheme was not included. if url.scheme: result = "{scheme}://".format(scheme=url.scheme) else: result = "" result += url.netloc + quote(url.path, safe="/?&=") if url.query: result += "?" + url.query if url.fragment: result += "#" + url.fragment return result
[docs]def dumps(*args, **kwargs): """ Agent's implementation of :func:`json.dumps` or :func:`pyfarm.master.utility.jsonify` """ indent = None if config["agent_pretty_json"]: indent = 2 if len(args) == 1 and not isinstance(args[0], (dict, UserDict)): obj = args[0] else: obj = dict(*args, **kwargs) return _dumps(obj, default=default_json_encoder, indent=indent)
[docs]def request_from_master(request): """Returns True if the request appears to be coming from the master""" return request.getHeader("User-Agent") == config["master_user_agent"]
# Unicode CSV reader/writers from the standard library docs: # https://docs.python.org/2/library/csv.html
[docs]class UTF8Recoder(object): """ Iterator that reads an encoded stream and reencodes the input to UTF-8 """ def __init__(self, f, encoding): self.reader = codecs.getreader(encoding)(f) def __iter__(self): return self
[docs] def next(self): return self.reader.next().encode("utf-8")
[docs]class UnicodeCSVReader(object): """ A CSV reader which will iterate over lines in the CSV file "f", which is encoded in the given encoding. """ def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds): f = UTF8Recoder(f, encoding) self.reader = csv.reader(f, dialect=dialect, **kwds)
[docs] def next(self): row = self.reader.next() return [unicode(s, "utf-8") for s in row]
def __iter__(self): return self
[docs]class UnicodeCSVWriter(object): """ A CSV writer which will write rows to CSV file "f", which is encoded in the given encoding. """ def __init__(self, f, dialect=csv.excel, **kwds): self.writer = csv.writer(f, dialect=dialect, **kwds)
[docs] def writerow(self, row): self.writer.writerow([s.encode("utf-8") if isinstance(s, unicode) else s for s in row])
[docs] def writerows(self, rows): for row in rows: self.writerow(row)
[docs]def total_seconds(td): """ Returns the total number of seconds in the time delta object. This function is provided for backwards comparability with Python 2.6. """ assert isinstance(td, timedelta) try: return td.total_seconds() except AttributeError: # pragma: no cover return ( td.microseconds + (td.seconds + td.days * 24 * 3600) * 1e6) / 1e6
[docs]class AgentUUID(object): """ This class wraps all the functionality required to load, cache and retrieve an Agent's UUID. """ log = getLogger("agent.uuid")
[docs] @classmethod def load(cls, path): """ A classmethod to load a UUID object from a path. If the provided ``path`` does not exist or does not contain data which can be converted into a UUID object ``None`` will be returned. """ assert isinstance(path, STRING_TYPES), path cls.log.debug("Attempting to load agent UUID from %r", path) try: with open(path, "r") as loaded_file: data = loaded_file.read().strip() return UUID(data) except (WindowsError, OSError, IOError) as e: if e.errno == ENOENT: cls.log.warning("UUID file %s does not exist.", path) return None cls.log.error("Failed to load uuid from %s: %s", path, e) raise except ValueError: # pragma: no cover cls.log.error("%r does not contain valid data for a UUID", path) raise
[docs] @classmethod def save(cls, agent_uuid, path): """ Saves ``agent_uuid`` to ``path``. This classmethod will also create the necessary parent directories and handle conversion from the input type :class:`uuid.UUID`. """ assert isinstance(agent_uuid, UUID) assert isinstance(path, STRING_TYPES) cls.log.debug("Saving %r to %r", agent_uuid, path) # Create directory if it does not exist parent_dir = dirname(path) if parent_dir.strip(): try: os.makedirs(parent_dir) except (OSError, IOError) as e: # pragma: no cover if e.errno != EEXIST: cls.log.warning("Failed to create %s, %s", parent_dir, e) raise # Write uuid to disk try: with open(path, "w") as output: output.write(str(agent_uuid)) cls.log.debug("Cached %s to %r", agent_uuid, path) except (WindowsError, OSError, IOError) as e: # pragma: no cover cls.log.error("Failed to write %s, %s", path, e) raise
[docs] @classmethod def generate(cls): """ Generates a UUID object. This simply wraps :func:`uuid.uuid4` and logs a warning. """ agent_uuid = uuid4() cls.log.warning("Generated agent UUID: %s", agent_uuid) return agent_uuid
[docs]def remove_file( path, retry_on_exit=False, raise_=True, ignored_errnos=(ENOENT, )): """ Simple function to remove the provided file or retry on exit if requested. This function standardizes the log output, ensures it's only called once per path on exit and handles platform specific exceptions (ie. ``WindowsError``). :param bool retry_on_exit: If True, retry removal of the file when Python exists. :param bool raise_: If True, raise an exceptions produced. This will always be False if :func:`remove_file` is being executed by :mod:`atexit` :param tuple ignored_errnos: A tuple of ignored error numbers. By default this function only ignores ENOENT. """ try: os.remove(path) except (WindowsError, OSError, IOError) as error: if error.errno in ignored_errnos: return if retry_on_exit: logger.warning( "Failed to remove %s (%s), will retry on exit", path, errorcode[error.errno]) # Make sure we're not added multiple times keywords = dict(retry_on_exit=False, raise_=False) signature = (remove_file, (path, ), keywords) if signature not in atexit._exithandlers: atexit.register(remove_file, path, **keywords) else: logger.error( "Failed to remove %s (%s)", path, errorcode[error.errno]) if raise_: raise else: logger.debug("Removed %s", path)
[docs]def remove_directory( path, retry_on_exit=False, raise_=True, ignored_errnos=(ENOENT, )): """ Simple function to **recursively** remove the provided directory or retry on exit if requested. This function standardizes the log output, ensures it's only called once per path on exit and handles platform specific exceptions (ie. ``WindowsError``). :param bool retry_on_exit: If True, retry removal of the file when Python exists. :param bool raise_: If True, raise an exceptions produced. This will always be False if :func:`remove_directory` is being executed by :mod:`atexit` :param tuple ignored_errnos: A tuple of ignored error numbers. By default this function only ignores ENOENT. """ try: shutil.rmtree(path) except (WindowsError, OSError, IOError) as error: if error.errno in ignored_errnos: return if retry_on_exit: logger.warning( "Failed to remove %s (%s), will retry on exit", path, errorcode[error.errno]) # Make sure we're not added multiple times keywords = dict(retry_on_exit=False, raise_=False) signature = (remove_directory, (path, ), keywords) if signature not in atexit._exithandlers: atexit.register(remove_directory, path, **keywords) else: logger.error( "Failed to remove %s (%s)", path, errorcode[error.errno]) if raise_: raise else: logger.debug("Removed %s", path)
[docs]class LockTimeoutError(Exception): """Raised if we timeout while attempting to acquire a deferred lock"""
[docs]class TimedDeferredLock(DeferredLock): """ A subclass of :class:`DeferredLock` which has a timeout for the :meth:`acquire` call. """ def _timeout(self, deferred): """ Raise a :class:`LockTimeoutError` in the given :class:`Deferred` instance. This ensures that when a timeout is triggered it's propagated all the way up the calling stack. """ try: raise LockTimeoutError except LockTimeoutError as timeout: deferred.errback(fail=timeout) def _cancel_timeout(self, _, scheduled_call): """ Cancels a scheduled call, ignoring cases where the event has already been canceled. """ try: scheduled_call.cancel() except AlreadyCalled: # pragma: no cover pass def _call_callback(self, _, deferred): """ Calls the callback() on the given deferred instance, ignoring cases where it may already be called. This can happen if acquire() has already run. """ try: deferred.callback(None) except AlreadyCalledError: # pragma: no cover pass
[docs] def acquire(self, timeout=None): """ This method operates the same as :meth:`DeferredLock.acquire` does except it requires a timeout argument. :param int timeout: The number of seconds to wait before timing out. :raises LockTimeoutError: Raised if the timeout was reached before we could acquire the lock. """ assert timeout is None \ or isinstance(timeout, (int, float)) and timeout > 0 lock = DeferredLock.acquire(self) if timeout is None: return lock # Schedule a call to trigger finished.errback() which will raise # an exception. If lock finishes first however cancel the timeout # and unlock the lock by calling finished. finished = Deferred() lock.addCallback( self._cancel_timeout, reactor.callLater(timeout, self._timeout, finished)) lock.addCallback(self._call_callback, finished) return finished