Skip to content

Commit 11d10b6

Browse files
committed
Merge branch 'feature/merge-fetching-attempts' into 'master'
Feature/merge fetching attempts See merge request grafolean/grafolean-worker-snmp!1
2 parents 445daf3 + 0a3380b commit 11d10b6

File tree

2 files changed

+254
-61
lines changed

2 files changed

+254
-61
lines changed

collector.py

Lines changed: 189 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,174 @@
1+
import sys
12
import requests
23
import logging
4+
import time
5+
import math
6+
from datetime import datetime, timedelta
7+
from pytz import utc
8+
from abc import abstractmethod
9+
10+
import concurrent.futures
11+
import traceback
12+
13+
from apscheduler.schedulers.blocking import BlockingScheduler
14+
from apscheduler.triggers.base import BaseTrigger
15+
from apscheduler.executors.base import BaseExecutor
16+
from apscheduler.events import (
17+
JobExecutionEvent, EVENT_JOB_MISSED, EVENT_JOB_ERROR, EVENT_JOB_EXECUTED)
18+
19+
20+
class MultipleIntervalsTrigger(BaseTrigger):
21+
"""
22+
This is a class extends APScheduler's BaseTrigger:
23+
- triggers at multiple intervals
24+
- aligns every invocation to a second (to make calculation of intervals easier)
25+
- multiple intervals, when aligned, cause only a single job invocation
26+
- remembers which intervals have caused the invocation; the list is cleared after
27+
`forget_affecting_after` seconds
28+
"""
29+
__slots__ = 'intervals', 'start_ts', 'affecting_intervals', 'forget_affecting_after'
30+
31+
def __init__(self, intervals, forget_affecting_after=300):
32+
if not intervals:
33+
raise Exception("At least one interval must be specified")
34+
# we only operate in whole seconds, and only care about unique values:
35+
self.intervals = list(set([int(i) for i in intervals]))
36+
self.forget_affecting_after = forget_affecting_after
37+
self.start_ts = int(time.time())
38+
self.affecting_intervals = {}
39+
40+
def get_next_fire_time(self, previous_fire_time, now):
41+
# We keep things simple by only dealing with UTC, and only with seconds, so
42+
# when travelling at low speeds we can use UNIX timestamps pretty safely.
43+
elapsed_time = now.timestamp() - self.start_ts
44+
# find the first time one of the intervals should fire:
45+
next_fires_for_intervals = [int(math.ceil(elapsed_time / interval) * interval) for interval in self.intervals]
46+
min_next_fire = min(next_fires_for_intervals)
47+
48+
# This is a hack. APScheduler doesn't allow us to pass information about the intervals triggered to the job being executed,
49+
# so we remember this information in the trigger object itself, which we then pass as a parameter to the executed job. Not
50+
# ideal, but it allows us to pass this information.
51+
# Determine which intervals will cause the next fire:
52+
next_fire_ts = self.start_ts + min_next_fire
53+
self.affecting_intervals[next_fire_ts] = []
54+
for i, next_fire_for_interval in enumerate(next_fires_for_intervals):
55+
if next_fire_for_interval == min_next_fire:
56+
self.affecting_intervals[next_fire_ts].append(self.intervals[i])
57+
58+
self._cleanup(now.timestamp() - self.forget_affecting_after)
59+
return datetime.fromtimestamp(next_fire_ts, tz=utc)
60+
61+
def _cleanup(self, limit_ts):
62+
for ts in list(self.affecting_intervals.keys()):
63+
if ts < limit_ts:
64+
del self.affecting_intervals[ts]
65+
66+
67+
class IntervalsAwareProcessPoolExecutor(BaseExecutor):
68+
"""
69+
This class merges APScheduler's BasePoolExecutor and ProcessPoolExecutor,
70+
because we need to use our own version of `run_job` (with a small detail
71+
changed - additional parameter passed). Unfortunately there is probably no
72+
cleaner way to do this at the moment.
73+
"""
74+
def __init__(self, max_workers=10):
75+
super().__init__()
76+
self._pool = concurrent.futures.ProcessPoolExecutor(int(max_workers))
77+
78+
def _do_submit_job(self, job, run_times):
79+
"""
80+
This function is copy-pasted from apscheduler/executors/pool.py
81+
(`BasePoolExecutor._do_submit_job()`). The difference is that it calls our own
82+
version of `run_job`.
83+
"""
84+
def callback(f):
85+
exc, tb = (f.exception_info() if hasattr(f, 'exception_info') else
86+
(f.exception(), getattr(f.exception(), '__traceback__', None)))
87+
if exc:
88+
self._run_job_error(job.id, exc, tb)
89+
else:
90+
self._run_job_success(job.id, f.result())
91+
92+
f = self._pool.submit(IntervalsAwareProcessPoolExecutor.run_job, job, job._jobstore_alias, run_times, self._logger.name)
93+
f.add_done_callback(callback)
94+
95+
def shutdown(self, wait=True):
96+
self._pool.shutdown(wait)
97+
98+
@staticmethod
99+
def run_job(job, jobstore_alias, run_times, logger_name):
100+
"""
101+
This function is copy-pasted from apscheduler/executors/base.py (`run_job()`). It is defined
102+
as static method here, and only the invocation of the job (`job.func()` call) was changed.
103+
104+
The reason for this is that we need to pass `affecting_intervals` from the trigger to the job
105+
function, so it can decide which parts of the job need to be run. SNMPCollector needs this
106+
so it can fetch data either separately, or for all of the task at the same time, when their
107+
intervals align.
108+
109+
The changes are in a single block and are marked with a comment.
110+
111+
---
112+
Called by executors to run the job. Returns a list of scheduler events to be dispatched by the
113+
scheduler.
114+
"""
115+
events = []
116+
logger = logging.getLogger(logger_name)
117+
for run_time in run_times:
118+
# See if the job missed its run time window, and handle
119+
# possible misfires accordingly
120+
if job.misfire_grace_time is not None:
121+
difference = datetime.now(utc) - run_time
122+
grace_time = timedelta(seconds=job.misfire_grace_time)
123+
if difference > grace_time:
124+
events.append(JobExecutionEvent(EVENT_JOB_MISSED, job.id, jobstore_alias,
125+
run_time))
126+
logger.warning('Run time of job "%s" was missed by %s', job, difference)
127+
continue
128+
129+
logger.info('Running job "%s" (scheduled at %s)', job, run_time)
130+
try:
131+
##########################
132+
### changes
133+
##########################
134+
# retval = job.func(*job.args, **job.kwargs)
135+
affecting_intervals = job.trigger.affecting_intervals[run_time.timestamp()]
136+
retval = job.func(affecting_intervals, **job.kwargs)
137+
##########################
138+
### /changes
139+
##########################
140+
except BaseException:
141+
exc, tb = sys.exc_info()[1:]
142+
formatted_tb = ''.join(traceback.format_tb(tb))
143+
events.append(JobExecutionEvent(EVENT_JOB_ERROR, job.id, jobstore_alias, run_time,
144+
exception=exc, traceback=formatted_tb))
145+
logger.exception('Job "%s" raised an exception', job)
146+
147+
# This is to prevent cyclic references that would lead to memory leaks
148+
traceback.clear_frames(tb)
149+
del tb
150+
else:
151+
events.append(JobExecutionEvent(EVENT_JOB_EXECUTED, job.id, jobstore_alias, run_time,
152+
retval=retval))
153+
logger.info('Job "%s" executed successfully', job)
154+
155+
return events
3156

4157

5158
class Collector(object):
159+
__slots__ = 'backend_url', 'bot_token'
160+
6161
def __init__(self, backend_url, bot_token):
7162
self.backend_url = backend_url
8163
self.bot_token = bot_token
9164

165+
@abstractmethod
166+
def jobs(self):
167+
"""
168+
Returns a list of (intervals, job_func, job_data) tuples. Usually calls
169+
`fetch_job_configs` to get input data.
170+
"""
171+
10172
def fetch_job_configs(self, protocol):
11173
"""
12174
Returns pairs (account_id, entity_info), where entity_info is everything needed for collecting data
@@ -78,5 +240,31 @@ def fetch_job_configs(self, protocol):
78240
entity_info["sensors"] = sensors
79241
del entity_info["protocols"]
80242

81-
yield account_id, entity_info
243+
entity_info["account_id"] = account_id
244+
entity_info["entity_id"] = entity_info["id"]
245+
del entity_info["id"]
246+
247+
yield entity_info
248+
249+
def execute(self):
250+
"""
251+
Calls self.jobs() to get the list of the jobs, and executes them by using
252+
`MultipleIntervalsTrigger`. Blocking.
253+
"""
254+
# initialize APScheduler:
255+
job_defaults = {
256+
'coalesce': True, # if multiple jobs "misfire", re-run only one instance of a missed job
257+
'max_instances': 1,
258+
}
259+
scheduler = BlockingScheduler(job_defaults=job_defaults, timezone=utc)
260+
executor = IntervalsAwareProcessPoolExecutor(10)
261+
scheduler.add_executor(executor, 'iaexecutor')
262+
263+
for intervals, job_func, job_data in self.jobs():
264+
trigger = MultipleIntervalsTrigger(intervals)
265+
scheduler.add_job(job_func, trigger=trigger, executor='iaexecutor', kwargs=job_data)
82266

267+
try:
268+
scheduler.start()
269+
except KeyboardInterrupt:
270+
logging.info("Got exit signal, exiting.")

grafolean-worker-snmp.py

Lines changed: 65 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
from apscheduler.schedulers.blocking import BlockingScheduler
21
import os
32
import dotenv
43
import logging
4+
import json
55
from pytz import utc
66
from colors import color
77

@@ -19,71 +19,76 @@
1919

2020

2121
class SNMPCollector(Collector):
22-
def __init__(self, *args, **kwargs):
23-
super().__init__(*args, **kwargs)
24-
25-
self.jobs_configs = []
26-
for account_id, entity_info in self.fetch_job_configs('snmp'):
27-
# convert entity_info into easy-to-use task definitions:
28-
for sensor_info in entity_info["sensors"]:
29-
self.jobs_configs.append({
30-
"account_id": account_id,
31-
"entity": entity_info["details"],
32-
"sensor": sensor_info["sensor_details"],
33-
"interval": sensor_info["interval"],
34-
"credential": entity_info["credential_details"],
35-
})
36-
37-
# >>> import json
38-
# >>> print(json.dumps(self.jobs_configs))
39-
# [
40-
# {
41-
# "account_id": 1,
42-
# "entity": {
43-
# "ipv4": "127.0.0.1"
44-
# },
45-
# "sensor": {
46-
# "oids": [
47-
# {
48-
# "oid": "1.3.6.1.4.1.2021.13.16.2.1.3",
49-
# "fetch_method": "get"
50-
# }
51-
# ],
52-
# "expression": "$1",
53-
# "output_path": "lm-sensors"
54-
# },
55-
# "interval": 30,
56-
# "credential": {
57-
# "version": "snmpv1",
58-
# "snmpv12_community": "public"
59-
# }
60-
# }
61-
# ]
6222

6323
@staticmethod
64-
def do_snmp(account_id, entity, sensor, interval, credential):
65-
log.info("Running job for account [{account_id}], IP [{ipv4}], OIDS: {oids}".format(
66-
account_id=account_id,
67-
ipv4=entity["ipv4"],
68-
oids=["SNMP{} {}".format(o["fetch_method"].upper(), o["oid"]) for o in sensor["oids"]],
24+
def do_snmp(*args, **entity_info):
25+
"""
26+
{
27+
"entity_id": 1348300224,
28+
"name": "localhost",
29+
"entity_type": "device",
30+
"details": {
31+
"ipv4": "127.0.0.1"
32+
},
33+
"credential_details": {
34+
"version": "snmpv1",
35+
"snmpv12_community": "public"
36+
},
37+
"sensors": [
38+
{
39+
"sensor_details": {
40+
"oids": [
41+
{
42+
"oid": "1.3.6.1.4.1.2021.13.16.2.1.3",
43+
"fetch_method": "walk"
44+
}
45+
],
46+
"expression": "$1",
47+
"output_path": "lm-sensors"
48+
},
49+
"interval": 30
50+
},
51+
{
52+
"sensor_details": {
53+
"oids": [
54+
{
55+
"oid": "1.3.6.1.4.1.2021.13.16.2.1.3.5",
56+
"fetch_method": "get"
57+
}
58+
],
59+
"expression": "$1",
60+
"output_path": "lmsensorscore3"
61+
},
62+
"interval": 20
63+
}
64+
],
65+
"account_id": 1
66+
}
67+
"""
68+
# filter out only those sensors that are supposed to run at this interval:
69+
affecting_intervals, = args
70+
sensors = [s for s in entity_info["sensors"] if s["interval"] in affecting_intervals]
71+
oids = []
72+
for sensor in sensors:
73+
oids.extend(sensor["sensor_details"]["oids"])
74+
log.info("Running job for account [{account_id}], IP [{ipv4}], nsensors: {n_sensors}, oids: {oids}".format(
75+
account_id=entity_info["account_id"],
76+
ipv4=entity_info["details"]["ipv4"],
77+
n_sensors=len(sensors),
78+
oids=["SNMP{} {}".format(o["fetch_method"].upper(), o["oid"]) for o in oids],
6979
))
7080

71-
def execute(self):
72-
# initialize a scheduler:
73-
job_defaults = {
74-
'coalesce': True, # if multiple jobs "misfire", re-run only one instance of a missed job
75-
'max_instances': 1,
76-
}
77-
self.scheduler = BlockingScheduler(job_defaults=job_defaults, timezone=utc)
7881

79-
# apply config to scheduler:
80-
for job_config in self.jobs_configs:
81-
self.scheduler.add_job(SNMPCollector.do_snmp, 'interval', seconds=job_config["interval"], kwargs=job_config)
8282

83-
try:
84-
self.scheduler.start()
85-
except KeyboardInterrupt:
86-
log.info("Got exit signal, exiting.")
83+
def jobs(self):
84+
"""
85+
Each entity (device) is a single job, no matter how many sensors it has. The reason is
86+
that when the intervals align, we can then issue a single SNMP Bulk GET/WALK.
87+
"""
88+
for entity_info in self.fetch_job_configs('snmp'):
89+
intervals = list(set([sensor_info["interval"] for sensor_info in entity_info["sensors"]]))
90+
yield intervals, SNMPCollector.do_snmp, entity_info
91+
8792

8893
if __name__ == "__main__":
8994
dotenv.load_dotenv()

0 commit comments

Comments
 (0)