Skip to content

Commit 51c179f

Browse files
committed
Add string processing pipeline, which demonstrate the FlowBase idea in Python, using asyncio
1 parent eb13b9b commit 51c179f

File tree

1 file changed

+95
-0
lines changed

1 file changed

+95
-0
lines changed
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
import asyncio
2+
3+
class HiSayer:
4+
out_lines = asyncio.Queue()
5+
6+
async def run(self):
7+
for i in range(20):
8+
self.out_lines.put_nowait(f'Hi hi for the {i+1}:th time...')
9+
self.out_lines.task_done()
10+
11+
12+
class StringSplitter:
13+
in_lines = asyncio.Queue()
14+
out_leftpart = asyncio.Queue()
15+
out_rightpart = asyncio.Queue()
16+
17+
async def run(self):
18+
while not self.in_lines.empty():
19+
s = await self.in_lines.get()
20+
self.out_leftpart.put_nowait(s[:int(len(s)/2)])
21+
self.out_rightpart.put_nowait(s[int(len(s)/2):])
22+
23+
class LowerCaser:
24+
in_lines = asyncio.Queue()
25+
out_lines = asyncio.Queue()
26+
27+
async def run(self):
28+
while not self.in_lines.empty():
29+
s = await self.in_lines.get()
30+
self.out_lines.put_nowait(s.lower())
31+
32+
class UpperCaser:
33+
in_lines = asyncio.Queue()
34+
out_lines = asyncio.Queue()
35+
36+
async def run(self):
37+
while not self.in_lines.empty():
38+
s = await self.in_lines.get()
39+
self.out_lines.put_nowait(s.upper())
40+
41+
class StringJoiner:
42+
in_leftpart = asyncio.Queue()
43+
in_rightpart = asyncio.Queue()
44+
out_lines = asyncio.Queue()
45+
46+
async def run(self):
47+
while not self.in_leftpart.empty() or not self.in_rightpart.empty():
48+
leftpart = await self.in_leftpart.get()
49+
rightpart = await self.in_rightpart.get()
50+
self.out_lines.put_nowait(f'{leftpart}{rightpart}')
51+
52+
class Printer:
53+
in_lines = asyncio.Queue()
54+
55+
async def run(self):
56+
while not self.in_lines.empty():
57+
s = await self.in_lines.get()
58+
print(f'Printer got line: {s}')
59+
60+
def main():
61+
loop = asyncio.get_event_loop()
62+
63+
# Initialize components
64+
hisayer = HiSayer()
65+
loop.create_task(hisayer.run())
66+
67+
splitter = StringSplitter()
68+
loop.create_task(splitter.run())
69+
70+
lowercaser = LowerCaser()
71+
loop.create_task(lowercaser.run())
72+
73+
uppercaser = UpperCaser()
74+
loop.create_task(uppercaser.run())
75+
76+
stringjoiner = StringJoiner()
77+
loop.create_task(stringjoiner.run())
78+
79+
printer = Printer()
80+
loop.create_task(printer.run())
81+
82+
# Connect network
83+
splitter.in_lines = hisayer.out_lines
84+
lowercaser.in_lines = splitter.out_leftpart
85+
uppercaser.in_lines = splitter.out_rightpart
86+
stringjoiner.in_leftpart = lowercaser.out_lines
87+
stringjoiner.in_rightpart = uppercaser.out_lines
88+
printer.in_lines = stringjoiner.out_lines
89+
90+
# Run the full event loop
91+
loop.run_until_complete(printer.run())
92+
93+
94+
if __name__ == '__main__':
95+
main()

0 commit comments

Comments
 (0)