Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/calculators/qm_xn_lib_calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
import itertools
import numpy as np
import pandas as pd
import os


class QMXNLibCalculator(QMCalculator):
def __init__(self, edges_file_path:str, output_file_path:str, polygon_file_path:str=None):
def __init__(self, edges_file_path:str, output_file_path:str, polygon_file_path:str=None, partition_count:int = os.cpu_count()):
"""
Initializes the QMXNLibCalculator class.

Expand All @@ -31,6 +32,7 @@ def __init__(self, edges_file_path:str, output_file_path:str, polygon_file_path:
self.default_projection = 'epsg:26910'
self.output_projection = 'epsg:4326'
self.precision = 1e-5
self.partition_count = partition_count

def add_edges_from_linestring(self, graph, linestring, edge_attrs):
points = list(linestring.coords)
Expand Down Expand Up @@ -147,8 +149,8 @@ def calculate_quality_metric(self):
gdf = gdf.to_crs(self.default_projection)
tile_gdf = tile_gdf.to_crs(self.default_projection)
tile_gdf = tile_gdf[['geometry']]

df_dask = dask_geopandas.from_geopandas(tile_gdf, npartitions=64)
no_of_cores = min(self.partition_count, os.cpu_count())
df_dask = dask_geopandas.from_geopandas(tile_gdf, npartitions=no_of_cores)

output = df_dask.apply(self.qm_func,axis=1, meta=[
('geometry', 'geometry'),
Expand Down
3 changes: 2 additions & 1 deletion src/calculators/xn_qm_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ def calculate_xn_qm(osw_edge_file_path: str, qm_file_path: str, xn_polygon_path:
tile_gdf = tile_gdf[['geometry']]

# Compute local stats using dask-geopandas
df_dask = dask_geopandas.from_geopandas(tile_gdf, npartitions=64)
no_of_cores = os.cpu_count()
df_dask = dask_geopandas.from_geopandas(tile_gdf, npartitions=no_of_cores)

# print('computing stats...')
output = df_dask.apply(qm_func, axis=1, meta=[
Expand Down
1 change: 1 addition & 0 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class Config(BaseSettings):
storage_container_name: str = os.environ.get('CONTAINER_NAME', 'osw')
algorithm_dictionary: dict = {"fixed":QMFixedCalculator,"ixn":QMXNLibCalculator}
max_concurrent_messages: int = os.environ.get('MAX_CONCURRENT_MESSAGES', 1)
partition_count:int = os.environ.get('PARTITION_COUNT', 2)

def get_download_folder(self) -> str:
root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
Expand Down
13 changes: 11 additions & 2 deletions src/services/osw_qm_calculator_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ class OswQmCalculator:

"""

def __init__(self, cores_to_use:int):
"""
Initializes the OswQmCalculator class.

Args:
cores_to_use (int): The number of cores to use for calculating quality metrics.

"""
self.cores_to_use = cores_to_use

def calculate_quality_metric(self, input_file, algorithm_names, output_path, ixn_file=None):
"""
Calculates quality metrics for input files using specified algorithms.
Expand Down Expand Up @@ -93,7 +103,7 @@ def get_osw_qm_calculator(self, algorithm_name:str, ixn_file:str=None, edges_fil

"""
if algorithm_name == 'ixn':
return QMXNLibCalculator(edges_file, output_file, ixn_file)
return QMXNLibCalculator(edges_file, output_file, ixn_file, self.cores_to_use)
else:
return QMFixedCalculator(edges_file, output_file)

Expand Down Expand Up @@ -164,7 +174,6 @@ def parse_and_calculate_quality_metric(self, input_file, algorithm_names, ixn_fi

if 'ixn' in algorithm_names:
# get the edges file from the input

ixn_calculator = QMXNLibCalculator()
input_json = ixn_calculator.calculate_quality_metric(input_file, ixn_file)

Expand Down
3 changes: 2 additions & 1 deletion src/services/servicebus_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ def process_message(self, msg: QueueMessage):
output_folder = os.path.join(download_folder,'qm')
os.makedirs(output_folder,exist_ok=True)
output_file_local_path = os.path.join(output_folder,'qm-output.zip')
qm_calculator = OswQmCalculator()
cores_to_use = self.config.partition_count
qm_calculator = OswQmCalculator(cores_to_use=cores_to_use)
algorithm_names = quality_request.data.algorithm.split(',')
qm_calculator.calculate_quality_metric(download_path, algorithm_names,output_file_local_path,ixn_file_path)
# Upload the file
Expand Down
Loading