Skip to content

Proposal: concurrency with 1.23 iterator #137

@amikai

Description

@amikai

Proposal

I propose the four functions

package iter
// unorder
func SeqMap[In, Out any](iter.Seq[In], func(In) Out) iter.Seq[Out]
func Seq2Map[In1, In2, Out1, Out2 any](iter.Seq2[In1, In2], func(In1, In2) (Out1, Out2)) iter.Seq2[Out1, Out2]

package stream
// order
func SeqMap[In, Out any](iter.Seq[In], func(In) Out) iter.Seq[Out]
func Seq2Map[In1, In2, Out1, Out2 any](iter.Seq2[In1, In2], func(In1, In2) (Out1, Out2)) iter.Seq2[Out1, Out2]

SeqMap under conc iter:

  • On user side, users to be able to get results processed with concurrency through a sequential for loop. When users break the loop, the running goroutines should be canceled to avoid wasting resources.
  • The values are received in an orderly fashion and processed by the function. However, we don't know which one finishes first because they are processed concurrently.
  • The SeqMap should limit the number of goroutines when running, like iter.Map
  • User can decide map to different type or not
for v := range iter.SeqMap(slices.Values([]int{1, 2, 3, 4, 5, 6, 7}), func(x int) int { return x * 2 }) {
        if v == 25 {
            break
        }
	fmt.Println(x)
}
// Output is not order:
// 4
// 1
// 9
// 16
// 36

SeqMap2 under conc iter: similar to iter.SeqMap but returns two values, allowing it to be used for error handling.

// we assume that seq2WithError produce following
// 1, nil
// 2, nil
// 3, errors.New("123")
// 4, nil
// 5, nil
var seq2WithError iter.Seq2[int, error] = ...

for v := range iter.SeqMap2(seq2WithError, func(x int, err error) int { return x * 2, err }) {
        if err != nil {
            break
        }
        fmt.Println(x)
}
// Output is not order:
// 4
// 1
// 16

SeqMap under conc stream: similar to iter.SeqMap, but in order way.

for v := range iter.SeqMap(slices.Values([]int{1, 2, 3, 4, 5, 6, 7}), func(x int) int { return x * 2 }) {
        if v == 25 {
            break
        }
	fmt.Println(x)
}
// Output is ordered:
// 1
// 4
// 9
// 16

SeqMap2 under conc conc: similar to stream.SeqMap but returns two values, allowing it to be used for error handling.

// we assume that seq2WithError produce following
// 1, nil
// 2, nil
// 3, errors.New("123")
// 4, nil
// 5, nil
var seq2WithError iter.Seq2[int, error] = ...

for v := range stream.SeqMap2(seq2WithError, func(x int, err error) int { return x * 2, err }) {
        if err != nil {
            break
        }
        fmt.Println(x)
}
// Output is ordered:
// 1
// 4

Some thought

I believe the best aspects of a concurrent iterator are its ability to calculate lazily and its resource-saving capability. Functions like iter.Map and iter.ForEach will exhaust all elements, but with an iterator, when the output iterator stops, the input iterator should also cease retrieving elements and stop all goroutines.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions