Source code for pyfarm.scheduler.tasks

# No shebang line, this module is meant to be imported
#
# Copyright 2014 Ambient Entertainment GmbH & Co. KG
# Copyright 2014 Oliver Palmer
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Tasks
-----

This module contains various asynchronous tasks to be run by celery.
"""

from datetime import timedelta, datetime
from logging import DEBUG
from json import dumps
from smtplib import SMTP
from email.mime.text import MIMEText
from time import time, sleep
from os.path import isfile, join
from os import remove, listdir
from errno import ENOENT
from gzip import GzipFile
from uuid import UUID

from sqlalchemy import or_, desc
from sqlalchemy.exc import InvalidRequestError

import requests
from requests.exceptions import ConnectionError, Timeout
# Workaround for https://github.com/kennethreitz/requests/issues/2204
from requests.packages.urllib3.exceptions import ProtocolError

from jinja2 import Template

from lockfile import LockFile, AlreadyLocked

from pyfarm.core.logger import getLogger
from pyfarm.core.enums import (
    AgentState, _AgentState, WorkState, _WorkState, UseAgentAddress)
from pyfarm.models.statistics.task_event_count import TaskEventCount
from pyfarm.models.software import (
    Software, SoftwareVersion, JobSoftwareRequirement,
    JobTypeSoftwareRequirement)
from pyfarm.models.tag import Tag
from pyfarm.models.task import Task
from pyfarm.models.tasklog import TaskLog, TaskTaskLogAssociation
from pyfarm.models.job import Job, JobNotifiedUser
from pyfarm.models.jobqueue import JobQueue
from pyfarm.models.jobtype import JobType, JobTypeVersion
from pyfarm.models.gpu import GPU
from pyfarm.models.disk import AgentDisk
from pyfarm.models.agent import Agent, AgentTagAssociation
from pyfarm.models.user import User, Role
from pyfarm.models.jobgroup import JobGroup
from pyfarm.master.application import db
from pyfarm.master.utility import default_json_encoder
from pyfarm.master.config import config

from pyfarm.scheduler.celery_app import celery_app


try:
    range_ = xrange  # pylint: disable=undefined-variable
except NameError:  # pragma: no cover
    range_ = range

logger = getLogger("pf.scheduler.tasks")
# TODO Get logger configuration from pyfarm config
logger.setLevel(DEBUG)

USERAGENT = config.get("master_user_agent")
POLL_BUSY_AGENTS_INTERVAL = timedelta(**config.get("poll_busy_agents_interval"))
POLL_IDLE_AGENTS_INTERVAL = timedelta(**config.get("poll_idle_agents_interval"))
POLL_OFFLINE_AGENTS_INTERVAL = \
    timedelta(**config.get("poll_offline_agents_interval"))
SCHEDULER_LOCKFILE_BASE = config.get("scheduler_lockfile_base")
LOGFILES_DIR = config.get("tasklogs_dir")
TRANSACTION_RETRIES = config.get("transaction_retries")
AGENT_REQUEST_TIMEOUT = config.get("agent_request_timeout")
BASE_URL = config.get("base_url")

# Email settings
SMTP_SERVER = config.get("smtp_server")
SMTP_PORT = config.get("smtp_port")
SMTP_USER, SMTP_PASSWORD = config.get("smtp_login")
FROM_ADDRESS = config.get("from_email")
DEFAULT_SUCCESS_SUBJECT = Template(config.get("success_subject"))
DEFAULT_SUCCESS_BODY = Template(config.get("success_body"))
DEFAULT_FAIL_SUBJECT = Template(config.get("failed_subject"))
DEFAULT_FAIL_BODY = Template(config.get("failed_body"))
DEFAULT_DELETE_SUBJECT = Template(config.get("deleted_subject"))
DEFAULT_DELETE_BODY = Template(config.get("deleted_body"))
OUR_FARM_NAME = config.get("farm_name")

[docs]def send_email(to, message): """ Configures and instance of :class:`SMTP` and sends a message to the given address. """ smtp = SMTP(SMTP_SERVER, port=SMTP_PORT) # Password could be blank in some cases if SMTP_USER is not None: smtp.login(SMTP_USER, SMTP_PASSWORD) try: smtp.sendmail(FROM_ADDRESS, to, message) finally: smtp.quit()
@celery_app.task(ignore_result=True, bind=True) def send_tasks_to_agent(self, agent_id): db.session.rollback() agent = Agent.query.filter(Agent.id == agent_id).first() if not agent: raise KeyError("agent not found") logger.debug("Sending assigned batches to agent %s (id %s)", agent.hostname, agent_id) if agent.state in ["offline", "disabled"]: raise ValueError("agent not available") if agent.use_address == UseAgentAddress.PASSIVE: logger.debug( "Agent's use address mode is PASSIVE, not sending anything") return tasks_query = Task.query.filter( Task.agent == agent, or_( Task.state == None, ~Task.state.in_( [WorkState.DONE, WorkState.FAILED]))).order_by("frame asc") tasks_in_jobs = {} for task in tasks_query: job_tasks = tasks_in_jobs.setdefault(task.job_id, []) job_tasks.append(task) if not tasks_in_jobs: logger.debug("No tasks for agent %s (id %s)", agent.hostname, agent.id) return for job_id, tasks in tasks_in_jobs.items(): job = Job.query.filter_by(id=job_id).first() message = {"job": {"id": job.id, "title": job.title, "data": job.data if job.data else {}, "environ": job.environ if job.environ else {}, "by": job.by, "batch": job.batch, "ram": job.ram, "ram_warning": job.ram_warning, "ram_max": job.ram_max, "cpus": job.cpus, "notified_users": [], "priority": job.priority, "notes": job.notes, "tags": [], "num_tiles": job.num_tiles }, "jobtype": {"name": job.jobtype_version.jobtype.name, "version": job.jobtype_version.version}, "tasks": []} if job.user: message["job"]["user"] = job.user.username for notified_user in job.notified_users: message["job"]["notified_users"].append( {"username": notified_user.user.username, "on_success": notified_user.on_success, "on_failure": notified_user.on_failure, "on_deletion": notified_user.on_deletion}) for tag in job.tags: message["job"]["tags"].append(tag.tag) for task in tasks: message["tasks"].append({"id": task.id, "frame": task.frame, "attempt": task.attempts, "tile": task.tile}) logger.info("Sending a batch of %s tasks for job %s (%s) to agent %s", len(tasks), job.title, job.id, agent.hostname) try: response = requests.post(agent.api_url() + "/assign", data=dumps(message, default=default_json_encoder), headers={ "Content-Type": "application/json", "User-Agent": USERAGENT}, timeout=AGENT_REQUEST_TIMEOUT) logger.debug("Return code after sending batch to agent: %s", response.status_code) if response.status_code == requests.codes.service_unavailable: if self.request.retries < self.max_retries: logger.warning( "Agent %s, (id %s), answered SERVICE_UNAVAILABLE, " "retrying the request later", agent.hostname, agent.id) self.retry(exc=ValueError("Got return code " "SERVICE_UNAVAILABLE")) else: logger.error( "Agent %s, (id %s), answered SERVICE_UNAVAILABLE, " "marking it as offline", agent.hostname, agent.id) agent.state = AgentState.OFFLINE db.session.add(agent) for task in tasks: task.agent = None task.attempts -= 1 db.session.add(task) db.session.commit() elif response.status_code == requests.codes.bad_request: logger.error("Agent %s, (id %s), answered BAD_REQUEST, " "removing assignment", agent.hostname, agent.id) for task in tasks: task.agent = None task.attempts -= 1 db.session.add(task) db.session.commit() elif response.status_code == requests.codes.conflict: logger.error("Agent %s, (id %s), answered CONFLICT, removing " "conflicting assignments", agent.hostname, agent.id) response_data = response.json() if "rejected_task_ids" in response_data: for task_id in response_data["rejected_task_ids"]: task = Task.query.filter_by(id=task_id).first() if task: logger.error("Removing assignment for task %s " "(Frame %s from job %s) from agent %s " "(id %s)", task_id, task.frame, task.job.title, agent.hostname, agent.id) task.agent = None task.attempts -= 1 db.session.add(task) db.session.flush() job.update_state() db.session.commit() else: logger.error("CONFLICT response from agent %s (id %s) did " "not contain a list of rejected task ids. " "Please update the agent to 0.8.4 or higher.", agent.hostname, agent.id) elif response.status_code not in [requests.codes.accepted, requests.codes.ok, requests.codes.created]: raise ValueError("Unexpected return code on sending batch to " "agent: %s", response.status_code) else: for task in tasks: task.sent_to_agent = True task.last_error = None db.session.add(task) db.session.commit() except (ConnectionError, Timeout) as e: if self.request.retries < self.max_retries: logger.warning("Caught %s trying to contact agent " "%s (id %s), retry %s of %s: %s", type(e).__name__, agent.hostname, agent.id, self.request.retries, self.max_retries, e) self.retry(exc=e) else: logger.error("Could not contact agent %s, (id %s), marking as " "offline", agent.hostname, agent.id) agent.state = AgentState.OFFLINE db.session.add(agent) db.session.commit() raise @celery_app.task(ignore_result=True, bind=True) def restart_agent(self, agent_id): db.session.rollback() agent = Agent.query.filter(Agent.id == agent_id).first() if not agent: raise KeyError("agent not found") if agent.state in ["offline", "disabled"]: raise ValueError("agent not available") if agent.use_address == UseAgentAddress.PASSIVE: logger.debug("Agent's use address mode is PASSIVE, cannot restart") return if not agent.restart_requested: logger.error("Agent %s (id %s) is not marked for restart, not " "restarting it", agent.hostname, agent.id) raise ValueError("agent not marked for restart") logger.info("Restarting agent %s (id %s)", agent.hostname, agent.id) try: response = requests.post(agent.api_url() + "/restart", data=dumps({}), headers={ "User-Agent": USERAGENT}, timeout=AGENT_REQUEST_TIMEOUT) logger.debug("Return code after sending restart to agent: %s", response.status_code) if response.status_code not in [requests.codes.accepted, requests.codes.ok]: raise ValueError("Unexpected return code on sending restart to " "agent %s: %s", agent.hostname, response.status_code) else: agent.restart_requested = False db.session.add(agent) db.session.commit() except (ConnectionError, Timeout) as e: if self.request.retries < self.max_retries: logger.warning("Caught %s trying to restart agent %s (id %s), " "retry %s of %s: %s", type(e).__name__, agent.hostname, agent.id, self.request.retries, self.max_retries, e) self.retry(exc=e) else: logger.error("Could not contact agent %s, (id %s), marking as " "offline", agent.hostname, agent.id) agent.state = AgentState.OFFLINE db.session.add(agent) db.session.commit() raise @celery_app.task(ignore_result=True) def assign_tasks(): db.session.rollback() idle_agents = Agent.query.filter(or_(Agent.state == AgentState.ONLINE, Agent.state == AgentState.RUNNING), ~Agent.tasks.any( or_( Task.state == None, ~Task.state.in_( [WorkState.DONE, WorkState.FAILED])))) for agent in idle_agents: assign_tasks_to_agent.delay(agent.id) @celery_app.task(ignore_result=True) def assign_tasks_to_agent(agent_id): agent_lockfile_name = SCHEDULER_LOCKFILE_BASE + "-" + str(agent_id) agent_lock = LockFile(agent_lockfile_name) try: agent_lock.acquire(timeout=-1) with agent_lock: with open(agent_lockfile_name, "w") as lockfile: lockfile.write(str(time())) db.session.rollback() agent = Agent.query.filter_by(id=agent_id).first() if not agent: raise ValueError("No agent with id %s" % agent_id) if agent.state == _AgentState.OFFLINE: raise ValueError("Agent %s (id %s) is offline" % (agent.hostname, agent_id)) if agent.state == _AgentState.DISABLED: raise ValueError("Agent %s (id %s) is disabled" % (agent.hostname, agent_id)) task_count = Task.query.filter(Task.agent == agent, or_(Task.state == None, Task.state == WorkState.RUNNING)).\ order_by(Task.job_id, Task.frame).\ count() if task_count > 0: logger.debug("Agent %s already has %s tasks assigned, not " "assigning any more", agent.hostname, task_count) return queue = JobQueue() unwanted_job_ids = [] assigned_job = False while not assigned_job: job = queue.get_job_for_agent(agent, unwanted_job_ids) db.session.commit() if not job: logger.debug("Did not find a job for agent %s", agent.hostname) return job_lockfile_name = SCHEDULER_LOCKFILE_BASE + "-job-" +\ str(job.id) job_lock = LockFile(job_lockfile_name) try: job_lock.acquire(timeout=-1) with job_lock: with open(job_lockfile_name, "w") as lockfile: lockfile.write(str(time())) batch = job.get_batch(agent) if batch: for task in batch: task.agent = agent task.sent_to_agent = False logger.info("Assigned agent %s (id %s) to task " "%s (frame %s) from job %s (id %s)", agent.hostname, agent.id, task.id, task.frame, job.title, job.id) db.session.add(task) if job.state != _WorkState.RUNNING: job.state = WorkState.RUNNING db.session.add(job) job.clear_assigned_counts() db.session.commit() assigned_job = True send_tasks_to_agent.delay(agent.id) else: unwanted_job_ids.append(job.id) except AlreadyLocked: logger.debug("The lockfile for job %s is locked", job.id) try: with open(job_lockfile_name, "r") as lockfile: locktime = float(lockfile.read()) if locktime < time() - 60: logger.error("The old lock on job %s was held " "for more than 60 seconds. " "Breaking the lock.", job.id) job_lock.break_lock() assign_tasks_to_agent.apply_async(args=[agent_id], countdown=1) except (IOError, OSError, ValueError) as e: logger.warning("Could not read a time value from the " "lockfile for job %s. Waiting 1 second " "before trying again. Error: %s", job.id, e) sleep(1) try: with open(job_lockfile_name, "r") as lockfile: locktime = float(lockfile.read()) if locktime < time() - 60: logger.error("The old lock on job %s was held " "for more than 60 seconds. " "Breaking the lock.", job.id) agent_lock.break_lock() assign_tasks_to_agent.apply_async(args=[agent_id], countdown=1) return except(IOError, OSError, ValueError): logger.error("Could not read a time value from the " "lockfile even after waiting 1s. Breaking " "the lock.") agent_lock.break_lock() assign_tasks_to_agent.delay(agent_id) return except AlreadyLocked: logger.debug("The scheduler lockfile is locked, the scheduler seems to " "already be running for agent %s", agent_id) try: with open(agent_lockfile_name, "r") as lockfile: locktime = float(lockfile.read()) if locktime < time() - 60: logger.error("The old lock was held for more than 60 " "seconds. Breaking the lock.") agent_lock.break_lock() except (IOError, OSError, ValueError) as e: # It is possible that we tried to read the file in the narrow window # between lock acquisition and actually writing the time logger.warning("Could not read a time value from the scheduler " "lockfile. Waiting 1 second before trying again. " "Error: %s", e) sleep(1) try: with open(agent_lockfile_name, "r") as lockfile: locktime = float(lockfile.read()) if locktime < time() - 60: logger.error("The old lock was held for more than 60 " "seconds. Breaking the lock.") agent_lock.break_lock() except(IOError, OSError, ValueError): # If we still cannot read a time value from the file after 1s, # there was something wrong with the process holding the lock logger.error("Could not read a time value from the scheduler " "lockfile even after waiting 1s. Breaking the lock") agent_lock.break_lock() @celery_app.task(ignore_results=True, bind=True) def poll_agent(self, agent_id): db.session.rollback() agent = Agent.query.filter(Agent.id == agent_id).first() running_tasks_count = Task.query.filter( Task.agent == agent, or_(Task.state == None, Task.state == WorkState.RUNNING)).count() if (running_tasks_count > 0 and agent.last_heard_from is not None and agent.last_heard_from + POLL_BUSY_AGENTS_INTERVAL > datetime.utcnow() and not agent.state == _AgentState.OFFLINE): return elif (running_tasks_count == 0 and agent.last_heard_from is not None and agent.last_heard_from + POLL_IDLE_AGENTS_INTERVAL > datetime.utcnow() and not agent.state == _AgentState.OFFLINE): return try: logger.info("Polling agent %s", agent.hostname) status_response = requests.get( agent.api_url() + "/status", headers={"User-Agent": USERAGENT}, timeout=AGENT_REQUEST_TIMEOUT) if status_response.status_code != requests.codes.ok: raise ValueError( "Unexpected return code on checking status of agent " "%s (id %s): %s" % ( agent.hostname, agent.id, status_response.status_code)) status_json = status_response.json() if UUID(status_json["agent_id"]) != agent_id: logger.error("Wrong agent reached under %s. Expected id %s, got %s", agent.api_url(), agent_id, status_json["agent_id"]) raise ValueError("Wrong agent_id on polling. Expected: %s. Got %s" % (agent_id, status_json["agent_id"])) if ("farm_name" in status_json and status_json["farm_name"] != OUR_FARM_NAME): agent.last_polled = datetime.utcnow() db.session.add(agent) db.session.commit() raise ValueError( "Wrong farm_name from agent %s (id %s): %s. (Expected: %s) " % (agent.hostname, agent.id, status_json["farm_name"], OUR_FARM_NAME)) agent.state = status_json["state"] agent.free_ram = status_json["free_ram"] tasks_response = requests.get( agent.api_url() + "/tasks/", headers={"User-Agent": USERAGENT}, timeout=AGENT_REQUEST_TIMEOUT) if tasks_response.status_code != requests.codes.ok: raise ValueError( "Unexpected return code on checking tasks in agent " "%s (id %s): %s" % ( agent.hostname, agent.id, tasks_response.status_code)) tasks_json = tasks_response.json() # Catching ProtocolError here is a work around for # https://github.com/kennethreitz/requests/issues/2204 except (ConnectionError, Timeout, ProtocolError) as e: if self.request.retries < self.max_retries: logger.warning("Caught %s trying to contact agent " "%s (id %s), retry %s of %s: %s", type(e).__name__, agent.hostname, agent.id, self.request.retries, self.max_retries, e) agent.last_polled = datetime.utcnow() db.session.add(agent) db.session.commit() self.retry(exc=e) else: logger.error("Could not contact agent %s, (id %s), marking as " "offline", agent.hostname, agent.id) agent.state = AgentState.OFFLINE agent.last_polled = datetime.utcnow() tasks_query = Task.query.filter( Task.agent == agent, Task.state == WorkState.RUNNING) jobs_to_check = set() for task in tasks_query: task.state = None db.session.add(task) jobs_to_check.add(task.job) db.session.add(agent) db.session.commit() for job in jobs_to_check: job.update_state() db.session.add(job) db.session.commit() else: present_task_ids = [x["id"] for x in tasks_json] assigned_task_ids = db.session.query(Task.id).filter( Task.agent == agent, or_(Task.state == None, Task.state == WorkState.RUNNING)).all() assigned_task_ids = [x[0] for x in assigned_task_ids] if set(assigned_task_ids) - set(present_task_ids): logger.debug("Agent %s does not have all the tasks it is supposed " "to have. Registering task pusher", agent.hostname) send_tasks_to_agent.delay(agent_id) superfluous_tasks = set(present_task_ids) - set(assigned_task_ids) if superfluous_tasks: for task_id in superfluous_tasks: task = Task.query.filter_by(id=task_id).first() if task: if task.agent_id != agent_id: logger.warning("Task %s belongs to agent %s (id %s), " "but has been found running on %s " "(id %s), stopping it.", task_id, task.agent.hostname, task.agent_id, agent.hostname, agent_id) stop_task.delay(task_id, agent_id, dissociate_agent=False) else: logger.warning("Superfluous task %s not found in db", task_id) agent.last_heard_from = datetime.utcnow() db.session.add(agent) db.session.commit() @celery_app.task(ignore_results=True) def poll_agents(): db.session.rollback() idle_agents_to_poll_query = Agent.query.filter( Agent.state != AgentState.OFFLINE, Agent.state != AgentState.DISABLED, or_(Agent.last_heard_from == None, Agent.last_heard_from + POLL_IDLE_AGENTS_INTERVAL < datetime.utcnow()), ~Agent.tasks.any(or_(Task.state == None, Task.state == WorkState.RUNNING)), Agent.use_address != UseAgentAddress.PASSIVE) for agent in idle_agents_to_poll_query: logger.debug("Polling idle agent %s", agent.hostname) poll_agent.delay(agent.id) busy_agents_to_poll_query = Agent.query.filter( Agent.state != AgentState.OFFLINE, or_(Agent.last_heard_from == None, Agent.last_heard_from + POLL_BUSY_AGENTS_INTERVAL < datetime.utcnow()), Agent.tasks.any(or_(Task.state == None, Task.state == WorkState.RUNNING)), Agent.use_address != UseAgentAddress.PASSIVE) for agent in busy_agents_to_poll_query: logger.debug("Polling busy agent %s", agent.hostname) poll_agent.delay(agent.id) offline_agents_to_poll_query = Agent.query.filter( Agent.state == AgentState.OFFLINE, or_(Agent.last_polled == None, Agent.last_polled + POLL_OFFLINE_AGENTS_INTERVAL < datetime.utcnow()), Agent.use_address != UseAgentAddress.PASSIVE) for agent in offline_agents_to_poll_query: logger.debug("Polling offline agent %s", agent.hostname) poll_agent.delay(agent.id) @celery_app.task(ignore_results=True) def send_job_completion_mail(job_id, successful=True): if not SMTP_SERVER: return job_lockfile_name = SCHEDULER_LOCKFILE_BASE + "-job-" + str(job_id) job_lock = LockFile(job_lockfile_name) try: job_lock.acquire(timeout=-1) with job_lock: with open(job_lockfile_name, "w") as lockfile: lockfile.write(str(time())) db.session.rollback() job = Job.query.filter_by(id=job_id).one() if job.completion_notify_sent: return job.url = BASE_URL if job.url[-1] != "/": job.url += "/" job.url+= "jobs/%s" % job.id failed_tasks = Task.query.filter(Task.job == job, Task.state == WorkState.FAILED).\ order_by(desc(Task.frame)) failed_log_urls = [] for task in failed_tasks: last_log_assoc = TaskTaskLogAssociation.query.filter_by( task=task).order_by(desc( TaskTaskLogAssociation.attempt)).limit(1).first() if last_log_assoc: log = last_log_assoc.log log_url = BASE_URL if not log_url.endswith("/"): log_url += "/" log_url += ("api/v1/jobs/%s/tasks/%s/attempts/%s/" "logs/%s/logfile" % (job.id, task.id, last_log_assoc.attempt, log.identifier)) failed_log_urls.append(log_url) notified_users_query = JobNotifiedUser.query.filter_by(job=job) if successful: notified_users_query = notified_users_query.filter_by( on_success=True) else: notified_users_query = notified_users_query.filter_by( on_failure=True) notified_users = notified_users_query.all() if not notified_users: return body_template = None subject_template = None if successful: if job.jobtype_version.jobtype.success_body: body_template = Template( job.jobtype_version.jobtype.success_body) else: body_template = DEFAULT_SUCCESS_BODY if job.jobtype_version.jobtype.success_subject: subject_template = Template( job.jobtype_version.jobtype.success_subject) else: subject_template = DEFAULT_SUCCESS_SUBJECT else: if job.jobtype_version.jobtype.fail_body: body_template = Template( job.jobtype_version.jobtype.fail_body) else: body_template = DEFAULT_FAIL_BODY if job.jobtype_version.jobtype.fail_subject: subject_template = Template( job.jobtype_version.jobtype.fail_subject) else: subject_template = DEFAULT_FAIL_SUBJECT message = MIMEText( body_template.render(job=job, failed_log_urls=failed_log_urls)) message["Subject"] = subject_template.render(job=job) message["From"] = FROM_ADDRESS to = [x.user.email for x in notified_users if x.user.email] message["To"] = ",".join(to) if to: send_email(to, message.as_string()) logger.info("Job completion mail for job %s (id %s) sent to %s", job.title, job.id, to) job.completion_notify_sent = True db.session.add(job) db.session.commit() except AlreadyLocked: logger.debug("The job lockfile is locked, something is already working " "on job %s", job_id) try: with open(job_lockfile_name, "r") as lockfile: locktime = float(lockfile.read()) if locktime < time() - 60: logger.error("The old lock was held for more than 60 " "seconds. Breaking the lock.") job_lock.break_lock() except (IOError, OSError, ValueError) as e: # It is possible that we tried to read the file in the narrow window # between lock acquisition and actually writing the time logger.warning("Could not read a time value from the scheduler " "lockfile. Waiting 1 second before trying again. " "Error: %s", e) sleep(1) try: with open(job_lockfile_name, "r") as lockfile: locktime = float(lockfile.read()) if locktime < time() - 60: logger.error("The old lock was held for more than 60 " "seconds. Breaking the lock.") job_lock.break_lock() except(IOError, OSError, ValueError): # If we still cannot read a time value from the file after 1s, # there was something wrong with the process holding the lock logger.error("Could not read a time value from the scheduler " "lockfile even after waiting 1s. Breaking the lock") job_lock.break_lock() @celery_app.task(ignore_results=True) def send_job_deletion_mail(job_id, jobtype_name, job_title, to): logger.debug("In send_job_deletion_mail(), job_id: %s, jobtype_name: %s, " "job_title: %s, to: %s", job_id, jobtype_name, job_title, to) message_text = DEFAULT_DELETE_BODY.render( job_title=job_title, jobtype_name=jobtype_name, job_id=job_id ) message = MIMEText(message_text) message["Subject"] = DEFAULT_DELETE_SUBJECT.render(job_title=job_title) message["From"] = FROM_ADDRESS message["To"] = ",".join(to) if to: send_email(to, message.as_string()) logger.info("Job deletion mail for job %s (id %s) sent to %s", job_title, job_id, to) @celery_app.task(ignore_results=True, bind=True) def update_agent(self, agent_id): db.session.rollback() agent = Agent.query.filter_by(id=agent_id).one() if agent.version == agent.upgrade_to: return True try: response = requests.post(agent.api_url() + "/update", dumps({"version": agent.upgrade_to}), headers={"User-Agent": USERAGENT}, timeout=AGENT_REQUEST_TIMEOUT) logger.debug("Return code after sending update request for %s " "to agent: %s", agent.upgrade_to, response.status_code) if response.status_code not in [requests.codes.accepted, requests.codes.ok]: raise ValueError("Unexpected return code on sending update request " "for %s to agent %s: %s", agent.upgrade_to, agent.hostname, response.status_code) except (ConnectionError, Timeout) as e: if self.request.retries < self.max_retries: logger.warning("Caught %s trying to contact agent " "%s (id %s), retry %s of %s: %s", type(e).__name__, agent.hostname, agent.id, self.request.retries, self.max_retries, e) self.retry(exc=e) else: logger.error("Could not contact agent %s, (id %s), marking as " "offline", agent.hostname, agent.id) agent.state = AgentState.OFFLINE db.session.add(agent) db.session.commit() raise @celery_app.task(ignore_results=True, bind=True) def delete_task(self, task_id): db.session.rollback() job_id = None task = None agent = None job = None job_group = None retries = TRANSACTION_RETRIES deleted = False while not deleted and retries > 0: try: task = Task.query.filter_by(id=task_id).one() job = task.job job_id = task.job_id agent = task.agent job_group = job.group logger.info("Deleting task %s (job %s - \"%s\")",task.id, job.id, job.title) db.session.delete(task) db.session.commit() deleted = True except InvalidRequestError: if retries > 0: logger.debug("Caught an InvalidRequestError trying to delete " "task %s, retrying transaction", task_id) retries -= 1 db.session.rollback() else: logger.error("While trying to delete task %s, caught an " "InvalidRequestError %s times, giving up", task_id, TRANSACTION_RETRIES) raise job.update_state() db.session.commit() if config.get("enable_statistics"): task_event_count = TaskEventCount(job_queue_id=job.job_queue_id, num_deleted=1) task_event_count.time_start = datetime.utcnow() task_event_count.time_end = datetime.utcnow() db.session.add(task_event_count) db.session.commit() retries = TRANSACTION_RETRIES done = False job_deleted = False while not done and retries > 0: try: job = Job.query.filter_by(id=job_id).one() if job.to_be_deleted: num_remaining_tasks = Task.query.filter_by(job=job).count() if num_remaining_tasks == 0: logger.info("Job %s (%s) is marked for deletion and has no " "tasks left, deleting it from the database now.", job.id, job.title) notified_users = JobNotifiedUser.query.filter( JobNotifiedUser.job == job, JobNotifiedUser.on_deletion == True).all() to = [x.user.email for x in notified_users if x.user.email] send_job_deletion_mail.delay( job.id, job.jobtype_version.jobtype.name, job.title, to) db.session.delete(job) job_deleted = True db.session.commit() done = True except InvalidRequestError: if retries > 0: logger.debug("Caught an InvalidRequestError trying to delete " "job %s, retrying transaction", job_id) retries -= 1 db.session.rollback() else: logger.error("While trying to delete job %s, caught an " "InvalidRequestError %s times, giving up", job_id, TRANSACTION_RETRIES) raise if job_deleted and job_group: if job_group.jobs.count() == 0: logger.info("Job group %s (id %s) has no jobs left, deleting", job_group.name, job_group.id) db.session.delete(job_group) db.session.commit() if (agent is not None and task.state not in [WorkState.DONE, WorkState.FAILED]): try: response = requests.delete("%s/tasks/%s" % (agent.api_url(), task.id), headers={"User-Agent": USERAGENT}, timeout=AGENT_REQUEST_TIMEOUT) logger.info("Deleting task %s (job %s - %r) from agent %s (id %s)", task.id, job.id, job.title, agent.hostname, agent.id) if response.status_code not in [requests.codes.accepted, requests.codes.ok, requests.codes.no_content, requests.codes.not_found]: raise ValueError("Unexpected return code on deleting task %s on " "agent %s: %s", task.id, agent.id, response.status_code) # Catching ProtocolError here is a work around for # https://github.com/kennethreitz/requests/issues/2204 except (ConnectionError, ProtocolError, Timeout) as e: if self.request.retries < self.max_retries: logger.warning("Caught %s while trying to delete task %s " "from agent %s (id %s): %s", type(e).__name__, task.id, agent.hostname, agent.id, e) @celery_app.task(ignore_results=True, bind=True) def stop_task(self, task_id, agent_id=None, dissociate_agent=True): db.session.rollback() task = Task.query.filter_by(id=task_id).one() job = task.job if ((task.agent is not None and task.state not in [WorkState.DONE, WorkState.FAILED]) or agent_id is not None): if agent_id is not None: agent = Agent.query.filter_by(id=agent_id).one() else: agent = task.agent try: response = requests.delete("%s/tasks/%s" % (agent.api_url(), task.id), headers={"User-Agent": USERAGENT}, timeout=AGENT_REQUEST_TIMEOUT) logger.info("Stopping task %s (job %s - \"%s\") on agent %s (id %s)", task.id, job.id, job.title, agent.hostname, agent.id) if response.status_code not in [requests.codes.accepted, requests.codes.ok, requests.codes.no_content, requests.codes.not_found]: raise ValueError("Unexpected return code on stopping task %s on " "agent %s: %s" % (task.id, agent.id, response.status_code)) elif dissociate_agent: task.agent = None task.state = None db.session.add(task) # Catching ProtocolError here is a work around for # https://github.com/kennethreitz/requests/issues/2204 except (ConnectionError, ProtocolError, Timeout) as e: if self.request.retries < self.max_retries: logger.warning("Caught %s while trying to delete task %s " "from agent %s (id %s), retry %s of %s: %s", type(e).__name__, task.id, agent.hostname, agent.id, self.request.retries, self.max_retries, e) self.retry(exc=e) db.session.commit() @celery_app.task(ignore_results=True) def delete_to_be_deleted_jobs(): db.session.rollback() jobs_to_delete_query = Job.query.filter(Job.to_be_deleted == True) job_ids_to_delete = [] for job in jobs_to_delete_query: delete_job.delay(job.id) db.session.commit() @celery_app.task(ignore_results=True) def delete_job(job_id): db.session.rollback() job = Job.query.filter_by(id=job_id).one() if not job.to_be_deleted: logger.warning("Not deleting job %s, it is not marked for deletion.", job.id) return job_group = job.group tasks_query = Task.query.filter_by(job=job) async_deletes = 0 immediate_deletes = 0 for task in tasks_query: if task.agent and task.state not in [_WorkState.DONE, _WorkState.FAILED]: delete_task.delay(task.id) async_deletes += 1 else: db.session.delete(task) immediate_deletes += 1 if async_deletes == 0: logger.info("Job %s (%s) is marked for deletion and has no tasks " "that require asynchronous deletion. Deleting it now.", job.id, job.title) # Notify users about deletion notified_users = JobNotifiedUser.query.filter( JobNotifiedUser.job == job, JobNotifiedUser.on_deletion == True).all() to = [x.user.email for x in notified_users if x.user.email] send_job_deletion_mail.delay(job.id, job.jobtype_version.jobtype.name, job.title, to) db.session.delete(job) db.session.commit() if async_deletes == 0 and job_group: if job_group.jobs.count() == 0: logger.info("Job group %s (id %s) has no jobs left, deleting", job_group.name, job_group.id) db.session.delete(job_group) db.session.commit() if immediate_deletes and config.get("enable_statistics"): task_event_count = TaskEventCount(job_queue_id=job.job_queue_id, num_deleted=immediate_deletes) task_event_count.time_start = datetime.utcnow() task_event_count.time_end = datetime.utcnow() db.session.add(task_event_count) db.session.commit() @celery_app.task(ignore_results=True) def clean_up_orphaned_task_logs(): db.session.rollback() orphaned_task_logs = TaskLog.query.filter( ~TaskLog.task_associations.any()).all() for log in orphaned_task_logs: logger.info("Removing orphaned task log %s" % log.identifier) db.session.delete(log) db.session.commit() try: tasklog_files = [f for f in listdir(LOGFILES_DIR)\ if isfile(join(LOGFILES_DIR, f))] for filepath in tasklog_files: uncompressed_name = filepath if filepath.endswith(".gz"): uncompressed_name = filepath[0:-3] referencing_count = TaskLog.query.filter( or_(TaskLog.identifier == filepath, TaskLog.identifier == uncompressed_name)).count() if not referencing_count: logger.info("Deleting log file %s", join(LOGFILES_DIR, filepath)) try: remove(join(LOGFILES_DIR, filepath)) except OSError as e: if e.errno != ENOENT: raise except OSError as e: if e.errno != ENOENT: raise logger.warning("Log directory %r does not exist", LOGFILES_DIR) @celery_app.task(ignore_results=True) def autodelete_old_jobs(): db.session.rollback() finished_jobs_query = Job.query.filter( Job.state != None, Job.state == WorkState.DONE, Job.time_finished != None, Job.autodelete_time != None) job_ids_to_delete = [] for job in finished_jobs_query: # I haven't figured out yet how to do this test as part of the filter if (job.time_finished + timedelta(seconds=job.autodelete_time) < datetime.utcnow()): logger.info("Deleting job %s (id %s). It was finished on %s UTC, " "which is more than %s ago", job.title, job.id, job.time_finished, timedelta(seconds=job.autodelete_time)) job.to_be_deleted = True db.session.add(job) job_ids_to_delete.append(job.id) db.session.commit() for job_id in job_ids_to_delete: delete_job.delay(job_id) @celery_app.task(ignore_results=True) def compress_task_logs(): db.session.rollback() try: uncompressed_tasklogs = [f for f in listdir(LOGFILES_DIR)\ if (isfile(join(LOGFILES_DIR, f)) and not f.endswith(".gz"))] for tasklog in uncompressed_tasklogs: compress_task_log.delay(tasklog) except OSError as e: if e.errno != ENOENT: raise logger.warning("Log directory %r does not exist", LOGFILES_DIR) @celery_app.task(ignore_results=True) def compress_task_log(tasklog_name): db.session.rollback() try: path = join(LOGFILES_DIR, tasklog_name) with open(path, "rb") as logfile: logger.debug("Compressing tasklog file %s", path) compressed_logfile = GzipFile("%s.gz" % path, "wb") compressed_logfile.write(logfile.read()) try: remove(join(LOGFILES_DIR, path)) except OSError as e: if e.errno != ENOENT: raise except IOError as e: logger.error("Could not compress tasklog file %s: %s: %s", tasklog_name, type(e).__name__, e) raise @celery_app.task(ignore_results=True) def cache_jobqueue_path(jobqueue_id): db.session.rollback() jobqueue = JobQueue.query.filter_by(id=jobqueue_id).one() jobqueue.fullpath = jobqueue.path() db.session.add(jobqueue) db.session.commit() @celery_app.task(ignore_results=True, bind=True) def check_software_version_on_agent(self, agent_id, version_id): db.session.rollback() agent = Agent.query.filter_by(id=agent_id).one() software_version = SoftwareVersion.query.filter_by(id=version_id).one() logger.debug("Checking availability of software %s, version %s on agent %s " "(id %s)", software_version.software.software, software_version.version, agent.hostname, agent_id) data = {"software": software_version.software.software, "version": software_version.version} try: response = requests.post( agent.api_url() + "/check_software", data=dumps(data), headers={ "Content-Type": "application/json", "User-Agent": USERAGENT}, timeout=AGENT_REQUEST_TIMEOUT) if response.status_code == requests.codes.bad_request: logger.error("On requesting check for software %s, version %s, " "agent %s, (id %s), answered BAD_REQUEST, ", software_version.software.software, software_version.version, agent.hostname, agent.id) elif response.status_code not in [requests.codes.accepted, requests.codes.ok]: raise ValueError("Unexpected return code on checking software " "on agent: %s", response.status_code) else: logger.info("Instructed agent %s (id %s) to check availability of " "software %s, version %s", agent.hostname, agent_id, software_version.software.software, software_version.version) except (ConnectionError, Timeout) as e: if self.request.retries < self.max_retries: logger.warning("Caught %s trying to contact agent " "%s (id %s), retry %s of %s: %s", type(e).__name__, agent.hostname, agent.id, self.request.retries, self.max_retries, e) self.retry(exc=e) else: logger.error("Could not contact agent %s, (id %s), marking as " "offline", agent.hostname, agent.id) agent.state = AgentState.OFFLINE db.session.add(agent) db.session.commit() raise @celery_app.task(ignore_results=True) def check_all_software_on_agent(agent_id): db.session.rollback() agent = Agent.query.filter_by(id=agent_id).one() software_versions_query = SoftwareVersion.query for version in software_versions_query: if version.discovery_code and version.discovery_function_name: check_software_version_on_agent.delay(agent_id, version.id)