Skip to content

Commit c511931

Browse files
authored
Update executeEngine-omp.c
first attempt at parallelizing executeEngine w/ omp
1 parent 74570b8 commit c511931

File tree

1 file changed

+21
-9
lines changed

1 file changed

+21
-9
lines changed

engine/omp/executeEngine-omp.c

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,8 @@ struct resultSetS *executeQuerySelectSerial(
358358
// Get all indexed attributes in the WHERE clause, using the B+ tree indexes where possible
359359
struct whereClauseS *wc = whereClause;
360360
while (wc != NULL) {
361+
// Parallelized
362+
#pragma omp parallel for
361363
for (int i = 0; i < engine->num_indexes; i++) {
362364
if (strcmp(wc->attribute, engine->indexed_attributes[i]) == 0) {
363365
anyIndexExists = true;
@@ -366,15 +368,15 @@ struct resultSetS *executeQuerySelectSerial(
366368
// Use B+ tree index for this attribute
367369
node *cur_root = engine->bplus_tree_roots[i]; // B+ tree root for this indexed attribute
368370
FieldType type = engine->attribute_types[i];
369-
371+
370372
KEY_T key_start, key_end;
371373
bool typeSupported = true;
372374

373375
if (type == FIELD_UINT64) {
374376
unsigned long long val = strtoull(wc->value, NULL, 10);
375377
key_start.type = KEY_UINT64;
376378
key_end.type = KEY_UINT64;
377-
379+
378380
if (strcmp(wc->operator, "=") == 0) {
379381
key_start.v.u64 = val;
380382
key_end.v.u64 = val;
@@ -398,7 +400,7 @@ struct resultSetS *executeQuerySelectSerial(
398400
int val = atoi(wc->value);
399401
key_start.type = KEY_INT;
400402
key_end.type = KEY_INT;
401-
403+
402404
if (strcmp(wc->operator, "=") == 0) {
403405
key_start.v.i32 = val;
404406
key_end.v.i32 = val;
@@ -431,17 +433,27 @@ struct resultSetS *executeQuerySelectSerial(
431433
// Allocating for keys, using num_records as upper bound.
432434
KEY_T *returned_keys = malloc(engine->num_records * sizeof(KEY_T));
433435
ROW_PTR *returned_pointers = malloc(engine->num_records * sizeof(ROW_PTR));
434-
436+
435437
int num_found = findRange(cur_root, key_start, key_end, false, returned_keys, returned_pointers);
436-
438+
437439
// Add found records to matchingRecords
438440
if (num_found > 0) {
439-
// Reallocate matchingRecords if needed (though we alloc'd max size initially)
440-
for (int k = 0; k < num_found; k++) {
441-
matchingRecords[matchCount++] = (record *)returned_pointers[k];
441+
int numthreads = omp_get_thread_num();
442+
if (num_found > numthreads * 3) { // Parallelized
443+
// Reallocate matchingRecords if needed (though we alloc'd max size initially)
444+
#pragma for
445+
for (int k = 0; k < num_found; k++) {
446+
#pragma omp critical
447+
matchingRecords[matchCount++] = (record *)returned_pointers[k];
448+
}
449+
} else { // Unparallelized
450+
// Reallocate matchingRecords if needed (though we alloc'd max size initially)
451+
for (int k = 0; k < num_found; k++) {
452+
matchingRecords[matchCount++] = (record *)returned_pointers[k];
453+
}
442454
}
443455
}
444-
456+
445457
free(returned_keys);
446458
free(returned_pointers);
447459
}

0 commit comments

Comments
 (0)