diff --git a/src/calculators/qm_xn_lib_calculator.py b/src/calculators/qm_xn_lib_calculator.py index 382d44c..0a5803e 100644 --- a/src/calculators/qm_xn_lib_calculator.py +++ b/src/calculators/qm_xn_lib_calculator.py @@ -12,10 +12,11 @@ import itertools import numpy as np import pandas as pd +import os class QMXNLibCalculator(QMCalculator): - def __init__(self, edges_file_path:str, output_file_path:str, polygon_file_path:str=None): + def __init__(self, edges_file_path:str, output_file_path:str, polygon_file_path:str=None, partition_count:int = os.cpu_count()): """ Initializes the QMXNLibCalculator class. @@ -31,6 +32,7 @@ def __init__(self, edges_file_path:str, output_file_path:str, polygon_file_path: self.default_projection = 'epsg:26910' self.output_projection = 'epsg:4326' self.precision = 1e-5 + self.partition_count = partition_count def add_edges_from_linestring(self, graph, linestring, edge_attrs): points = list(linestring.coords) @@ -147,8 +149,8 @@ def calculate_quality_metric(self): gdf = gdf.to_crs(self.default_projection) tile_gdf = tile_gdf.to_crs(self.default_projection) tile_gdf = tile_gdf[['geometry']] - - df_dask = dask_geopandas.from_geopandas(tile_gdf, npartitions=64) + no_of_cores = min(self.partition_count, os.cpu_count()) + df_dask = dask_geopandas.from_geopandas(tile_gdf, npartitions=no_of_cores) output = df_dask.apply(self.qm_func,axis=1, meta=[ ('geometry', 'geometry'), diff --git a/src/calculators/xn_qm_lib.py b/src/calculators/xn_qm_lib.py index 2d72b24..411c4dc 100644 --- a/src/calculators/xn_qm_lib.py +++ b/src/calculators/xn_qm_lib.py @@ -170,7 +170,8 @@ def calculate_xn_qm(osw_edge_file_path: str, qm_file_path: str, xn_polygon_path: tile_gdf = tile_gdf[['geometry']] # Compute local stats using dask-geopandas - df_dask = dask_geopandas.from_geopandas(tile_gdf, npartitions=64) + no_of_cores = os.cpu_count() + df_dask = dask_geopandas.from_geopandas(tile_gdf, npartitions=no_of_cores) # print('computing stats...') output = df_dask.apply(qm_func, axis=1, meta=[ diff --git a/src/config.py b/src/config.py index 1658626..5822c7e 100644 --- a/src/config.py +++ b/src/config.py @@ -14,6 +14,7 @@ class Config(BaseSettings): storage_container_name: str = os.environ.get('CONTAINER_NAME', 'osw') algorithm_dictionary: dict = {"fixed":QMFixedCalculator,"ixn":QMXNLibCalculator} max_concurrent_messages: int = os.environ.get('MAX_CONCURRENT_MESSAGES', 1) + partition_count:int = os.environ.get('PARTITION_COUNT', 2) def get_download_folder(self) -> str: root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) diff --git a/src/services/osw_qm_calculator_service.py b/src/services/osw_qm_calculator_service.py index d91d883..8e4adb2 100644 --- a/src/services/osw_qm_calculator_service.py +++ b/src/services/osw_qm_calculator_service.py @@ -27,6 +27,16 @@ class OswQmCalculator: """ + def __init__(self, cores_to_use:int): + """ + Initializes the OswQmCalculator class. + + Args: + cores_to_use (int): The number of cores to use for calculating quality metrics. + + """ + self.cores_to_use = cores_to_use + def calculate_quality_metric(self, input_file, algorithm_names, output_path, ixn_file=None): """ Calculates quality metrics for input files using specified algorithms. @@ -93,7 +103,7 @@ def get_osw_qm_calculator(self, algorithm_name:str, ixn_file:str=None, edges_fil """ if algorithm_name == 'ixn': - return QMXNLibCalculator(edges_file, output_file, ixn_file) + return QMXNLibCalculator(edges_file, output_file, ixn_file, self.cores_to_use) else: return QMFixedCalculator(edges_file, output_file) @@ -164,7 +174,6 @@ def parse_and_calculate_quality_metric(self, input_file, algorithm_names, ixn_fi if 'ixn' in algorithm_names: # get the edges file from the input - ixn_calculator = QMXNLibCalculator() input_json = ixn_calculator.calculate_quality_metric(input_file, ixn_file) diff --git a/src/services/servicebus_service.py b/src/services/servicebus_service.py index 6831f79..3de184a 100644 --- a/src/services/servicebus_service.py +++ b/src/services/servicebus_service.py @@ -73,7 +73,8 @@ def process_message(self, msg: QueueMessage): output_folder = os.path.join(download_folder,'qm') os.makedirs(output_folder,exist_ok=True) output_file_local_path = os.path.join(output_folder,'qm-output.zip') - qm_calculator = OswQmCalculator() + cores_to_use = self.config.partition_count + qm_calculator = OswQmCalculator(cores_to_use=cores_to_use) algorithm_names = quality_request.data.algorithm.split(',') qm_calculator.calculate_quality_metric(download_path, algorithm_names,output_file_local_path,ixn_file_path) # Upload the file