Source code for pyfarm.master.api.tasklogs

# No shebang line, this module is meant to be imported
#
# Copyright 2013 Oliver Palmer
# Copyright 2014 Ambient Entertainment GmbH & Co. KG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Task Logs
---------

This module defines an API for managing and querying logs belonging to tasks
"""

try:
    from httplib import (
      OK, NOT_FOUND, CONFLICT, TEMPORARY_REDIRECT, CREATED, BAD_REQUEST,
      INTERNAL_SERVER_ERROR)
except ImportError:  # pragma: no cover
    from http.client import (
      OK, NOT_FOUND, CONFLICT, TEMPORARY_REDIRECT, CREATED, BAD_REQUEST,
      INTERNAL_SERVER_ERROR)

import tempfile
from gzip import GzipFile
from os import makedirs
from os.path import join, realpath
from errno import EEXIST

from flask.views import MethodView
from flask import g, redirect, send_file, request, Response

from pyfarm.core.logger import getLogger
from pyfarm.core.config import read_env
from pyfarm.models.tasklog import TaskLog, TaskTaskLogAssociation
from pyfarm.models.task import Task
from pyfarm.master.application import db
from pyfarm.master.utility import jsonify, validate_with_model, isuuid

logger = getLogger("api.tasklogs")

# TODO a temp directory might not be a good default for putting logs
LOGFILES_DIR = read_env(
    "PYFARM_LOGFILES_DIR", join(tempfile.gettempdir(), "task_logs"))

try:
    makedirs(LOGFILES_DIR)
except OSError as e:  # pragma: no cover
    if e.errno != EEXIST:
        raise


[docs]class LogsInTaskAttemptsIndexAPI(MethodView):
[docs] def get(self, job_id, task_id, attempt): """ A ``GET`` to this endpoint will return a list of all known logs that are associated with this attempt at running this task .. http:get:: /api/v1/jobs/<job_id>/tasks/<task_id>/attempts/<attempt>/logs/ HTTP/1.1 **Request** .. sourcecode:: http GET /api/v1/jobs/4/tasks/1300/attempts/5/logs/ HTTP/1.1 Accept: application/json **Response** .. sourcecode:: http HTTP/1.1 200 OK Content-Type: application/json [ { "agent_id": "3087ada4-290a-45b0-8c1a-21db4cd284fc", "created_on": "2014-09-03T10:58:59.754880", "identifier": "2014-09-03_10-58-59_4_4ee02475335911e4a935c86000cbf5fb.csv" } ] :statuscode 200: no error :statuscode 404: the specified task was not found """ task = Task.query.filter_by(id=task_id, job_id=job_id).first() if not task: return jsonify(task_id=task_id, job_id=job_id, error="Specified task not found"), NOT_FOUND association_objects = TaskTaskLogAssociation.query.filter( TaskTaskLogAssociation.task == task, TaskTaskLogAssociation.attempt == attempt) out = [] for item in association_objects: log = item.log out.append({"identifier": log.identifier, "created_on": log.created_on, "agent_id": str(log.agent_id)}) return jsonify(out), OK
@validate_with_model(TaskLog, type_checks={"agent_id": isuuid})
[docs] def post(self, job_id, task_id, attempt): """ A ``POST`` to this endpoint will register a new logfile with the given attempt at running the given task A logfile has an identifier which must be unique in the system. If two tasks get assigned a logfile with the same id, it is considered to be the same log. .. http:post:: /api/v1/jobs/<job_id>/tasks/<task_id>/attempts/<attempt>/logs/ HTTP/1.1 **Request** .. sourcecode:: http POST /api/v1/jobs/4/tasks/1300/attempts/5/logs/ HTTP/1.1 Content-Type: application/json { "identifier": "2014-09-03_10-58-59_4_4ee02475335911e4a935c86000cbf5fb.csv", "agent_id": "2dc2cb5a-35da-41d6-8864-329c0d7d5391" } **Response** .. sourcecode:: http HTTP/1.1 201 CREATED Content-Type: application/json { "identifier": "2014-09-03_10-58-59_4_4ee02475335911e4a935c86000cbf5fb.csv", "agent_id": "2dc2cb5a-35da-41d6-8864-329c0d7d5391", "created_on": "2014-09-03T10:59:05.103005", "id": 148 } :statuscode 201: the association between this task attempt and logfile has been created :statuscode 400: there was something wrong with the request (such as invalid columns being included) :statuscode 404: the specified task does not exist :statuscode 409: the specified log was already registered on the specified task """ task = Task.query.filter_by(id=task_id, job_id=job_id).first() if not task: return jsonify(task_id=task_id, job_id=job_id, error="Specified task not found"), NOT_FOUND path = realpath(join(LOGFILES_DIR, g.json["identifier"])) if not realpath(path).startswith(LOGFILES_DIR): return jsonify(error="Identifier is not acceptable"), BAD_REQUEST task_log = TaskLog.query.filter_by( identifier=g.json["identifier"]).first() if not task_log: task_log = TaskLog(**g.json) association = TaskTaskLogAssociation.query.filter_by( task=task, log=task_log, attempt=attempt).first() if association: return (jsonify( log=task_log, attempt=attempt, task_id=task_id, error="This log is already registered for this task"), CONFLICT) association = TaskTaskLogAssociation() association.task = task association.log = task_log association.attempt = attempt db.session.add(association) db.session.add(task_log) db.session.commit() logger.info("Registered task log %s with attempt %s for task %s", task_log.identifier, attempt, task.id) return jsonify(task_log.to_dict(unpack_relationships=False)), CREATED
[docs]class SingleLogInTaskAttempt(MethodView):
[docs] def get(self, job_id, task_id, attempt, log_identifier): """ A ``GET`` to this endpoint will return metadata about the specified logfile .. http:get:: /api/v1/jobs/<job_id>/tasks/<task_id>/attempts/<attempt>/logs/<log_identifier> HTTP/1.1 **Request** .. sourcecode:: http GET /api/v1/jobs/4/tasks/1300/attempts/5/logs/2014-09-03_10-58-59_4_4ee02475335911e4a935c86000cbf5fb.csv HTTP/1.1 Accept: application/json **Response** .. sourcecode:: http HTTP/1.1 200 OK Content-Type: application/json { "id": 147, "identifier": "2014-09-03_10-58-59_4_4ee02475335911e4a935c86000cbf5fb.csv", "created_on": "2014-09-03T10:58:59.754880", "agent_id": "836ce137-6ad4-443f-abb9-94c4465ff87c" } :statuscode 200: no error :statuscode 404: task or logfile not found """ task = Task.query.filter_by(id=task_id, job_id=job_id).first() if not task: return jsonify(task_id=task_id, job_id=job_id, error="Specified task not found"), NOT_FOUND log = TaskLog.query.filter_by(identifier=log_identifier).first() if not log: return jsonify(task_id=task_id, job_id=job_id, error="Specified log not found"), NOT_FOUND association = TaskTaskLogAssociation.query.filter_by( task=task, log=log, attempt=attempt).first() if not association: return jsonify(task_id=task.id, log=log.identifier, error="Specified log not found in task"), NOT_FOUND return jsonify(log.to_dict(unpack_relationships=False))
[docs]class TaskLogfileAPI(MethodView):
[docs] def get(self, job_id, task_id, attempt, log_identifier): """ A ``GET`` to this endpoint will return the actual logfile or a redirect to it. .. http:get:: /api/v1/jobs/<job_id>/tasks/<task_id>/attempts/<attempt>/logs/<log_identifier>/logfile HTTP/1.1 **Request** .. sourcecode:: http GET /api/v1/jobs/4/tasks/1300/attempts/5/logs/2014-09-03_10-58-59_4_4ee02475335911e4a935c86000cbf5fb.csv/logfile HTTP/1.1 Accept: text/csv **Response** .. sourcecode:: http HTTP/1.1 200 OK Content-Type: text/csv <Content of the logfile> :statuscode 200: no error :statuscode 307: The logfile can be found in another location at this point in time. Independent future requests for the same logfile should continue using the original URL :statuscode 400: the specified logfile identifier is not acceptable :statuscode 404: task or logfile not found """ task = Task.query.filter_by(id=task_id, job_id=job_id).first() if not task: return jsonify(task_id=task_id, log=log_identifier, error="Specified task not found"), NOT_FOUND log = TaskLog.query.filter_by(identifier=log_identifier).first() if not log: return jsonify(task_id=task_id, log=log_identifier, error="Specified log not found"), NOT_FOUND association = TaskTaskLogAssociation.query.filter_by( task=task, log=log, attempt=attempt).first() if not association: return jsonify(task_id=task.id, log=log.identifier, error="Specified log not found in task"), NOT_FOUND path = realpath(join(LOGFILES_DIR, log_identifier)) if not realpath(path).startswith(LOGFILES_DIR): return jsonify(error="Identifier is not acceptable"), BAD_REQUEST try: logfile = open(path, "rb") return send_file(logfile) except IOError: try: compressed_logfile = GzipFile("%s.gz" % path, "rb") def logfile_generator(): eof = False while not eof: out = compressed_logfile.read(4096) # 4096 == mempage eof = len(out) == 0 yield out return Response(logfile_generator(), mimetype="text/csv") except IOError: agent = log.agent if not agent: return (jsonify( path=path, log=log_identifier, error="Logfile is not available on master and agent " "is not known"), NOT_FOUND) return redirect(agent.api_url() + "/task_logs/" + log_identifier, TEMPORARY_REDIRECT)
[docs] def put(self, job_id, task_id, attempt, log_identifier): """ A ``PUT`` to this endpoint will upload the request's body as the specified logfile .. http:put:: /api/v1/jobs/<job_id>/tasks/<task_id>/attempts/<attempt>/logs/<log_identifier>/logfile HTTP/1.1 **Request** .. sourcecode:: http PUT /api/v1/jobs/4/tasks/1300/attempts/5/logs/2014-09-03_10-58-59_4_4ee02475335911e4a935c86000cbf5fb.csv/logfile HTTP/1.1 <content of the logfile> **Response** .. sourcecode:: http HTTP/1.1 201 CREATED :statuscode 201: lofile was uploaded :statuscode 400: the specified logfile identifier is not acceptable :statuscode 404: task or logfile not found """ task = Task.query.filter_by(id=task_id, job_id=job_id).first() if not task: return jsonify(task_id=task_id, log=log_identifier, error="Specified task not found"), NOT_FOUND log = TaskLog.query.filter_by(identifier=log_identifier).first() if not log: return jsonify(task_id=task_id, log=log_identifier, error="Specified log not found"), NOT_FOUND association = TaskTaskLogAssociation.query.filter_by( task=task, log=log, attempt=attempt).first() if not association: return jsonify(task_id=task_id, log=log.identifier, error="Specified log not found in task"), NOT_FOUND path = realpath(join(LOGFILES_DIR, log_identifier)) if not realpath(path).startswith(LOGFILES_DIR): return jsonify(error="Identifier is not acceptable"), BAD_REQUEST logger.info("Writing task log file for task %s, attempt %s to path %s", task_id, attempt, path) try: with open(path, "wb+") as log_file: log_file.write(request.data) except (IOError, OSError) as e: logger.error("Could not write task log file: %s (%s)", e.errno, e.strerror) return (jsonify(error="Could not write file %s to disk: %s" % (path, e)), INTERNAL_SERVER_ERROR) return "", CREATED