Skip to content

Commit 71021f9

Browse files
committed
Fixing initialization taking forever
Also optimizing some more stuff, *should* be good to go now. Going to continue testing.
1 parent 8876b2b commit 71021f9

File tree

12 files changed

+282
-134
lines changed

12 files changed

+282
-134
lines changed

QPEMPI

1.3 KB
Binary file not shown.

QPEMPI.c

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -151,21 +151,20 @@ int main(int argc, char *argv[]) {
151151

152152
set_rank(rank);
153153

154-
// Determine data file from CLI args or default
155-
const char *dataFile = DATA_FILE;
156-
if (argc > 1) {
157-
dataFile = argv[1];
158-
}
159-
160154
// Start a timer for total runtime statistics
161155
double totalStart = MPI_Wtime();
162156

157+
const char *data_file = DATA_FILE;
158+
if (argc > 1) {
159+
data_file = argv[1];
160+
}
161+
163162
// Instantiate an engine object to handle the execution of the query
164163
struct engineS *engine = initializeEngineMPI(
165164
numOptimalIndexes, // Number of indexes
166165
optimalIndexes, // Indexes to build B+ trees for
167166
(const int *)optimalIndexTypes, // Index types
168-
dataFile,
167+
data_file,
169168
TABLE_NAME
170169
);
171170

@@ -292,14 +291,10 @@ int main(int argc, char *argv[]) {
292291
execTime = MPI_Wtime() - start;
293292
}
294293

295-
// Synchronize all ranks before printing to ensure ordered output
296-
MPI_Barrier(MPI_COMM_WORLD);
294+
// Print results (Owner only) - No barriers for performance
295+
if (is_owner) {
296+
printf("Executing Query: %s\n", query);
297297

298-
// Print results in rank order
299-
for (int print_rank = 0; print_rank < size; print_rank++) {
300-
if (rank == print_rank && is_owner) {
301-
printf("Executing Query: %s\n", query);
302-
303298
if (parseFailed) {
304299
printf("Tokenization failed.\n");
305300
} else {
@@ -326,8 +321,6 @@ int main(int argc, char *argv[]) {
326321
fprintf(stderr, "Unsupported command.\n");
327322
}
328323
}
329-
}
330-
MPI_Barrier(MPI_COMM_WORLD);
331324
}
332325

333326
// Cleanup
@@ -337,12 +330,12 @@ int main(int argc, char *argv[]) {
337330
// Print total runtime statistics in pretty colors (Rank 0 only)
338331
if (rank == 0) {
339332
double totalTimeTaken = MPI_Wtime() - totalStart;
340-
printf(CYAN "======= Execution Summary =======" RESET "\n");
333+
printf(CYAN "======= MPI Execution Summary =======" RESET "\n");
341334
printf(CYAN "Engine Initialization Time: " RESET YELLOW "%.4f seconds\n" RESET, initTimeTaken);
342335
printf(CYAN "Query Loading Time: " RESET YELLOW "%.4f seconds\n" RESET, loadTimeTaken - initTimeTaken);
343336
printf(CYAN "Query Execution Time: " RESET YELLOW "%.4f seconds\n" RESET, totalTimeTaken - loadTimeTaken);
344337
printf(BOLD CYAN "Total Execution Time: " RESET BOLD YELLOW "%.4f seconds" RESET "\n", totalTimeTaken);
345-
printf(CYAN "=================================" RESET "\n");
338+
printf(CYAN "=====================================" RESET "\n");
346339
}
347340

348341
// printf("Rank %d: Freeing buffer...\n", rank);

QPEOMP.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -339,12 +339,12 @@ int main(int argc, char *argv[]) {
339339

340340
// Print total runtime statistics in pretty colors
341341
double totalTimeTaken = omp_get_wtime() - totalStart;
342-
printf(CYAN "======= Execution Summary =======" RESET "\n");
342+
printf(CYAN "======= OMP Execution Summary =======" RESET "\n");
343343
printf(CYAN "Engine Initialization Time: " RESET YELLOW "%.4f seconds\n" RESET, initTimeTaken);
344344
printf(CYAN "Query Loading Time: " RESET YELLOW "%.4f seconds\n" RESET, loadTimeTaken - initTimeTaken);
345345
printf(CYAN "Query Execution Time: " RESET YELLOW "%.4f seconds\n" RESET, totalTimeTaken - loadTimeTaken);
346346
printf(BOLD CYAN "Total Execution Time: " RESET BOLD YELLOW "%.4f seconds" RESET "\n", totalTimeTaken);
347-
printf(CYAN "=================================" RESET "\n");
347+
printf(CYAN "=====================================" RESET "\n");
348348

349349
return EXIT_SUCCESS;
350350
}

QPESeq.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,12 @@ int main(int argc, char *argv[]) {
8686

8787
// Print total runtime statistics in pretty colors
8888
double totalTimeTaken = ((double)clock() - totalStart) / CLOCKS_PER_SEC;
89-
printf(CYAN "======= Execution Summary =======" RESET "\n");
89+
printf(CYAN "======= Serial Execution Summary =======" RESET "\n");
9090
printf(CYAN "Engine Initialization Time: " RESET YELLOW "%.4f seconds\n" RESET, initTimeTaken);
9191
printf(CYAN "Query Loading Time: " RESET YELLOW "%.4f seconds\n" RESET, loadTimeTaken - initTimeTaken);
9292
printf(CYAN "Query Execution Time: " RESET YELLOW "%.4f seconds\n" RESET, totalTimeTaken - loadTimeTaken);
9393
printf(BOLD CYAN "Total Execution Time: " RESET BOLD YELLOW "%.4f seconds" RESET "\n", totalTimeTaken);
94-
printf(CYAN "=================================" RESET "\n");
94+
printf(CYAN "========================================" RESET "\n");
9595

9696
return EXIT_SUCCESS;
9797
}

benchmark.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,11 +128,10 @@ def run_benchmark():
128128

129129
if cores:
130130
count = 0
131-
# Divide cores by 2 to account for threading stuff - quick fix
132131
if cores == "ALL":
133-
count = os.cpu_count()/2 or 4
132+
count = os.cpu_count() or 4
134133
else:
135-
count = int(cores)/2
134+
count = int(cores)
136135

137136
omp_prefix = f"OMP_NUM_THREADS={count} "
138137
mpi_prefix = f"mpirun -np {count} "

engine/mpi/buildEngine-mpi.c

Lines changed: 92 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -69,43 +69,110 @@ node *loadIntoBplusTreeMPI(record **records, int num_records, const char *attrib
6969
* Array of all record structs in the file
7070
*/
7171
record **getAllRecordsFromFileMPI(const char *filepath, int *num_records) {
72-
// Attempt to open the file from the provided path
73-
FILE *file = fopen(filepath, "r");
74-
if (file == NULL) {
75-
fprintf(stderr, "Error opening file: %s\n", filepath);
76-
return NULL;
72+
int rank;
73+
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
74+
75+
char *file_content = NULL;
76+
long file_size = 0;
77+
78+
// Rank 0 reads the file from disk
79+
if (rank == 0) {
80+
FILE *file = fopen(filepath, "r");
81+
if (file == NULL) {
82+
fprintf(stderr, "Error opening file: %s\n", filepath);
83+
file_size = -1; // Signal error
84+
} else {
85+
fseek(file, 0, SEEK_END);
86+
file_size = ftell(file);
87+
fseek(file, 0, SEEK_SET);
88+
89+
file_content = (char *)malloc(file_size + 1);
90+
if (file_content) {
91+
size_t read_size = fread(file_content, 1, file_size, file);
92+
if (read_size != (size_t)file_size) {
93+
fprintf(stderr, "Error reading file content\n");
94+
free(file_content);
95+
file_content = NULL;
96+
file_size = -1;
97+
} else {
98+
file_content[file_size] = '\0';
99+
}
100+
} else {
101+
file_size = -1; // Memory error
102+
}
103+
fclose(file);
104+
}
105+
}
106+
107+
// Broadcast file size to all ranks
108+
MPI_Bcast(&file_size, 1, MPI_LONG, 0, MPI_COMM_WORLD);
109+
110+
if (file_size < 0) {
111+
return NULL; // Propagate error
77112
}
78113

79-
// Initialize the records and count
114+
// Allocate memory on other ranks
115+
if (rank != 0) {
116+
file_content = (char *)malloc(file_size + 1);
117+
if (!file_content) {
118+
fprintf(stderr, "Rank %d failed to allocate memory for file content\n", rank);
119+
// In a real app, we should signal abort here
120+
return NULL;
121+
}
122+
}
123+
124+
// Broadcast file content to all ranks
125+
// Note: MPI_Bcast count is int. If file_size > INT_MAX, this needs chunking.
126+
// For this project, we assume file_size fits in int.
127+
MPI_Bcast(file_content, (int)file_size + 1, MPI_CHAR, 0, MPI_COMM_WORLD);
128+
129+
// Parse records from memory buffer
80130
record **records = NULL;
81-
char line[1024];
82131
int count = 0;
132+
133+
char *cursor = file_content;
134+
135+
// Skip header line
136+
char *eol = strchr(cursor, '\n');
137+
if (eol) {
138+
cursor = eol + 1;
139+
}
83140

84-
// Read each line and populate the records array
85-
bool first_line = true;
86-
while (fgets(line, sizeof(line), file)) {
87-
if (first_line) {
88-
first_line = false;
89-
continue; // Skip header
141+
while (*cursor) {
142+
// Find end of line
143+
eol = strchr(cursor, '\n');
144+
if (!eol) {
145+
// Handle last line without newline
146+
if (*cursor) eol = cursor + strlen(cursor);
147+
else break;
90148
}
91-
record *new_record = getRecordFromLineMPI(line);
92-
records = realloc(records, (count + 1) * sizeof(record *));
93-
if (records == NULL) {
94-
fprintf(stderr, "Memory allocation failed\n");
95-
fclose(file);
96-
return NULL;
149+
150+
if (eol > cursor) {
151+
// Temporarily null-terminate the line to use existing parser
152+
char save = *eol;
153+
*eol = '\0';
154+
155+
record *new_record = getRecordFromLineMPI(cursor);
156+
if (new_record) {
157+
records = realloc(records, (count + 1) * sizeof(record *));
158+
records[count] = new_record;
159+
count++;
160+
}
161+
162+
*eol = save; // Restore (good practice)
97163
}
98-
records[count] = new_record;
99-
count++;
164+
165+
if (*eol == '\0') break;
166+
cursor = eol + 1;
100167
}
101168

102-
// Close the file, set the output count, and return the records array
103-
fclose(file);
104-
if(VERBOSE) {
169+
free(file_content);
170+
171+
if(VERBOSE && rank == 0) {
105172
printf("Loaded %d records from file: %s\n", count, filepath);
106173
}
107174
*num_records = count;
108-
return records; // Return the array of all records
175+
return records;
109176
}
110177

111178
/* Helper to parse a CSV field, handling quotes and commas */

engine/mpi/executeEngine-mpi.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -878,6 +878,7 @@ struct engineS *initializeEngineMPI(
878878
engine->attribute_types = (FieldType *)malloc(num_indexes * sizeof(FieldType));
879879
engine->all_records = NULL; // Initialize to NULL, will be set later
880880
engine->num_records = 0; // Initialize record count to 0
881+
engine->record_block = NULL; // Initialize to NULL
881882
if (engine->bplus_tree_roots == NULL || engine->indexed_attributes == NULL || engine->attribute_types == NULL) {
882883
perror("Failed to allocate memory for engine components");
883884
free(engine);

0 commit comments

Comments
 (0)