Skip to content

Commit 90a9a49

Browse files
authored
Update executeEngine-mpi.c
First attempt at parallelizing executeEngine w/ MPI (note get error when compiling but still creates object file)
1 parent 30a0e53 commit 90a9a49

File tree

1 file changed

+85
-41
lines changed

1 file changed

+85
-41
lines changed

engine/mpi/executeEngine-mpi.c

Lines changed: 85 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -357,30 +357,54 @@ struct resultSetS *executeQuerySelectSerial(
357357

358358
// Start the timer
359359
clock_t start = clock(); // Start a timer
360-
360+
// Replaced whil-loop with parallelized MPI version of code
361361
// Get all indexed attributes in the WHERE clause, using the B+ tree indexes where possible
362-
struct whereClauseS *wc = whereClause;
363-
while (wc != NULL) {
362+
/* --- Parallel WHERE clause index processing --- */
363+
364+
int rank, size;
365+
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
366+
MPI_Comm_size(MPI_COMM_WORLD, &size);
367+
368+
// Convert linked list into array for easier slicing
369+
int wc_count = 0;
370+
struct whereClauseS *tmp = whereClause;
371+
while (tmp) {
372+
wc_count++;
373+
tmp = tmp->next;
374+
}
375+
376+
struct whereClauseS **wc_array = malloc(wc_count * sizeof(struct whereClauseS *));
377+
tmp = whereClause;
378+
for (int i = 0; i < wc_count; i++) {
379+
wc_array[i] = tmp;
380+
tmp = tmp->next;
381+
}
382+
383+
// Determine each process’s work range
384+
int chunk = (wc_count + size - 1) / size;
385+
int st = rank * chunk;
386+
int ed = (start + chunk < wc_count) ? start + chunk : wc_count;
387+
388+
// Local result buffer
389+
record **local_matches = malloc(engine->num_records * sizeof(record *));
390+
int local_match_count = 0;
391+
392+
// Each rank processes its assigned WHERE clauses
393+
for (int idx = start; idx < ed; idx++) {
394+
struct whereClauseS *wc = wc_array[idx];
364395
for (int i = 0; i < engine->num_indexes; i++) {
365396
if (strcmp(wc->attribute, engine->indexed_attributes[i]) == 0) {
366-
anyIndexExists = true;
367-
indexExists[i] = true;
368-
369-
// Use B+ tree index for this attribute
370-
node *cur_root = engine->bplus_tree_roots[i]; // B+ tree root for this indexed attribute
397+
node *cur_root = engine->bplus_tree_roots[i];
371398
FieldType type = engine->attribute_types[i];
372-
399+
373400
KEY_T key_start, key_end;
374401
bool typeSupported = true;
375-
402+
376403
if (type == FIELD_UINT64) {
377404
unsigned long long val = strtoull(wc->value, NULL, 10);
378-
key_start.type = KEY_UINT64;
379-
key_end.type = KEY_UINT64;
380-
405+
key_start.type = key_end.type = KEY_UINT64;
381406
if (strcmp(wc->operator, "=") == 0) {
382-
key_start.v.u64 = val;
383-
key_end.v.u64 = val;
407+
key_start.v.u64 = key_end.v.u64 = val;
384408
} else if (strcmp(wc->operator, ">") == 0) {
385409
key_start.v.u64 = val + 1;
386410
key_end.v.u64 = UINT64_MAX;
@@ -399,12 +423,9 @@ struct resultSetS *executeQuerySelectSerial(
399423
}
400424
} else if (type == FIELD_INT) {
401425
int val = atoi(wc->value);
402-
key_start.type = KEY_INT;
403-
key_end.type = KEY_INT;
404-
426+
key_start.type = key_end.type = KEY_INT;
405427
if (strcmp(wc->operator, "=") == 0) {
406-
key_start.v.i32 = val;
407-
key_end.v.i32 = val;
428+
key_start.v.i32 = key_end.v.i32 = val;
408429
} else if (strcmp(wc->operator, ">") == 0) {
409430
key_start.v.i32 = val + 1;
410431
key_end.v.i32 = INT_MAX;
@@ -422,38 +443,61 @@ struct resultSetS *executeQuerySelectSerial(
422443
key_end.v.i32 = INT_MAX;
423444
}
424445
} else {
425-
// Fallback for unsupported types in index search
426446
typeSupported = false;
427-
indexExists[i] = false;
428-
}
429-
430-
if (!typeSupported) {
431-
continue;
432447
}
433-
434-
// Allocating for keys, using num_records as upper bound.
448+
449+
if (!typeSupported) continue;
450+
435451
KEY_T *returned_keys = malloc(engine->num_records * sizeof(KEY_T));
436452
ROW_PTR *returned_pointers = malloc(engine->num_records * sizeof(ROW_PTR));
437-
438453
int num_found = findRange(cur_root, key_start, key_end, false, returned_keys, returned_pointers);
439-
440-
// Add found records to matchingRecords
441-
if (num_found > 0) {
442-
// Reallocate matchingRecords if needed (though we alloc'd max size initially)
443-
for (int k = 0; k < num_found; k++) {
444-
matchingRecords[matchCount++] = (record *)returned_pointers[k];
445-
}
454+
455+
for (int k = 0; k < num_found; k++) {
456+
local_matches[local_match_count++] = (record *)returned_pointers[k];
446457
}
447-
458+
448459
free(returned_keys);
449460
free(returned_pointers);
450461
}
451-
else {
452-
indexExists[i] = false;
453-
}
454462
}
455-
wc = wc->next;
456463
}
464+
465+
// Gather results from all ranks
466+
int *recvcounts = NULL, *displs = NULL;
467+
int local_count = local_match_count;
468+
if (rank == 0) {
469+
recvcounts = malloc(size * sizeof(int));
470+
}
471+
MPI_Gather(&local_count, 1, MPI_INT, recvcounts, 1, MPI_INT, 0, MPI_COMM_WORLD);
472+
473+
record **global_matches = NULL;
474+
int total = 0;
475+
if (rank == 0) {
476+
displs = malloc(size * sizeof(int));
477+
displs[0] = 0;
478+
for (int i = 0; i < size; i++) {
479+
total += recvcounts[i];
480+
if (i > 0) displs[i] = displs[i - 1] + recvcounts[i - 1];
481+
}
482+
global_matches = malloc(total * sizeof(record *));
483+
}
484+
485+
MPI_Gatherv(local_matches, local_count, MPI_AINT, global_matches, recvcounts, displs, MPI_AINT, 0, MPI_COMM_WORLD);
486+
487+
if (rank == 0) {
488+
// merge deduplicate
489+
matchingRecords = global_matches;
490+
matchCount = 0;
491+
for (int i = 0; i < total; i++) {
492+
matchingRecords[matchCount++] = global_matches[i];
493+
}
494+
}
495+
496+
free(local_matches);
497+
if (rank == 0) { free(recvcounts); free(displs); }
498+
free(wc_array);
499+
500+
/* --- END PARALLEL SECTION --- */
457501

458502

459503
// Perform linear search if on all non-indexed attributes or no indexes matched

0 commit comments

Comments
 (0)