Skip to content

Commit a0f3206

Browse files
author
Anze
committed
Put generic collector behavior to a separate class
1 parent 445daf3 commit a0f3206

File tree

2 files changed

+247
-62
lines changed

2 files changed

+247
-62
lines changed

collector.py

Lines changed: 187 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,174 @@
1+
import sys
12
import requests
23
import logging
4+
import time
5+
import math
6+
from datetime import datetime, timedelta
7+
from pytz import utc
8+
from abc import abstractmethod
9+
10+
import concurrent.futures
11+
import traceback
12+
13+
from apscheduler.schedulers.blocking import BlockingScheduler
14+
from apscheduler.triggers.base import BaseTrigger
15+
from apscheduler.executors.base import BaseExecutor
16+
from apscheduler.events import (
17+
JobExecutionEvent, EVENT_JOB_MISSED, EVENT_JOB_ERROR, EVENT_JOB_EXECUTED)
18+
19+
20+
class MultipleIntervalsTrigger(BaseTrigger):
21+
"""
22+
This is a class extends APScheduler's BaseTrigger:
23+
- triggers at multiple intervals
24+
- aligns every invocation to a second (to make calculation of intervals easier)
25+
- multiple intervals, when aligned, cause only a single job invocation
26+
- remembers which intervals have caused the invocation; the list is cleared after
27+
`forget_affecting_after` seconds
28+
"""
29+
__slots__ = 'intervals', 'start_ts', 'affecting_intervals', 'forget_affecting_after'
30+
31+
def __init__(self, intervals, forget_affecting_after=300):
32+
if not intervals:
33+
raise Exception("At least one interval must be specified")
34+
# we only operate in whole seconds, and only care about unique values:
35+
self.intervals = list(set([int(i) for i in intervals]))
36+
self.forget_affecting_after = forget_affecting_after
37+
self.start_ts = int(time.time())
38+
self.affecting_intervals = {}
39+
40+
def get_next_fire_time(self, previous_fire_time, now):
41+
# We keep things simple by only dealing with UTC, and only with seconds, so
42+
# when travelling at low speeds we can use UNIX timestamps pretty safely.
43+
elapsed_time = now.timestamp() - self.start_ts
44+
# find the first time one of the intervals should fire:
45+
next_fires_for_intervals = [int(math.ceil(elapsed_time / interval) * interval) for interval in self.intervals]
46+
min_next_fire = min(next_fires_for_intervals)
47+
48+
# This is a hack. APScheduler doesn't allow us to pass information about the intervals triggered to the job being executed,
49+
# so we remember this information in the trigger object itself, which we then pass as a parameter to the executed job. Not
50+
# ideal, but it allows us to pass this information.
51+
# Determine which intervals will cause the next fire:
52+
next_fire_ts = self.start_ts + min_next_fire
53+
self.affecting_intervals[next_fire_ts] = []
54+
for i, next_fire_for_interval in enumerate(next_fires_for_intervals):
55+
if next_fire_for_interval == min_next_fire:
56+
self.affecting_intervals[next_fire_ts].append(self.intervals[i])
57+
58+
self._cleanup(now.timestamp() - self.forget_affecting_after)
59+
return datetime.fromtimestamp(next_fire_ts, tz=utc)
60+
61+
def _cleanup(self, limit_ts):
62+
for ts in list(self.affecting_intervals.keys()):
63+
if ts < limit_ts:
64+
del self.affecting_intervals[ts]
65+
66+
67+
class IntervalsAwareProcessPoolExecutor(BaseExecutor):
68+
"""
69+
This class merges APScheduler's BasePoolExecutor and ProcessPoolExecutor,
70+
because we need to use our own version of `run_job` (with a small detail
71+
changed - additional parameter passed). Unfortunately there is probably no
72+
cleaner way to do this at the moment.
73+
"""
74+
def __init__(self, max_workers=10):
75+
super().__init__()
76+
self._pool = concurrent.futures.ProcessPoolExecutor(int(max_workers))
77+
78+
def _do_submit_job(self, job, run_times):
79+
"""
80+
This function is copy-pasted from apscheduler/executors/pool.py
81+
(`BasePoolExecutor._do_submit_job()`). The difference is that it calls our own
82+
version of `run_job`.
83+
"""
84+
def callback(f):
85+
exc, tb = (f.exception_info() if hasattr(f, 'exception_info') else
86+
(f.exception(), getattr(f.exception(), '__traceback__', None)))
87+
if exc:
88+
self._run_job_error(job.id, exc, tb)
89+
else:
90+
self._run_job_success(job.id, f.result())
91+
92+
f = self._pool.submit(IntervalsAwareProcessPoolExecutor.run_job, job, job._jobstore_alias, run_times, self._logger.name)
93+
f.add_done_callback(callback)
94+
95+
def shutdown(self, wait=True):
96+
self._pool.shutdown(wait)
97+
98+
@staticmethod
99+
def run_job(job, jobstore_alias, run_times, logger_name):
100+
"""
101+
This function is copy-pasted from apscheduler/executors/base.py (`run_job()`). It is defined
102+
as static method here, and only the invocation of the job (`job.func()` call) was changed.
103+
104+
The reason for this is that we need to pass `affecting_intervals` from the trigger to the job
105+
function, so it can decide which parts of the job need to be run. SNMPCollector needs this
106+
so it can fetch data either separately, or for all of the task at the same time, when their
107+
intervals align.
108+
109+
The changes are in a single block and are marked with a comment.
110+
111+
---
112+
Called by executors to run the job. Returns a list of scheduler events to be dispatched by the
113+
scheduler.
114+
"""
115+
events = []
116+
logger = logging.getLogger(logger_name)
117+
for run_time in run_times:
118+
# See if the job missed its run time window, and handle
119+
# possible misfires accordingly
120+
if job.misfire_grace_time is not None:
121+
difference = datetime.now(utc) - run_time
122+
grace_time = timedelta(seconds=job.misfire_grace_time)
123+
if difference > grace_time:
124+
events.append(JobExecutionEvent(EVENT_JOB_MISSED, job.id, jobstore_alias,
125+
run_time))
126+
logger.warning('Run time of job "%s" was missed by %s', job, difference)
127+
continue
128+
129+
logger.info('Running job "%s" (scheduled at %s)', job, run_time)
130+
try:
131+
##########################
132+
### changes
133+
##########################
134+
# retval = job.func(*job.args, **job.kwargs)
135+
affecting_intervals = job.trigger.affecting_intervals[run_time.timestamp()]
136+
retval = job.func(affecting_intervals, **job.kwargs)
137+
##########################
138+
### /changes
139+
##########################
140+
except BaseException:
141+
exc, tb = sys.exc_info()[1:]
142+
formatted_tb = ''.join(traceback.format_tb(tb))
143+
events.append(JobExecutionEvent(EVENT_JOB_ERROR, job.id, jobstore_alias, run_time,
144+
exception=exc, traceback=formatted_tb))
145+
logger.exception('Job "%s" raised an exception', job)
146+
147+
# This is to prevent cyclic references that would lead to memory leaks
148+
traceback.clear_frames(tb)
149+
del tb
150+
else:
151+
events.append(JobExecutionEvent(EVENT_JOB_EXECUTED, job.id, jobstore_alias, run_time,
152+
retval=retval))
153+
logger.info('Job "%s" executed successfully', job)
154+
155+
return events
3156

4157

5158
class Collector(object):
159+
__slots__ = 'backend_url', 'bot_token'
160+
6161
def __init__(self, backend_url, bot_token):
7162
self.backend_url = backend_url
8163
self.bot_token = bot_token
9164

165+
@abstractmethod
166+
def jobs(self):
167+
"""
168+
Returns a list of (intervals, job_func, job_data) tuples. Usually calls
169+
`fetch_job_configs` to get input data.
170+
"""
171+
10172
def fetch_job_configs(self, protocol):
11173
"""
12174
Returns pairs (account_id, entity_info), where entity_info is everything needed for collecting data
@@ -78,5 +240,29 @@ def fetch_job_configs(self, protocol):
78240
entity_info["sensors"] = sensors
79241
del entity_info["protocols"]
80242

81-
yield account_id, entity_info
243+
entity_info["account_id"] = account_id
244+
245+
yield entity_info
246+
247+
def execute(self):
248+
"""
249+
Calls self.jobs() to get the list of the jobs, and executes them by using
250+
`MultipleIntervalsTrigger`. Blocking.
251+
"""
252+
# initialize APScheduler:
253+
job_defaults = {
254+
'coalesce': True, # if multiple jobs "misfire", re-run only one instance of a missed job
255+
'max_instances': 1,
256+
}
257+
scheduler = BlockingScheduler(job_defaults=job_defaults, timezone=utc)
258+
executor = IntervalsAwareProcessPoolExecutor(10)
259+
scheduler.add_executor(executor, 'iaexecutor')
260+
261+
for intervals, job_func, job_data in self.jobs():
262+
trigger = MultipleIntervalsTrigger(intervals)
263+
scheduler.add_job(job_func, trigger=trigger, executor='iaexecutor', kwargs=job_data)
82264

265+
try:
266+
scheduler.start()
267+
except KeyboardInterrupt:
268+
logging.info("Got exit signal, exiting.")

grafolean-worker-snmp.py

Lines changed: 60 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import os
33
import dotenv
44
import logging
5+
import json
56
from pytz import utc
67
from colors import color
78

@@ -19,71 +20,69 @@
1920

2021

2122
class SNMPCollector(Collector):
22-
def __init__(self, *args, **kwargs):
23-
super().__init__(*args, **kwargs)
24-
25-
self.jobs_configs = []
26-
for account_id, entity_info in self.fetch_job_configs('snmp'):
27-
# convert entity_info into easy-to-use task definitions:
28-
for sensor_info in entity_info["sensors"]:
29-
self.jobs_configs.append({
30-
"account_id": account_id,
31-
"entity": entity_info["details"],
32-
"sensor": sensor_info["sensor_details"],
33-
"interval": sensor_info["interval"],
34-
"credential": entity_info["credential_details"],
35-
})
36-
37-
# >>> import json
38-
# >>> print(json.dumps(self.jobs_configs))
39-
# [
40-
# {
41-
# "account_id": 1,
42-
# "entity": {
43-
# "ipv4": "127.0.0.1"
44-
# },
45-
# "sensor": {
46-
# "oids": [
47-
# {
48-
# "oid": "1.3.6.1.4.1.2021.13.16.2.1.3",
49-
# "fetch_method": "get"
50-
# }
51-
# ],
52-
# "expression": "$1",
53-
# "output_path": "lm-sensors"
54-
# },
55-
# "interval": 30,
56-
# "credential": {
57-
# "version": "snmpv1",
58-
# "snmpv12_community": "public"
59-
# }
60-
# }
61-
# ]
6223

6324
@staticmethod
64-
def do_snmp(account_id, entity, sensor, interval, credential):
65-
log.info("Running job for account [{account_id}], IP [{ipv4}], OIDS: {oids}".format(
66-
account_id=account_id,
67-
ipv4=entity["ipv4"],
68-
oids=["SNMP{} {}".format(o["fetch_method"].upper(), o["oid"]) for o in sensor["oids"]],
69-
))
70-
71-
def execute(self):
72-
# initialize a scheduler:
73-
job_defaults = {
74-
'coalesce': True, # if multiple jobs "misfire", re-run only one instance of a missed job
75-
'max_instances': 1,
76-
}
77-
self.scheduler = BlockingScheduler(job_defaults=job_defaults, timezone=utc)
25+
def do_snmp(*args, **entity_info):
26+
"""
27+
{
28+
"id": 1348300224,
29+
"name": "localhost",
30+
"entity_type": "device",
31+
"details": {
32+
"ipv4": "127.0.0.1"
33+
},
34+
"credential_details": {
35+
"version": "snmpv1",
36+
"snmpv12_community": "public"
37+
},
38+
"sensors": [
39+
{
40+
"sensor_details": {
41+
"oids": [
42+
{
43+
"oid": "1.3.6.1.4.1.2021.13.16.2.1.3",
44+
"fetch_method": "walk"
45+
}
46+
],
47+
"expression": "$1",
48+
"output_path": "lm-sensors"
49+
},
50+
"interval": 30
51+
},
52+
{
53+
"sensor_details": {
54+
"oids": [
55+
{
56+
"oid": "1.3.6.1.4.1.2021.13.16.2.1.3.5",
57+
"fetch_method": "get"
58+
}
59+
],
60+
"expression": "$1",
61+
"output_path": "lmsensorscore3"
62+
},
63+
"interval": 20
64+
}
65+
],
66+
"account_id": 1
67+
}
68+
"""
69+
# log.info("Running job for account [{account_id}], IP [{ipv4}], OIDS: {oids}".format(
70+
# account_id=account_id,
71+
# ipv4=entity["ipv4"],
72+
# oids=["SNMP{} {}".format(o["fetch_method"].upper(), o["oid"]) for o in sensor["oids"]],
73+
# ))
74+
affecting_intervals, = args
75+
log.info("Running: {} {}".format(affecting_intervals, json.dumps(entity_info)))
7876

79-
# apply config to scheduler:
80-
for job_config in self.jobs_configs:
81-
self.scheduler.add_job(SNMPCollector.do_snmp, 'interval', seconds=job_config["interval"], kwargs=job_config)
77+
def jobs(self):
78+
"""
79+
Each entity (device) is a single job, no matter how many sensors it has. The reason is
80+
that when the intervals align, we can then issue a single SNMP Bulk GET/WALK.
81+
"""
82+
for entity_info in self.fetch_job_configs('snmp'):
83+
intervals = list(set([sensor_info["interval"] for sensor_info in entity_info["sensors"]]))
84+
yield intervals, SNMPCollector.do_snmp, entity_info
8285

83-
try:
84-
self.scheduler.start()
85-
except KeyboardInterrupt:
86-
log.info("Got exit signal, exiting.")
8786

8887
if __name__ == "__main__":
8988
dotenv.load_dotenv()

0 commit comments

Comments
 (0)