Skip to content

Commit 58f1171

Browse files
committed
CANDIDATE UNCHECKED PARALLEL FUNCTIONALITY ADDED ON LOGANS PART TO OMP MPI. TWO FILES MAY NOT OPERATE!!
1 parent c511931 commit 58f1171

File tree

1 file changed

+164
-78
lines changed

1 file changed

+164
-78
lines changed

engine/mpi/executeEngine-mpi.c

Lines changed: 164 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
#include "../../include/buildEngine-mpi.h"
88
#include "../../include/executeEngine-mpi.h"
99
#include <mpi.h>
10+
#include <stdlib.h>
11+
#include <stdio.h>
12+
#include <stdbool.h>
1013
#define VERBOSE 0
1114

1215
// Function pointer type for WHERE condition evaluation
@@ -628,101 +631,184 @@ bool executeQueryInsertSerial(
628631
return success;
629632
}
630633

631-
/* Main functionality for DELETE logic
632-
* Parameters:
633-
* engine - constant engine object
634-
* tableName - name of the table
635-
* whereClause - WHERE clause (NULL for all rows)
636-
* Returns:
637-
* ResultSet containing number of deleted records
638-
*/
639-
struct resultSetS *executeQueryDeleteSerial(
640-
struct engineS *engine, // Constant engine object
641-
const char *tableName, // Table to delete from
642-
struct whereClauseS *whereClause // WHERE clause (NULL for all rows)
634+
// assumes same external declarations as in the OpenMP version
635+
636+
struct resultSetS *executeQueryDeleteMPI(
637+
struct engineS *engine,
638+
const char *tableName,
639+
struct whereClauseS *whereClause,
640+
MPI_Comm comm
643641
) {
642+
int rank, size;
643+
MPI_Comm_rank(comm, &rank);
644+
MPI_Comm_size(comm, &size);
645+
644646
struct resultSetS *result = (struct resultSetS *)malloc(sizeof(struct resultSetS));
645-
result->numRecords = 0;
646-
result->numColumns = 0;
647-
result->columnNames = NULL;
648-
result->columnTypes = NULL;
649-
result->data = NULL;
650-
result->queryTime = 0.0;
651-
result->success = false;
652-
653-
clock_t start = clock();
654-
int deletedCount = 0;
655-
int writeIndex = 0;
656-
657-
// Iterate through all records in the engine
658-
for (int i = 0; i < engine->num_records; i++) {
659-
record *currentRecord = engine->all_records[i];
660-
bool shouldDelete = false;
661-
662-
// Check if the record matches the WHERE clause
647+
if (!result) {
648+
return NULL;
649+
}
650+
651+
result->numRecords = 0;
652+
result->numColumns = 0;
653+
result->columnNames = NULL;
654+
result->columnTypes = NULL;
655+
result->data = NULL;
656+
result->queryTime = 0.0;
657+
result->success = false;
658+
659+
double start = MPI_Wtime();
660+
661+
// We assume num_records is the same on all ranks (e.g., broadcasted beforehand if needed)
662+
int num_records = engine->num_records;
663+
664+
// Block partition of records across ranks
665+
int base = num_records / size;
666+
int rem = num_records % size;
667+
668+
int local_n;
669+
int local_start;
670+
671+
if (rank < rem) {
672+
local_n = base + 1;
673+
local_start = rank * (base + 1);
674+
} else {
675+
local_n = base;
676+
local_start = rem * (base + 1) + (rank - rem) * base;
677+
}
678+
679+
// Local flags for this rank's chunk
680+
int *localFlags = (local_n > 0)
681+
? (int *)calloc(local_n, sizeof(int))
682+
: NULL;
683+
684+
int localDeleted = 0;
685+
686+
// Local WHERE evaluation
687+
for (int i = 0; i < local_n; i++) {
688+
int globalIdx = local_start + i;
689+
record *currentRecord = engine->all_records[globalIdx];
690+
bool shouldDelete;
691+
663692
if (whereClause == NULL) {
664-
shouldDelete = true; // Delete all if no WHERE clause
693+
shouldDelete = true;
665694
} else {
666695
shouldDelete = evaluateWhereClause(currentRecord, whereClause);
667696
}
668697

669698
if (shouldDelete) {
670-
// Delete the matched record
671-
672-
// Remove from B+ Tree Indexes
673-
for (int j = 0; j < engine->num_indexes; j++) {
674-
const char *indexed_attr = engine->indexed_attributes[j];
675-
KEY_T key = extract_key_from_record(currentRecord, indexed_attr);
676-
engine->bplus_tree_roots[j] = delete(engine->bplus_tree_roots[j], key, (ROW_PTR)currentRecord);
699+
localFlags[i] = 1;
700+
localDeleted++;
701+
}
702+
}
703+
704+
// Total deleted count across all ranks
705+
int globalDeleted = 0;
706+
MPI_Reduce(&localDeleted, &globalDeleted, 1, MPI_INT, MPI_SUM, 0, comm);
707+
708+
// Gather all flags on rank 0
709+
int *recvCounts = NULL;
710+
int *displs = NULL;
711+
int *globalFlags = NULL;
712+
713+
if (rank == 0) {
714+
recvCounts = (int *)malloc(size * sizeof(int));
715+
displs = (int *)malloc(size * sizeof(int));
716+
}
717+
718+
// Each rank sends its local_n
719+
MPI_Gather(&local_n, 1, MPI_INT,
720+
recvCounts, 1, MPI_INT,
721+
0, comm);
722+
723+
if (rank == 0) {
724+
int offset = 0;
725+
for (int r = 0; r < size; r++) {
726+
displs[r] = offset;
727+
offset += recvCounts[r];
728+
}
729+
730+
globalFlags = (int *)calloc(num_records, sizeof(int));
731+
}
732+
733+
MPI_Gatherv(localFlags, local_n, MPI_INT,
734+
globalFlags, recvCounts, displs, MPI_INT,
735+
0, comm);
736+
737+
if (localFlags) {
738+
free(localFlags);
739+
}
740+
741+
// Rank 0 applies deletions, updates indexes, compacts array, writes CSV
742+
if (rank == 0) {
743+
int writeIndex = 0;
744+
745+
for (int i = 0; i < num_records; i++) {
746+
record *currentRecord = engine->all_records[i];
747+
748+
if (globalFlags[i]) {
749+
// Remove from B+ Tree Indexes
750+
for (int j = 0; j < engine->num_indexes; j++) {
751+
const char *indexed_attr = engine->indexed_attributes[j];
752+
KEY_T key = extract_key_from_record(currentRecord, indexed_attr);
753+
engine->bplus_tree_roots[j] =
754+
delete(engine->bplus_tree_roots[j], key, (ROW_PTR)currentRecord);
755+
}
756+
757+
// Free record memory
758+
free(currentRecord);
759+
} else {
760+
// Keep record and compact
761+
if (writeIndex != i) {
762+
engine->all_records[writeIndex] = currentRecord;
763+
}
764+
writeIndex++;
677765
}
766+
}
767+
768+
engine->num_records = writeIndex;
678769

679-
// Free the record memory
680-
free(currentRecord);
681-
deletedCount++;
770+
// Rewrite CSV with remaining records
771+
FILE *file = fopen(engine->datafile, "w");
772+
if (file != NULL) {
773+
for (int i = 0; i < engine->num_records; i++) {
774+
record *r = engine->all_records[i];
775+
fprintf(file, "%llu,%s,%s,%s,%d,%s,%d,%s,%d,%s,%s,%d\n",
776+
r->command_id,
777+
r->raw_command,
778+
r->base_command,
779+
r->shell_type,
780+
r->exit_code,
781+
r->timestamp,
782+
r->sudo_used,
783+
r->working_directory,
784+
r->user_id,
785+
r->user_name,
786+
r->host_name,
787+
r->risk_level);
788+
}
789+
fclose(file);
682790
} else {
683-
// Keep the record, move it to the current write position if needed
684-
if (writeIndex != i) {
685-
engine->all_records[writeIndex] = currentRecord;
791+
if (VERBOSE) {
792+
fprintf(stderr,
793+
"Failed to open data file for rewriting: %s\n",
794+
engine->datafile);
686795
}
687-
writeIndex++;
688796
}
689-
}
690797

691-
// Update the record count in the engine
692-
engine->num_records = writeIndex;
693-
694-
// Rewrite the CSV file with the remaining records
695-
FILE *file = fopen(engine->datafile, "w");
696-
if (file != NULL) {
697-
for (int i = 0; i < engine->num_records; i++) {
698-
record *r = engine->all_records[i];
699-
fprintf(file, "%llu,%s,%s,%s,%d,%s,%d,%s,%d,%s,%s,%d\n",
700-
r->command_id,
701-
r->raw_command,
702-
r->base_command,
703-
r->shell_type,
704-
r->exit_code,
705-
r->timestamp,
706-
r->sudo_used,
707-
r->working_directory,
708-
r->user_id,
709-
r->user_name,
710-
r->host_name,
711-
r->risk_level);
712-
}
713-
fclose(file);
798+
double time_taken = MPI_Wtime() - start;
799+
800+
result->numRecords = globalDeleted;
801+
result->queryTime = time_taken;
802+
result->success = true;
803+
804+
if (globalFlags) free(globalFlags);
805+
if (recvCounts) free(recvCounts);
806+
if (displs) free(displs);
714807
} else {
715-
if (VERBOSE) {
716-
fprintf(stderr, "Failed to open data file for rewriting: %s\n", engine->datafile);
717-
}
808+
// Non-root ranks: result is not meaningful; optionally set success=false explicitly
809+
result->success = false;
718810
}
719811

720-
double time_taken = ((double)clock() - start) / CLOCKS_PER_SEC;
721-
722-
result->numRecords = deletedCount;
723-
result->queryTime = time_taken;
724-
result->success = true;
725-
726812
return result;
727813
}
728814

0 commit comments

Comments
 (0)