1010import concurrent .futures
1111import traceback
1212
13- from apscheduler .schedulers .blocking import BlockingScheduler
13+ from apscheduler .schedulers .background import BackgroundScheduler
1414from apscheduler .triggers .base import BaseTrigger
1515from apscheduler .executors .base import BaseExecutor
1616from apscheduler .events import (
@@ -156,11 +156,12 @@ def run_job(job, jobstore_alias, run_times, logger_name):
156156
157157
158158class Collector (object ):
159- __slots__ = 'backend_url' , 'bot_token'
159+ __slots__ = 'backend_url' , 'bot_token' , 'scheduler' , 'known_jobs'
160160
161161 def __init__ (self , backend_url , bot_token ):
162162 self .backend_url = backend_url
163163 self .bot_token = bot_token
164+ self .known_jobs = {}
164165
165166 @abstractmethod
166167 def jobs (self ):
@@ -246,6 +247,17 @@ def fetch_job_configs(self, protocol):
246247
247248 yield entity_info
248249
250+ def refresh_jobs (self ):
251+ for job_id , intervals , job_func , job_data in self .jobs ():
252+ # if the existing job's configuration is the same, leave it alone, otherwise the trigger will be reset:
253+ if self .known_jobs .get (job_id ) == job_data :
254+ continue
255+ self .known_jobs [job_id ] = job_data
256+
257+ trigger = MultipleIntervalsTrigger (intervals )
258+ logging .info (f"Adding job: { job_id } " )
259+ self .scheduler .add_job (job_func , id = job_id , trigger = trigger , executor = 'iaexecutor' , kwargs = job_data , replace_existing = True )
260+
249261 def execute (self ):
250262 """
251263 Calls self.jobs() to get the list of the jobs, and executes them by using
@@ -256,15 +268,19 @@ def execute(self):
256268 'coalesce' : True , # if multiple jobs "misfire", re-run only one instance of a missed job
257269 'max_instances' : 1 ,
258270 }
259- scheduler = BlockingScheduler (job_defaults = job_defaults , timezone = utc )
260- executor = IntervalsAwareProcessPoolExecutor (10 )
261- scheduler .add_executor (executor , 'iaexecutor' )
262-
263- for job_id , intervals , job_func , job_data in self .jobs ():
264- trigger = MultipleIntervalsTrigger (intervals )
265- scheduler .add_job (job_func , id = job_id , trigger = trigger , executor = 'iaexecutor' , kwargs = job_data )
271+ self .scheduler = BackgroundScheduler (job_defaults = job_defaults , timezone = utc )
272+ self .scheduler .add_executor (IntervalsAwareProcessPoolExecutor (10 ), 'iaexecutor' )
266273
267274 try :
268- scheduler .start ()
275+ self .scheduler .start ()
276+ while True :
277+ try :
278+ self .refresh_jobs ()
279+ except :
280+ logging .exception ("Error refreshing jobs." )
281+ time .sleep (120 )
282+
269283 except KeyboardInterrupt :
270284 logging .info ("Got exit signal, exiting." )
285+ finally :
286+ self .scheduler .shutdown ()
0 commit comments