Skip to content

Conversation

@alexlemann
Copy link

This allows ordering in stages of the pipeline. My idea was to use this with Pipeline.join() to in effect create a bottleneck pipeline stage that forces sequential ordering and then releases to further stages.

@simonschmidt simonschmidt self-requested a review May 24, 2017 20:20
Copy link
Owner

@simonschmidt simonschmidt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the pull request! :)

randomized = ordered.copy()
random.shuffle(randomized)
cpq = ClosablePriorityQueue(fuzz=0.01)
for letter in ordered:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be for letter in randomized right?

@simonschmidt
Copy link
Owner

simonschmidt commented May 24, 2017

Though I think you wont achieve your end goal with this, as the priority queue can at best give you partial ordering as it will probably be read from while it is still being written to.

from gevent_pipeline import ClosablePriorityQueue, Pipeline

def test_cpq_pipeline_ordering():
    cpq = ClosablePriorityQueue()
    random_array = [random.randint(1, 50) for _ in range(10)]

    def f(x):
        gevent.sleep(random.uniform(0, 0.01))
        return x

    output = list(
        Pipeline()
            .from_iter(random_array)
            .map(f, q_out=cpq))
    assert sorted(random_array) == output

However to create a collecting and sorting stage you could create a worker like:

def sorter(q_in, q_out, q_done):
    for value in sorted(q_in):
        q_out.put(value)

    # Signal that this worker is out of work
    q_done.put(None)

# Use it something like:
# Pipeline().
#                ....
#                .chain_workers(sorter, n_workers=1)

And make sure to run it with n_workers=1 to avoid a similar issue.

On a related note, I've found it useful to limit the size of the output queue of workers to avoid pile-up in early quick parts of the chain, I should probably change the default to be something like maxsize=n_workers*2 since unlimited is rarely what you want....

...
    .map(f, q_out=ClosableQueue(maxsize=10), n_workers=5)

@alexlemann
Copy link
Author

@simonschmidt I had the same thought.... my approach was going to be either to make sure the CPQ was used as the last stage and then start a new Pipeline with that as the initial input, or... as I showed here, make join (or joinall, whatever) act like the rest of the methods and return self so that it can be embedded as a stage in a pipeline. Thoughts?

@simonschmidt
Copy link
Owner

simonschmidt commented May 29, 2017

Not sure about that approach, you would also need to restart the Pipeline._queue_closer.

I added a Pipeline.sort method, is that sufficient for your use case?

I'd still like to have the ClosablePriorityQueue as it can be useful to have the partial sorting properties in many other scenarios.

@alexlemann
Copy link
Author

With regards to .join, could you provide a case where this breaks because of the ._queue_closer being canceled so that I could try to think of a fix?

One thing I like about using the ClosablePriorityQueue here is that a CPQ.insert() by one of many workers sounds quicker than calling sorted() with one worker. That said, I don't have any numbers to back up when this would become a real issue in practice.

I'm all for having Pipeline.sort. My particular use case was reading a data file, doing work on the lines, and writing them out in the same order which seems like it would be a common use case. Your syntax is certainly an improvement over having a special output queue type, but if you agree that my issue above is worth considering, then maybe it could be rewritten with the CPQ.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants