Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gevent_pipeline/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from .closablequeue import ClosableQueue # noqa
from .closablequeue import ClosableQueue, ClosablePriorityQueue # noqa
from .pipeline import Pipeline, worker, forward_input # noqa
16 changes: 16 additions & 0 deletions gevent_pipeline/closablequeue.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,19 @@ def get(self, *args, **kwargs):
return super().get(block=False)
except Exception:
return StopIteration

class ClosablePriorityQueue(queue.PriorityQueue, ClosableQueue):
"""
Mixes gevent's PriorityQueue with the ClosableQueue

This can be useful for ordering output of a pipeline stage.

Example:
>>> from gevent_pipeline import Pipeline
>>> cpq = ClosablePriorityQueue()
>>> random_array = [random.randint(1,50) for _ in range(10)]
>>> output = list(Pipeline().from_iter(random_array, q_out=cpq))
>>> sorted(random_array) == output
True
"""
pass
4 changes: 2 additions & 2 deletions gevent_pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ def g(q_in, q_out, q_done):
raise RuntimeError("Unexpected data on fold output channel")
return result

def join(self):
def joinall(self):
"""
Wait for the greenlets to finish
Wrapper around gevent.joinall
Expand All @@ -306,4 +306,4 @@ def join(self):
# TODO pass argumetns to .joinall and remove greenlets in done from _greenlets
self._greenlets = []

return done
return self
14 changes: 13 additions & 1 deletion tests/test_closablequeue.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from gevent_pipeline import ClosableQueue
from gevent_pipeline import ClosableQueue, ClosablePriorityQueue

import gevent
from gevent import queue
Expand Down Expand Up @@ -224,3 +224,15 @@ def putter():
# No items lost
n_ok_get = n_left_on_queue + n_got
assert n_ok_get == n_ok_put

def test_cpq_order_matches():
# Order of this list matches comparator order for strings.
# Will be randomized and then reordered by the Queue
ordered = list('abcdefghijklmnopqrstuvwxyz')
randomized = ordered.copy()
random.shuffle(randomized)
cpq = ClosablePriorityQueue(fuzz=0.01)
for letter in randomized:
cpq.put(letter)
for orig_el, queued in zip(ordered, cpq):
assert orig_el == queued
15 changes: 13 additions & 2 deletions tests/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from gevent_pipeline import Pipeline, ClosableQueue, worker, forward_input
from gevent_pipeline import Pipeline, ClosableQueue, ClosablePriorityQueue, worker, forward_input

import gevent
from gevent import queue
Expand Down Expand Up @@ -125,7 +125,7 @@ def doubler(x):
l = sorted(p)
assert l == [i*i for i in range(10)]

p.join()
p.joinall()


def test_pipeline_sloppy_map():
Expand All @@ -143,3 +143,14 @@ def f(x):
s_odd = sum(range(1, 100, 2))
s_even = sum(2*i for i in range(0, 100, 2))
assert sum(p) == s_odd + s_even

def test_cpq_out_join_matches_order():
cpq = ClosablePriorityQueue()
original = [random.randint(0,1000) for _ in range(100)]
p = Pipeline()\
.from_iter(enumerate(original),n_workers=5)\
.map(lambda x: (x[0], x[1]*2), n_workers=10, q_out=cpq)\
.joinall()\
.map(lambda x: x[1], n_workers=1)
result = list(p)
assert result == [x*2 for x in original]