Skip to content

Commit 7ce2884

Browse files
committed
Merge branch 'feature/update-jobs' into 'master'
Feature/update jobs See merge request grafolean/grafolean-worker-snmp!3
2 parents 4d27b27 + 6f34a94 commit 7ce2884

File tree

3 files changed

+35
-15
lines changed

3 files changed

+35
-15
lines changed

.env.example

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
BACKEND_URL=https://grafolean.com/api
22
BOT_TOKEN=
3+
JOBS_REFRESH_INTERVAL=120

collector.py

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import concurrent.futures
1111
import traceback
1212

13-
from apscheduler.schedulers.blocking import BlockingScheduler
13+
from apscheduler.schedulers.background import BackgroundScheduler
1414
from apscheduler.triggers.base import BaseTrigger
1515
from apscheduler.executors.base import BaseExecutor
1616
from apscheduler.events import (
@@ -156,16 +156,18 @@ def run_job(job, jobstore_alias, run_times, logger_name):
156156

157157

158158
class Collector(object):
159-
__slots__ = 'backend_url', 'bot_token'
159+
__slots__ = 'backend_url', 'bot_token', 'scheduler', 'known_jobs', 'jobs_refresh_interval'
160160

161-
def __init__(self, backend_url, bot_token):
161+
def __init__(self, backend_url, bot_token, jobs_refresh_interval):
162162
self.backend_url = backend_url
163163
self.bot_token = bot_token
164+
self.jobs_refresh_interval = jobs_refresh_interval
165+
self.known_jobs = {}
164166

165167
@abstractmethod
166168
def jobs(self):
167169
"""
168-
Returns a list of (intervals, job_func, job_data) tuples. Usually calls
170+
Returns a list of (job_id, intervals, job_func, job_data) tuples. Usually calls
169171
`fetch_job_configs` to get input data.
170172
"""
171173

@@ -246,6 +248,17 @@ def fetch_job_configs(self, protocol):
246248

247249
yield entity_info
248250

251+
def refresh_jobs(self):
252+
for job_id, intervals, job_func, job_data in self.jobs():
253+
# if the existing job's configuration is the same, leave it alone, otherwise the trigger will be reset:
254+
if self.known_jobs.get(job_id) == job_data:
255+
continue
256+
self.known_jobs[job_id] = job_data
257+
258+
trigger = MultipleIntervalsTrigger(intervals)
259+
logging.info(f"Adding job: {job_id}")
260+
self.scheduler.add_job(job_func, id=job_id, trigger=trigger, executor='iaexecutor', kwargs=job_data, replace_existing=True)
261+
249262
def execute(self):
250263
"""
251264
Calls self.jobs() to get the list of the jobs, and executes them by using
@@ -256,15 +269,19 @@ def execute(self):
256269
'coalesce': True, # if multiple jobs "misfire", re-run only one instance of a missed job
257270
'max_instances': 1,
258271
}
259-
scheduler = BlockingScheduler(job_defaults=job_defaults, timezone=utc)
260-
executor = IntervalsAwareProcessPoolExecutor(10)
261-
scheduler.add_executor(executor, 'iaexecutor')
262-
263-
for intervals, job_func, job_data in self.jobs():
264-
trigger = MultipleIntervalsTrigger(intervals)
265-
scheduler.add_job(job_func, trigger=trigger, executor='iaexecutor', kwargs=job_data)
272+
self.scheduler = BackgroundScheduler(job_defaults=job_defaults, timezone=utc)
273+
self.scheduler.add_executor(IntervalsAwareProcessPoolExecutor(10), 'iaexecutor')
266274

267275
try:
268-
scheduler.start()
276+
self.scheduler.start()
277+
while True:
278+
try:
279+
self.refresh_jobs()
280+
except:
281+
logging.exception("Error refreshing jobs.")
282+
time.sleep(self.jobs_refresh_interval)
283+
269284
except KeyboardInterrupt:
270285
logging.info("Got exit signal, exiting.")
286+
finally:
287+
self.scheduler.shutdown()

snmpcollector.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ def send_results_to_grafolean(backend_url, bot_token, account_id, values):
7777
try:
7878
r = requests.post(url, json=values)
7979
r.raise_for_status()
80-
log.info("Results sent.")
80+
log.info("Results sent: {}".format(values))
8181
except:
8282
log.exception("Error sending data to Grafolean")
8383

@@ -198,7 +198,8 @@ def jobs(self):
198198
for entity_info in self.fetch_job_configs('snmp'):
199199
intervals = list(set([sensor_info["interval"] for sensor_info in entity_info["sensors"]]))
200200
job_info = { **entity_info, "backend_url": self.backend_url, "bot_token": self.bot_token }
201-
yield intervals, SNMPCollector.do_snmp, job_info
201+
job_id = str(entity_info["entity_id"])
202+
yield job_id, intervals, SNMPCollector.do_snmp, job_info
202203

203204

204205
if __name__ == "__main__":
@@ -208,6 +209,7 @@ def jobs(self):
208209
bot_token = os.environ.get('BOT_TOKEN')
209210
if not backend_url or not bot_token:
210211
raise Exception("Please specify BACKEND_URL and BOT_TOKEN env vars.")
212+
jobs_refresh_interval = int(os.environ.get('JOBS_REFRESH_INTERVAL', 120))
211213

212-
c = SNMPCollector(backend_url, bot_token)
214+
c = SNMPCollector(backend_url, bot_token, jobs_refresh_interval)
213215
c.execute()

0 commit comments

Comments
 (0)