Skip to content

OpenStream似乎有并行性能问题 #97

@WSitong

Description

@WSitong

我在测试我代码发现OpenStream并行性能存在问题,甚至没有串行好。不知道是不是运行平台或设备的原因,我的电脑运行下面代码可以复现这个问题(并行用时12.8s,串行用时1.2s)。下面代码里会运行一个回显服务器,Test_Serial和Test_Parallel作为客户端,会先创建一个smux.session,并开1000个协程创建stream,然后发送和接受数据。唯一的区别就是Test_Serial在OpenStream时使用了锁:

package openstream_test

import (
	"github.com/xtaci/kcp-go/v5"
	"github.com/xtaci/smux"
	"log"
	"net"
	"sync"
	"testing"
	"time"
)

var listener net.Listener

func Test_Serial(t *testing.T) {
	// 测试串行性能
	if listener == nil {
		go runServer()
		time.Sleep(time.Second)
	}
	conn, err := kcp.DialWithOptions("127.0.0.1:8000", nil, 0, 0)
	if err != nil {
		t.Error(err)
	}
	sess, err := smux.Client(conn, smux.DefaultConfig())
	if err != nil {
		t.Error(err)
	}
	// 使用同一个smux.session,串行创建1000个stream发送数据
	data := []byte("this is s test message")
	wg := sync.WaitGroup{}
	mux := sync.Mutex{}
	for i := 0; i < 1000; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			mux.Lock()
			stream, err := sess.OpenStream()
			mux.Unlock()
			if err != nil {
				return
			}
			stream.Write(data)
			buffer := make([]byte, 1024)
			stream.Read(buffer)
		}()
	}
	wg.Wait()
}

func Test_Parallel(t *testing.T) {
	// 测试并行性能
	if listener == nil {
		go runServer()
		time.Sleep(time.Second)
	}
	conn, err := kcp.DialWithOptions("127.0.0.1:8000", nil, 0, 0)
	if err != nil {
		t.Error(err)
	}
	sess, err := smux.Client(conn, smux.DefaultConfig())
	if err != nil {
		t.Error(err)
	}
	// 使用同一个smux.session,并行创建1000个stream发送数据
	data := []byte("this is s test message")
	wg := sync.WaitGroup{}
	//mux := sync.Mutex{}
	for i := 0; i < 1000; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			//mux.Lock()
			stream, err := sess.OpenStream()
			//mux.Unlock()
			if err != nil {
				return
			}
			stream.Write(data)
			buffer := make([]byte, 1024)
			stream.Read(buffer)
		}()
	}
	wg.Wait()
}

func runServer() {
	// 回显服务器
	lis, err := kcp.ListenWithOptions(":8000", nil, 0, 0)
	if err != nil {
		log.Fatal(err)
	}
	listener = lis
	for {
		conn, err := lis.Accept()
		if err != nil {
			log.Fatal(err)
		}
		go func() {
			defer conn.Close()
			sess, err := smux.Server(conn, smux.DefaultConfig())
			if err != nil {
				log.Fatal(err)
			}
			for {
				stream, err := sess.AcceptStream()
				if err != nil {
					log.Fatal(err)
				}
				go func() {
					buffer := make([]byte, 1024)
					n, err := stream.Read(buffer)
					if err != nil {
						log.Fatal(err)
					}
					stream.Write(buffer[:n])
					time.Sleep(time.Minute)
				}()
			}
		}()
	}
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions