-
Notifications
You must be signed in to change notification settings - Fork 215
Open
Description
我在测试我代码发现OpenStream并行性能存在问题,甚至没有串行好。不知道是不是运行平台或设备的原因,我的电脑运行下面代码可以复现这个问题(并行用时12.8s,串行用时1.2s)。下面代码里会运行一个回显服务器,Test_Serial和Test_Parallel作为客户端,会先创建一个smux.session,并开1000个协程创建stream,然后发送和接受数据。唯一的区别就是Test_Serial在OpenStream时使用了锁:
package openstream_test
import (
"github.com/xtaci/kcp-go/v5"
"github.com/xtaci/smux"
"log"
"net"
"sync"
"testing"
"time"
)
var listener net.Listener
func Test_Serial(t *testing.T) {
// 测试串行性能
if listener == nil {
go runServer()
time.Sleep(time.Second)
}
conn, err := kcp.DialWithOptions("127.0.0.1:8000", nil, 0, 0)
if err != nil {
t.Error(err)
}
sess, err := smux.Client(conn, smux.DefaultConfig())
if err != nil {
t.Error(err)
}
// 使用同一个smux.session,串行创建1000个stream发送数据
data := []byte("this is s test message")
wg := sync.WaitGroup{}
mux := sync.Mutex{}
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
mux.Lock()
stream, err := sess.OpenStream()
mux.Unlock()
if err != nil {
return
}
stream.Write(data)
buffer := make([]byte, 1024)
stream.Read(buffer)
}()
}
wg.Wait()
}
func Test_Parallel(t *testing.T) {
// 测试并行性能
if listener == nil {
go runServer()
time.Sleep(time.Second)
}
conn, err := kcp.DialWithOptions("127.0.0.1:8000", nil, 0, 0)
if err != nil {
t.Error(err)
}
sess, err := smux.Client(conn, smux.DefaultConfig())
if err != nil {
t.Error(err)
}
// 使用同一个smux.session,并行创建1000个stream发送数据
data := []byte("this is s test message")
wg := sync.WaitGroup{}
//mux := sync.Mutex{}
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
//mux.Lock()
stream, err := sess.OpenStream()
//mux.Unlock()
if err != nil {
return
}
stream.Write(data)
buffer := make([]byte, 1024)
stream.Read(buffer)
}()
}
wg.Wait()
}
func runServer() {
// 回显服务器
lis, err := kcp.ListenWithOptions(":8000", nil, 0, 0)
if err != nil {
log.Fatal(err)
}
listener = lis
for {
conn, err := lis.Accept()
if err != nil {
log.Fatal(err)
}
go func() {
defer conn.Close()
sess, err := smux.Server(conn, smux.DefaultConfig())
if err != nil {
log.Fatal(err)
}
for {
stream, err := sess.AcceptStream()
if err != nil {
log.Fatal(err)
}
go func() {
buffer := make([]byte, 1024)
n, err := stream.Read(buffer)
if err != nil {
log.Fatal(err)
}
stream.Write(buffer[:n])
time.Sleep(time.Minute)
}()
}
}()
}
}Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels