From 059c4879253f3c65b471b30b6a507cf8c00425ed Mon Sep 17 00:00:00 2001 From: Naresh Kumar D Date: Fri, 8 Nov 2024 08:28:00 +0530 Subject: [PATCH 1/3] partitions are as per cores --- src/calculators/qm_xn_lib_calculator.py | 4 ++-- src/calculators/xn_qm_lib.py | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/calculators/qm_xn_lib_calculator.py b/src/calculators/qm_xn_lib_calculator.py index 382d44c..350e1a0 100644 --- a/src/calculators/qm_xn_lib_calculator.py +++ b/src/calculators/qm_xn_lib_calculator.py @@ -147,8 +147,8 @@ def calculate_quality_metric(self): gdf = gdf.to_crs(self.default_projection) tile_gdf = tile_gdf.to_crs(self.default_projection) tile_gdf = tile_gdf[['geometry']] - - df_dask = dask_geopandas.from_geopandas(tile_gdf, npartitions=64) + no_of_cores = os.cpu_count() + df_dask = dask_geopandas.from_geopandas(tile_gdf, npartitions=no_of_cores) output = df_dask.apply(self.qm_func,axis=1, meta=[ ('geometry', 'geometry'), diff --git a/src/calculators/xn_qm_lib.py b/src/calculators/xn_qm_lib.py index 2d72b24..411c4dc 100644 --- a/src/calculators/xn_qm_lib.py +++ b/src/calculators/xn_qm_lib.py @@ -170,7 +170,8 @@ def calculate_xn_qm(osw_edge_file_path: str, qm_file_path: str, xn_polygon_path: tile_gdf = tile_gdf[['geometry']] # Compute local stats using dask-geopandas - df_dask = dask_geopandas.from_geopandas(tile_gdf, npartitions=64) + no_of_cores = os.cpu_count() + df_dask = dask_geopandas.from_geopandas(tile_gdf, npartitions=no_of_cores) # print('computing stats...') output = df_dask.apply(qm_func, axis=1, meta=[ From e9482c6183472832c431ecdf8d98397ba47fcccc Mon Sep 17 00:00:00 2001 From: Naresh Kumar D Date: Fri, 8 Nov 2024 08:35:08 +0530 Subject: [PATCH 2/3] made cores to use configurable Configurable cores to use. --- src/calculators/qm_xn_lib_calculator.py | 5 +++-- src/config.py | 1 + src/services/osw_qm_calculator_service.py | 13 +++++++++++-- src/services/servicebus_service.py | 3 ++- 4 files changed, 17 insertions(+), 5 deletions(-) diff --git a/src/calculators/qm_xn_lib_calculator.py b/src/calculators/qm_xn_lib_calculator.py index 350e1a0..8731207 100644 --- a/src/calculators/qm_xn_lib_calculator.py +++ b/src/calculators/qm_xn_lib_calculator.py @@ -15,7 +15,7 @@ class QMXNLibCalculator(QMCalculator): - def __init__(self, edges_file_path:str, output_file_path:str, polygon_file_path:str=None): + def __init__(self, edges_file_path:str, output_file_path:str, polygon_file_path:str=None, partition_count:int = os.cpu_count()): """ Initializes the QMXNLibCalculator class. @@ -31,6 +31,7 @@ def __init__(self, edges_file_path:str, output_file_path:str, polygon_file_path: self.default_projection = 'epsg:26910' self.output_projection = 'epsg:4326' self.precision = 1e-5 + self.partition_count = partition_count def add_edges_from_linestring(self, graph, linestring, edge_attrs): points = list(linestring.coords) @@ -147,7 +148,7 @@ def calculate_quality_metric(self): gdf = gdf.to_crs(self.default_projection) tile_gdf = tile_gdf.to_crs(self.default_projection) tile_gdf = tile_gdf[['geometry']] - no_of_cores = os.cpu_count() + no_of_cores = min(self.partition_count, os.cpu_count()) df_dask = dask_geopandas.from_geopandas(tile_gdf, npartitions=no_of_cores) output = df_dask.apply(self.qm_func,axis=1, meta=[ diff --git a/src/config.py b/src/config.py index 1658626..5822c7e 100644 --- a/src/config.py +++ b/src/config.py @@ -14,6 +14,7 @@ class Config(BaseSettings): storage_container_name: str = os.environ.get('CONTAINER_NAME', 'osw') algorithm_dictionary: dict = {"fixed":QMFixedCalculator,"ixn":QMXNLibCalculator} max_concurrent_messages: int = os.environ.get('MAX_CONCURRENT_MESSAGES', 1) + partition_count:int = os.environ.get('PARTITION_COUNT', 2) def get_download_folder(self) -> str: root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) diff --git a/src/services/osw_qm_calculator_service.py b/src/services/osw_qm_calculator_service.py index d91d883..8e4adb2 100644 --- a/src/services/osw_qm_calculator_service.py +++ b/src/services/osw_qm_calculator_service.py @@ -27,6 +27,16 @@ class OswQmCalculator: """ + def __init__(self, cores_to_use:int): + """ + Initializes the OswQmCalculator class. + + Args: + cores_to_use (int): The number of cores to use for calculating quality metrics. + + """ + self.cores_to_use = cores_to_use + def calculate_quality_metric(self, input_file, algorithm_names, output_path, ixn_file=None): """ Calculates quality metrics for input files using specified algorithms. @@ -93,7 +103,7 @@ def get_osw_qm_calculator(self, algorithm_name:str, ixn_file:str=None, edges_fil """ if algorithm_name == 'ixn': - return QMXNLibCalculator(edges_file, output_file, ixn_file) + return QMXNLibCalculator(edges_file, output_file, ixn_file, self.cores_to_use) else: return QMFixedCalculator(edges_file, output_file) @@ -164,7 +174,6 @@ def parse_and_calculate_quality_metric(self, input_file, algorithm_names, ixn_fi if 'ixn' in algorithm_names: # get the edges file from the input - ixn_calculator = QMXNLibCalculator() input_json = ixn_calculator.calculate_quality_metric(input_file, ixn_file) diff --git a/src/services/servicebus_service.py b/src/services/servicebus_service.py index 6831f79..3de184a 100644 --- a/src/services/servicebus_service.py +++ b/src/services/servicebus_service.py @@ -73,7 +73,8 @@ def process_message(self, msg: QueueMessage): output_folder = os.path.join(download_folder,'qm') os.makedirs(output_folder,exist_ok=True) output_file_local_path = os.path.join(output_folder,'qm-output.zip') - qm_calculator = OswQmCalculator() + cores_to_use = self.config.partition_count + qm_calculator = OswQmCalculator(cores_to_use=cores_to_use) algorithm_names = quality_request.data.algorithm.split(',') qm_calculator.calculate_quality_metric(download_path, algorithm_names,output_file_local_path,ixn_file_path) # Upload the file From 2b4fc664430b40b399bf091e2e71017b88d78edb Mon Sep 17 00:00:00 2001 From: Naresh Kumar D Date: Fri, 8 Nov 2024 08:37:15 +0530 Subject: [PATCH 3/3] Update qm_xn_lib_calculator.py added missing import --- src/calculators/qm_xn_lib_calculator.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/calculators/qm_xn_lib_calculator.py b/src/calculators/qm_xn_lib_calculator.py index 8731207..0a5803e 100644 --- a/src/calculators/qm_xn_lib_calculator.py +++ b/src/calculators/qm_xn_lib_calculator.py @@ -12,6 +12,7 @@ import itertools import numpy as np import pandas as pd +import os class QMXNLibCalculator(QMCalculator):