Skip to content

Commit 2bc0d3a

Browse files
authored
Merge pull request #20 from TaskarCenterAtUW/feature-1063
Fixed the issues with messages
2 parents a1c162f + 61e3c28 commit 2bc0d3a

File tree

2 files changed

+19
-14
lines changed

2 files changed

+19
-14
lines changed

src/services/osw_qm_calculator_service.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ def calculate_quality_metric(self, input_file, algorithm_names, output_path, ixn
4747
# logging.info(f"Extracted input files: {file_list}")
4848
# input_files_path = [os.path.join(input_unzip_folder.name, file) for file in file_list]
4949
output_unzip_folder = tempfile.TemporaryDirectory()
50-
logging.info(f"Started calculating quality metrics for input files: {input_files_path}")
50+
logger.info(f"Started calculating quality metrics for input files: {input_files_path}")
5151
# Get only the edges file out of the input files
5252
edges_file_path = [file_path for file_path in input_files_path if 'edges' in file_path]
5353
if len(edges_file_path) == 0:
@@ -58,7 +58,10 @@ def calculate_quality_metric(self, input_file, algorithm_names, output_path, ixn
5858
for algorithm_name in algorithm_names:
5959
qm_edges_output_path = os.path.join(output_unzip_folder.name, f'{algorithm_name}_{edges_file_without_extension}.geojson')
6060
qm_calculator = self.get_osw_qm_calculator(algorithm_name, ixn_file, edges_file_path, qm_edges_output_path)
61+
start_time = time.time()
6162
qm_calculator.calculate_quality_metric()
63+
end_time = time.time()
64+
logger.info(f"Time taken to calculate quality metrics for {algorithm_name}: {end_time - start_time} seconds")
6265
# Copy the rest of the files from input to output
6366
for file_path in input_files_path:
6467
if 'edges' in file_path:
@@ -67,10 +70,10 @@ def calculate_quality_metric(self, input_file, algorithm_names, output_path, ixn
6770
output_file_path = os.path.join(output_unzip_folder.name, file_basename)
6871
os.rename(file_path, output_file_path)
6972

70-
logging.info(f"Finished calculating quality metrics for input files: {input_files_path}")
71-
logging.info(f'Zipping output files to {output_path}')
73+
logger.info(f"Finished calculating quality metrics for input files: {input_files_path}")
74+
logger.info(f'Zipping output files to {output_path}')
7275
self.zip_folder(output_unzip_folder.name, output_path)
73-
logging.info(f'Cleaning up temporary folders.')
76+
logger.info(f'Cleaning up temporary folders.')
7477
input_unzip_folder.cleanup()
7578
output_unzip_folder.cleanup()
7679
except Exception as e:

src/services/servicebus_service.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@ def __new__(cls):
2828

2929
def __init__(self) -> None:
3030
self.config = Config()
31-
core = Core()
32-
self.incoming_topic = core.get_topic(self.config.incoming_topic_name)
33-
self.outgoing_topic = core.get_topic(self.config.outgoing_topic_name)
34-
self.storage_service = StorageService(core)
31+
self.core = Core()
32+
self.incoming_topic = self.core.get_topic(self.config.incoming_topic_name)
33+
self.outgoing_topic = self.core.get_topic(self.config.outgoing_topic_name)
34+
self.storage_service = StorageService(self.core)
3535
self.listening_thread = threading.Thread(target=self.incoming_topic.subscribe, args=[self.config.incoming_topic_subscription, self.process_message])
3636
# Start listening to the things
3737
# self.incoming_topic.subscribe(self.config.incoming_topic_subscription, self.handle_message)
@@ -44,8 +44,9 @@ def __init__(self) -> None:
4444
# process_thread.start()
4545

4646
def process_message(self, msg: QueueMessage):
47+
logger.info(f"Processing message {msg}")
4748
try:
48-
logging.info(f"Processing message {msg.messageId}")
49+
logger.info(f"Processing message {msg.messageId}")
4950
# Parse the message
5051
quality_request = QualityRequest(messageType=msg.messageType,messageId=msg.messageId,data=msg.data)
5152
# Download the file
@@ -57,11 +58,12 @@ def process_message(self, msg: QueueMessage):
5758
os.makedirs(download_folder,exist_ok=True)
5859
download_path = os.path.join(download_folder,file_name)
5960
self.storage_service.download_remote_file(input_file_url, download_path)
60-
61+
logger.info(f'Downloaded file to {download_path}')
6162
# intersection file
6263
ixn_file_url = quality_request.data.sub_regions_file
6364
ixn_file_path = None
64-
if ixn_file_url or ixn_file_url != '':
65+
if ixn_file_url is not None:
66+
logger.info(f'Downloading intersection file {ixn_file_url}')
6567
ixn_file_name = os.path.basename(ixn_file_url)
6668
ixn_file_path = os.path.join(download_folder,ixn_file_name)
6769
self.storage_service.download_remote_file(ixn_file_url, ixn_file_path)
@@ -77,7 +79,7 @@ def process_message(self, msg: QueueMessage):
7779
# Upload the file
7880
output_file_remote_path = f'{self.get_directory_path(input_file_url)}/qm-{quality_request.data.jobId}-output.zip'
7981
output_file_url = self.storage_service.upload_local_file(output_file_local_path,output_file_remote_path)
80-
logging.info(f'Uploaded file to {output_file_url}')
82+
logger.info(f'Uploaded file to {output_file_url}')
8183

8284
response_data = {
8385
'status':'success',
@@ -94,10 +96,10 @@ def process_message(self, msg: QueueMessage):
9496
self.send_response(response)
9597
# Process the message
9698
# Clean up the download_folder
97-
logging.info('Cleaning up download folder')
99+
logger.info('Cleaning up download folder')
98100
shutil.rmtree(download_folder)
99101
except Exception as e:
100-
logging.error(f'Error processing message {msg.messageId} : {e}')
102+
logger.error(f'Error processing message {msg.messageId} : {e}')
101103
response_data = {
102104
'status':'failed',
103105
'message':str(e),

0 commit comments

Comments
 (0)