Skip to content

Commit 74570b8

Browse files
author
Sam Dickerson
committed
Merge branch 'main' of github.com:Jairik/Parallel-Query-Processing-System
merge commit 12/2
2 parents 1fdcd81 + e11d4c1 commit 74570b8

File tree

8 files changed

+982
-4
lines changed

8 files changed

+982
-4
lines changed

engine/mpi/buildEngine-mpi.c

Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
/* Skeleton for the Serial Implementation - uses the bplus serial engine and tokenizer to execute a provided SQL query */
2+
3+
#define _POSIX_C_SOURCE 200809L // Enable strdup
4+
#include "../../include/buildEngine-mpi.h"
5+
#include <string.h>
6+
#include <strings.h>
7+
#include <mpi.h>
8+
#define VERBOSE 0 // Essentially testing mode
9+
10+
// Forward declaration
11+
FieldType mapAttributeType(int attributeType);
12+
13+
// Creates a serial B+ tree from data file, returns the tree root
14+
bool makeIndexSerial(struct engineS *engine, const char *indexName, int attributeType) {
15+
// Load all records from the engine's data source
16+
record **records = engine->all_records;
17+
int numRecords = engine->num_records;
18+
19+
// Build the B+ tree from the records array
20+
node *root = loadIntoBplusTree(records, numRecords, indexName);
21+
if (VERBOSE && root == NULL) {
22+
fprintf(stderr, "Failed to load data into B+ tree\n");
23+
}
24+
25+
// Add to the engine's known tree roots
26+
engine->bplus_tree_roots[engine->num_indexes] = root;
27+
engine->indexed_attributes[engine->num_indexes] = strdup(indexName);
28+
engine->num_indexes += 1;
29+
engine->attribute_types[engine->num_indexes-1] = mapAttributeType(attributeType);
30+
31+
return (engine->bplus_tree_roots[engine->num_indexes-1]) != NULL; // Return success status
32+
}
33+
34+
/* Loads in all data from the array of records into a B+ tree
35+
* Parameters:
36+
* records - array of record pointers
37+
* num_records - number of records in the array
38+
* attributeName - name of the attribute to index
39+
* Returns:
40+
* root of the B+ tree
41+
*/
42+
node *loadIntoBplusTree(record **records, int num_records, const char *attributeName) {
43+
// Instantiate the B+ tree root
44+
node *root = NULL;
45+
46+
// Iterate through each record and insert into the B+ tree
47+
for (int i = 0; i < num_records; i++) {
48+
49+
// Extract the current record as a key
50+
record *currentRecord = records[i];
51+
KEY_T key = extract_key_from_record(currentRecord, attributeName);
52+
53+
// Insert the record into the B+ tree using the key
54+
root = insert(root, key, (ROW_PTR)currentRecord);
55+
56+
// Validate insertion via printing (only in verbose mode)
57+
if (VERBOSE) {
58+
printTree(root);
59+
}
60+
}
61+
62+
return root; // Return the root of the constructed B+ tree
63+
}
64+
65+
/* Load the full CSV file into memory as an array of record structs
66+
* Parameters:
67+
* filepath - path to the CSV data file
68+
* Returns:
69+
* Array of all record structs in the file
70+
*/
71+
record **getAllRecordsFromFile(const char *filepath, int *num_records) {
72+
// Attempt to open the file from the provided path
73+
FILE *file = fopen(filepath, "r");
74+
if (file == NULL) {
75+
fprintf(stderr, "Error opening file: %s\n", filepath);
76+
return NULL;
77+
}
78+
79+
// Initialize the records and count
80+
record **records = NULL;
81+
char line[1024];
82+
int count = 0;
83+
84+
// Read each line and populate the records array
85+
bool first_line = true;
86+
while (fgets(line, sizeof(line), file)) {
87+
if (first_line) {
88+
first_line = false;
89+
continue; // Skip header
90+
}
91+
record *new_record = getRecordFromLine(line);
92+
records = realloc(records, (count + 1) * sizeof(record *));
93+
if (records == NULL) {
94+
fprintf(stderr, "Memory allocation failed\n");
95+
fclose(file);
96+
return NULL;
97+
}
98+
records[count] = new_record;
99+
count++;
100+
}
101+
102+
// Close the file, set the output count, and return the records array
103+
fclose(file);
104+
if(VERBOSE) {
105+
printf("Loaded %d records from file: %s\n", count, filepath);
106+
}
107+
*num_records = count;
108+
return records; // Return the array of all records
109+
}
110+
111+
/* Helper to parse a CSV field, handling quotes and commas */
112+
char *parseCSVField(char **cursor) {
113+
char *start = *cursor;
114+
if (*start == '\0' || *start == '\n' || *start == '\r') return NULL;
115+
116+
char *field = malloc(1024); // Allocate buffer for field
117+
int i = 0;
118+
bool in_quotes = false;
119+
120+
if (*start == '"') {
121+
in_quotes = true;
122+
start++; // Skip opening quote
123+
}
124+
125+
while (*start != '\0' && *start != '\n' && *start != '\r') {
126+
if (in_quotes) {
127+
if (*start == '"') {
128+
if (*(start + 1) == '"') {
129+
// Escaped quote
130+
field[i++] = '"';
131+
start += 2;
132+
} else {
133+
// End of quoted field
134+
in_quotes = false;
135+
start++;
136+
}
137+
} else {
138+
field[i++] = *start++;
139+
}
140+
} else {
141+
if (*start == ',') {
142+
start++; // Skip comma
143+
break;
144+
}
145+
field[i++] = *start++;
146+
}
147+
}
148+
149+
field[i] = '\0';
150+
*cursor = start;
151+
return field;
152+
}
153+
154+
/* Get a record struct from a line of CSV data
155+
* Parameters:
156+
* line - character array containing a line of CSV data
157+
* Returns:
158+
* record struct populated with data from the line
159+
*/
160+
record *getRecordFromLine(char *line){
161+
record *new_record = (record *)calloc(1, sizeof(record));
162+
if (new_record == NULL) {
163+
fprintf(stderr, "Memory allocation failed\n");
164+
return NULL;
165+
}
166+
167+
char *cursor = line;
168+
char *token;
169+
170+
// command_id
171+
token = parseCSVField(&cursor);
172+
if (token) { new_record->command_id = strtoull(token, NULL, 10); free(token); }
173+
174+
// raw_command
175+
token = parseCSVField(&cursor);
176+
if (token) { strncpy(new_record->raw_command, token, sizeof(new_record->raw_command)); free(token); }
177+
178+
// base_command
179+
token = parseCSVField(&cursor);
180+
if (token) { strncpy(new_record->base_command, token, sizeof(new_record->base_command)); free(token); }
181+
182+
// shell_type
183+
token = parseCSVField(&cursor);
184+
if (token) { strncpy(new_record->shell_type, token, sizeof(new_record->shell_type)); free(token); }
185+
186+
// exit_code
187+
token = parseCSVField(&cursor);
188+
if (token) { new_record->exit_code = atoi(token); free(token); }
189+
190+
// timestamp
191+
token = parseCSVField(&cursor);
192+
if (token) { strncpy(new_record->timestamp, token, sizeof(new_record->timestamp)); free(token); }
193+
194+
// sudo_used
195+
token = parseCSVField(&cursor);
196+
if (token) {
197+
new_record->sudo_used = (strcasecmp(token, "true") == 0 || strcmp(token, "1") == 0);
198+
free(token);
199+
}
200+
201+
// working_directory
202+
token = parseCSVField(&cursor);
203+
if (token) { strncpy(new_record->working_directory, token, sizeof(new_record->working_directory)); free(token); }
204+
205+
// user_id
206+
token = parseCSVField(&cursor);
207+
if (token) { new_record->user_id = atoi(token); free(token); }
208+
209+
// user_name
210+
token = parseCSVField(&cursor);
211+
if (token) { strncpy(new_record->user_name, token, sizeof(new_record->user_name)); free(token); }
212+
213+
// host_name
214+
token = parseCSVField(&cursor);
215+
if (token) { strncpy(new_record->host_name, token, sizeof(new_record->host_name)); free(token); }
216+
217+
// risk_level
218+
token = parseCSVField(&cursor);
219+
if (token) { new_record->risk_level = atoi(token); free(token); }
220+
221+
return new_record; // Return the fully populated record
222+
}
223+
224+
/* Helper to map an int representation (0, 1, 2, 3) to a FieldType object for storing */
225+
FieldType mapAttributeType(int attributeType) {
226+
switch (attributeType) {
227+
case 0:
228+
return FIELD_UINT64;
229+
case 1:
230+
return FIELD_INT;
231+
case 2:
232+
return FIELD_STRING;
233+
case 3:
234+
return FIELD_BOOL;
235+
default:
236+
return -1; // Invalid type
237+
}
238+
}

engine/mpi/executeEngine-mpi.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
#include <strings.h> // For strcasecmp
55
#include <time.h> // For clock_t, clock(), CLOCKS_PER_SEC
66
#include <limits.h> // For INT_MAX, INT_MIN, UINT64_MAX
7-
#include "../../include/buildEngine-serial.h"
8-
#include "../../include/executeEngine-serial.h"
7+
#include "../../include/buildEngine-mpi.h"
8+
#include "../../include/executeEngine-mpi.h"
99
#include <mpi.h>
1010
#define VERBOSE 0
1111

0 commit comments

Comments
 (0)