Skip to content

conc/stream.Stream run sequentially if no max goroutine set #153

@pascalmail

Description

@pascalmail

If we do not set any concurrency, Stream will do its work sequentially.

I think it is because of https://github.com/sourcegraph/conc/blob/v0.3.0/stream/stream.go#L112, where conc will set the internal queue to 1 instead of unlimited. Is this expected?

Example:

func TestParallelWork(t *testing.T) {
	times := []int{20, 52, 16, 45, 4, 80}

	// Without concurrency limit, the goroutines scheduling blocks until previous callbacks are executed.
	//0 queueing 20ms
	//0 queueing 52ms
	//20 20ms
	//20 queueing 16ms
	//52 52ms
	//52 16ms
	//32 queueing 45ms
	//0 queueing 4ms
	//78 45ms
	//46 4ms
	//46 queueing 80ms
	//127 80ms
	t.Run("stream without concurrency limit", func(t *testing.T) {
		stream := cstream.New()
		for _, millis := range times {
			dur := time.Duration(millis) * time.Millisecond
			t := time.Now()
			stream.Go(func() cstream.Callback {
				fmt.Println(time.Now().Sub(t).Milliseconds(), "queueing", dur)
				time.Sleep(dur)
				// This will print in the order the tasks were submitted
				return func() { fmt.Println(time.Now().Sub(t).Milliseconds(), dur) }
			})
		}
		stream.Wait()
	})

	// With concurrency limit, the goroutines are executed concurrently up to the limit.
	//0 queueing 20ms
	//0 queueing 52ms
	//0 queueing 16ms
	//0 queueing 45ms
	//0 queueing 4ms
	//0 queueing 80ms
	//20 20ms
	//52 52ms
	//52 16ms
	//52 45ms
	//52 4ms
	//81 80ms
	t.Run("stream with concurrency limit", func(t *testing.T) {
		stream := cstream.New().WithMaxGoroutines(len(times))
		for _, millis := range times {
			dur := time.Duration(millis) * time.Millisecond
			t := time.Now()
			stream.Go(func() cstream.Callback {
				fmt.Println(time.Now().Sub(t).Milliseconds(), "queueing", dur)
				time.Sleep(dur)
				// This will print in the order the tasks were submitted
				return func() { fmt.Println(time.Now().Sub(t).Milliseconds(), dur) }
			})
		}
		stream.Wait()
	})
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions