Source code for pyfarm.master.api.tasklogs

# No shebang line, this module is meant to be imported
#
# Copyright 2013 Oliver Palmer
# Copyright 2014 Ambient Entertainment GmbH & Co. KG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Task Logs
---------

This module defines an API for managing and querying logs belonging to tasks
"""

try:
    from httplib import (
      OK, NOT_FOUND, CONFLICT, TEMPORARY_REDIRECT, CREATED, BAD_REQUEST,
      INTERNAL_SERVER_ERROR)
except ImportError:  # pragma: no cover
    from http.client import (
      OK, NOT_FOUND, CONFLICT, TEMPORARY_REDIRECT, CREATED, BAD_REQUEST,
      INTERNAL_SERVER_ERROR)

from gzip import GzipFile
from os import makedirs
from os.path import join, realpath
from errno import EEXIST

from flask.views import MethodView
from flask import g, redirect, send_file, request, Response

from sqlalchemy.exc import IntegrityError

from pyfarm.core.logger import getLogger
from pyfarm.master.config import config
from pyfarm.models.tasklog import TaskLog, TaskTaskLogAssociation
from pyfarm.models.task import Task
from pyfarm.master.application import db
from pyfarm.master.utility import jsonify, validate_with_model, isuuid

logger = getLogger("api.tasklogs")

LOGFILES_DIR = config.get("tasklogs_dir")

try:
    makedirs(LOGFILES_DIR)
except OSError as e:  # pragma: no cover
    if e.errno != EEXIST:
        raise


[docs]class LogsInTaskAttemptsIndexAPI(MethodView):
[docs] def get(self, job_id, task_id, attempt): """ A ``GET`` to this endpoint will return a list of all known logs that are associated with this attempt at running this task .. http:get:: /api/v1/jobs/<job_id>/tasks/<task_id>/attempts/<attempt>/logs/ HTTP/1.1 **Request** .. sourcecode:: http GET /api/v1/jobs/4/tasks/1300/attempts/5/logs/ HTTP/1.1 Accept: application/json **Response** .. sourcecode:: http HTTP/1.1 200 OK Content-Type: application/json [ { "agent_id": "3087ada4-290a-45b0-8c1a-21db4cd284fc", "created_on": "2014-09-03T10:58:59.754880", "identifier": "2014-09-03_10-58-59_4_4ee02475335911e4a935c86000cbf5fb.csv" } ] :statuscode 200: no error :statuscode 404: the specified task was not found """ task = Task.query.filter_by(id=task_id, job_id=job_id).first() if not task: return jsonify(task_id=task_id, job_id=job_id, error="Specified task not found"), NOT_FOUND association_objects = TaskTaskLogAssociation.query.filter( TaskTaskLogAssociation.task == task, TaskTaskLogAssociation.attempt == attempt) out = [] for item in association_objects: log = item.log out.append({"identifier": log.identifier, "created_on": log.created_on, "agent_id": str(log.agent_id)}) return jsonify(out), OK
@validate_with_model(TaskLog, type_checks={"agent_id": isuuid})
[docs] def post(self, job_id, task_id, attempt): """ A ``POST`` to this endpoint will register a new logfile with the given attempt at running the given task A logfile has an identifier which must be unique in the system. If two tasks get assigned a logfile with the same id, it is considered to be the same log. .. http:post:: /api/v1/jobs/<job_id>/tasks/<task_id>/attempts/<attempt>/logs/ HTTP/1.1 **Request** .. sourcecode:: http POST /api/v1/jobs/4/tasks/1300/attempts/5/logs/ HTTP/1.1 Content-Type: application/json { "identifier": "2014-09-03_10-58-59_4_4ee02475335911e4a935c86000cbf5fb.csv", "agent_id": "2dc2cb5a-35da-41d6-8864-329c0d7d5391" } **Response** .. sourcecode:: http HTTP/1.1 201 CREATED Content-Type: application/json { "identifier": "2014-09-03_10-58-59_4_4ee02475335911e4a935c86000cbf5fb.csv", "agent_id": "2dc2cb5a-35da-41d6-8864-329c0d7d5391", "created_on": "2014-09-03T10:59:05.103005", "id": 148 } :statuscode 201: the association between this task attempt and logfile has been created :statuscode 400: there was something wrong with the request (such as invalid columns being included) :statuscode 404: the specified task does not exist :statuscode 409: the specified log was already registered on the specified task """ task = Task.query.filter_by(id=task_id, job_id=job_id).first() if not task: return jsonify(task_id=task_id, job_id=job_id, error="Specified task not found"), NOT_FOUND path = realpath(join(LOGFILES_DIR, g.json["identifier"])) if not realpath(path).startswith(LOGFILES_DIR): return jsonify(error="Identifier is not acceptable"), BAD_REQUEST attempts = 0 registered = False while not registered: try: task_log = TaskLog.query.filter_by( identifier=g.json["identifier"]).first() if task_log: association = TaskTaskLogAssociation.query.filter_by( task=task, log=task_log, attempt=attempt).first() if association: return (jsonify( log=task_log, attempt=attempt, task_id=task_id, error="This log is already registered for this " "task."), CONFLICT) if not task_log: task_log = TaskLog(**g.json) association = TaskTaskLogAssociation() association.task = task association.log = task_log association.attempt = attempt db.session.add(association) db.session.add(task_log) db.session.commit() registered = True except IntegrityError: if attempts < 3: attempts += 1 db.session.rollback() else: raise logger.info("Registered task log %s with attempt %s for task %s", task_log.identifier, attempt, task.id) return jsonify(task_log.to_dict(unpack_relationships=False)), CREATED
[docs]class SingleLogInTaskAttempt(MethodView):
[docs] def get(self, job_id, task_id, attempt, log_identifier): """ A ``GET`` to this endpoint will return metadata about the specified logfile .. http:get:: /api/v1/jobs/<job_id>/tasks/<task_id>/attempts/<attempt>/logs/<log_identifier> HTTP/1.1 **Request** .. sourcecode:: http GET /api/v1/jobs/4/tasks/1300/attempts/5/logs/2014-09-03_10-58-59_4_4ee02475335911e4a935c86000cbf5fb.csv HTTP/1.1 Accept: application/json **Response** .. sourcecode:: http HTTP/1.1 200 OK Content-Type: application/json { "id": 147, "identifier": "2014-09-03_10-58-59_4_4ee02475335911e4a935c86000cbf5fb.csv", "created_on": "2014-09-03T10:58:59.754880", "agent_id": "836ce137-6ad4-443f-abb9-94c4465ff87c" } :statuscode 200: no error :statuscode 404: task or logfile not found """ task = Task.query.filter_by(id=task_id, job_id=job_id).first() if not task: return jsonify(task_id=task_id, job_id=job_id, error="Specified task not found"), NOT_FOUND log = TaskLog.query.filter_by(identifier=log_identifier).first() if not log: return jsonify(task_id=task_id, job_id=job_id, error="Specified log not found"), NOT_FOUND association = TaskTaskLogAssociation.query.filter_by( task=task, log=log, attempt=attempt).first() if not association: return jsonify(task_id=task.id, log=log.identifier, error="Specified log not found in task"), NOT_FOUND return jsonify(log.to_dict(unpack_relationships=False))
[docs] def post(self, job_id, task_id, attempt, log_identifier): """ A ``POST`` to this endpoint will update metadata about the specified logfile .. http:post:: /api/v1/jobs/<job_id>/tasks/<task_id>/attempts/<attempt>/logs/<log_identifier> HTTP/1.1 **Request** .. sourcecode:: http POST /api/v1/jobs/4/tasks/1300/attempts/5/logs/2014-09-03_10-58-59_4_4ee02475335911e4a935c86000cbf5fb.csv HTTP/1.1 Accept: application/json Content-Type: application/json { "state": "done" } **Response** .. sourcecode:: http HTTP/1.1 200 OK Content-Type: application/json { "id": 147, "identifier": "2014-09-03_10-58-59_4_4ee02475335911e4a935c86000cbf5fb.csv", "created_on": "2014-09-03T10:58:59.754880", "agent_id": "836ce137-6ad4-443f-abb9-94c4465ff87c" } :statuscode 200: no error :statuscode 404: task or logfile not found """ task = Task.query.filter_by(id=task_id, job_id=job_id).first() if not task: return jsonify(task_id=task_id, job_id=job_id, error="Specified task not found"), NOT_FOUND log = TaskLog.query.filter_by(identifier=log_identifier).first() if not log: return jsonify(task_id=task_id, job_id=job_id, error="Specified log not found"), NOT_FOUND association = TaskTaskLogAssociation.query.filter_by( task=task, log=log, attempt=attempt).first() if not association: return jsonify(task_id=task.id, log=log.identifier, error="Specified log not found in task"), NOT_FOUND if "state" in g.json: new_state = g.json.pop("state") if new_state != "queued": association.state = new_state else: association.state = None db.session.add(association) if "agent_id" in g.json: association.agent_id = g.json.pop("agent_id") db.session.add(association) if "identifier" in g.json: return (jsonify(error="The tasklog identifier cannot be changed"), BAD_REQUEST) if "created_on" in g.json: return (jsonify(error="The created_on timestampt cannot be changed"), BAD_REQUEST) db.session.commit() return jsonify(log.to_dict(unpack_relationships=False))
[docs]class TaskLogfileAPI(MethodView):
[docs] def get(self, job_id, task_id, attempt, log_identifier): """ A ``GET`` to this endpoint will return the actual logfile or a redirect to it. .. http:get:: /api/v1/jobs/<job_id>/tasks/<task_id>/attempts/<attempt>/logs/<log_identifier>/logfile HTTP/1.1 **Request** .. sourcecode:: http GET /api/v1/jobs/4/tasks/1300/attempts/5/logs/2014-09-03_10-58-59_4_4ee02475335911e4a935c86000cbf5fb.csv/logfile HTTP/1.1 Accept: text/csv **Response** .. sourcecode:: http HTTP/1.1 200 OK Content-Type: text/csv <Content of the logfile> :statuscode 200: no error :statuscode 307: The logfile can be found in another location at this point in time. Independent future requests for the same logfile should continue using the original URL :statuscode 400: the specified logfile identifier is not acceptable :statuscode 404: task or logfile not found """ task = Task.query.filter_by(id=task_id, job_id=job_id).first() if not task: return jsonify(task_id=task_id, log=log_identifier, error="Specified task not found"), NOT_FOUND log = TaskLog.query.filter_by(identifier=log_identifier).first() if not log: return jsonify(task_id=task_id, log=log_identifier, error="Specified log not found"), NOT_FOUND association = TaskTaskLogAssociation.query.filter_by( task=task, log=log, attempt=attempt).first() if not association: return jsonify(task_id=task.id, log=log.identifier, error="Specified log not found in task"), NOT_FOUND path = realpath(join(LOGFILES_DIR, log_identifier)) if not realpath(path).startswith(LOGFILES_DIR): return jsonify(error="Identifier is not acceptable"), BAD_REQUEST try: logfile = open(path, "rb") return send_file(logfile) except IOError: try: compressed_logfile = GzipFile("%s.gz" % path, "rb") def logfile_generator(): eof = False while not eof: out = compressed_logfile.read(4096) # 4096 == mempage eof = len(out) == 0 yield out return Response(logfile_generator(), mimetype="text/csv") except IOError: agent = log.agent if not agent: return (jsonify( path=path, log=log_identifier, error="Logfile is not available on master and agent " "is not known"), NOT_FOUND) return redirect(agent.api_url() + "/task_logs/" + log_identifier, TEMPORARY_REDIRECT)
[docs] def put(self, job_id, task_id, attempt, log_identifier): """ A ``PUT`` to this endpoint will upload the request's body as the specified logfile .. http:put:: /api/v1/jobs/<job_id>/tasks/<task_id>/attempts/<attempt>/logs/<log_identifier>/logfile HTTP/1.1 **Request** .. sourcecode:: http PUT /api/v1/jobs/4/tasks/1300/attempts/5/logs/2014-09-03_10-58-59_4_4ee02475335911e4a935c86000cbf5fb.csv/logfile HTTP/1.1 <content of the logfile> **Response** .. sourcecode:: http HTTP/1.1 201 CREATED :statuscode 201: lofile was uploaded :statuscode 400: the specified logfile identifier is not acceptable :statuscode 404: task or logfile not found """ task = Task.query.filter_by(id=task_id, job_id=job_id).first() if not task: return jsonify(task_id=task_id, log=log_identifier, error="Specified task not found"), NOT_FOUND log = TaskLog.query.filter_by(identifier=log_identifier).first() if not log: return jsonify(task_id=task_id, log=log_identifier, error="Specified log not found"), NOT_FOUND association = TaskTaskLogAssociation.query.filter_by( task=task, log=log, attempt=attempt).first() if not association: return jsonify(task_id=task_id, log=log.identifier, error="Specified log not found in task"), NOT_FOUND path = realpath(join(LOGFILES_DIR, log_identifier)) if not realpath(path).startswith(LOGFILES_DIR): return jsonify(error="Identifier is not acceptable"), BAD_REQUEST logger.info("Writing task log file for task %s, attempt %s to path %s", task_id, attempt, path) try: with open(path, "wb+") as log_file: log_file.write(request.data) except (IOError, OSError) as e: logger.error("Could not write task log file: %s (%s)", e.errno, e.strerror) return (jsonify(error="Could not write file %s to disk: %s" % (path, e)), INTERNAL_SERVER_ERROR) return "", CREATED