Source code for pyfarm.agent.utility

# No shebang line, this module is meant to be imported
#
# Copyright 2014 Oliver Palmer
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Utilities
---------

Top level utilities for the agent to use internally.  Many of these
are copied over from the master (which we can't import here).
"""

import csv
import codecs
import cStringIO
import os
from decimal import Decimal
from datetime import datetime, timedelta
from errno import EEXIST, ENOENT
from json import dumps as _dumps
from os.path import join, dirname
from UserDict import UserDict
from uuid import UUID, uuid1, uuid4

try:
    from urlparse import urlsplit
    from urllib import quote
except ImportError:  # pragma: no cover
    from http.client import urlsplit
    from urllib.parse import quote

try:
    from httplib import OK
except ImportError:  # pragma: no cover
    from http.client import OK

from voluptuous import Schema, Any, Required, Optional, Invalid

from pyfarm.core.enums import STRING_TYPES
from pyfarm.agent.config import config
from pyfarm.agent.logger import getLogger

logger = getLogger("agent.util")
STRINGS = Any(*STRING_TYPES)
try:
    WHOLE_NUMBERS = Any(*(int, long))
    NUMBERS = Any(*(int, long, float, Decimal))
except NameError:  # pragma: no cover
    WHOLE_NUMBERS = int
    NUMBERS = Any(*(int, float, Decimal))


[docs]def validate_environment(values): """ Ensures that ``values`` is a dictionary and that it only contains string keys and values. """ if not isinstance(values, dict): raise Invalid("Expected a dictionary") for key, value in values.items(): if not isinstance(key, STRING_TYPES): raise Invalid("Key %r must be a string" % key) if not isinstance(value, STRING_TYPES): raise Invalid("Value %r for key %r must be a string" % (key, value))
[docs]def validate_uuid(value): """ Ensures that ``value`` can be converted to or is a UUID object. """ if isinstance(value, UUID): return value elif isinstance(value, STRING_TYPES): try: return UUID(hex=value) except ValueError: raise Invalid("%s cannot be converted to a UUID" % value) else: raise Invalid("Expected string or UUID instance for `value`") # Shared schema declarations
JOBTYPE_SCHEMA = Schema({ Required("name"): STRINGS, Required("version"): WHOLE_NUMBERS}) TASK_SCHEMA = Schema({ Required("id"): WHOLE_NUMBERS, Required("frame"): NUMBERS, Required("attempt", default=0): WHOLE_NUMBERS}) TASKS_SCHEMA = lambda values: map(TASK_SCHEMA, values) JOB_SCHEMA = Schema({ Required("id"): WHOLE_NUMBERS, Required("by"): NUMBERS, Optional("batch"): WHOLE_NUMBERS, Optional("user"): STRINGS, Optional("ram"): WHOLE_NUMBERS, Optional("ram_warning"): Any(WHOLE_NUMBERS, type(None)), Optional("ram_max"): Any(WHOLE_NUMBERS, type(None)), Optional("cpus"): WHOLE_NUMBERS, Optional("data"): dict, Optional("environ"): validate_environment, Optional("title"): STRINGS})
[docs]def default_json_encoder(obj, return_obj=False): if isinstance(obj, Decimal): return float(obj) elif isinstance(obj, datetime): return obj.isoformat() elif isinstance(obj, UUID): return str(obj) elif return_obj: return obj
[docs]def json_safe(source): """ Recursively converts ``source`` into something that should be safe for :func:`json.dumps` to handle. This is used in conjunction with :func:`default_json_encoder` to also convert keys to something the json encoder can understand. """ if not isinstance(source, dict): return source result = {} try: items = source.iteritems except AttributeError: # pragma: no cover items = source.items for k, v in items(): result[default_json_encoder(k, return_obj=True)] = \ default_json_encoder(json_safe(v), return_obj=True) return result
[docs]def quote_url(source_url): """ This function serves as a wrapper around :func:`.urlsplit` and :func:`.quote` and a url that has the path quoted. """ url = urlsplit(source_url) # If a url is just "/" then we don't want to add :// # since a scheme was not included. if url.scheme: result = "{scheme}://".format(scheme=url.scheme) else: result = "" result += url.netloc + quote(url.path, safe="/?&=") if url.query: result += "?" + url.query if url.fragment: result += "#" + url.fragment return result
[docs]def dumps(*args, **kwargs): """ Agent's implementation of :func:`json.dumps` or :func:`pyfarm.master.utility.jsonify` """ indent = None if config["agent_pretty_json"]: indent = 2 if len(args) == 1 and not isinstance(args[0], (dict, UserDict)): obj = args[0] else: obj = dict(*args, **kwargs) return _dumps(obj, default=default_json_encoder, indent=indent)
[docs]def request_from_master(request): """Returns True if the request appears to be coming from the master""" return request.getHeader("User-Agent") == config["master_user_agent"] # Unicode CSV reader/writers from the standard library docs: # https://docs.python.org/2/library/csv.html
[docs]class UTF8Recoder(object): """ Iterator that reads an encoded stream and reencodes the input to UTF-8 """ def __init__(self, f, encoding): self.reader = codecs.getreader(encoding)(f) def __iter__(self): return self
[docs] def next(self): return self.reader.next().encode("utf-8")
[docs]class UnicodeCSVReader(object): """ A CSV reader which will iterate over lines in the CSV file "f", which is encoded in the given encoding. """ def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds): f = UTF8Recoder(f, encoding) self.reader = csv.reader(f, dialect=dialect, **kwds)
[docs] def next(self): row = self.reader.next() return [unicode(s, "utf-8") for s in row]
def __iter__(self): return self
[docs]class UnicodeCSVWriter(object): """ A CSV writer which will write rows to CSV file "f", which is encoded in the given encoding. """ def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds): # Redirect output to a queue self.queue = cStringIO.StringIO() self.writer = csv.writer(self.queue, dialect=dialect, **kwds) self.stream = f self.encoder = codecs.getincrementalencoder(encoding)()
[docs] def writerow(self, row): self.writer.writerow([s.encode("utf-8") for s in row]) # Fetch UTF-8 output from the queue ... data = self.queue.getvalue() data = data.decode("utf-8") # ... and reencode it into the target encoding data = self.encoder.encode(data) # write to the target stream self.stream.write(data) # empty queue self.queue.truncate(0)
[docs] def writerows(self, rows): for row in rows: self.writerow(row)
[docs]def total_seconds(td): """ Returns the total number of seconds in the time delta object. This function is provided for backwards comparability with Python 2.6. """ assert isinstance(td, timedelta) try: return td.total_seconds() except AttributeError: # pragma: no cover return ( td.microseconds + (td.seconds + td.days * 24 * 3600) * 1e6) / 1e6
[docs]class AgentUUID(object): """ This class wraps all the functionality required to load, cache and retrieve an Agent's UUID. """ log = getLogger("agent.uuid") @classmethod
[docs] def load(cls, path): """ A classmethod to load a UUID object from a path. If the provided ``path`` does not exist or does not contain data which can be converted into a UUID object ``None`` will be returned. """ assert isinstance(path, STRING_TYPES), path cls.log.debug("Attempting to load agent UUID from %r", path) try: with open(path, "r") as loaded_file: data = loaded_file.read().strip() return UUID(data) except (OSError, IOError) as e: if e.errno == ENOENT: cls.log.warning("UUID file %s does not exist.", path) return None cls.log.error("Failed to load uuid from %s: %s", path, e) raise except ValueError: # pragma: no cover cls.log.error("%r does not contain valid data for a UUID", path) raise
@classmethod
[docs] def save(cls, agent_uuid, path): """ Saves ``agent_uuid`` to ``path``. This classmethod will also create the necessary parent directories and handle conversion from the input type :class:`uuid.UUID`. """ assert isinstance(agent_uuid, UUID) assert isinstance(path, STRING_TYPES) cls.log.debug("Saving %r to %r", agent_uuid, path) # Create directory if it does not exist parent_dir = dirname(path) if parent_dir.strip(): try: os.makedirs(parent_dir) except (OSError, IOError) as e: # pragma: no cover if e.errno != EEXIST: cls.log.warning("Failed to create %s, %s", parent_dir, e) raise # Write uuid to disk try: with open(path, "w") as output: output.write(str(agent_uuid)) cls.log.debug("Cached %s to %r", agent_uuid, path) except (OSError, IOError) as e: # pragma: no cover cls.log.error("Failed to write %s, %s", path, e) raise
@classmethod
[docs] def generate(cls): """ Generates a UUID object. This simply wraps :func:`uuid.uuid4` and logs a warning. """ agent_uuid = uuid4() cls.log.warning("Generated agent UUID: %s", agent_uuid) return agent_uuid