@@ -28,68 +28,89 @@ def __new__(cls):
2828
2929 def __init__ (self ) -> None :
3030 self .config = Config ()
31- self .core = Core ()
32- self .incoming_topic = self .core .get_topic (self .config .incoming_topic_name ,
33- max_concurrent_messages = self .config .max_concurrent_messages )
34- self .storage_service = StorageService (core = self .core )
31+ core = Core ()
32+ self .incoming_topic = core .get_topic (self .config .incoming_topic_name )
33+ self .outgoing_topic = core .get_topic (self .config .outgoing_topic_name )
34+ self .storage_service = StorageService (core )
35+ self .listening_thread = threading .Thread (target = self .incoming_topic .subscribe , args = [self .config .incoming_topic_subscription , self .process_message ])
3536 # Start listening to the things
36- self .listening_thread = threading . Thread ( target = self .listen )
37+ # self.incoming_topic.subscribe( self.config.incoming_topic_subscription, self.handle_message )
3738 self .listening_thread .start ()
3839 pass
39-
40- def listen (self ):
41- self .incoming_topic .subscribe (self .config .incoming_topic_subscription , self .handle_message )
42- pass
4340
44- def handle_message (self , msg : QueueMessage ):
45- # Logs and creates a thread for processing
46- self .process_message (msg = msg )
41+ # def handle_message(self, msg: QueueMessage):
42+ # # Logs and creates a thread for processing
43+ # process_thread = threading.Thread(target=self.process_message, args=[msg])
44+ # process_thread.start()
4745
4846 def process_message (self , msg : QueueMessage ):
49- logger .info (f"Processing message { msg .messageId } " )
50- # Parse the message
51- quality_request = QualityRequest (messageType = msg .messageType , messageId = msg .messageId , data = msg .data )
52- # Download the file
53- input_file_url = quality_request .data .data_file
54- parsed_url = urlparse (input_file_url )
55- file_name = os .path .basename (parsed_url .path )
56- input_dir_path = parsed_url .path
57- download_folder = os .path .join (self .config .get_download_folder (),msg .messageId )
58- os .makedirs (download_folder ,exist_ok = True )
59- download_path = os .path .join (download_folder ,file_name )
60- self .storage_service .download_remote_file (input_file_url , download_path )
47+ try :
48+ logging .info (f"Processing message { msg .messageId } " )
49+ # Parse the message
50+ quality_request = QualityRequest (messageType = msg .messageType ,messageId = msg .messageId ,data = msg .data )
51+ # Download the file
52+ input_file_url = quality_request .data .data_file
53+ parsed_url = urlparse (input_file_url )
54+ file_name = os .path .basename (parsed_url .path )
55+ input_dir_path = parsed_url .path
56+ download_folder = os .path .join (self .config .get_download_folder (),msg .messageId )
57+ os .makedirs (download_folder ,exist_ok = True )
58+ download_path = os .path .join (download_folder ,file_name )
59+ self .storage_service .download_remote_file (input_file_url , download_path )
6160
62- # intersection file
63- ixn_file_url = quality_request .data .intersectionFile
64- ixn_file_path = None
65- if ixn_file_url :
66- ixn_file_name = os .path .basename (ixn_file_url )
67- ixn_file_path = os .path .join (download_folder ,ixn_file_name )
68- self .storage_service .download_remote_file (ixn_file_url , ixn_file_path )
69- # quality_request.data.intersectionFile = ixn_file_path
61+ # intersection file
62+ ixn_file_url = quality_request .data .sub_regions_file
63+ ixn_file_path = None
64+ if ixn_file_url or ixn_file_url != '' :
65+ ixn_file_name = os .path .basename (ixn_file_url )
66+ ixn_file_path = os .path .join (download_folder ,ixn_file_name )
67+ self .storage_service .download_remote_file (ixn_file_url , ixn_file_path )
68+ # quality_request.data.intersectionFile = ixn_file_path
7069
71- # Process the file
72- output_folder = os .path .join (download_folder ,'qm' )
73- os .makedirs (output_folder ,exist_ok = True )
74- output_file_local_path = os .path .join (output_folder ,'qm-output.zip' )
75- qm_calculator = OswQmCalculator ()
76- algorithm_names = quality_request .data .algorithms .split (',' )
77- qm_calculator .calculate_quality_metric (download_path , algorithm_names ,output_file_local_path ,ixn_file_path )
78- # Upload the file
79- output_file_remote_path = f'{ self .get_directory_path (input_file_url )} /qm-{ quality_request .data .jobId } -output.zip'
80- output_file_url = self .storage_service .upload_local_file (output_file_local_path ,output_file_remote_path )
81- logging .info (f'Uploaded file to { output_file_url } ' )
70+ # Process the file
71+ output_folder = os .path .join (download_folder ,'qm' )
72+ os .makedirs (output_folder ,exist_ok = True )
73+ output_file_local_path = os .path .join (output_folder ,'qm-output.zip' )
74+ qm_calculator = OswQmCalculator ()
75+ algorithm_names = quality_request .data .algorithm .split (',' )
76+ qm_calculator .calculate_quality_metric (download_path , algorithm_names ,output_file_local_path ,ixn_file_path )
77+ # Upload the file
78+ output_file_remote_path = f'{ self .get_directory_path (input_file_url )} /qm-{ quality_request .data .jobId } -output.zip'
79+ output_file_url = self .storage_service .upload_local_file (output_file_local_path ,output_file_remote_path )
80+ logging .info (f'Uploaded file to { output_file_url } ' )
8281
83- response = QualityMetricResponse (
84- messageType = msg .messageType ,
85- messageId = msg .messageId ,
86- data = response_data
87- )
88- self .send_response (msg = response )
89- # Process the message
90- # Clean up the download_folder
91- logger .info ('Cleaning up download folder' )
92- shutil .rmtree (download_folder )
82+ response_data = {
83+ 'status' :'success' ,
84+ 'message' :'Quality metrics calculated successfully' ,
85+ 'success' :True ,
86+ 'dataset_url' :input_file_url ,
87+ 'qm_dataset_url' :output_file_url
88+ }
89+ response = QualityMetricResponse (
90+ messageType = msg .messageType ,
91+ messageId = msg .messageId ,
92+ data = response_data
93+ )
94+ self .send_response (response )
95+ # Process the message
96+ # Clean up the download_folder
97+ logging .info ('Cleaning up download folder' )
98+ shutil .rmtree (download_folder )
99+ except Exception as e :
100+ logging .error (f'Error processing message { msg .messageId } : { e } ' )
101+ response_data = {
102+ 'status' :'failed' ,
103+ 'message' :str (e ),
104+ 'success' :False ,
105+ 'dataset_url' :input_file_url ,
106+ 'qm_dataset_url' :None
107+ }
108+ response = QualityMetricResponse (
109+ messageType = msg .messageType ,
110+ messageId = msg .messageId ,
111+ data = response_data
112+ )
113+ self .send_response (response )
93114 pass
94115
95116 def send_response (self , msg : QueueMessage ):
0 commit comments