Skip to content

Commit 11a5c28

Browse files
committed
Fixing up some delete logic, adding CLI tool
CLI tool has not yet been fully tested (will do after class), I plan to do this soon with the ability to select from different datasets as well (or generate a new one). This will help with the demo.
1 parent 2c8e38f commit 11a5c28

File tree

2 files changed

+153
-52
lines changed

2 files changed

+153
-52
lines changed

benchmark.py

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
import curses
2+
import subprocess
3+
import sys
4+
import os
5+
import time
6+
7+
def run_command(command, silent=False):
8+
"""Runs a shell command."""
9+
try:
10+
if silent:
11+
subprocess.run(command, shell=True, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
12+
else:
13+
subprocess.run(command, shell=True, check=True)
14+
except subprocess.CalledProcessError as e:
15+
if not silent:
16+
print(f"Error running command: {command}")
17+
print(f"Exit code: {e.returncode}")
18+
# We don't raise here to allow the script to continue to cleanup
19+
20+
def draw_menu(stdscr, selected_row_idx, options):
21+
stdscr.clear()
22+
h, w = stdscr.getmaxyx()
23+
24+
title = "Select Benchmark to Run (Use Arrow Keys + Enter)"
25+
stdscr.addstr(h//2 - len(options)//2 - 2, w//2 - len(title)//2, title)
26+
27+
for idx, row in enumerate(options):
28+
x = w//2 - len(row)//2
29+
y = h//2 - len(options)//2 + idx
30+
if idx == selected_row_idx:
31+
stdscr.attron(curses.color_pair(1))
32+
stdscr.addstr(y, x, row)
33+
stdscr.attroff(curses.color_pair(1))
34+
else:
35+
stdscr.addstr(y, x, row)
36+
37+
stdscr.refresh()
38+
39+
def main(stdscr):
40+
# Setup colors
41+
curses.start_color()
42+
curses.init_pair(1, curses.COLOR_BLACK, curses.COLOR_WHITE)
43+
curses.curs_set(0)
44+
45+
options = ["Serial", "OMP", "MPI", "ALL", "Exit"]
46+
selected_row_idx = 0
47+
48+
while True:
49+
draw_menu(stdscr, selected_row_idx, options)
50+
key = stdscr.getch()
51+
52+
if key == curses.KEY_UP and selected_row_idx > 0:
53+
selected_row_idx -= 1
54+
elif key == curses.KEY_DOWN and selected_row_idx < len(options) - 1:
55+
selected_row_idx += 1
56+
elif key == curses.KEY_ENTER or key in [10, 13]:
57+
return options[selected_row_idx]
58+
59+
def run_benchmark():
60+
# Run make silently
61+
print("Building project... (this may take a moment)")
62+
run_command("make", silent=True)
63+
64+
# Show Menu
65+
try:
66+
# curses.wrapper handles initialization and cleanup of curses
67+
selection = curses.wrapper(main)
68+
except Exception as e:
69+
print(f"Error in UI: {e}")
70+
# Ensure cleanup happens even if UI fails
71+
run_command("make clean", silent=True)
72+
return
73+
74+
if selection == "Exit":
75+
print("Exiting...")
76+
run_command("make clean", silent=True)
77+
return
78+
79+
print(f"\nRunning {selection} Benchmark...\n" + "="*40 + "\n")
80+
81+
# Run Selected
82+
start_time = time.time()
83+
84+
if selection == "Serial":
85+
run_command("make run")
86+
elif selection == "OMP":
87+
run_command("make run-omp")
88+
elif selection == "MPI":
89+
run_command("make run-mpi")
90+
elif selection == "ALL":
91+
print("--- Running Serial ---")
92+
run_command("make run")
93+
print("\n--- Running OMP ---")
94+
run_command("make run-omp")
95+
print("\n--- Running MPI ---")
96+
run_command("make run-mpi")
97+
98+
end_time = time.time()
99+
print("\n" + "="*40)
100+
print(f"Total Benchmark Time: {end_time - start_time:.4f} seconds")
101+
102+
# Run make clean silently
103+
print("Cleaning up build artifacts...")
104+
run_command("make clean", silent=True)
105+
print("Done.")
106+
107+
if __name__ == "__main__":
108+
run_benchmark()

engine/mpi/executeEngine-mpi.c

Lines changed: 45 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -742,71 +742,67 @@ struct resultSetS *executeQueryDeleteMPI(
742742

743743
// Total deleted count across all ranks
744744
int globalDeleted = 0;
745-
MPI_Reduce(&localDeleted, &globalDeleted, 1, MPI_INT, MPI_SUM, 0, comm);
745+
MPI_Allreduce(&localDeleted, &globalDeleted, 1, MPI_INT, MPI_SUM, comm);
746746

747-
// Gather all flags on rank 0
748-
int *recvCounts = NULL;
749-
int *displs = NULL;
750-
int *globalFlags = NULL;
747+
// Gather all flags on ALL ranks
748+
int *recvCounts = (int *)malloc(size * sizeof(int));
749+
int *displs = (int *)malloc(size * sizeof(int));
750+
int *globalFlags = (int *)calloc(num_records, sizeof(int));
751751

752-
if (rank == 0) {
753-
recvCounts = (int *)malloc(size * sizeof(int));
754-
displs = (int *)malloc(size * sizeof(int));
755-
}
756-
757-
// Each rank sends its local_n
758-
MPI_Gather(&local_n, 1, MPI_INT,
752+
// Share counts with everyone
753+
MPI_Allgather(&local_n, 1, MPI_INT,
759754
recvCounts, 1, MPI_INT,
760-
0, comm);
761-
762-
if (rank == 0) {
763-
int offset = 0;
764-
for (int r = 0; r < size; r++) {
765-
displs[r] = offset;
766-
offset += recvCounts[r];
767-
}
755+
comm);
768756

769-
globalFlags = (int *)calloc(num_records, sizeof(int));
757+
// Calculate displacements
758+
int offset = 0;
759+
for (int r = 0; r < size; r++) {
760+
displs[r] = offset;
761+
offset += recvCounts[r];
770762
}
771763

772-
MPI_Gatherv(localFlags, local_n, MPI_INT,
764+
// Share flags with everyone
765+
MPI_Allgatherv(localFlags, local_n, MPI_INT,
773766
globalFlags, recvCounts, displs, MPI_INT,
774-
0, comm);
767+
comm);
775768

776769
if (localFlags) {
777770
free(localFlags);
778771
}
779772

780-
// Rank 0 applies deletions, updates indexes, compacts array, writes CSV
781-
if (rank == 0) {
782-
int writeIndex = 0;
773+
// ALL ranks apply deletions to keep memory and indexes in sync
774+
int writeIndex = 0;
783775

784-
for (int i = 0; i < num_records; i++) {
785-
record *currentRecord = engine->all_records[i];
776+
for (int i = 0; i < num_records; i++) {
777+
record *currentRecord = engine->all_records[i];
786778

787-
if (globalFlags[i]) {
788-
// Remove from B+ Tree Indexes
789-
for (int j = 0; j < engine->num_indexes; j++) {
779+
if (globalFlags[i]) {
780+
// Remove from B+ Tree Indexes
781+
// CRITICAL: Only update indexes owned by this rank
782+
for (int j = 0; j < engine->num_indexes; j++) {
783+
if (j % size == rank) {
790784
const char *indexed_attr = engine->indexed_attributes[j];
791785
KEY_T key = extract_key_from_record(currentRecord, indexed_attr);
792786
engine->bplus_tree_roots[j] =
793787
delete(engine->bplus_tree_roots[j], key, (ROW_PTR)currentRecord);
794788
}
789+
}
795790

796-
// Free record memory
797-
free(currentRecord);
798-
} else {
799-
// Keep record and compact
800-
if (writeIndex != i) {
801-
engine->all_records[writeIndex] = currentRecord;
802-
}
803-
writeIndex++;
791+
// Free record memory (All ranks must free their local copy)
792+
free(currentRecord);
793+
} else {
794+
// Keep record and compact
795+
if (writeIndex != i) {
796+
engine->all_records[writeIndex] = currentRecord;
804797
}
798+
writeIndex++;
805799
}
800+
}
806801

807-
engine->num_records = writeIndex;
802+
engine->num_records = writeIndex;
808803

809-
// Rewrite CSV with remaining records
804+
// Only Rank 0 rewrites the file
805+
if (rank == 0) {
810806
FILE *file = fopen(engine->datafile, "w");
811807
if (file != NULL) {
812808
for (int i = 0; i < engine->num_records; i++) {
@@ -833,20 +829,17 @@ struct resultSetS *executeQueryDeleteMPI(
833829
engine->datafile);
834830
}
835831
}
832+
}
836833

837-
double time_taken = MPI_Wtime() - start;
834+
double time_taken = MPI_Wtime() - start;
838835

839-
result->numRecords = globalDeleted;
840-
result->queryTime = time_taken;
841-
result->success = true;
836+
result->numRecords = globalDeleted;
837+
result->queryTime = time_taken;
838+
result->success = true;
842839

843-
if (globalFlags) free(globalFlags);
844-
if (recvCounts) free(recvCounts);
845-
if (displs) free(displs);
846-
} else {
847-
// Non-root ranks: result is not meaningful; optionally set success=false explicitly
848-
result->success = false;
849-
}
840+
if (globalFlags) free(globalFlags);
841+
if (recvCounts) free(recvCounts);
842+
if (displs) free(displs);
850843

851844
return result;
852845
}

0 commit comments

Comments
 (0)