-
Notifications
You must be signed in to change notification settings - Fork 351
Open
Description
If we do not set any concurrency, Stream will do its work sequentially.
I think it is because of https://github.com/sourcegraph/conc/blob/v0.3.0/stream/stream.go#L112, where conc will set the internal queue to 1 instead of unlimited. Is this expected?
Example:
func TestParallelWork(t *testing.T) {
times := []int{20, 52, 16, 45, 4, 80}
// Without concurrency limit, the goroutines scheduling blocks until previous callbacks are executed.
//0 queueing 20ms
//0 queueing 52ms
//20 20ms
//20 queueing 16ms
//52 52ms
//52 16ms
//32 queueing 45ms
//0 queueing 4ms
//78 45ms
//46 4ms
//46 queueing 80ms
//127 80ms
t.Run("stream without concurrency limit", func(t *testing.T) {
stream := cstream.New()
for _, millis := range times {
dur := time.Duration(millis) * time.Millisecond
t := time.Now()
stream.Go(func() cstream.Callback {
fmt.Println(time.Now().Sub(t).Milliseconds(), "queueing", dur)
time.Sleep(dur)
// This will print in the order the tasks were submitted
return func() { fmt.Println(time.Now().Sub(t).Milliseconds(), dur) }
})
}
stream.Wait()
})
// With concurrency limit, the goroutines are executed concurrently up to the limit.
//0 queueing 20ms
//0 queueing 52ms
//0 queueing 16ms
//0 queueing 45ms
//0 queueing 4ms
//0 queueing 80ms
//20 20ms
//52 52ms
//52 16ms
//52 45ms
//52 4ms
//81 80ms
t.Run("stream with concurrency limit", func(t *testing.T) {
stream := cstream.New().WithMaxGoroutines(len(times))
for _, millis := range times {
dur := time.Duration(millis) * time.Millisecond
t := time.Now()
stream.Go(func() cstream.Callback {
fmt.Println(time.Now().Sub(t).Milliseconds(), "queueing", dur)
time.Sleep(dur)
// This will print in the order the tasks were submitted
return func() { fmt.Println(time.Now().Sub(t).Milliseconds(), dur) }
})
}
stream.Wait()
})
}Metadata
Metadata
Assignees
Labels
No labels