aboutsummaryrefslogtreecommitdiff
path: root/jobmanager.py
blob: 05481f6cdf1af94c8b12b1da1ec3d45b21c54e69 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
#!/usr/bin/env python

import os
import time
import logging
import datetime
import requests

from jobs import JobFinder, JobBase
from jobstate import JobState

class JobManager:
    def __init__(self, config):
        jobsFinder = JobFinder(config)
        self.jobs = jobsFinder.get_jobs()
        self.config = config
        self.statefile = config.get('general', 'statefile')
        self._load_state()

    def _load_state(self):
        self.state = {}
        if not os.path.isfile(self.statefile):
            logging.warn("Could not find statefile at " + self.statefile + "; creating a new one.")
        else:
            f = open(self.statefile, "r")
            lines = f.readlines()
            for l in lines:
                s = JobState.Parse(l)
                self.state[s.name] = s
            f.close()

    def _save_state(self):
        f = open(self.statefile, "w")
        for i in self.state:
            f.write(self.state[i].serialize())
        f.close()

    def list_jobs(self):
        return self.jobs
    
    def execute_jobs(self, cronmode):
        logging.info("Executing jobs...")
        emailWorks = True
        for thisJob in self.jobs:
            if thisJob.shouldExecute(cronmode):
                logging.info("Executing " + thisJob.getName() + "(" + thisJob.getStateName() + ")")
                try:
                    lastRunStatus = self.state[thisJob.getStateName()]
                except:
                    logging.warn("No state was found for " + thisJob.getStateName() + \
                                 ", making up a dummy state for it.")
                    lastRunStatus = self.state[thisJob.getStateName()] = JobState.Empty(thisJob.getStateName(), thisJob.getName())

                if not thisJob.execute():
                    #Unsuccessful run
                    logging.info("Execution of " + thisJob.getName() + " failed")
                    if thisJob.shouldNotifyFailure(lastRunStatus):
                        lastRunStatus.markFailedAndNotify()
                        logging.info("Notifying of failure for " + thisJob.getName())
                        if not thisJob.onFailure():
                            emailWorks = False
                    else:
                        lastRunStatus.markFailedNoNotify()
                else:
                    #Successful Run
                    logging.info("Execution of " + thisJob.getName() + " succeeded")
                    if lastRunStatus.CurrentStateSuccess == False and thisJob.notifyOnFailureEvery() == JobBase.JobFailureNotificationFrequency.ONSTATECHANGE:
                        if not thisJob.onStateChangeSuccess():
                            emailWorks = False
                    lastRunStatus.markSuccessful()
        self._save_state()
        return emailWorks
        
    def mark_jobs_ran(self):
        logging.debug("Marking jobs as run successfully.")
        try:
            requests.post("http://localhost:5001/", data="True")
        except:
            pass
            #Nothing we can do except hope our peers save us

    def mark_jobs_ran_with_error(self):
        logging.warning("Marking jobs as run unsuccessfully.")
        try:
            requests.post("http://localhost:5001/", data="False")
        except:
            pass
            #Nothing we can do except hope our peers save us