From 3507ea44e17b79e0a50356abfa83dc88f906a919 Mon Sep 17 00:00:00 2001 From: Lucas Van Mol <16979353+lucasvanmol@users.noreply.github.com> Date: Mon, 5 May 2025 17:49:26 +0200 Subject: [PATCH 1/3] Add navigation service prefetching experiment --- deathstar_movie_review/demo.py | 10 +- deathstar_movie_review/entities/entities.py | 24 +++-- .../start_prefetch_experiment.py | 32 ++++-- .../test_movie_review_demo.py | 21 +++- experiments/dynamic_prefetching/entities.py | 68 ++++++++++++ .../dynamic_prefetching/run_experiments.py | 14 +-- .../dynamic_prefetching/run_prefetcher.py | 64 ++++++++--- experiments/dynamic_prefetching/submit_job.py | 17 +-- .../test_prefetch_experiment.py | 102 ++++++++++++++++++ run_prefetch_exp.py | 8 +- src/cascade/core.py | 2 +- src/cascade/dataflow/dataflow.py | 2 +- .../dataflow/optimization/parallelization.py | 13 ++- src/cascade/frontend/cfg/cfg_builder.py | 21 ++-- .../frontend/generator/dataflow_builder.py | 23 ++-- src/cascade/frontend/generator/unparser.py | 19 ++++ src/cascade/runtime/flink_runtime.py | 14 +-- .../dataflow_analysis/test_branches.py | 58 ++++++++-- .../test_dataflow_graph_builder.py | 8 +- .../dataflow_analysis/test_entities.py | 65 ++++++++--- .../dataflow_analysis/test_split_functions.py | 34 +++--- .../flink/test_collect_operator.py | 2 +- 22 files changed, 486 insertions(+), 135 deletions(-) create mode 100644 experiments/dynamic_prefetching/test_prefetch_experiment.py diff --git a/deathstar_movie_review/demo.py b/deathstar_movie_review/demo.py index 6cadb8e..d70ec43 100644 --- a/deathstar_movie_review/demo.py +++ b/deathstar_movie_review/demo.py @@ -2,7 +2,7 @@ import cascade from cascade.dataflow.dataflow import DataflowRef from cascade.dataflow.optimization.dead_node_elim import dead_node_elimination -from cascade.dataflow.optimization.parallelization import parallelize_until_if +from cascade.dataflow.optimization.parallelization import parallelize, parallelize_until_if from cascade.runtime.flink_runtime import FlinkRuntime from tests.integration.flink.utils import create_topics, init_flink_runtime @@ -39,6 +39,14 @@ def main(): cascade.core.dataflows[DataflowRef("MovieId", "upload_movie_prefetch_parallel")] = df_parallel runtime.add_dataflow(df_parallel) + # for prefetch experiment + df_baseline_no = cascade.core.dataflows[DataflowRef("MovieId", "upload_movie_no_prefetch")] + df_parallel_no = parallelize(df_baseline_no) + df_parallel_no.name = "upload_movie_no_prefetch_parallel" + cascade.core.dataflows[DataflowRef("MovieId", "upload_movie_no_prefetch_parallel")] = df_parallel_no + runtime.add_dataflow(df_parallel_no) + + print(cascade.core.dataflows.keys()) runtime.run() diff --git a/deathstar_movie_review/entities/entities.py b/deathstar_movie_review/entities/entities.py index 6d05439..5235466 100644 --- a/deathstar_movie_review/entities/entities.py +++ b/deathstar_movie_review/entities/entities.py @@ -46,15 +46,13 @@ def __init__(self, title: str, movie_id: str): def upload_movie(self, review: ComposeReview, rating: int): cond = rating is not None + movie_id = self.movie_id + review.upload_movie_id(movie_id) if cond: review.upload_rating(rating) - movie_id = self.movie_id - review.upload_movie_id(movie_id) - return True + return cond else: - movie_id = self.movie_id - review.upload_movie_id(movie_id) - return False + return cond # if without else isn't invented yet, otherwise this would be # cond = rating is not None @@ -63,12 +61,24 @@ def upload_movie(self, review: ComposeReview, rating: int): # movie_id = self.movie_id # review.upload_movie_id(movie_id) + def upload_movie_no_prefetch(self, review: ComposeReview, rating: int): + cond = rating is not None + movie_id = self.movie_id + review.upload_movie_id(movie_id) + review.upload_rating(rating) + if cond: + review.upload_rating(rating) + return cond + else: + return cond + def upload_movie_prefetch(self, review: ComposeReview, rating: int): cond = rating is not None movie_id = self.movie_id - review.upload_rating(rating) review.upload_movie_id(movie_id) + review.upload_rating(rating) + review.upload_rating(rating) return cond diff --git a/deathstar_movie_review/start_prefetch_experiment.py b/deathstar_movie_review/start_prefetch_experiment.py index 96d2b6f..fc8d79b 100644 --- a/deathstar_movie_review/start_prefetch_experiment.py +++ b/deathstar_movie_review/start_prefetch_experiment.py @@ -20,7 +20,7 @@ from tests.integration.flink.utils import init_cascade_from_module, init_flink_runtime import cascade -from cascade.dataflow.optimization.parallelization import parallelize_until_if +from cascade.dataflow.optimization.parallelization import parallelize from cascade.dataflow.dataflow import DataflowRef,EventResult from cascade.runtime.flink_runtime import FlinkClientSync @@ -74,7 +74,7 @@ def upload_movie(rating_chance: float, prefetch=False): if prefetch: movie_id = cascade.core.dataflows[DataflowRef("MovieId", "upload_movie_prefetch_parallel")] else: - movie_id = cascade.core.dataflows[DataflowRef("MovieId", "upload_movie")] + movie_id = cascade.core.dataflows[DataflowRef("MovieId", "upload_movie_no_prefetch_parallel")] return movie_id.generate_event({ "review_0": str(req_id), @@ -160,13 +160,13 @@ def write_dict_to_pkl(futures_dict, filename): ret: EventResult = event_data.get("ret") row = { "event_id": event_id, - "sent": str(event_data.get("sent")), - "sent_t": event_data.get("sent_t"), - "ret": str(event_data.get("ret")), - "ret_t": event_data.get("ret_t"), - "roundtrip": ret.metadata["roundtrip"] if ret else None, + # "sent": str(event_data.get("sent")), + # "sent_t": event_data.get("sent_t"), + # "ret": str(event_data.get("ret")), + # "ret_t": event_data.get("ret_t"), + # "roundtrip": ret.metadata["roundtrip"] if ret else None, "flink_time": ret.metadata["flink_time"] if ret else None, - "deser_times": ret.metadata["deser_times"] if ret else None, + # "deser_times": ret.metadata["deser_times"] if ret else None, "loops": ret.metadata["loops"] if ret else None, "latency": event_data["ret_t"][1] - event_data["sent_t"][1] if ret else None } @@ -207,9 +207,15 @@ def main(): init_client = FlinkClientSync(IN_TOPIC, OUT_TOPIC) + # for prefetch experiment + df_baseline_no = cascade.core.dataflows[DataflowRef("MovieId", "upload_movie_no_prefetch")] + df_parallel_no = parallelize(df_baseline_no) + df_parallel_no.name = "upload_movie_no_prefetch_parallel" + cascade.core.dataflows[DataflowRef("MovieId", "upload_movie_no_prefetch_parallel")] = df_parallel_no + # for prefetch experiment df_baseline = cascade.core.dataflows[DataflowRef("MovieId", "upload_movie_prefetch")] - df_parallel, _ = parallelize_until_if(df_baseline) + df_parallel = parallelize(df_baseline) df_parallel.name = "upload_movie_prefetch_parallel" cascade.core.dataflows[DataflowRef("MovieId", "upload_movie_prefetch_parallel")] = df_parallel @@ -259,8 +265,12 @@ def main(): print(f"Median Flink time : {flink_time:.2f} ms ({flink_prct:.2f}%)") init_client.close() - df = preprocess(args.output, df) - df.to_pickle(args.output) + # Ignore the first warmup_time seconds of events + warmup_time_s = 3 + warmup_events = int(warmup_time_s * args.requests_per_second) + df = df.iloc[warmup_events:] + # df = preprocess(args.output, df) + df.to_csv(args.output) import re diff --git a/deathstar_movie_review/test_movie_review_demo.py b/deathstar_movie_review/test_movie_review_demo.py index f11b59b..2ea9ba0 100644 --- a/deathstar_movie_review/test_movie_review_demo.py +++ b/deathstar_movie_review/test_movie_review_demo.py @@ -8,7 +8,7 @@ from cascade.runtime.flink_runtime import FlinkClientSync from cascade.dataflow.dataflow import DataflowRef -from cascade.dataflow.optimization.parallelization import parallelize_until_if +from cascade.dataflow.optimization.parallelization import parallelize, parallelize_until_if from cascade.dataflow.operator import StatefulOperator, StatelessOperator from cascade.runtime.python_runtime import PythonClientSync, PythonRuntime @@ -194,15 +194,22 @@ def test_deathstar_movie_demo_prefetch_flink(): runtime = utils.init_flink_runtime("deathstar_movie_review.entities.entities") + # for prefetch experiment + df_baseline_no = cascade.core.dataflows[DataflowRef("MovieId", "upload_movie_no_prefetch")] + df_parallel_no = parallelize(df_baseline_no) + df_parallel_no.name = "upload_movie_no_prefetch_parallel" + cascade.core.dataflows[DataflowRef("MovieId", "upload_movie_no_prefetch_parallel")] = df_parallel_no + runtime.add_dataflow(df_parallel_no) + + # for prefetch experiment df_baseline = cascade.core.dataflows[DataflowRef("MovieId", "upload_movie_prefetch")] df_parallel, _ = parallelize_until_if(df_baseline) df_parallel.name = "upload_movie_prefetch_parallel" cascade.core.dataflows[DataflowRef("MovieId", "upload_movie_prefetch_parallel")] = df_parallel - runtime.add_dataflow(df_parallel) print(df_parallel.to_dot()) - assert len(df_parallel.entry) == 2 + assert len(df_parallel.entry) == 3 client = FlinkClientSync() @@ -225,11 +232,15 @@ def deathstar_prefetch(client): print("review made") - event = cascade.core.dataflows[DataflowRef("MovieId", "upload_movie")].generate_event({"review_0": "100", "rating_0": 3}, "cars") + event = cascade.core.dataflows[DataflowRef("MovieId", "upload_movie_no_prefetch")].generate_event({"review_0": "100", "rating_0": 3}, "cars") result = client.send(event, block=True) print("movie uploaded") event = cascade.core.dataflows[DataflowRef("MovieId", "upload_movie_prefetch_parallel")].generate_event({"review_0": "100", "rating_0": 3}, "cars") result = client.send(event, block=True) print("movie uploaded w/ prefetch") - print(result) \ No newline at end of file + print(result) + + event = cascade.core.dataflows[DataflowRef("MovieId", "upload_movie_no_prefetch_parallel")].generate_event({"review_0": "100", "rating_0": 3}, "cars") + result = client.send(event, block=True) + print("movie uploaded") \ No newline at end of file diff --git a/experiments/dynamic_prefetching/entities.py b/experiments/dynamic_prefetching/entities.py index 2bd4e73..c38b467 100644 --- a/experiments/dynamic_prefetching/entities.py +++ b/experiments/dynamic_prefetching/entities.py @@ -32,3 +32,71 @@ def baseline(branch_chance: float): else: return -42 +@cascade +class User(): + def __init__(self, key: str, is_premium: bool, preferences: list[str]): + self.key = key + self.is_premium = is_premium + self.preferences = preferences + + def is_ad_free(self) -> bool: + return self.is_premium + + def get_preferences(self) -> list[str]: + return self.preferences + +@cascade +class NavigationService: + @staticmethod + def get_directions_baseline(origin: int, dest: int, user: User): + directions = MapService.get_route(origin, dest) + cond = not user.is_ad_free() + if cond: + recc = Recommender.get_recommendations(dest, user) + return (directions, recc) + else: + return (directions, None) + + @staticmethod + def get_directions_prefetch(origin: int, dest: int, user: User): + directions = MapService.get_route(origin, dest) + recc = Recommender.get_recommendations(dest, user) + cond = not user.is_ad_free() + if cond: + return (directions, recc) + else: + return (directions, None) + +@cascade(globals={'time': time}) +class MapService: + @staticmethod + def get_route(origin: int, dest: int): + time.sleep(0.01) + return "left right left" + +@cascade +class Recommender: + @staticmethod + def get_recommendations(dest: int, user: User) -> str: + user_preferences = user.get_preferences() + recs = [] + + cond = "restaurants" in user_preferences + if cond: + recs_0.extend(["McDonalds", "Starbucks"]) + else: + pass + + # cond = "museums" in user_preferences + # if cond: + # recs_0.extend(["Louvre"]) + # else: + # pass + + cond = "attractions" in user_preferences + if cond: + recs_0.extend(["Eiffel Tower", "Arc de Triomphe"]) + else: + pass + + return ", ".join(recs) \ No newline at end of file diff --git a/experiments/dynamic_prefetching/run_experiments.py b/experiments/dynamic_prefetching/run_experiments.py index 3e64229..6f73e4c 100644 --- a/experiments/dynamic_prefetching/run_experiments.py +++ b/experiments/dynamic_prefetching/run_experiments.py @@ -1,15 +1,17 @@ import subprocess import time +rps = 500 +sec = 30 # Define experiment parameters as a list of dictionaries experiments = [ - {"parallelism": 4, "benchmark_args": {"requests_per_second": 1000, "seconds": 30, "threads": 20, "experiment": "baseline", "chance": 0.9}}, - {"parallelism": 4, "benchmark_args": {"requests_per_second": 1000, "seconds": 30, "threads": 20, "experiment": "prefetch", "chance": 0.9}}, - {"parallelism": 4, "benchmark_args": {"requests_per_second": 1000, "seconds": 30, "threads": 20, "experiment": "baseline", "chance": 0.5}}, - {"parallelism": 4, "benchmark_args": {"requests_per_second": 1000, "seconds": 30, "threads": 20, "experiment": "prefetch", "chance": 0.5}}, - {"parallelism": 4, "benchmark_args": {"requests_per_second": 1000, "seconds": 30, "threads": 20, "experiment": "baseline", "chance": 0.1}}, - {"parallelism": 4, "benchmark_args": {"requests_per_second": 1000, "seconds": 30, "threads": 20, "experiment": "prefetch", "chance": 0.1}}, + {"parallelism": 8, "benchmark_args": {"requests_per_second": rps, "seconds": sec, "threads": 10, "experiment": "baseline", "chance": 0.9, "num_users": 100}}, + {"parallelism": 8, "benchmark_args": {"requests_per_second": rps, "seconds": sec, "threads": 10, "experiment": "prefetch", "chance": 0.9, "num_users": 100}}, + {"parallelism": 8, "benchmark_args": {"requests_per_second": rps, "seconds": sec, "threads": 10, "experiment": "baseline", "chance": 0.5, "num_users": 100}}, + {"parallelism": 8, "benchmark_args": {"requests_per_second": rps, "seconds": sec, "threads": 10, "experiment": "prefetch", "chance": 0.5, "num_users": 100}}, + {"parallelism": 8, "benchmark_args": {"requests_per_second": rps, "seconds": sec, "threads": 10, "experiment": "baseline", "chance": 0.1, "num_users": 100}}, + {"parallelism": 8, "benchmark_args": {"requests_per_second": rps, "seconds": sec, "threads": 10, "experiment": "prefetch", "chance": 0.1, "num_users": 100}}, ] diff --git a/experiments/dynamic_prefetching/run_prefetcher.py b/experiments/dynamic_prefetching/run_prefetcher.py index 91adad0..2ae3bac 100644 --- a/experiments/dynamic_prefetching/run_prefetcher.py +++ b/experiments/dynamic_prefetching/run_prefetcher.py @@ -35,18 +35,20 @@ def main(): print(cascade.core.dataflows.keys()) - baseline = cascade.core.dataflows[DataflowRef("Prefetcher", "baseline")] - prefetch = cascade.core.dataflows[DataflowRef("Prefetcher", "prefetch")] + baseline = cascade.core.dataflows[DataflowRef("NavigationService", "get_directions_baseline")] + prefetch = cascade.core.dataflows[DataflowRef("NavigationService", "get_directions_prefetch")] pre_par = parallelize(prefetch) - cascade.core.dataflows[DataflowRef("Prefetcher", "prefetch_parallel")] = pre_par + cascade.core.dataflows[DataflowRef("NavigationService", "get_directions_prefetch_parallel")] = pre_par base_par = parallelize(baseline) - cascade.core.dataflows[DataflowRef("Prefetcher", "baseline_parallel")] = base_par + cascade.core.dataflows[DataflowRef("NavigationService", "get_directions_baseline_parallel")] = base_par - print(base_par.to_dot()) - print(pre_par.to_dot()) + for df in cascade.core.dataflows.values(): + print(df.to_dot()) + for block in df.blocks.values(): + print(block.function_string) run_test() @@ -66,15 +68,43 @@ def wait_for_futures(client: FlinkClientSync): futures = client._futures return futures -def generate_event(exp: Literal["baseline", "prefetch"], chance: float): - baseline = cascade.core.dataflows[DataflowRef("Prefetcher", "baseline_parallel")] - prefetch = cascade.core.dataflows[DataflowRef("Prefetcher", "prefetch_parallel")] +import random +def seed_users(n: int, percent_premium: float): + assert 0 <= percent_premium <= 1 + client = FlinkClientSync(IN_TOPIC, OUT_TOPIC) + + num_premium = int(n * percent_premium) + df = cascade.core.dataflows[DataflowRef("User", "__init__")] + for i in range(n): + + user_preferences = [] + if random.random() < 0.5: + user_preferences.append("museums") + if random.random() < 0.5: + user_preferences.append("restaurants") + if random.random() < 0.5: + user_preferences.append("attractions") + + is_premium = i < num_premium + event = df.generate_event({"key": str(i), "is_premium": is_premium, "preferences": user_preferences}, key=str(i)) + client.send(event) + wait_for_futures(client) + client.close() + + + + +def generate_event(exp: Literal["baseline", "prefetch"], n: int): + baseline = cascade.core.dataflows[DataflowRef("NavigationService", "get_directions_baseline_parallel")] + prefetch = cascade.core.dataflows[DataflowRef("NavigationService", "get_directions_prefetch_parallel")] df = prefetch if exp == "prefetch" else baseline - return df.generate_event({"branch_chance_0": chance}) + key = str(random.randint(0, n-1)) + + return df.generate_event({"origin_0": 0, "dest_0": 0, "user_0": key}) def runner(args): - chance, bursts, requests_per_second, exp = args + bursts, requests_per_second, exp, num_users = args client = FlinkClientSync(IN_TOPIC, OUT_TOPIC) sleep_time = 0.95 / requests_per_second @@ -88,7 +118,7 @@ def runner(args): # sleep sometimes between messages # if i % (messages_per_burst // sleeps_per_burst) == 0: time.sleep(sleep_time) - event = generate_event(exp, chance) + event = generate_event(exp, num_users) client.send(event) client.flush() @@ -110,7 +140,7 @@ def runner(args): def run_test(): logger = logging.getLogger("cascade") - logger.setLevel("INFO") + logger.setLevel("DEBUG") @@ -120,6 +150,7 @@ def run_test(): parser.add_argument("--threads", type=int, default=1, help="Number of concurrent threads") parser.add_argument("--chance", type=float, default=0.5, help="Chance") parser.add_argument("--experiment", type=str, default="baseline", help="Experiment type") + parser.add_argument("--num_users", type=int, default=1000, help="Number of users") args = parser.parse_args() assert args.experiment in ["baseline", "prefetch"] @@ -128,7 +159,11 @@ def run_test(): print(f"Actual requests per second is {int(rps_per_thread * args.threads)} (due to rounding)") - func_args = [(args.chance, args.seconds,rps_per_thread,args.experiment)] + print("seeding..") + seed_users(args.num_users, args.chance) + print("done.") + + func_args = [(args.seconds,rps_per_thread,args.experiment, args.num_users)] with Pool(args.threads) as p: results = p.map(runner, func_args) @@ -138,7 +173,6 @@ def run_test(): print(count) df = to_pandas(results) df.to_csv(f"{args.experiment}_{args.chance}_{args.requests_per_second}.csv") - def to_pandas(futures_dict): diff --git a/experiments/dynamic_prefetching/submit_job.py b/experiments/dynamic_prefetching/submit_job.py index 44a5982..a48980b 100644 --- a/experiments/dynamic_prefetching/submit_job.py +++ b/experiments/dynamic_prefetching/submit_job.py @@ -1,7 +1,6 @@ import cascade from cascade.dataflow.dataflow import DataflowRef -from cascade.dataflow.optimization.parallelization import parallelize_until_if -from experiments.dynamic_prefetching.run_prefetcher import gen_parallel +from cascade.dataflow.optimization.parallelization import parallelize, parallelize_until_if from tests.integration.flink.utils import create_topics, init_flink_runtime @@ -22,15 +21,17 @@ def main(): print(cascade.core.dataflows.keys()) - baseline = cascade.core.dataflows[DataflowRef("Prefetcher", "baseline")] - prefetch = cascade.core.dataflows[DataflowRef("Prefetcher", "prefetch")] - pre_par = gen_parallel(prefetch) - cascade.core.dataflows[DataflowRef("Prefetcher", "prefetch_parallel")] = pre_par + baseline = cascade.core.dataflows[DataflowRef("NavigationService", "get_directions_baseline")] + prefetch = cascade.core.dataflows[DataflowRef("NavigationService", "get_directions_prefetch")] + + + pre_par = parallelize(prefetch) + cascade.core.dataflows[DataflowRef("NavigationService", "get_directions_prefetch_parallel")] = pre_par runtime.add_dataflow(pre_par) - base_par = gen_parallel(baseline) - cascade.core.dataflows[DataflowRef("Prefetcher", "baseline_parallel")] = base_par + base_par = parallelize(baseline) + cascade.core.dataflows[DataflowRef("NavigationService", "get_directions_baseline_parallel")] = base_par runtime.add_dataflow(base_par) runtime.run() diff --git a/experiments/dynamic_prefetching/test_prefetch_experiment.py b/experiments/dynamic_prefetching/test_prefetch_experiment.py new file mode 100644 index 0000000..84258bf --- /dev/null +++ b/experiments/dynamic_prefetching/test_prefetch_experiment.py @@ -0,0 +1,102 @@ +import logging +import sys +import os + + + +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../src"))) + +from cascade.runtime.flink_runtime import FlinkClientSync +from cascade.dataflow.dataflow import DataflowRef +from cascade.dataflow.optimization.parallelization import parallelize, parallelize_until_if +from cascade.dataflow.operator import StatefulOperator, StatelessOperator +from cascade.runtime.python_runtime import PythonClientSync, PythonRuntime + +import cascade +import pytest +import tests.integration.flink.utils as utils + +def init_python_runtime() -> tuple[PythonRuntime, PythonClientSync]: + runtime = PythonRuntime() + for op in cascade.core.operators.values(): + if isinstance(op, StatefulOperator): + runtime.add_operator(op) + elif isinstance(op, StatelessOperator): + runtime.add_stateless_operator(op) + + runtime.run() + return runtime, PythonClientSync(runtime) + + +def test_prefetching_exp_test_python(): + print("starting") + cascade.core.clear() + exec(f'import experiments.dynamic_prefetching.entities') + cascade.core.init() + + + baseline = cascade.core.dataflows[DataflowRef("NavigationService", "get_directions_baseline")] + prefetch = cascade.core.dataflows[DataflowRef("NavigationService", "get_directions_prefetch")] + + + pre_par = parallelize(prefetch) + cascade.core.dataflows[DataflowRef("NavigationService", "get_directions_prefetch_parallel")] = pre_par + + base_par = parallelize(baseline) + cascade.core.dataflows[DataflowRef("NavigationService", "get_directions_baseline_parallel")] = base_par + + + runtime, client = init_python_runtime() + prefetching_exp_test(client) + +@pytest.mark.integration +def test_prefetching_exp_flink(): + print("starting") + logger = logging.getLogger("cascade") + logger.setLevel("DEBUG") + + utils.create_topics() + + runtime = utils.init_flink_runtime("experiments.dynamic_prefetching.entities") + + baseline = cascade.core.dataflows[DataflowRef("NavigationService", "get_directions_baseline")] + prefetch = cascade.core.dataflows[DataflowRef("NavigationService", "get_directions_prefetch")] + + + pre_par = parallelize(prefetch) + cascade.core.dataflows[DataflowRef("NavigationService", "get_directions_prefetch_parallel")] = pre_par + runtime.add_dataflow(pre_par) + + base_par = parallelize(baseline) + cascade.core.dataflows[DataflowRef("NavigationService", "get_directions_baseline_parallel")] = base_par + runtime.add_dataflow(base_par) + + + client = FlinkClientSync() + runtime.run(run_async=True) + + try: + prefetching_exp_test(client) + finally: + client.close() + +def prefetching_exp_test(client): + for df in cascade.core.dataflows.values(): + print(df.to_dot()) + + username = "premium" + + print("testing user create") + + event = cascade.core.dataflows[DataflowRef("User", "__init__")].generate_event({"key": username, "is_premium": False, "preferences": ["museums"]}, username) + result = client.send(event, block=True) + print(result) + assert result['key'] == username + + event = cascade.core.dataflows[DataflowRef("NavigationService", "get_directions_baseline_parallel")].generate_event({"origin_0": 0, "dest_0": 0, "user_0": username}) + result = client.send(event, block=True) + print(result) + + event = cascade.core.dataflows[DataflowRef("NavigationService", "get_directions_prefetch_parallel")].generate_event({"origin_0": 0, "dest_0": 0, "user_0": username}) + result = client.send(event, block=True) + print(result) diff --git a/run_prefetch_exp.py b/run_prefetch_exp.py index b2879e8..5e06e1b 100755 --- a/run_prefetch_exp.py +++ b/run_prefetch_exp.py @@ -13,9 +13,9 @@ def rps(num, branch_chance, producer_threads=1): # Define experiment parameters as a list of dictionaries experiments = [ - {"parallelism": 4, "benchmark_args": {**rps(500, 0.1, producer_threads=10)}}, - {"parallelism": 4, "benchmark_args": {**rps(500, 0.5, producer_threads=10)}}, - {"parallelism": 4, "benchmark_args": {**rps(500, 0.9, producer_threads=10)}}, + {"parallelism": 8, "benchmark_args": {**rps(500, 0.1, producer_threads=10)}}, + {"parallelism": 8, "benchmark_args": {**rps(500, 0.5, producer_threads=10)}}, + {"parallelism": 8, "benchmark_args": {**rps(500, 0.9, producer_threads=10)}}, ] @@ -46,7 +46,7 @@ def rps(num, branch_chance, producer_threads=1): subprocess.run(flink_cmd, check=True, env=env) # Start benchmark - filename = f"{e}_p-{exp['parallelism']}_rps-{exp['benchmark_args']['requests_per_second']}_chance-{exp['benchmark_args']['branch_chance']}.pkl" + filename = f"{e}_{exp['benchmark_args']['branch_chance']}_{exp['benchmark_args']['requests_per_second']}_{exp['parallelism']}.csv" benchmark_cmd = [ "python", "-u", "-m", "deathstar_movie_review.start_prefetch_experiment", "--output", filename, "--experiment", e ] diff --git a/src/cascade/core.py b/src/cascade/core.py index 9f20e36..40774cc 100644 --- a/src/cascade/core.py +++ b/src/cascade/core.py @@ -88,7 +88,7 @@ def init(): df.entry = [n0] blocks = [] else: - df = DataflowBuilder(method.method_node, cls.class_desc.globals).build(dataflows, op_name) + df = DataflowBuilder(method.method_node, dataflows, cls.class_desc.globals).build(op_name) dataflows[df.ref()] = df op.dataflows[df.ref()] = df diff --git a/src/cascade/dataflow/dataflow.py b/src/cascade/dataflow/dataflow.py index 1d49674..a22bd5b 100644 --- a/src/cascade/dataflow/dataflow.py +++ b/src/cascade/dataflow/dataflow.py @@ -74,7 +74,7 @@ def propogate(self, event: 'Event', targets: List[Node], df_map: dict['DataflowR if_cond = event.variable_map[self.predicate_var] targets = [] for edge in event.target.outgoing_edges: - assert edge.if_conditional is not None + assert edge.if_conditional is not None, f"The edge did not have an if_conditional set: {edge}" if edge.if_conditional == if_cond: targets.append(edge.to_node) diff --git a/src/cascade/dataflow/optimization/parallelization.py b/src/cascade/dataflow/optimization/parallelization.py index 95a7bf5..c968a48 100644 --- a/src/cascade/dataflow/optimization/parallelization.py +++ b/src/cascade/dataflow/optimization/parallelization.py @@ -79,22 +79,27 @@ def parallelize_until_if(df: DataFlow) -> Tuple[DataFlow, DataFlow]: rest = copy.deepcopy(df) collectors = {} - finishers = set() + terminal_nodes = set() for u in graph.nodes: + # Add the node to the updated graph updated.add_node(n_map[u]) rest.remove_node_by_id(u) + + # Add collect nodes if a node has multiple in flows if graph.in_degree(u) > 1: c = CollectNode(0) updated.add_node(c) collectors[u] = c.id updated.add_edge_refs(c.id, u) + # Terminal nodes have no outgoing edges elif graph.out_degree(u) == 0: - finishers.add(u) + terminal_nodes.add(u) - if len(finishers) > 1: + # Add a collect node at the end if the dependency graph is split + if len(terminal_nodes) > 1: c = CollectNode(0) updated.add_node(c) - for f in finishers: + for f in terminal_nodes: c.num_events += 1 updated.add_edge_refs(f, c.id) diff --git a/src/cascade/frontend/cfg/cfg_builder.py b/src/cascade/frontend/cfg/cfg_builder.py index 29aa63d..df90352 100644 --- a/src/cascade/frontend/cfg/cfg_builder.py +++ b/src/cascade/frontend/cfg/cfg_builder.py @@ -8,7 +8,9 @@ class ControlFlowGraphBuilder: - def __init__(self, block_list: list, globals: list[str]): + def __init__(self, block_list: list, globals: list[str], operators: list[str]): + self.operators = operators + self.remote_entities: list = [] self.block_list: list = block_list self.globals = globals @@ -22,6 +24,10 @@ def make_cfg(self, blocks: list, i = 0) -> tuple[ControlFlowGraph, int]: i += 1 args = b.args function_vars = [f'{a.arg}_0' for a in args.args] + + # detect arguments that are entities e.g. `item1: Item` means item1 is an entity. + self.remote_entities.extend([f'{a.arg}' for a in args.args if str(a.annotation).strip("'") in self.operators]) + statement.extend_targets(function_vars) statement.extend_values(function_vars) graph.append_statement(statement) @@ -53,10 +59,11 @@ def make_cfg(self, blocks: list, i = 0) -> tuple[ControlFlowGraph, int]: statement.values = [v.__repr__() for v in values] contains_attribute, attribute = ContainsAttributeVisitor.check_return_attribute(b) if contains_attribute: - if attribute.value.id in self.globals: - statement.values.remove(attribute.value.id) - elif attribute.value.id != 'self': - statement.set_remote() + if hasattr(attribute.value, "id"): + if attribute.value.id in self.globals: + statement.values.remove(attribute.value.id) + elif attribute.value.id in self.remote_entities or attribute.value.id in self.operators: + statement.set_remote() statement.set_attribute(attribute) @@ -67,6 +74,6 @@ def construct_dataflow_graph(self) -> ControlFlowGraph: return graph @classmethod - def build(cls, block_list: list, globals: list[str]) -> ControlFlowGraph: - dataflow_graph_builder = cls(block_list, globals) + def build(cls, block_list: list, globals: list[str], operators: list[str]) -> ControlFlowGraph: + dataflow_graph_builder = cls(block_list, globals, operators) return dataflow_graph_builder.construct_dataflow_graph() diff --git a/src/cascade/frontend/generator/dataflow_builder.py b/src/cascade/frontend/generator/dataflow_builder.py index 437d542..07de9e2 100644 --- a/src/cascade/frontend/generator/dataflow_builder.py +++ b/src/cascade/frontend/generator/dataflow_builder.py @@ -157,8 +157,8 @@ def blocked_cfg(statement_graph: nx.DiGraph, entry: Statement) -> nx.DiGraph: # add then and orelse blocks - graph.add_edges_from(then_blocked_graph.edges()) - graph.add_edges_from(orelse_blocked_graph.edges()) + graph.add_edges_from(then_blocked_graph.edges(data=True)) + graph.add_edges_from(orelse_blocked_graph.edges(data=True)) # connect them to this node first_then = list(then_blocked_graph.nodes)[0] @@ -173,7 +173,7 @@ def blocked_cfg(statement_graph: nx.DiGraph, entry: Statement) -> nx.DiGraph: except IndexError: first_finally = succ_then[0] finally_graph = blocked_cfg(statement_graph, first_finally) - graph.add_edges_from(finally_graph.edges()) + graph.add_edges_from(finally_graph.edges(data=True)) first_finally = list(finally_graph.nodes)[0] graph.add_edge(last_then, first_finally) @@ -186,15 +186,18 @@ def blocked_cfg(statement_graph: nx.DiGraph, entry: Statement) -> nx.DiGraph: class DataflowBuilder: - def __init__(self, function_def: nodes.FunctionDef, globals: Optional[dict[str, Any]] = None): + def __init__(self, function_def: nodes.FunctionDef, dataflows: dict[DataflowRef, DataFlow], globals: Optional[dict[str, Any]] = None): self.function_def = function_def self.name = self.function_def.name self.globals = globals + self.dataflows = dataflows def build_cfg(self): + operators = [d.operator_name for d in self.dataflows.keys()] + global_names = list(self.globals.keys()) if self.globals else [] - cfg: ControlFlowGraph = ControlFlowGraphBuilder.build([self.function_def] + self.function_def.body, global_names) + cfg: ControlFlowGraph = ControlFlowGraphBuilder.build([self.function_def] + self.function_def.body, global_names, operators) self.type_map = ExtractTypeVisitor.extract(self.function_def) cfg.name = self.function_def.name @@ -205,9 +208,9 @@ def build_cfg(self): self.blocked_cfg = split_cfg(blocked_cfg(cfg.graph, cfg.get_single_source())) - def build_df(self, dataflows: dict[DataflowRef, DataFlow], op_name: str) -> DataFlow: + def build_df(self, op_name: str) -> DataFlow: df_ref = DataflowRef(op_name, self.name) - df = dataflows[df_ref] + df = self.dataflows[df_ref] node_id_map = {} @@ -215,7 +218,7 @@ def build_df(self, dataflows: dict[DataflowRef, DataFlow], op_name: str) -> Data is_entry = True for statement_block in self.blocked_cfg.nodes: if len(statement_block) == 1 and statement_block[0].is_remote(): - node = to_entity_call(statement_block[0], self.type_map, dataflows) + node = to_entity_call(statement_block[0], self.type_map, self.dataflows) elif len(statement_block) == 1 and statement_block[0].is_predicate: rawblock = statement_block[0].block assert isinstance(rawblock, nodes.Bool), type(rawblock) @@ -244,8 +247,8 @@ def build_df(self, dataflows: dict[DataflowRef, DataFlow], op_name: str) -> Data return df - def build(self, dataflows: dict[DataflowRef, DataFlow], op_name: str) -> DataFlow: + def build(self, op_name: str) -> DataFlow: self.build_cfg() - return self.build_df(dataflows, op_name) + return self.build_df(op_name) diff --git a/src/cascade/frontend/generator/unparser.py b/src/cascade/frontend/generator/unparser.py index 1e677e6..3e635b5 100644 --- a/src/cascade/frontend/generator/unparser.py +++ b/src/cascade/frontend/generator/unparser.py @@ -58,5 +58,24 @@ def unparse(block: RawBasicBlock): for v in block.values[1:]: res += " {} {}".format(block.op, unparse(v)) return res + case nodes.Tuple: + vals = [unparse(v) for v in block.elts] + + if len(vals) > 1: + res = f"({','.join(vals)})" + elif len(vals) == 1: + res = f"({vals[0]},)" + else: + res = "tuple()" + return res + + case nodes.List: + vals = [unparse(v) for v in block.elts] + + res = f"[{','.join(vals)}]" + return res + case nodes.Pass: + return "pass" + case _: raise NotImplementedError(f"{type(block)}: {block}") diff --git a/src/cascade/runtime/flink_runtime.py b/src/cascade/runtime/flink_runtime.py index 66faba6..8bc4bc0 100644 --- a/src/cascade/runtime/flink_runtime.py +++ b/src/cascade/runtime/flink_runtime.py @@ -19,7 +19,7 @@ import logging logger = logging.getLogger("cascade") -logger.setLevel("INFO") +logger.setLevel("DEBUG") console_handler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') console_handler.setFormatter(formatter) @@ -109,7 +109,7 @@ def process_element(self, event_result: tuple[Event, Any], ctx: ProcessFunction. else: yield event else: - assert isinstance(event, EventResult) + assert isinstance(event, EventResult), f"Expected EventResult, got: {event}" yield self.out_tag, event class FlinkOperator(KeyedProcessFunction): @@ -128,12 +128,12 @@ def process_element(self, event: Event, ctx: KeyedProcessFunction.Context): event = profile_event(event, "STATEFUL OP INNER ENTRY") # should be handled by filters on this FlinkOperator - assert(isinstance(event.target, CallLocal)) + assert isinstance(event.target, CallLocal), f"Expected CallLocal target, got: {event.target}" logger.debug(f"FlinkOperator {self.operator.name()}[{ctx.get_current_key()}]: Processing: {event.target.method}") assert(event.dataflow.operator_name == self.operator.name()) key = ctx.get_current_key() - assert(key is not None) + assert key is not None, f"Expected key for a Stateful Operator" if isinstance(event.target.method, InitClass): result = self.operator.handle_init_class(**event.variable_map).__dict__ @@ -270,7 +270,7 @@ def process_element(self, event: Event, ctx: KeyedProcessFunction.Context): var_map_num_items = self.var_map.value() logger.debug(f"FlinkCollectOp [{ctx.get_current_key()}]: Processing: {event}") - assert isinstance(event.target, CollectNode) + assert isinstance(event.target, CollectNode), f"Expectec CollectNode target, got: {event.target}" total_events = event.target.num_events @@ -646,7 +646,7 @@ def run(self, run_async=False, output: Literal["collect", "kafka", "stdout"]="ka raise RuntimeError("No operators found, were they added to the flink runtime with .add_*_operator()") - op_routed = operator_streams.process(RouterOperator(self.dataflows, collect_tag, result_tag)).name("ROUTER (OP)") + op_routed = operator_streams.process(RouterOperator(self.dataflows, collect_tag, result_tag)).name("POST OP ROUTER") collect_stream = ( op_routed @@ -654,7 +654,7 @@ def run(self, run_async=False, output: Literal["collect", "kafka", "stdout"]="ka .key_by(lambda e: str(e._id) + "_" + str(e.target.id)) # might not work in the future if we have multiple merges in one dataflow? .process(FlinkCollectOperator()) .name("Collect") - .process(RouterOperator(self.dataflows, collect_tag, result_tag)) + .process(RouterOperator(self.dataflows, collect_tag, result_tag)).name("POST COLLECT ROUTER") ) """Stream that ingests events with an `cascade.dataflow.dataflow.CollectNode` target""" diff --git a/tests/frontend/dataflow_analysis/test_branches.py b/tests/frontend/dataflow_analysis/test_branches.py index f2bc08e..d54ca79 100644 --- a/tests/frontend/dataflow_analysis/test_branches.py +++ b/tests/frontend/dataflow_analysis/test_branches.py @@ -22,15 +22,15 @@ def buy_item(self, item: 'Item') -> int: test_class = blocks[2] get_total: nodes.FunctionDef = test_class.blocks[1].ssa_code.code_list[0] - sf = DataflowBuilder(get_total) dataflows = { DataflowRef("User", "buy_item"): DataFlow("buy_item", "User", ["item"]), DataflowRef("User", "__init__"): DataFlow("__init__", "User", ["username", "balance"]), DataflowRef("Item", "get_price"): DataFlow("get_price", "Item", []), } + sf = DataflowBuilder(get_total, dataflows) + df = sf.build("User") - df = sf.build(dataflows, "User") print(df.to_dot()) assert len(df.nodes) == 7 ifnode = None @@ -58,15 +58,15 @@ def buy_item(self, item: 'Item') -> int: test_class = blocks[2] get_total: nodes.FunctionDef = test_class.blocks[1].ssa_code.code_list[0] - sf = DataflowBuilder(get_total) dataflows = { DataflowRef("User", "buy_item"): DataFlow("buy_item", "User", ["item"]), DataflowRef("User", "__init__"): DataFlow("__init__", "User", ["username", "balance"]), DataflowRef("Item", "get_price"): DataFlow("get_price", "Item", []), } + sf = DataflowBuilder(get_total, dataflows) + df = sf.build("User") - df = sf.build(dataflows, "User") print(df.to_dot()) assert len(df.nodes) == 6, "complex predicate should create a temp variable assignment" @@ -90,15 +90,15 @@ def buy_item(self, item: 'Item') -> int: test_class = blocks[2] get_total: nodes.FunctionDef = test_class.blocks[1].ssa_code.code_list[0] - sf = DataflowBuilder(get_total) dataflows = { DataflowRef("User", "buy_item"): DataFlow("buy_item", "User", ["item"]), DataflowRef("User", "__init__"): DataFlow("__init__", "User", ["username", "balance"]), DataflowRef("Item", "get_price"): DataFlow("get_price", "Item", []), } + sf = DataflowBuilder(get_total, dataflows) + df = sf.build("User") - df = sf.build(dataflows, "User") print(df.to_dot()) def test_no_else(): @@ -117,15 +117,15 @@ def buy_item(self, item: 'Item') -> int: test_class = blocks[2] get_total: nodes.FunctionDef = test_class.blocks[1].ssa_code.code_list[0] - sf = DataflowBuilder(get_total) dataflows = { DataflowRef("User", "buy_item"): DataFlow("buy_item", "User", ["item"]), DataflowRef("User", "__init__"): DataFlow("__init__", "User", ["username", "balance"]), DataflowRef("Item", "get_price"): DataFlow("get_price", "Item", []), } + sf = DataflowBuilder(get_total, dataflows) + df = sf.build("User") - df = sf.build(dataflows, "User") print(df.to_dot()) assert len(df.nodes) == 6 @@ -139,6 +139,8 @@ def buy_item(self, item: 'Item') -> int: item_price = item.get_price() if True: x = 20 + else: + x = 30 self.balance = self.balance - item_price return "item bought" else: @@ -154,14 +156,48 @@ def buy_item(self, item: 'Item') -> int: test_class = blocks[2] get_total: nodes.FunctionDef = test_class.blocks[1].ssa_code.code_list[0] - sf = DataflowBuilder(get_total) dataflows = { DataflowRef("User", "buy_item"): DataFlow("buy_item", "User", ["item"]), DataflowRef("User", "__init__"): DataFlow("__init__", "User", ["username", "balance"]), DataflowRef("Item", "get_price"): DataFlow("get_price", "Item", []), } + sf = DataflowBuilder(get_total, dataflows) + df = sf.build("User") + + print(df.to_dot()) + conditional_edges = 0 + for edge in df.edges: + if edge.if_conditional is not None: + conditional_edges += 1 + assert conditional_edges == 6 + + +def test_local_methods(): + program: str = dedent(""" + class Test: + + def get_total(item: Item): + q1 = item.get_quantity() + lst = [] + lst.append(q1) + lst.append(23) + return lst""") + + cfg, _ = setup_cfg(program) + blocks = cfg.block_list + test_class: nodes.Block = blocks[2] + get_total: nodes.FunctionDef = test_class.blocks[1].ssa_code.code_list[0] + + + dataflows = { + DataflowRef("Test", "get_total"): DataFlow("get_total", "Test", ["item"]), + DataflowRef("Item", "get_quantity"): DataFlow("get_quantity", "Item", []), + } + sf = DataflowBuilder(get_total, dataflows) + df = sf.build("Test") - df = sf.build(dataflows, "User") print(df.to_dot()) - assert len(df.nodes) == 12 \ No newline at end of file + assert len(df.nodes) == 3 + for block in df.blocks.values(): + print(block.function_string) \ No newline at end of file diff --git a/tests/frontend/dataflow_analysis/test_dataflow_graph_builder.py b/tests/frontend/dataflow_analysis/test_dataflow_graph_builder.py index 68c5a9b..0d3bc08 100644 --- a/tests/frontend/dataflow_analysis/test_dataflow_graph_builder.py +++ b/tests/frontend/dataflow_analysis/test_dataflow_graph_builder.py @@ -26,14 +26,14 @@ def get_total(item1: Stock, item2: Stock): # TODO: check that the produced ssa code made variables for # - item1.get_quantity() # - item2.get_quantity() - df: ControlFlowGraph = ControlFlowGraphBuilder.build([get_total] + get_total.body, globals=[]) + df: ControlFlowGraph = ControlFlowGraphBuilder.build([get_total] + get_total.body, globals=[], operators=["Stock", "Adder"]) for n in df.graph.nodes: print(n) for u, v in df.graph.edges: print(u.block_num, v.block_num) # print(df.graph.edges) -def test_ssa(): +def test_remote_call_assignment(): program: str = dedent(""" class Test: @@ -49,7 +49,7 @@ def get_total(item1: Stock, item2: Stock): # TODO: check that the produced ssa code made variables for # - item1.get_quantity() # - item2.get_quantity() - df: ControlFlowGraph = ControlFlowGraphBuilder.build([get_total] + get_total.body, globals=[]) + df: ControlFlowGraph = ControlFlowGraphBuilder.build([get_total] + get_total.body, globals=[], operators=["Stock", "Adder"]) print(df.graph.nodes) print(df.graph.edges) @@ -76,6 +76,6 @@ def test_branches(item1: Stock, item2: Stock): # TODO: check that the produced ssa code made variables for # - item1.get_quantity() # - item2.get_quantity() - df: ControlFlowGraph = ControlFlowGraphBuilder.build([test] + test.body, globals=[]) + df: ControlFlowGraph = ControlFlowGraphBuilder.build([test] + test.body, globals=[], operators=["Stock"]) # print(df.graph.nodes) # print(df.graph.edges) \ No newline at end of file diff --git a/tests/frontend/dataflow_analysis/test_entities.py b/tests/frontend/dataflow_analysis/test_entities.py index 6a3ffca..4b9a8bb 100644 --- a/tests/frontend/dataflow_analysis/test_entities.py +++ b/tests/frontend/dataflow_analysis/test_entities.py @@ -23,15 +23,15 @@ def get_total(item1: Stock, item2: Stock): test_class = blocks[2] get_total: nodes.FunctionDef = test_class.blocks[1].ssa_code.code_list[0] - sf = DataflowBuilder(get_total) - sf.build_cfg() dataflows = { DataflowRef("Test", "get_total"): DataFlow("get_total", "Test", ["item1", "item2"]), DataflowRef("Stock", "get_quantity"): DataFlow("get_quantity", "Stock", []) } - df = sf.build(dataflows, "Test") + sf = DataflowBuilder(get_total, dataflows) + sf.build_cfg() + df = sf.build("Test") ## TODO: check blocks/df assert len(df.nodes) == 4 @@ -47,6 +47,40 @@ def get_total(item1: Stock, item2: Stock): next = next[0] assert isinstance(next, CallLocal) +def test_ssa(): + program: str = dedent(""" + class Test: + + def get_total(inc: bool): + t = 1 + if inc: + t = 2 + else: + t = 3 + return t""") + + cfg, _ = setup_cfg(program) + blocks = cfg.block_list + test_class: nodes.Block = blocks[2] + get_total: nodes.FunctionDef = test_class.blocks[1].ssa_code.code_list[0] + + + dataflows = { + DataflowRef("Test", "get_total"): DataFlow("get_total", "Test", ["item1", "item2"]), + DataflowRef("Stock", "get_quantity"): DataFlow("get_quantity", "Stock", []) + } + sf = DataflowBuilder(get_total, dataflows) + sf.build_cfg() + + df = sf.build("Test") + + for block in df.blocks.values(): + print(block.function_string) + for node in df.nodes.values(): + print(node) + + raise NotImplementedError("there should be a phi node here!") + def test_simple_block(): program: str = dedent(""" @@ -59,13 +93,13 @@ def add(x: int, y: int): test_class = blocks[2] get_total: nodes.FunctionDef = test_class.blocks[1].ssa_code.code_list[0] - sf = DataflowBuilder(get_total) dataflows = { DataflowRef("Test", "add"): DataFlow("get_total", "Test", ["x", "y"]), } + sf = DataflowBuilder(get_total, dataflows) - df = sf.build(dataflows, "Test") + df = sf.build( "Test") assert len(df.blocks) == 1 block = list(df.blocks.values())[0] @@ -89,14 +123,14 @@ def buy_item(self, item: 'Item') -> bool: user_class = blocks[2] buy_item: nodes.FunctionDef = user_class.blocks[1].ssa_code.code_list[0] - sf = DataflowBuilder(buy_item) dataflows = { DataflowRef("User", "buy_item"): DataFlow("buy_item", "User", ["item"]), DataflowRef("Item", "get_price"): DataFlow("get_price", "Item", []), } + sf = DataflowBuilder(buy_item, dataflows) - df = sf.build(dataflows, "User") + df = sf.build("User") blocks = list(df.blocks.values()) @@ -128,14 +162,15 @@ def upload_unique_id(self, review_id: int): user_class = blocks[2] upload_unique: nodes.FunctionDef = user_class.blocks[1].ssa_code.code_list[0] - sf = DataflowBuilder(upload_unique) dataflows = { DataflowRef("ComposeReview", "upload_unique_id"): DataFlow("upload_unique_id", "ComposeReview", ["review_id"]), DataflowRef("ComposeReview", "__init__"): DataFlow("__init__", "ComposeReview", ["req_id"]), } - df = sf.build(dataflows, "ComposeReview") + sf = DataflowBuilder(upload_unique, dataflows) + + df = sf.build("ComposeReview") blocks = list(df.blocks.values()) @@ -171,16 +206,18 @@ def rand(): upload_unique: nodes.FunctionDef = user_class.blocks[1].ssa_code.code_list[0] import random - sf = DataflowBuilder(upload_unique, {'random': random}) + dataflows = { + DataflowRef("Randomer", "rand"): DataFlow("rand", "Randomer", []), + } + + sf = DataflowBuilder(upload_unique, dataflows, {'random': random}) sf.build_cfg() for node in sf.cfg.get_nodes(): print(node) - dataflows = { - DataflowRef("Randomer", "rand"): DataFlow("rand", "Randomer", []), - } + - df = sf.build(dataflows, "Randomer") + df = sf.build("Randomer") for block in df.blocks.values(): print(block.function_string) diff --git a/tests/frontend/dataflow_analysis/test_split_functions.py b/tests/frontend/dataflow_analysis/test_split_functions.py index 8f25dfb..9acdbc1 100644 --- a/tests/frontend/dataflow_analysis/test_split_functions.py +++ b/tests/frontend/dataflow_analysis/test_split_functions.py @@ -29,18 +29,15 @@ def get_total(item1: Stock, item2: Stock, y: int): get_total: nodes.FunctionDef = test_class.blocks[1].ssa_code.code_list[0] - sf = DataflowBuilder(get_total) - sf.build_cfg() - dataflows = { DataflowRef("Adder", "add"): DataFlow("add", "Adder", ["a", "b"]), DataflowRef("Stock", "get_quantity"): DataFlow("get_quantity", "Item", []), DataflowRef("Test", "get_total"): DataFlow("get_total", "Test", []) } + sf = DataflowBuilder(get_total, dataflows) + sf.build_cfg() + df = sf.build_df("Test") - - - df = sf.build_df(dataflows, "Test") print(df.to_dot()) for block in df.blocks.values(): print(block.function_string) @@ -67,7 +64,11 @@ def test_branching(self) -> int: test_class: nodes.Block = blocks[2] get_total: nodes.FunctionDef = test_class.blocks[1].ssa_code.code_list[0] - sf = DataflowBuilder(get_total) + dataflows = { + DataflowRef("Test", "test_branching"): DataFlow("test_branching", "Test", []) + } + + sf = DataflowBuilder(get_total, dataflows) sf.build_cfg() print(sf.cfg.to_dot()) new = blocked_cfg(sf.cfg.graph, sf.cfg.get_single_source()) @@ -77,13 +78,9 @@ def test_branching(self) -> int: print_digraph(split_cfg(new)) assert len(new.nodes) == 5 - - dataflows = { - DataflowRef("Test", "test_branching"): DataFlow("test_branching", "Test", []) - } + df = sf.build_df("Test") - df = sf.build_df(dataflows, "Test") print(df.to_dot()) for block in df.blocks.values(): print(block.function_string) @@ -124,7 +121,11 @@ def test_branching(self) -> int: test_class: nodes.Block = blocks[2] get_total: nodes.FunctionDef = test_class.blocks[1].ssa_code.code_list[0] - sf = DataflowBuilder(get_total) + dataflows = { + DataflowRef("Test", "test_branching"): DataFlow("test_branching", "Test", []), + DataflowRef("Entity", "call"): DataFlow("call", "Entity", []) + } + sf = DataflowBuilder(get_total, dataflows) sf.build_cfg() print(sf.cfg.to_dot()) new = blocked_cfg(sf.cfg.graph, sf.cfg.get_single_source()) @@ -135,15 +136,12 @@ def test_branching(self) -> int: print(new_split.nodes) assert len(list(new_split.nodes)) == 8 - dataflows = { - DataflowRef("Test", "test_branching"): DataFlow("test_branching", "Test", []), - DataflowRef("Entity", "call"): DataFlow("call", "Entity", []) - } + # TODO: Check # entity calls, # of blocks, # of local calls - df = sf.build_df(dataflows, "Test") + df = sf.build_df("Test") print(df.to_dot()) for block in df.blocks.values(): print(block.function_string) diff --git a/tests/integration/flink/test_collect_operator.py b/tests/integration/flink/test_collect_operator.py index df9b089..ba80e1e 100644 --- a/tests/integration/flink/test_collect_operator.py +++ b/tests/integration/flink/test_collect_operator.py @@ -2,7 +2,7 @@ from pyflink.datastream.data_stream import CloseableIterator from cascade.dataflow.dataflow import DataflowRef -from cascade.dataflow.optimization.parallelization import parallelize, parallelize_until_if +from cascade.dataflow.optimization.parallelization import parallelize from cascade.runtime.flink_runtime import FlinkClientSync import tests.integration.flink.utils as utils From ade0dbc23cac3ac8ab6ff3aa123406b2d08e0b3e Mon Sep 17 00:00:00 2001 From: Lucas Van Mol <16979353+lucasvanmol@users.noreply.github.com> Date: Tue, 13 May 2025 11:17:22 +0200 Subject: [PATCH 2/3] Modify experiments --- experiments/dynamic_prefetching/run_experiments.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/experiments/dynamic_prefetching/run_experiments.py b/experiments/dynamic_prefetching/run_experiments.py index 6f73e4c..9267b28 100644 --- a/experiments/dynamic_prefetching/run_experiments.py +++ b/experiments/dynamic_prefetching/run_experiments.py @@ -6,12 +6,12 @@ # Define experiment parameters as a list of dictionaries experiments = [ - {"parallelism": 8, "benchmark_args": {"requests_per_second": rps, "seconds": sec, "threads": 10, "experiment": "baseline", "chance": 0.9, "num_users": 100}}, - {"parallelism": 8, "benchmark_args": {"requests_per_second": rps, "seconds": sec, "threads": 10, "experiment": "prefetch", "chance": 0.9, "num_users": 100}}, - {"parallelism": 8, "benchmark_args": {"requests_per_second": rps, "seconds": sec, "threads": 10, "experiment": "baseline", "chance": 0.5, "num_users": 100}}, - {"parallelism": 8, "benchmark_args": {"requests_per_second": rps, "seconds": sec, "threads": 10, "experiment": "prefetch", "chance": 0.5, "num_users": 100}}, - {"parallelism": 8, "benchmark_args": {"requests_per_second": rps, "seconds": sec, "threads": 10, "experiment": "baseline", "chance": 0.1, "num_users": 100}}, - {"parallelism": 8, "benchmark_args": {"requests_per_second": rps, "seconds": sec, "threads": 10, "experiment": "prefetch", "chance": 0.1, "num_users": 100}}, + {"parallelism": 8, "benchmark_args": {"requests_per_second": rps, "seconds": sec, "threads": 10, "experiment": "baseline", "chance": 0.9, "num_users": 1000}}, + {"parallelism": 8, "benchmark_args": {"requests_per_second": rps, "seconds": sec, "threads": 10, "experiment": "prefetch", "chance": 0.9, "num_users": 1000}}, + {"parallelism": 8, "benchmark_args": {"requests_per_second": rps, "seconds": sec, "threads": 10, "experiment": "baseline", "chance": 0.5, "num_users": 1000}}, + {"parallelism": 8, "benchmark_args": {"requests_per_second": rps, "seconds": sec, "threads": 10, "experiment": "prefetch", "chance": 0.5, "num_users": 1000}}, + {"parallelism": 8, "benchmark_args": {"requests_per_second": rps, "seconds": sec, "threads": 10, "experiment": "baseline", "chance": 0.1, "num_users": 1000}}, + {"parallelism": 8, "benchmark_args": {"requests_per_second": rps, "seconds": sec, "threads": 10, "experiment": "prefetch", "chance": 0.1, "num_users": 1000}}, ] From 5258158d609dfbe6ae5e633c191dddda830b3cfa Mon Sep 17 00:00:00 2001 From: Lucas Van Mol <16979353+lucasvanmol@users.noreply.github.com> Date: Thu, 5 Jun 2025 01:01:36 +0200 Subject: [PATCH 3/3] Start on writing hotel reservation in new IR --- deathstar_hotel_reservation/demo.py | 385 +----------------- deathstar_hotel_reservation/demo_process.py | 24 ++ deathstar_hotel_reservation/demo_python.py | 109 ----- .../entities/reservation.py | 56 +++ deathstar_hotel_reservation/entities/user.py | 104 +---- .../start_benchmark.py | 351 ++++++++++++++++ deathstar_hotel_reservation/test_demo.py | 100 ----- .../test_hotel_demo.py | 73 ++++ deathstar_movie_review/demo.py | 2 - run_experiments.py | 108 ----- run_hotel_exp.py | 76 ++++ ..._workaround copy.py => run_parallel_exp.py | 0 .../dataflow/optimization/parallelization.py | 11 +- src/cascade/runtime/flink_runtime.py | 2 +- tests/integration/flink/utils.py | 11 +- 15 files changed, 618 insertions(+), 794 deletions(-) create mode 100644 deathstar_hotel_reservation/demo_process.py delete mode 100644 deathstar_hotel_reservation/demo_python.py create mode 100644 deathstar_hotel_reservation/entities/reservation.py create mode 100644 deathstar_hotel_reservation/start_benchmark.py delete mode 100644 deathstar_hotel_reservation/test_demo.py create mode 100644 deathstar_hotel_reservation/test_hotel_demo.py delete mode 100755 run_experiments.py create mode 100755 run_hotel_exp.py rename run_experiments_gil_workaround copy.py => run_parallel_exp.py (100%) diff --git a/deathstar_hotel_reservation/demo.py b/deathstar_hotel_reservation/demo.py index 63a6024..5af3870 100644 --- a/deathstar_hotel_reservation/demo.py +++ b/deathstar_hotel_reservation/demo.py @@ -1,375 +1,24 @@ -import random -import sys -import os -import time -import csv -from timeit import default_timer as timer -from multiprocessing import Pool +import cascade +from tests.integration.flink.utils import create_topics, init_flink_runtime -# import cascade -sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../src"))) +KAFKA_BROKER = "localhost:9092" +KAFKA_FLINK_BROKER = "kafka:9093" # If running a flink cluster and kafka inside docker, the broker url might be different -from cascade.dataflow.dataflow import Event, EventResult, InitClass, InvokeMethod, OpNode -from cascade.runtime.flink_runtime import FlinkClientSync, FlinkOperator, FlinkRuntime, FlinkStatelessOperator -from deathstar_hotel_reservation.entities.flight import Flight, flight_op -from deathstar_hotel_reservation.entities.hotel import Geo, Hotel, Rate, hotel_op -from deathstar_hotel_reservation.entities.recommendation import Recommendation, recommend_op -from deathstar_hotel_reservation.entities.search import Search, search_op -from deathstar_hotel_reservation.entities.user import User, user_op -import pandas as pd - - -class DeathstarDemo(): - def __init__(self): - self.init_user = OpNode(User, InitClass(), read_key_from="user_id") - self.init_hotel = OpNode(Hotel, InitClass(), read_key_from="key") - self.init_flight = OpNode(Flight, InitClass(), read_key_from="id") - - def init_runtime(self, runtime, **kwargs): - self.runtime = runtime - self.runtime.init(**kwargs) - self.runtime.add_operator(hotel_op) - self.runtime.add_operator(flight_op) - self.runtime.add_operator(user_op) - # self.runtime.add_stateless_operator(search_op) - # self.runtime.add_stateless_operator(recommend_op) - - - def populate(self): - # Create locations & rates for hotels - geos = [] - geos.append(Geo(37.7867, 0)) - geos.append(Geo(37.7854, -122.4005)) - geos.append(Geo(37.7867, -122.4071)) - geos.append(Geo(37.7936, -122.3930)) - geos.append(Geo(37.7831, -122.4181)) - geos.append(Geo(37.7863, -122.4015)) - - for i in range(6, 100): - lat: float = 37.7835 + i / 500.0 * 3 - lon: float = -122.41 + i / 500.0 * 4 - geos.append(Geo(lat, lon)) - - rates = {} - rates[1] = Rate(1, "RACK", - "2015-04-09", - "2015-04-10", - { "BookableRate": 190.0, - "Code": "KNG", - "RoomDescription": "King sized bed", - "TotalRate": 109.0, - "TotalRateInclusive": 123.17}) - - rates[2] = Rate(2, "RACK", - "2015-04-09", - "2015-04-10", - { "BookableRate": 139.0, - "Code": "QN", - "RoomDescription": "Queen sized bed", - "TotalRate": 139.0, - "TotalRateInclusive": 153.09}) - - rates[3] = Rate(3, "RACK", - "2015-04-09", - "2015-04-10", - { "BookableRate": 109.0, - "Code": "KNG", - "RoomDescription": "King sized bed", - "TotalRate": 109.0, - "TotalRateInclusive": 123.17}) - - for i in range(4, 80): - if i % 3 == 0: - hotel_id = i - end_date = "2015-04-" - rate = 109.0 - rate_inc = 123.17 - if i % 2 == 0: - end_date += '17' - else: - end_date += '24' - if i % 5 == 1: - rate = 120.0 - rate_inc = 140.0 - elif i % 5 == 2: - rate = 124.0 - rate_inc = 144.0 - elif i % 5 == 3: - rate = 132.0 - rate_inc = 158.0 - elif i % 5 == 4: - rate = 232.0 - rate_inc = 258.0 - - rates[hotel_id] = Rate(i, "RACK", - "2015-04-09", - end_date, - { "BookableRate": rate, - "Code": "KNG", - "RoomDescription": "King sized bed", - "TotalRate": rate, - "TotalRateInclusive": rate_inc}) - - # we don't create recommendations, because it doesn't really - # correspond to an entity - prices = [] - - prices.append(150.00) - prices.append(120.00) - prices.append(190.00) - prices.append(160.00) - prices.append(140.00) - prices.append(200.00) - - for i in range(6, 100): - price = 179.00 - if i % 3 == 0: - if i % 5 == 0: - price = 123.17 - elif i % 5 == 1: - price = 140.00 - elif i % 5 == 2: - price = 144.00 - elif i % 5 == 3: - price = 158.00 - elif i % 5 == 4: - price = 258.00 - - prices.append(price) - - # populate users - self.users = [User(f"Cornell_{i}", str(i) * 10) for i in range(501)] - for user in self.users: - event = Event(self.init_user, {"user_id": user.id, "password": user.password}, None) - self.runtime.send(event) - - # populate hotels - self.hotels: list[Hotel] = [] - for i in range(100): - geo = geos[i] - rate = rates[i] if i in rates else [] - price = prices[i] - hotel = Hotel(str(i), 10, geo, rate, price) - self.hotels.append(hotel) - event = Event(self.init_hotel, - { - "key": hotel.key, - "cap": hotel.cap, - "geo": hotel.geo, - "rates": hotel.rates, - "price": hotel.price - }, None) - self.runtime.send(event) - - # populate flights - self.flights = [Flight(str(i), 10) for i in range(100)] - for flight in self.flights[:-1]: - event = Event(self.init_flight, { - "id": flight.id, - "cap": flight.cap - }, None) - self.runtime.send(event) - flight = self.flights[-1] - event = Event(self.init_flight, { - "id": flight.id, - "cap": flight.cap - }, None) - self.runtime.send(event, flush=True) - -def search_hotel(): - in_date = random.randint(9, 23) - out_date = random.randint(in_date + 1, 24) - - if in_date < 10: - in_date_str = f"2015-04-0{in_date}" - else: - in_date_str = f"2015-04-{in_date}" - if out_date < 10: - out_date_str = f"2015-04-0{out_date}" - else: - out_date_str = f"2015-04-{out_date}" - - lat = 38.0235 + (random.randint(0, 481) - 240.5) / 1000.0 - lon = -122.095 + (random.randint(0, 325) - 157.0) / 1000.0 - - # We don't really use the in_date, out_date information - return Event(search_op.dataflow.entry, {"lat": lat, "lon": lon}, search_op.dataflow) - -def recommend(req_param=None): - if req_param is None: - coin = random.random() - if coin < 0.5: - req_param = "distance" - else: - req_param = "price" - - lat = 38.0235 + (random.randint(0, 481) - 240.5) / 1000.0 - lon = -122.095 + (random.randint(0, 325) - 157.0) / 1000.0 - - - return Event(recommend_op.dataflow.entry, {"requirement": req_param, "lat": lat, "lon": lon}, recommend_op.dataflow) - -def user_login(succesfull=True): - user_id = random.randint(0, 500) - username = f"Cornell_{user_id}" - password = str(user_id) * 10 if succesfull else "" - return Event(OpNode(User, InvokeMethod("login"), read_key_from="user_key"), {"user_key": username, "password": password}, None) - - -def reserve(): - hotel_id = random.randint(0, 99) - flight_id = random.randint(0, 99) - - user_id = "Cornell_" + str(random.randint(0, 500)) - - return Event( - user_op.dataflows["order"].entry, - { - "user_key": user_id, - "flight_key": str(flight_id), - "hotel_key": str(hotel_id) - }, - user_op.dataflows["order"]) - -def deathstar_workload_generator(): - search_ratio = 0.6 - recommend_ratio = 0.39 - user_ratio = 0.005 - reserve_ratio = 0.005 - c = 0 - while True: - coin = random.random() - if coin < search_ratio: - yield search_hotel() - elif coin < search_ratio + recommend_ratio: - yield recommend() - elif coin < search_ratio + recommend_ratio + user_ratio: - yield user_login() - else: - yield reserve() - c += 1 - -def reserve_workload_generator(): - while True: - yield reserve() - -def user_login_workload_generator(): - while True: - yield user_login() - -threads = 1 -messages_per_burst = 10 -sleeps_per_burst = 1 -sleep_time = 0.0085 -seconds_per_burst = 1 -bursts = 50 - - -def benchmark_runner(proc_num) -> dict[int, dict]: - print(f'Generator: {proc_num} starting') - client = FlinkClientSync("deathstar", "ds-out", "localhost:9092", True) - deathstar_generator = deathstar_workload_generator() - start = timer() - - for _ in range(bursts): - sec_start = timer() - - # send burst of messages - for i in range(messages_per_burst): - - # sleep sometimes between messages - if i % (messages_per_burst // sleeps_per_burst) == 0: - time.sleep(sleep_time) - event = next(deathstar_generator) - client.send(event) - - client.flush() - sec_end = timer() - - # wait out the second - lps = sec_end - sec_start - if lps < seconds_per_burst: - time.sleep(1 - lps) - sec_end2 = timer() - print(f'Latency per burst: {sec_end2 - sec_start} ({seconds_per_burst})') - - end = timer() - print(f'Average latency per burst: {(end - start) / bursts} ({seconds_per_burst})') - - done = False - while not done: - done = True - for event_id, fut in client._futures.items(): - result = fut["ret"] - if result is None: - done = False - time.sleep(0.5) - break - futures = client._futures - client.close() - return futures - - -def write_dict_to_pkl(futures_dict, filename): - """ - Writes a dictionary of event data to a pickle file. - - Args: - futures_dict (dict): A dictionary where each key is an event ID and the value is another dict. - filename (str): The name of the pickle file to write to. - """ - - # Prepare the data for the DataFrame - data = [] - for event_id, event_data in futures_dict.items(): - ret: EventResult = event_data.get("ret") - row = { - "event_id": event_id, - "sent": str(event_data.get("sent")), - "sent_t": event_data.get("sent_t"), - "ret": str(event_data.get("ret")), - "ret_t": event_data.get("ret_t"), - "roundtrip": ret.metadata["roundtrip"] if ret else None, - "flink_time": ret.metadata["flink_time"] if ret else None, - "deser_times": ret.metadata["deser_times"] if ret else None, - "loops": ret.metadata["loops"] if ret else None, - "latency": event_data["ret_t"][1] - event_data["sent_t"][1] if ret else None - } - data.append(row) - - # Create a DataFrame and save it as a pickle file - df = pd.DataFrame(data) - df.to_pickle(filename) +IN_TOPIC = "ds-hotel-in" +OUT_TOPIC = "ds-hotel-out" +INTERNAL_TOPIC = "ds-hotel-internal" def main(): - ds = DeathstarDemo() - ds.init_runtime(FlinkRuntime("deathstar", "ds-out", ui_port=8081), bundle_time=5, bundle_size=10) - ds.runtime.run(run_async=True) - ds.populate() - - - time.sleep(1) - input() - - # with Pool(threads) as p: - # results = p.map(benchmark_runner, range(threads)) - - # results = {k: v for d in results for k, v in d.items()} - results = benchmark_runner(0) - - # pd.DataFrame({"request_id": list(results.keys()), - # "timestamp": [res["timestamp"] for res in results.values()], - # "op": [res["op"] for res in results.values()] - # }).sort_values("timestamp").to_csv(f'{SAVE_DIR}/client_requests.csv', index=False) - print(results) - t = len(results) - r = 0 - for result in results.values(): - if result["ret"] is not None: - print(result) - r += 1 - print(f"{r}/{t} results recieved.") - write_dict_to_pkl(results, "test2.pkl") + thread_mode = True + + create_topics(IN_TOPIC, OUT_TOPIC, INTERNAL_TOPIC) + runtime = init_flink_runtime([ + "deathstar_hotel_reservation.entities.user", + # "deathstar_hotel_reservation.entities.reservation" + ], IN_TOPIC, OUT_TOPIC, INTERNAL_TOPIC, kafka_broker=KAFKA_FLINK_BROKER, bundle_time=5, bundle_size=10, thread_mode=thread_mode, parallelism=None) + + print(cascade.core.dataflows.keys()) + runtime.run() if __name__ == "__main__": main() \ No newline at end of file diff --git a/deathstar_hotel_reservation/demo_process.py b/deathstar_hotel_reservation/demo_process.py new file mode 100644 index 0000000..4692b20 --- /dev/null +++ b/deathstar_hotel_reservation/demo_process.py @@ -0,0 +1,24 @@ +import cascade +from tests.integration.flink.utils import create_topics, init_flink_runtime + +KAFKA_BROKER = "localhost:9092" +KAFKA_FLINK_BROKER = "kafka:9093" # If running a flink cluster and kafka inside docker, the broker url might be different + +IN_TOPIC = "ds-hotel-in" +OUT_TOPIC = "ds-hotel-out" +INTERNAL_TOPIC = "ds-hotel-internal" + +def main(): + thread_mode = False + + create_topics(IN_TOPIC, OUT_TOPIC, INTERNAL_TOPIC) + runtime = init_flink_runtime([ + "deathstar_hotel_reservation.entities.user", + # "deathstar_hotel_reservation.entities.reservation" + ], IN_TOPIC, OUT_TOPIC, INTERNAL_TOPIC, kafka_broker=KAFKA_FLINK_BROKER, bundle_time=5, bundle_size=10, thread_mode=thread_mode, parallelism=None) + + print(cascade.core.dataflows.keys()) + runtime.run() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/deathstar_hotel_reservation/demo_python.py b/deathstar_hotel_reservation/demo_python.py deleted file mode 100644 index 1d44ac4..0000000 --- a/deathstar_hotel_reservation/demo_python.py +++ /dev/null @@ -1,109 +0,0 @@ -import time -import sys -import os - -# import cascade -sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../src"))) - - -from cascade.runtime.python_runtime import PythonRuntime -from deathstar_hotel_reservation.demo import DeathstarDemo, deathstar_workload_generator -from timeit import default_timer as timer -import csv - -messages_per_second = 10 -sleeps_per_second = 10 -sleep_time = 0.0085 -seconds = 10 - -def benchmark_runner(runtime) -> dict[int, dict]: - - deathstar_generator = deathstar_workload_generator() - # futures: dict[int, dict] = {} - start = timer() - for _ in range(seconds): - sec_start = timer() - for i in range(messages_per_second): - if i % (messages_per_second // sleeps_per_second) == 0: - time.sleep(sleep_time) - event = next(deathstar_generator) - # func_name = event.dataflow.name if event.dataflow is not None else "login" # only login has no dataflow - key = event.key_stack[0] - # params = event.variable_map - runtime.send(event) - # futures[event._id] = {"event": f'{func_name} {key}->{params}'} - - sec_end = timer() - lps = sec_end - sec_start - if lps < 1: - time.sleep(1 - lps) - sec_end2 = timer() - print(f'Latency per second: {sec_end2 - sec_start}') - end = timer() - print(f'Average latency per second: {(end - start) / seconds}') - - # done = False - # while not done: - # done = True - # for event_id, fut in client._futures.items(): - # result = fut["ret"] - # if result is None: - # done = False - # time.sleep(0.5) - # break - # futures = client._futures - # client.close() - # return futures - - -def write_dict_to_csv(futures_dict, filename): - """ - Writes a dictionary of event data to a CSV file. - - Args: - futures_dict (dict): A dictionary where each key is an event ID and the value is another dict. - filename (str): The name of the CSV file to write to. - """ - # Define the column headers - headers = ["event_id", "sent", "sent_t", "ret", "ret_t", "latency"] - - # Open the file for writing - with open(filename, mode='w', newline='', encoding='utf-8') as csvfile: - writer = csv.DictWriter(csvfile, fieldnames=headers) - - # Write the headers - writer.writeheader() - - # Write the data rows - for event_id, event_data in futures_dict.items(): - # Prepare a row where the 'event_id' is the first column - row = { - "event_id": event_id, - "sent": event_data.get("sent"), - "sent_t": event_data.get("sent_t"), - "ret": event_data.get("ret"), - "ret_t": event_data.get("ret_t"), - "latency": event_data["ret_t"][1] - event_data["sent_t"][1] - } - writer.writerow(row) - -def test_python_runtime(): - ds = DeathstarDemo() - ds.init_runtime(PythonRuntime()) - ds.populate() - - - time.sleep(1) - input() - - results = benchmark_runner(ds.runtime) - - print(results) - t = len(results) - r = 0 - for result in results.values(): - if result["ret"] is not None: - print(result) - r += 1 - print(f"{r}/{t} results recieved.") - write_dict_to_csv(results, "test2.csv") diff --git a/deathstar_hotel_reservation/entities/reservation.py b/deathstar_hotel_reservation/entities/reservation.py new file mode 100644 index 0000000..f7ae58e --- /dev/null +++ b/deathstar_hotel_reservation/entities/reservation.py @@ -0,0 +1,56 @@ +from typing import List +import cascade +from dataclasses import dataclass +from datetime import datetime + + +@dataclass +class HotelReservation: + customerId: str + inDate: datetime + outDate: datetime + numberOfRooms: int + + def has_date_conflict(self, in_date: datetime, out_date: datetime) -> bool: + return in_date <= self.outDate and out_date >= self.inDate + +@cascade.cascade(globals={"datetime": datetime}) +class Reservation: + def __init__(self, hotel_id: str, max_capacity: int): + self.hotel_id = hotel_id + self.max_capacity = max_capacity + self.reservations: List[HotelReservation] = [] + + def add_reservation( + self, customer_name: str, in_date: str, out_date: str, number_of_rooms: int + ) -> bool: + if not self.check_availability(in_date, out_date, number_of_rooms): + return False + else: + + in_d = datetime.strptime(in_date, "%Y-%m-%d") + out_d = datetime.strptime(out_date, "%Y-%m-%d") + + self.reservations = self.reservations + [ + HotelReservation(customer_name, in_d, out_d, number_of_rooms) + ] + + return True + + def check_availability( + self, in_date: str, out_date: str, number_of_rooms: int + ) -> bool: + in_d = datetime.strptime(in_date, "%Y-%m-%d") + out_d = datetime.strptime(out_date, "%Y-%m-%d") + + current_capacity: int = sum( + [ + reserve.numberOfRooms + for reserve in self.reservations + if reserve.has_date_conflict(in_d, out_d) + ] + ) + return not (current_capacity + number_of_rooms > self.max_capacity) + + def __key__(self): + return self.hotel_id \ No newline at end of file diff --git a/deathstar_hotel_reservation/entities/user.py b/deathstar_hotel_reservation/entities/user.py index 112ed64..33c9edc 100644 --- a/deathstar_hotel_reservation/entities/user.py +++ b/deathstar_hotel_reservation/entities/user.py @@ -1,105 +1,11 @@ -from typing import Any -from cascade.dataflow.dataflow import CollectNode, CollectTarget, DataFlow, Edge, InvokeMethod, OpNode -from cascade.dataflow.operator import StatefulOperator -from deathstar_hotel_reservation.entities.flight import Flight, flight_op -from deathstar_hotel_reservation.entities.hotel import Hotel, hotel_op - +import cascade +@cascade.cascade class User(): - def __init__(self, user_id: str, password: str): - self.id = user_id + def __init__(self, username: str, password: str): + self.username = username self.password = password - def check(self, password): + def login(self, password: str) -> bool: return self.password == password - def order(self, flight: Flight, hotel: Hotel): - if hotel.reserve() and flight.reserve(): - return True - else: - return False - -#### COMPILED FUNCTIONS (ORACLE) ##### - -def check_compiled(variable_map: dict[str, Any], state: User) -> Any: - return state.password == variable_map["password"] - -def order_compiled_entry_0(variable_map: dict[str, Any], state: User) -> Any: - pass - -def order_compiled_entry_1(variable_map: dict[str, Any], state: User) -> Any: - pass - -def order_compiled_if_cond(variable_map: dict[str, Any], state: User) -> Any: - # parallel - if "reserves" in variable_map: - return variable_map["reserves"][0] and variable_map["reserves"][1] - else: - return variable_map["hotel_reserve"] and variable_map["flight_reserve"] - -def order_compiled_if_body(variable_map: dict[str, Any], state: User) -> Any: - return True - -def order_compiled_else_body(variable_map: dict[str, Any], state: User) -> Any: - return False - -user_op = StatefulOperator( - User, - { - "login": check_compiled, - "order_compiled_entry_0": order_compiled_entry_0, - "order_compiled_entry_1": order_compiled_entry_1, - "order_compiled_if_cond": order_compiled_if_cond, - "order_compiled_if_body": order_compiled_if_body, - "order_compiled_else_body": order_compiled_else_body - }, - {} -) - -# For now, the dataflow will be serial instead of parallel. Future optimizations -# will try to automatically parallelize this. -# There is also no user entry (this could also be an optimization) -def df_serial(): - df = DataFlow("user_order") - n0 = OpNode(User, InvokeMethod("order_compiled_entry_0"), read_key_from="user_key") - n1 = OpNode(Hotel, InvokeMethod("reserve"), assign_result_to="hotel_reserve", read_key_from="hotel_key") - n2 = OpNode(User, InvokeMethod("order_compiled_entry_1"), read_key_from="user_key") - n3 = OpNode(Flight, InvokeMethod("reserve"), assign_result_to="flight_reserve", read_key_from="flight_key") - n4 = OpNode(User, InvokeMethod("order_compiled_if_cond"), is_conditional=True, read_key_from="user_key") - n5 = OpNode(User, InvokeMethod("order_compiled_if_body"), read_key_from="user_key") - n6 = OpNode(User, InvokeMethod("order_compiled_else_body"), read_key_from="user_key") - - df.add_edge(Edge(n0, n1)) - df.add_edge(Edge(n1, n2)) - df.add_edge(Edge(n2, n3)) - df.add_edge(Edge(n3, n4)) - df.add_edge(Edge(n4, n5, if_conditional=True)) - df.add_edge(Edge(n4, n6, if_conditional=False)) - - df.entry = n0 - return df - - -# PARALLEL DATAFLOW -def df_parallel(): - df = DataFlow("user_order") - n0 = OpNode(User, InvokeMethod("order_compiled_entry_0"), read_key_from="user_key") - ct = CollectNode(assign_result_to="reserves", read_results_from="reserve") - n1 = OpNode(Hotel, InvokeMethod("reserve"), assign_result_to="reserve", read_key_from="hotel_key", collect_target=CollectTarget(ct, 2, 0)) - n3 = OpNode(Flight, InvokeMethod("reserve"), assign_result_to="reserve", read_key_from="flight_key", collect_target=CollectTarget(ct, 2, 1)) - n4 = OpNode(User, InvokeMethod("order_compiled_if_cond"), is_conditional=True, read_key_from="user_key") - n5 = OpNode(User, InvokeMethod("order_compiled_if_body"), read_key_from="user_key") - n6 = OpNode(User, InvokeMethod("order_compiled_else_body"), read_key_from="user_key") - - df.add_edge(Edge(n0, n1)) - df.add_edge(Edge(n0, n3)) - df.add_edge(Edge(n1, ct)) - df.add_edge(Edge(n3, ct)) - df.add_edge(Edge(ct, n4)) - df.add_edge(Edge(n4, n5, if_conditional=True)) - df.add_edge(Edge(n4, n6, if_conditional=False)) - - df.entry = n0 - return df - -user_op.dataflows["order"] = df_serial() diff --git a/deathstar_hotel_reservation/start_benchmark.py b/deathstar_hotel_reservation/start_benchmark.py new file mode 100644 index 0000000..2766117 --- /dev/null +++ b/deathstar_hotel_reservation/start_benchmark.py @@ -0,0 +1,351 @@ +import argparse +from multiprocessing import Pool +import random +from timeit import default_timer as timer +import time +import pandas as pd +import sys +import os + +# import cascade +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../src"))) + +from cascade.runtime.flink_runtime import FlinkClientSync +from cascade.dataflow.dataflow import DataflowRef, EventResult +import cascade + +from tests.integration.flink.utils import init_cascade_from_module + +IN_TOPIC = "ds-hotel-in" +OUT_TOPIC = "ds-hotel-out" + + +AMOUNT_OF_USERS = 500 +AMOUNT_OF_HOTELS = 80 + +def populate_users(client: FlinkClientSync): + # populate users + user_init = cascade.core.dataflows[DataflowRef("User", "__init__")] + for i in range(AMOUNT_OF_USERS): + username = f"Delft_{i}" + password = str(i) * 10 + client.send(user_init.generate_event({"username": username, "password": password}, key=username)) + +def populate_hotels(client: FlinkClientSync): + # populate hotels + reservation_init = cascade.core.dataflows[DataflowRef("Reservation", "__init__")] + add_reservation = cascade.core.dataflows[DataflowRef("Reservation", "add_reservation")] + client.send(reservation_init.generate_event({"hotel_id": "1", "capacity": 200}, key="1")) + client.send(reservation_init.generate_event({"hotel_id": "2", "capacity": 200}, key="2")) + client.send(reservation_init.generate_event({"hotel_id": "3", "capacity": 200}, key="3")) + client.send(reservation_init.generate_event({"hotel_id": "5", "capacity": 200}, key="5")) + client.send(reservation_init.generate_event({"hotel_id": "6", "capacity": 200}, key="6")) + client.send(reservation_init.generate_event({"hotel_id": "4", "capacity": 200}, key="4"), block=True) + + client.send(add_reservation.generate_event({ + "customer_name": "Alice", + "in_date": "2015-04-09", + "out_date": "2015-04-10", + "number_of_rooms": 1 + }, key="4")) + + for i in range(7, AMOUNT_OF_HOTELS + 1): + max_capacity = 200 + + if i % 3 == 1: + max_capacity = 300 + elif i % 3 == 2: + max_capacity = 250 + + client.send(reservation_init.generate_event({"hotel_id": str(i), "capacity": max_capacity}, key=str(i))) + + +def user_login(client: FlinkClientSync): + username, password = get_user() + user_login = cascade.core.dataflows[DataflowRef("User", "login")] + event = user_login.generate_event({"password_0": password}, key=username) + client.send(event) + +def reserve( + in_date: str, + out_date: str, + hotel_id: str, + customer_name: str, + amount_of_rooms: int, + username: str, + password: str, + client: FlinkClientSync): + + # login first + df = cascade.core.dataflows[DataflowRef("User", "login")] + login_success = client.send(df.generate_event({"password_0": password}, key=username), block=True) + + if not login_success: + raise ValueError("Login failed") + + # reserve + df_reserve = cascade.core.dataflows[DataflowRef("Reservation", "add_reservation")] + event = df_reserve.generate_event({ + "customer_name_0": customer_name, + "in_date_0": in_date, + "out_date_0": out_date, + "amount_of_rooms_0": amount_of_rooms + }, key=hotel_id) + reserve_success = client.send(event) + + if reserve_success: + return "Success" + else: + return "Failed reservation" + +def get_user(): + i = random.randint(0, AMOUNT_OF_USERS-1) + return f"Delft_{i}", str(i) * 10 + +def do_reserve(client): + in_date = random.randint(9, 23) + out_date = in_date + random.randint(1, 5) + + if in_date <= 9: + in_date_str = "2015-04-0" + str(in_date) + else: + in_date_str = "2015-04-" + str(in_date) + + if out_date <= 9: + out_date_str = "2015-04-0" + str(out_date) + else: + out_date_str = "2015-04-" + str(out_date) + + hotel_id = str(random.randint(1, AMOUNT_OF_HOTELS)) + user_id, password = get_user() + customer_name = user_id + num_rooms = 1 + + reserve(in_date_str, out_date_str, hotel_id, customer_name, num_rooms, user_id, password, client) + +# def deathstar_workload_generator(): +# search_ratio = 0.6 +# recommend_ratio = 0.39 +# user_ratio = 0.005 +# reserve_ratio = 0.005 +# c = 0 +# while True: +# coin = random.random() +# if coin < search_ratio: +# yield search_hotel() +# elif coin < search_ratio + recommend_ratio: +# yield recommend() +# elif coin < search_ratio + recommend_ratio + user_ratio: +# yield user_login() +# else: +# yield reserve() +# c += 1 + +def reserve_workload_generator(client): + do_reserve(client) + +def user_login_workload_generator(client): + user_login(client) + + +def benchmark_runner(args) -> dict[int, dict]: + proc_num, requests_per_second, sleep_time, bursts = args + print(f'Generator: {proc_num} starting') + client = FlinkClientSync(IN_TOPIC, OUT_TOPIC) + deathstar_generator = user_login_workload_generator + start = timer() + + for b in range(bursts): + sec_start = timer() + + # send burst of messages + for i in range(requests_per_second): + + # sleep sometimes between messages + # if i % (messages_per_burst // sleeps_per_burst) == 0: + time.sleep(sleep_time) + deathstar_generator(client) + + client.flush() + sec_end = timer() + + # wait out the second + lps = sec_end - sec_start + if lps < 1: + time.sleep(1 - lps) + sec_end2 = timer() + print(f'Latency per burst: {sec_end2 - sec_start} ({b+1}/{bursts})') + + end = timer() + avg_send_latency = (end - start) / bursts + print(f'Average send latency per burst for generator {proc_num} was: {avg_send_latency}') + if avg_send_latency > 1.1: + print(f'This is higher than expected (1). Maybe increase the number of threads?') + futures = wait_for_futures(client) + client.close() + return futures + +def wait_for_futures(client: FlinkClientSync): + done = False + while not done: + done = True + for event_id, fut in client._futures.items(): + result = fut["ret"] + if result is None: + done = False + time.sleep(0.5) + break + futures = client._futures + return futures + + +def write_dict_to_pkl(futures_dict, filename): + """ + Writes a dictionary of event data to a pickle file. + + Args: + futures_dict (dict): A dictionary where each key is an event ID and the value is another dict. + filename (str): The name of the pickle file to write to. + """ + + # Prepare the data for the DataFrame + data = [] + for event_id, event_data in futures_dict.items(): + ret: EventResult = event_data.get("ret") + row = { + "event_id": event_id, + "sent": str(event_data.get("sent")), + "sent_t": event_data.get("sent_t"), + "ret": str(event_data.get("ret")), + "ret_t": event_data.get("ret_t"), + "roundtrip": ret.metadata["roundtrip"] if ret else None, + "flink_time": ret.metadata["flink_time"] if ret else None, + "deser_times": ret.metadata["deser_times"] if ret else None, + "loops": ret.metadata["loops"] if ret else None, + "latency": event_data["ret_t"][1] - event_data["sent_t"][1] if ret else None + } + data.append(row) + + # Create a DataFrame and save it as a pickle file + df = pd.DataFrame(data) + + # Multiply flink_time by 1000 to convert to milliseconds + df['flink_time'] = df['flink_time'] * 1000 + + return df + + +def main(): + parser = argparse.ArgumentParser(description="Run the benchmark and save results.") + parser.add_argument("-o", "--output", type=str, default="benchmark_results.pkl", help="Output file name for the results") + parser.add_argument("--rps", type=int, default=10, help="Number of messages per burst") + parser.add_argument("--seconds", type=int, default=100, help="Number of seconds to benchmark for") + parser.add_argument("--threads", type=int, default=1, help="Number of concurrent threads") + parser.add_argument("--no_init", action="store_true", help="Don't populate") + args = parser.parse_args() + + + rps_per_thread = int(args.rps / args.threads) + sleep_time = 0.95 / rps_per_thread + + + print(f"Starting with args:\n{args}") + print(f"Actual requests per second is {int(rps_per_thread * args.threads)} (due to rounding)") + + init_cascade_from_module([ + "deathstar_hotel_reservation.entities.user", + # "deathstar_hotel_reservation.entities.reservation", + ]) + + init_client = FlinkClientSync(IN_TOPIC, OUT_TOPIC) + + print(cascade.core.dataflows.keys()) + + for df in cascade.core.dataflows.values(): + print(df.to_dot()) + for block in df.blocks.values(): + print(block.function_string) + + if not args.no_init: + print("Populating...") + populate_users(init_client) + # populate_hotels(init_client) + init_client.producer.flush() + wait_for_futures(init_client) + print("Done.") + time.sleep(1) + + print("Starting benchmark") + + func_args = [(t, rps_per_thread, sleep_time, args.seconds) for t in range(args.threads)] + with Pool(args.threads) as p: + results = p.map(benchmark_runner, func_args) + + results = {k: v for d in results for k, v in d.items()} + + print("last result:") + print(list(results.values())[-1]) + t = len(results) + r = 0 + for result in results.values(): + if result["ret"] is not None: + r += 1 + + print(f"{r}/{t} results recieved.") + print(f"Writing results to {args.output}") + + df = write_dict_to_pkl(results, args.output) + + flink_time = df['flink_time'].median() + latency = df['latency'].median() + flink_prct = float(flink_time) * 100 / latency + print(f"Median latency : {latency:.2f} ms") + print(f"Median Flink time : {flink_time:.2f} ms ({flink_prct:.2f}%)") + init_client.close() + + df = preprocess(args.output, df) + df.to_csv(args.output) + +import re + +def preprocess(name, df, warmup_time_s=3) -> pd.DataFrame: + # Extract parallelism and mps from the name using regex + match = re.search(r'(\d+)x(\d+)_rps-(\d+)', name) + if match: + tm = int(match.group(1)) + slots = int(match.group(2)) + mps = int(match.group(3)) + else: + raise Exception() + + # Ignore the first warmup_time seconds of events + warmup_events = int(warmup_time_s * mps) + df = df.iloc[warmup_events:] + + # Calculate the additional Kafka overhead + # df['kafka_overhead'] = df['latency'] - df['flink_time'] + + # Extract median values from df + flink_time_median = df['flink_time'].median() + latency_median = df['latency'].median() + flink_time_99_percentile = df['flink_time'].quantile(0.99) + latency_99_percentile = df['latency'].quantile(0.99) + flink_time_95_percentile = df['flink_time'].quantile(0.95) + latency_95_percentile = df['latency'].quantile(0.95) + + data = { + 'taskmanagers': tm, + 'slots': slots, + 'mps': mps, + 'flink_time_median': flink_time_median, + 'latency_median': latency_median, + 'latency_99_percentile': latency_99_percentile, + 'latency_95_percentile': latency_95_percentile, + 'flink_time_99_percentile': flink_time_99_percentile, + 'flink_time_95_percentile': flink_time_95_percentile + } + data = {k:[v] for k,v in data.items()} + return pd.DataFrame(data) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/deathstar_hotel_reservation/test_demo.py b/deathstar_hotel_reservation/test_demo.py deleted file mode 100644 index 05302ce..0000000 --- a/deathstar_hotel_reservation/test_demo.py +++ /dev/null @@ -1,100 +0,0 @@ - -# import os -# import sys - -# # import cascade -# sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../src"))) - -# from cascade.runtime.python_runtime import PythonClientSync, PythonRuntime -# from cascade.runtime.flink_runtime import FlinkClientSync, FlinkRuntime -# from deathstar_hotel_reservation.demo import DeathstarDemo, recommend, reserve, search_hotel, user_login -# import time -# import pytest - -# @pytest.mark.integration -# def test_deathstar_demo(): -# ds = DeathstarDemo() -# ds.init_runtime(FlinkRuntime("deathstardemo-test", "dsd-out")) -# ds.runtime.run(run_async=True) -# print("Populating, press enter to go to the next step when done") -# ds.populate() - -# client = FlinkClientSync("deathstardemo-test", "dsd-out") -# input() -# print("testing user login") -# event = user_login() -# client.send(event) - -# input() -# print("testing reserve") -# event = reserve() -# client.send(event) - -# input() -# print("testing search") -# event = search_hotel() -# client.send(event) - -# input() -# print("testing recommend (distance)") -# time.sleep(0.5) -# event = recommend(req_param="distance") -# client.send(event) - -# input() -# print("testing recommend (price)") -# time.sleep(0.5) -# event = recommend(req_param="price") -# client.send(event) - -# print(client._futures) -# input() -# print("done!") -# print(client._futures) - -# def test_deathstar_demo_python(): -# ds = DeathstarDemo() -# ds.init_runtime(PythonRuntime()) -# ds.runtime.run() -# print("Populating, press enter to go to the next step when done") -# ds.populate() - -# time.sleep(0.1) - -# client = PythonClientSync(ds.runtime) -# print("testing user login") -# event = user_login() -# result = client.send(event) -# assert result == True -# event = user_login(succesfull=False) -# result = client.send(event) -# assert result == False - -# print("testing reserve") -# event = reserve() -# result = client.send(event) -# assert result == True - -# return -# print("testing search") -# event = search_hotel() -# result = client.send(event) -# print(result) - -# print("testing recommend (distance)") -# time.sleep(0.5) -# event = recommend(req_param="distance") -# result = client.send(event) -# print(result) - -# print("testing recommend (price)") -# time.sleep(0.5) -# event = recommend(req_param="price") -# result = client.send(event) -# print(result) - -# print("done!") - - -# if __name__ == "__main__": -# test_deathstar_demo() \ No newline at end of file diff --git a/deathstar_hotel_reservation/test_hotel_demo.py b/deathstar_hotel_reservation/test_hotel_demo.py new file mode 100644 index 0000000..6fc3964 --- /dev/null +++ b/deathstar_hotel_reservation/test_hotel_demo.py @@ -0,0 +1,73 @@ +import logging +import sys +import os + + + +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../src"))) + +from cascade.runtime.flink_runtime import FlinkClientSync +from cascade.dataflow.dataflow import DataflowRef +from cascade.dataflow.optimization.parallelization import parallelize, parallelize_until_if +from cascade.dataflow.operator import StatefulOperator, StatelessOperator +from cascade.runtime.python_runtime import PythonClientSync, PythonRuntime + +import cascade +import pytest +import tests.integration.flink.utils as utils + +def init_python_runtime() -> tuple[PythonRuntime, PythonClientSync]: + runtime = PythonRuntime() + for op in cascade.core.operators.values(): + if isinstance(op, StatefulOperator): + runtime.add_operator(op) + elif isinstance(op, StatelessOperator): + runtime.add_stateless_operator(op) + + runtime.run() + return runtime, PythonClientSync(runtime) + + +def test_deathstar_hotel_demo_python(): + print("starting") + cascade.core.clear() + exec(f'import deathstar_hotel_reservation.entities.user') + cascade.core.init() + + runtime, client = init_python_runtime() + deathstar_hotel_demo(client) + +@pytest.mark.integration +def test_deathstar_hotel_demo_flink(): + print("starting") + logger = logging.getLogger("cascade") + logger.setLevel("DEBUG") + + utils.create_topics() + + runtime = utils.init_flink_runtime("deathstar_hotel_reservation.entities.user") + + client = FlinkClientSync() + runtime.run(run_async=True) + + try: + deathstar_hotel_demo(client) + finally: + client.close() + +def deathstar_hotel_demo(client): + user_init = cascade.core.dataflows[DataflowRef("User", "__init__")] + + for df in cascade.core.dataflows.values(): + print(df.to_dot()) + + print("testing user create") + + username = f"Delft_1" + password = str(1) * 10 + event = user_init.generate_event({"username": username, "password": password}, key=username) + result = client.send(event, block = True) + assert result['username'] == username + assert result['password'] == password + + print("testing user login") diff --git a/deathstar_movie_review/demo.py b/deathstar_movie_review/demo.py index d70ec43..50e2f90 100644 --- a/deathstar_movie_review/demo.py +++ b/deathstar_movie_review/demo.py @@ -1,9 +1,7 @@ from typing import Literal import cascade from cascade.dataflow.dataflow import DataflowRef -from cascade.dataflow.optimization.dead_node_elim import dead_node_elimination from cascade.dataflow.optimization.parallelization import parallelize, parallelize_until_if -from cascade.runtime.flink_runtime import FlinkRuntime from tests.integration.flink.utils import create_topics, init_flink_runtime import os diff --git a/run_experiments.py b/run_experiments.py deleted file mode 100755 index 58934c0..0000000 --- a/run_experiments.py +++ /dev/null @@ -1,108 +0,0 @@ -import os -import subprocess -import time - -args = { - "messages_per_burst": 10, - "sleeps_per_burst": 10, - "sleep_time": 0.09, - "seconds_per_burst": 1, - "bursts": 100 -} - -mps_1 = { - **args, - "messages_per_burst": 1, - "sleeps_per_burst": 1, - "sleep_time": 0.9, -} - -mps_20 = { - **args, - "messages_per_burst": 20, - "sleeps_per_burst": 20, - "sleep_time": 0.09/2, -} - -mps_30 = { - **args, - "messages_per_burst": 30, - "sleeps_per_burst": 30, - "sleep_time": 0.09/3, -} - -mps_50 = { - **args, - "messages_per_burst": 50, - "sleeps_per_burst": 50, - "sleep_time": 0.09/5, -} - - -# Define experiment parameters as a list of dictionaries -experiments = [ - # {"parallelism": 16, "benchmark_args": {**args}}, - # {"parallelism": 8, "benchmark_args": {**args}}, - # {"parallelism": 4, "benchmark_args": {**args}}, - # {"parallelism": 2, "benchmark_args": {**args}}, - # {"parallelism": 1, "benchmark_args": {**args}}, - - # {"parallelism": 16, "benchmark_args": {**mps_20}}, - # {"parallelism": 8, "benchmark_args": {**mps_20}}, - # {"parallelism": 4, "benchmark_args": {**mps_20}}, - # {"parallelism": 2, "benchmark_args": {**mps_20}}, - # {"parallelism": 1, "benchmark_args": {**mps_20}}, - - {"parallelism": 16, "benchmark_args": {**mps_50}}, - {"parallelism": 8, "benchmark_args": {**mps_50}}, - {"parallelism": 4, "benchmark_args": {**mps_50}}, - {"parallelism": 2, "benchmark_args": {**mps_50}}, - {"parallelism": 1, "benchmark_args": {**mps_50}}, -] - - - - -print("Tearing down docker containers") -subprocess.run(["docker", "compose", "down"], check=True) - -for e in ["pipelined", "parallel", "baseline"]: - for exp in experiments: - print(f"Starting experiment {exp}") - - # Start docker compose - subprocess.run(["docker", "compose", "up", "-d"], check=True) - - time.sleep(10) - - # Run Flink job - - flink_cmd = [ - "flink", "run", "--pyFiles", "/home/lvanmol/cascade/src,/home/lvanmol/cascade", - "--pyModule", "deathstar_movie_review.demo", "-d", "-p", str(exp['parallelism']) - ] - env = os.environ - env["EXPERIMENT"] = e - subprocess.run(flink_cmd, check=True, env=env) - - # Start benchmark - filename = f"{e}_p-{exp['parallelism']}_mps-{exp['benchmark_args']['messages_per_burst']}.pkl" - benchmark_cmd = [ - "python", "-u", "-m", "deathstar_movie_review.start_benchmark", "--output", filename, "--experiment", e - ] - - for arg, val in exp['benchmark_args'].items(): - benchmark_cmd.append(f"--{arg}") - benchmark_cmd.append(str(val)) - subprocess.run(benchmark_cmd, check=True) - - # Sleep for experiment duration - # print(f"Sleeping for {exp['sleep']} seconds...") - # time.sleep(exp['sleep']) - - # Stop docker compose - subprocess.run(["docker", "compose", "down"], check=True) - - print(f"Experiment completed.") - -print("All experiments completed.") diff --git a/run_hotel_exp.py b/run_hotel_exp.py new file mode 100755 index 0000000..b4c4427 --- /dev/null +++ b/run_hotel_exp.py @@ -0,0 +1,76 @@ +import os +import subprocess +import time + + + + +# Define experiment parameters as a list of dictionaries +experiments = [ + {"mode": "thread", "taskmanagers": 16, "slots_per_tm": 1, "benchmark_args": {"rps": 1000, "sec": 30, "threads": 10}}, + {"mode": "thread", "taskmanagers": 8, "slots_per_tm": 2, "benchmark_args": {"rps": 1000, "sec": 30, "threads": 10}}, + {"mode": "thread", "taskmanagers": 4, "slots_per_tm": 4, "benchmark_args": {"rps": 1000, "sec": 30, "threads": 10}}, + {"mode": "thread", "taskmanagers": 2, "slots_per_tm": 8, "benchmark_args": {"rps": 1000, "sec": 30, "threads": 10}}, + {"mode": "thread", "taskmanagers": 1, "slots_per_tm": 16, "benchmark_args": {"rps": 1000, "sec": 30, "threads": 10}}, + + {"mode": "process", "taskmanagers": 16, "slots_per_tm": 1, "benchmark_args": {"rps": 1000, "sec": 30, "threads": 10}}, + {"mode": "process", "taskmanagers": 8, "slots_per_tm": 2, "benchmark_args": {"rps": 1000, "sec": 30, "threads": 10}}, + {"mode": "process", "taskmanagers": 4, "slots_per_tm": 4, "benchmark_args": {"rps": 1000, "sec": 30, "threads": 10}}, + {"mode": "process", "taskmanagers": 2, "slots_per_tm": 8, "benchmark_args": {"rps": 1000, "sec": 30, "threads": 10}}, + {"mode": "process", "taskmanagers": 1, "slots_per_tm": 16, "benchmark_args": {"rps": 1000, "sec": 30, "threads": 10}}, +] + + + + +print("Tearing down docker containers") +subprocess.run(["docker", "compose", "down"], check=False) + +for exp in experiments: + print(f"Starting experiment {exp}") + + tm = exp["taskmanagers"] + slots = exp["slots_per_tm"] + parallelism = tm * slots + + # Start docker compose + subprocess.run(["docker", "compose", "up", "-d", "--scale", f"taskmanager={tm}", "--force-recreate"], check=True, env={ + "TASK_SLOTS": str(slots) + }) + + time.sleep(10) + + # Run Flink job + + module = "deathstar_hotel_reservation.demo" + if exp["mode"] == "process": + module += "_process" + + flink_cmd = [ + "flink", "run", "--pyFiles", "/home/lvanmol/cascade/src,/home/lvanmol/cascade", + "--pyModule", module, "-d", "-p", str(parallelism) + ] + env = os.environ + subprocess.run(flink_cmd, check=True, env=env) + + # Start benchmark + filename = f"{tm}x{slots}_rps-{exp['benchmark_args']['rps']}_{exp['mode']}.csv" + benchmark_cmd = [ + "python", "-u", "-m", "deathstar_hotel_reservation.start_benchmark", "--output", filename, + ] + + for arg, val in exp['benchmark_args'].items(): + benchmark_cmd.append(f"--{arg}") + benchmark_cmd.append(str(val)) + subprocess.run(benchmark_cmd, check=True) + + # Sleep for experiment duration + # print(f"Sleeping for {exp['sleep']} seconds...") + # time.sleep(exp['sleep']) + + # Stop docker compose + subprocess.run(["docker", "compose", "down"], check=False) + + print(f"Experiment completed.") + +print("All experiments completed.") diff --git a/run_experiments_gil_workaround copy.py b/run_parallel_exp.py similarity index 100% rename from run_experiments_gil_workaround copy.py rename to run_parallel_exp.py diff --git a/src/cascade/dataflow/optimization/parallelization.py b/src/cascade/dataflow/optimization/parallelization.py index c968a48..9a9f8ef 100644 --- a/src/cascade/dataflow/optimization/parallelization.py +++ b/src/cascade/dataflow/optimization/parallelization.py @@ -29,7 +29,8 @@ def parallelize(df): import networkx as nx def parallelize_until_if(df: DataFlow) -> Tuple[DataFlow, DataFlow]: """Parallelize df, stopping at the first if node. - The first dataflow returned is the parallelized dataflow up until the first if node. The second dataflow is the rest of the dataflow""" + The first dataflow returned is the parallelized dataflow up until the first if node. The second dataflow is the rest of the dataflow. + We therefore only parallelize until the first if node.""" # create the dependency graph ans = [] # since we use SSA, every variable has exactly one node that writes it @@ -60,10 +61,12 @@ def parallelize_until_if(df: DataFlow) -> Tuple[DataFlow, DataFlow]: n_map = copy.deepcopy(df.nodes) for node in ans: for read in node.reads: + + # "read" will not be in write nodes if it is part of the dataflow arguments + # + # a more thorough implementation would not need the if check, + # instead add the arguments as writes to some function entry node if read in write_nodes: - # "read" will not be in write nodes if it is part of the arguments - # a more thorough implementation would not need the if check, - # and add the arguments as writes to some function entry node graph.add_edge(write_nodes[read], node.node.id) try: nodes_with_indegree_0.remove(node.node.id) diff --git a/src/cascade/runtime/flink_runtime.py b/src/cascade/runtime/flink_runtime.py index 8bc4bc0..a1f8cc2 100644 --- a/src/cascade/runtime/flink_runtime.py +++ b/src/cascade/runtime/flink_runtime.py @@ -19,7 +19,7 @@ import logging logger = logging.getLogger("cascade") -logger.setLevel("DEBUG") +logger.setLevel("INFO") console_handler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') console_handler.setFormatter(formatter) diff --git a/tests/integration/flink/utils.py b/tests/integration/flink/utils.py index 5f0f5c4..7697a15 100644 --- a/tests/integration/flink/utils.py +++ b/tests/integration/flink/utils.py @@ -1,3 +1,4 @@ +from typing import Union import cascade from cascade.dataflow.dataflow import EventResult from cascade.dataflow.operator import StatefulOperator, StatelessOperator @@ -19,12 +20,16 @@ def wait_for_event_id(id: int, collector: CloseableIterator) -> EventResult: return record -def init_cascade_from_module(import_path: str): +def init_cascade_from_module(import_path: Union[str, list[str]]): cascade.core.clear() - exec(f'import {import_path}') + if isinstance(import_path, list): + for path in import_path: + exec(f'import {path}') + else: + exec(f'import {import_path}') cascade.core.init() -def init_flink_runtime(import_path: str, in_topic=None, out_topic=None, internal_topic=None, parallelism=4, **init_args) -> FlinkRuntime: +def init_flink_runtime(import_path: Union[str, list[str]], in_topic=None, out_topic=None, internal_topic=None, parallelism=4, **init_args) -> FlinkRuntime: init_cascade_from_module(import_path) if in_topic is None: