Skip to content

Commit 9078cbe

Browse files
committed
Add Port object
1 parent ad9a4e3 commit 9078cbe

File tree

3 files changed

+17
-14
lines changed

3 files changed

+17
-14
lines changed

examples/string_processing_pipeline.py

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import asyncio
21
import flowbase
32

43

@@ -37,17 +36,17 @@ def main():
3736

3837

3938
class HiSayer:
40-
out_lines = asyncio.Queue()
39+
out_lines = flowbase.Port()
4140

4241
async def run(self):
4342
for i in range(20):
4443
await self.out_lines.put(f"Hi hi for the {i+1}:th time...")
4544

4645

4746
class StringSplitter:
48-
in_lines = asyncio.Queue()
49-
out_leftpart = asyncio.Queue()
50-
out_rightpart = asyncio.Queue()
47+
in_lines = flowbase.Port()
48+
out_leftpart = flowbase.Port()
49+
out_rightpart = flowbase.Port()
5150

5251
async def run(self):
5352
while not self.in_lines.empty():
@@ -57,8 +56,8 @@ async def run(self):
5756

5857

5958
class LowerCaser:
60-
in_lines = asyncio.Queue()
61-
out_lines = asyncio.Queue()
59+
in_lines = flowbase.Port()
60+
out_lines = flowbase.Port()
6261

6362
async def run(self):
6463
while not self.in_lines.empty():
@@ -67,8 +66,8 @@ async def run(self):
6766

6867

6968
class UpperCaser:
70-
in_lines = asyncio.Queue()
71-
out_lines = asyncio.Queue()
69+
in_lines = flowbase.Port()
70+
out_lines = flowbase.Port()
7271

7372
async def run(self):
7473
while not self.in_lines.empty():
@@ -77,9 +76,9 @@ async def run(self):
7776

7877

7978
class StringJoiner:
80-
in_leftpart = asyncio.Queue()
81-
in_rightpart = asyncio.Queue()
82-
out_lines = asyncio.Queue()
79+
in_leftpart = flowbase.Port()
80+
in_rightpart = flowbase.Port()
81+
out_lines = flowbase.Port()
8382

8483
async def run(self):
8584
while not self.in_leftpart.empty() or not self.in_rightpart.empty():
@@ -89,7 +88,7 @@ async def run(self):
8988

9089

9190
class Printer:
92-
in_lines = asyncio.Queue()
91+
in_lines = flowbase.Port()
9392

9493
async def run(self):
9594
while not self.in_lines.empty():

flowbase/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
from flowbase.flowbase import Process, Network
1+
from flowbase.flowbase import Process, Network, Port

flowbase/flowbase.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,7 @@ def add_process(self, name: str, process: Process):
2727

2828
def run(self):
2929
self._loop.run_until_complete(self._driver_process.run())
30+
31+
32+
class Port(asyncio.Queue):
33+
pass

0 commit comments

Comments
 (0)