Source code for pyfarm.master.user_interface.statistics.task_events

# No shebang line, this module is meant to be imported
#
# Copyright 2015 Ambient Entertainment GmbH & Co. KG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
from calendar import timegm
from datetime import timedelta, datetime

from flask import render_template, request

from sqlalchemy import or_

from pyfarm.core.logger import getLogger
from pyfarm.models.jobqueue import JobQueue
from pyfarm.models.statistics.task_event_count import TaskEventCount
from pyfarm.models.statistics.task_count import TaskCount
from pyfarm.master.config import config
from pyfarm.master.application import db

logger = getLogger("ui.task_events")


[docs]class TotalsAverage(object): def __init__(self, first_sample, last_sum=None): self.time_start = first_sample.counted_time self.num_samples_by_queue = {first_sample.job_queue_id: 1} self.avg_queued_by_queue = ( last_sum.avg_queued_by_queue if last_sum else {}) self.avg_queued_by_queue[first_sample.job_queue_id] =\ first_sample.total_queued self.avg_running_by_queue = ( last_sum.avg_running_by_queue if last_sum else {}) self.avg_running_by_queue[first_sample.job_queue_id] =\ first_sample.total_running self.avg_done_by_queue = ( last_sum.avg_done_by_queue if last_sum else {}) self.avg_done_by_queue[first_sample.job_queue_id] =\ first_sample.total_done self.avg_failed_by_queue = ( last_sum.avg_failed_by_queue if last_sum else {}) self.avg_failed_by_queue[first_sample.job_queue_id] =\ first_sample.total_failed
[docs] def add_sample(self, sample): self.num_samples_by_queue[sample.job_queue_id] =\ self.num_samples_by_queue.get(sample.job_queue_id, 0) + 1 num_samples = self.num_samples_by_queue[sample.job_queue_id] self.avg_queued_by_queue[sample.job_queue_id] =\ (float(sample.total_queued) / num_samples + ((float(num_samples) - 1) / num_samples) * self.avg_queued_by_queue.get(sample.job_queue_id, 0.0)) self.avg_running_by_queue[sample.job_queue_id] =\ (float(sample.total_running) / num_samples + ((float(num_samples) - 1) / num_samples) * self.avg_running_by_queue.get(sample.job_queue_id, 0.0)) self.avg_done_by_queue[sample.job_queue_id] =\ (float(sample.total_done) / num_samples + ((float(num_samples) - 1) / num_samples) * self.avg_done_by_queue.get(sample.job_queue_id, 0.0)) self.avg_failed_by_queue[sample.job_queue_id] =\ (float(sample.total_failed) / num_samples + ((float(num_samples) - 1) / num_samples) * self.avg_failed_by_queue.get(sample.job_queue_id, 0.0))
[docs] def avg_queued(self): return sum(self.avg_queued_by_queue.values())
[docs] def avg_running(self): return sum(self.avg_running_by_queue.values())
[docs] def avg_done(self): return sum(self.avg_done_by_queue.values())
[docs] def avg_failed(self): return sum(self.avg_failed_by_queue.values())
[docs]def task_events(): consolidate_interval = timedelta(**config.get( "task_event_count_consolidate_interval")) minutes_resolution = int(consolidate_interval.total_seconds() / 60) if "minutes_resolution" in request.args: minutes_resolution = int(request.args.get("minutes_resolution")) consolidate_interval = timedelta(minutes=minutes_resolution) days_back = int(request.args.get("days_back", 7)) time_back = timedelta(days=days_back) task_event_count_query = TaskEventCount.query.order_by( TaskEventCount.time_start).filter( TaskEventCount.time_start > datetime.utcnow() - time_back) task_count_query = TaskCount.query.order_by( TaskCount.counted_time).filter( TaskCount.counted_time > datetime.utcnow() - time_back) jobqueue_ids = [] no_queue = ("no_queue" in request.args and request.args["no_queue"].lower() == "true") if "queue" in request.args or no_queue: jobqueue_ids = request.args.getlist("queue") jobqueue_ids = [int(x) for x in jobqueue_ids] if no_queue: task_event_count_query = task_event_count_query.filter(or_( TaskEventCount.job_queue_id.in_(jobqueue_ids), TaskEventCount.job_queue_id == None)) task_count_query = task_count_query.filter(or_( TaskCount.job_queue_id.in_(jobqueue_ids), TaskCount.job_queue_id == None)) else: task_event_count_query = task_event_count_query.filter( TaskEventCount.job_queue_id.in_(jobqueue_ids)) task_count_query = task_count_query.filter( TaskCount.job_queue_id.in_(jobqueue_ids)) tasks_new = [] tasks_deleted = [] tasks_restarted = [] tasks_failed = [] tasks_done = [] current_period_start = None for sample in task_event_count_query: if not current_period_start: current_period_start = sample.time_start timestamp = timegm(current_period_start.utctimetuple()) tasks_new.append([timestamp, sample.num_new]) tasks_deleted.append([timestamp, -sample.num_deleted]) tasks_restarted.append([timestamp, sample.num_restarted]) tasks_failed.append([timestamp, -sample.num_failed]) tasks_done.append([timestamp, -sample.num_done]) elif (sample.time_start < (current_period_start + consolidate_interval)): tasks_new[-1][-1] += sample.num_new tasks_deleted[-1][-1] -= sample.num_deleted tasks_restarted[-1][-1] += sample.num_restarted tasks_failed[-1][-1] -= sample.num_failed tasks_done[-1][-1] -= sample.num_done else: while (sample.time_start >= (current_period_start + consolidate_interval)): current_period_start += consolidate_interval timestamp = timegm(current_period_start.utctimetuple()) tasks_new.append([timestamp, 0]) tasks_deleted.append([timestamp, 0]) tasks_restarted.append([timestamp, 0]) tasks_failed.append([timestamp, 0]) tasks_done.append([timestamp, 0]) tasks_new[-1][-1] += sample.num_new tasks_deleted[-1][-1] -= sample.num_deleted tasks_restarted[-1][-1] += sample.num_restarted tasks_failed[-1][-1] -= sample.num_failed tasks_done[-1][-1] -= sample.num_done total_queued = [] total_running = [] total_done = [] total_failed = [] current_average = None for sample in task_count_query: if not current_average: current_average = TotalsAverage(sample) elif (sample.counted_time < (current_average.time_start + consolidate_interval)): current_average.add_sample(sample) else: timestamp = timegm(current_average.time_start.utctimetuple()) total_queued.append([timestamp, current_average.avg_queued()]) total_running.append([timestamp, current_average.avg_running()]) total_done.append([timestamp, current_average.avg_done()]) total_failed.append([timestamp, current_average.avg_failed()]) current_average = TotalsAverage(sample, current_average) if current_average: timestamp = timegm(current_average.time_start.utctimetuple()) total_queued.append([timestamp, current_average.avg_queued()]) total_running.append([timestamp, current_average.avg_running()]) total_done.append([timestamp, current_average.avg_done()]) total_failed.append([timestamp, current_average.avg_failed()]) jobqueues = JobQueue.query.order_by(JobQueue.fullpath).all() return render_template( "pyfarm/statistics/task_events.html", tasks_new_json=json.dumps(tasks_new), tasks_deleted_json=json.dumps(tasks_deleted), tasks_restarted_json=json.dumps(tasks_restarted), tasks_failed_json=json.dumps(tasks_failed), tasks_done_json=json.dumps(tasks_done), total_queued_json=json.dumps(total_queued), total_running_json=json.dumps(total_running), total_done_json=json.dumps(total_done), total_failed_json=json.dumps(total_failed), no_queue=no_queue, jobqueue_ids=jobqueue_ids, jobqueues=jobqueues, minutes_resolution=minutes_resolution, days_back=days_back)