# No shebang line, this module is meant to be imported
#
# Copyright 2013 Oliver Palmer
# Copyright 2014 Ambient Entertainment GmbH & Co. KG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Task Logs
---------
This module defines an API for managing and querying logs belonging to tasks
"""
try:
from httplib import (
OK, NOT_FOUND, CONFLICT, TEMPORARY_REDIRECT, CREATED, BAD_REQUEST,
INTERNAL_SERVER_ERROR)
except ImportError: # pragma: no cover
from http.client import (
OK, NOT_FOUND, CONFLICT, TEMPORARY_REDIRECT, CREATED, BAD_REQUEST,
INTERNAL_SERVER_ERROR)
from gzip import GzipFile
from os import makedirs
from os.path import join, realpath
from errno import EEXIST
from flask.views import MethodView
from flask import g, redirect, send_file, request, Response
from sqlalchemy.exc import IntegrityError
from pyfarm.core.logger import getLogger
from pyfarm.master.config import config
from pyfarm.models.tasklog import TaskLog, TaskTaskLogAssociation
from pyfarm.models.task import Task
from pyfarm.master.application import db
from pyfarm.master.utility import jsonify, validate_with_model, isuuid
logger = getLogger("api.tasklogs")
LOGFILES_DIR = config.get("tasklogs_dir")
try:
makedirs(LOGFILES_DIR)
except OSError as e: # pragma: no cover
if e.errno != EEXIST:
raise
[docs]class LogsInTaskAttemptsIndexAPI(MethodView):
[docs] def get(self, job_id, task_id, attempt):
"""
A ``GET`` to this endpoint will return a list of all known logs that are
associated with this attempt at running this task
.. http:get:: /api/v1/jobs/<job_id>/tasks/<task_id>/attempts/<attempt>/logs/ HTTP/1.1
**Request**
.. sourcecode:: http
GET /api/v1/jobs/4/tasks/1300/attempts/5/logs/ HTTP/1.1
Accept: application/json
**Response**
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Type: application/json
[
{
"agent_id": "3087ada4-290a-45b0-8c1a-21db4cd284fc",
"created_on": "2014-09-03T10:58:59.754880",
"identifier": "2014-09-03_10-58-59_4_4ee02475335911e4a935c86000cbf5fb.csv"
}
]
:statuscode 200: no error
:statuscode 404: the specified task was not found
"""
task = Task.query.filter_by(id=task_id, job_id=job_id).first()
if not task:
return jsonify(task_id=task_id, job_id=job_id,
error="Specified task not found"), NOT_FOUND
association_objects = TaskTaskLogAssociation.query.filter(
TaskTaskLogAssociation.task == task,
TaskTaskLogAssociation.attempt == attempt)
out = []
for item in association_objects:
log = item.log
out.append({"identifier": log.identifier,
"created_on": log.created_on,
"agent_id": str(log.agent_id)})
return jsonify(out), OK
@validate_with_model(TaskLog, type_checks={"agent_id": isuuid})
[docs] def post(self, job_id, task_id, attempt):
"""
A ``POST`` to this endpoint will register a new logfile with the given
attempt at running the given task
A logfile has an identifier which must be unique in the system. If two
tasks get assigned a logfile with the same id, it is considered to be the
same log.
.. http:post:: /api/v1/jobs/<job_id>/tasks/<task_id>/attempts/<attempt>/logs/ HTTP/1.1
**Request**
.. sourcecode:: http
POST /api/v1/jobs/4/tasks/1300/attempts/5/logs/ HTTP/1.1
Content-Type: application/json
{
"identifier": "2014-09-03_10-58-59_4_4ee02475335911e4a935c86000cbf5fb.csv",
"agent_id": "2dc2cb5a-35da-41d6-8864-329c0d7d5391"
}
**Response**
.. sourcecode:: http
HTTP/1.1 201 CREATED
Content-Type: application/json
{
"identifier": "2014-09-03_10-58-59_4_4ee02475335911e4a935c86000cbf5fb.csv",
"agent_id": "2dc2cb5a-35da-41d6-8864-329c0d7d5391",
"created_on": "2014-09-03T10:59:05.103005",
"id": 148
}
:statuscode 201: the association between this task attempt and logfile
has been created
:statuscode 400: there was something wrong with the request (such as
invalid columns being included)
:statuscode 404: the specified task does not exist
:statuscode 409: the specified log was already registered on the
specified task
"""
task = Task.query.filter_by(id=task_id, job_id=job_id).first()
if not task:
return jsonify(task_id=task_id, job_id=job_id,
error="Specified task not found"), NOT_FOUND
path = realpath(join(LOGFILES_DIR, g.json["identifier"]))
if not realpath(path).startswith(LOGFILES_DIR):
return jsonify(error="Identifier is not acceptable"), BAD_REQUEST
attempts = 0
registered = False
while not registered:
try:
task_log = TaskLog.query.filter_by(
identifier=g.json["identifier"]).first()
if task_log:
association = TaskTaskLogAssociation.query.filter_by(
task=task, log=task_log, attempt=attempt).first()
if association:
return (jsonify(
log=task_log, attempt=attempt, task_id=task_id,
error="This log is already registered for this "
"task."),
CONFLICT)
if not task_log:
task_log = TaskLog(**g.json)
association = TaskTaskLogAssociation()
association.task = task
association.log = task_log
association.attempt = attempt
db.session.add(association)
db.session.add(task_log)
db.session.commit()
registered = True
except IntegrityError:
if attempts < 3:
attempts += 1
db.session.rollback()
else:
raise
logger.info("Registered task log %s with attempt %s for task %s",
task_log.identifier, attempt, task.id)
return jsonify(task_log.to_dict(unpack_relationships=False)), CREATED
[docs]class SingleLogInTaskAttempt(MethodView):
[docs] def get(self, job_id, task_id, attempt, log_identifier):
"""
A ``GET`` to this endpoint will return metadata about the specified
logfile
.. http:get:: /api/v1/jobs/<job_id>/tasks/<task_id>/attempts/<attempt>/logs/<log_identifier> HTTP/1.1
**Request**
.. sourcecode:: http
GET /api/v1/jobs/4/tasks/1300/attempts/5/logs/2014-09-03_10-58-59_4_4ee02475335911e4a935c86000cbf5fb.csv HTTP/1.1
Accept: application/json
**Response**
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Type: application/json
{
"id": 147,
"identifier": "2014-09-03_10-58-59_4_4ee02475335911e4a935c86000cbf5fb.csv",
"created_on": "2014-09-03T10:58:59.754880",
"agent_id": "836ce137-6ad4-443f-abb9-94c4465ff87c"
}
:statuscode 200: no error
:statuscode 404: task or logfile not found
"""
task = Task.query.filter_by(id=task_id, job_id=job_id).first()
if not task:
return jsonify(task_id=task_id, job_id=job_id,
error="Specified task not found"), NOT_FOUND
log = TaskLog.query.filter_by(identifier=log_identifier).first()
if not log:
return jsonify(task_id=task_id, job_id=job_id,
error="Specified log not found"), NOT_FOUND
association = TaskTaskLogAssociation.query.filter_by(
task=task,
log=log,
attempt=attempt).first()
if not association:
return jsonify(task_id=task.id, log=log.identifier,
error="Specified log not found in task"), NOT_FOUND
return jsonify(log.to_dict(unpack_relationships=False))
[docs] def post(self, job_id, task_id, attempt, log_identifier):
"""
A ``POST`` to this endpoint will update metadata about the specified
logfile
.. http:post:: /api/v1/jobs/<job_id>/tasks/<task_id>/attempts/<attempt>/logs/<log_identifier> HTTP/1.1
**Request**
.. sourcecode:: http
POST /api/v1/jobs/4/tasks/1300/attempts/5/logs/2014-09-03_10-58-59_4_4ee02475335911e4a935c86000cbf5fb.csv HTTP/1.1
Accept: application/json
Content-Type: application/json
{
"state": "done"
}
**Response**
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Type: application/json
{
"id": 147,
"identifier": "2014-09-03_10-58-59_4_4ee02475335911e4a935c86000cbf5fb.csv",
"created_on": "2014-09-03T10:58:59.754880",
"agent_id": "836ce137-6ad4-443f-abb9-94c4465ff87c"
}
:statuscode 200: no error
:statuscode 404: task or logfile not found
"""
task = Task.query.filter_by(id=task_id, job_id=job_id).first()
if not task:
return jsonify(task_id=task_id, job_id=job_id,
error="Specified task not found"), NOT_FOUND
log = TaskLog.query.filter_by(identifier=log_identifier).first()
if not log:
return jsonify(task_id=task_id, job_id=job_id,
error="Specified log not found"), NOT_FOUND
association = TaskTaskLogAssociation.query.filter_by(
task=task,
log=log,
attempt=attempt).first()
if not association:
return jsonify(task_id=task.id, log=log.identifier,
error="Specified log not found in task"), NOT_FOUND
if "state" in g.json:
new_state = g.json.pop("state")
if new_state != "queued":
association.state = new_state
else:
association.state = None
db.session.add(association)
if "agent_id" in g.json:
association.agent_id = g.json.pop("agent_id")
db.session.add(association)
if "identifier" in g.json:
return (jsonify(error="The tasklog identifier cannot be changed"),
BAD_REQUEST)
if "created_on" in g.json:
return (jsonify(error="The created_on timestampt cannot be changed"),
BAD_REQUEST)
db.session.commit()
return jsonify(log.to_dict(unpack_relationships=False))
[docs]class TaskLogfileAPI(MethodView):
[docs] def get(self, job_id, task_id, attempt, log_identifier):
"""
A ``GET`` to this endpoint will return the actual logfile or a redirect
to it.
.. http:get:: /api/v1/jobs/<job_id>/tasks/<task_id>/attempts/<attempt>/logs/<log_identifier>/logfile HTTP/1.1
**Request**
.. sourcecode:: http
GET /api/v1/jobs/4/tasks/1300/attempts/5/logs/2014-09-03_10-58-59_4_4ee02475335911e4a935c86000cbf5fb.csv/logfile HTTP/1.1
Accept: text/csv
**Response**
.. sourcecode:: http
HTTP/1.1 200 OK
Content-Type: text/csv
<Content of the logfile>
:statuscode 200: no error
:statuscode 307: The logfile can be found in another location at this
point in time. Independent future requests for the same
logfile should continue using the original URL
:statuscode 400: the specified logfile identifier is not acceptable
:statuscode 404: task or logfile not found
"""
task = Task.query.filter_by(id=task_id, job_id=job_id).first()
if not task:
return jsonify(task_id=task_id, log=log_identifier,
error="Specified task not found"), NOT_FOUND
log = TaskLog.query.filter_by(identifier=log_identifier).first()
if not log:
return jsonify(task_id=task_id, log=log_identifier,
error="Specified log not found"), NOT_FOUND
association = TaskTaskLogAssociation.query.filter_by(
task=task,
log=log,
attempt=attempt).first()
if not association:
return jsonify(task_id=task.id, log=log.identifier,
error="Specified log not found in task"), NOT_FOUND
path = realpath(join(LOGFILES_DIR, log_identifier))
if not realpath(path).startswith(LOGFILES_DIR):
return jsonify(error="Identifier is not acceptable"), BAD_REQUEST
try:
logfile = open(path, "rb")
return send_file(logfile)
except IOError:
try:
compressed_logfile = GzipFile("%s.gz" % path, "rb")
def logfile_generator():
eof = False
while not eof:
out = compressed_logfile.read(4096) # 4096 == mempage
eof = len(out) == 0
yield out
return Response(logfile_generator(), mimetype="text/csv")
except IOError:
agent = log.agent
if not agent:
return (jsonify(
path=path, log=log_identifier,
error="Logfile is not available on master and agent "
"is not known"), NOT_FOUND)
return redirect(agent.api_url() + "/task_logs/" +
log_identifier, TEMPORARY_REDIRECT)
[docs] def put(self, job_id, task_id, attempt, log_identifier):
"""
A ``PUT`` to this endpoint will upload the request's body as the
specified logfile
.. http:put:: /api/v1/jobs/<job_id>/tasks/<task_id>/attempts/<attempt>/logs/<log_identifier>/logfile HTTP/1.1
**Request**
.. sourcecode:: http
PUT /api/v1/jobs/4/tasks/1300/attempts/5/logs/2014-09-03_10-58-59_4_4ee02475335911e4a935c86000cbf5fb.csv/logfile HTTP/1.1
<content of the logfile>
**Response**
.. sourcecode:: http
HTTP/1.1 201 CREATED
:statuscode 201: lofile was uploaded
:statuscode 400: the specified logfile identifier is not acceptable
:statuscode 404: task or logfile not found
"""
task = Task.query.filter_by(id=task_id, job_id=job_id).first()
if not task:
return jsonify(task_id=task_id, log=log_identifier,
error="Specified task not found"), NOT_FOUND
log = TaskLog.query.filter_by(identifier=log_identifier).first()
if not log:
return jsonify(task_id=task_id, log=log_identifier,
error="Specified log not found"), NOT_FOUND
association = TaskTaskLogAssociation.query.filter_by(
task=task,
log=log,
attempt=attempt).first()
if not association:
return jsonify(task_id=task_id, log=log.identifier,
error="Specified log not found in task"), NOT_FOUND
path = realpath(join(LOGFILES_DIR, log_identifier))
if not realpath(path).startswith(LOGFILES_DIR):
return jsonify(error="Identifier is not acceptable"), BAD_REQUEST
logger.info("Writing task log file for task %s, attempt %s to path %s",
task_id, attempt, path)
try:
with open(path, "wb+") as log_file:
log_file.write(request.data)
except (IOError, OSError) as e:
logger.error("Could not write task log file: %s (%s)", e.errno,
e.strerror)
return (jsonify(error="Could not write file %s to disk: %s"
% (path, e)),
INTERNAL_SERVER_ERROR)
return "", CREATED