Skip to content
Merged
2 changes: 2 additions & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ linters:
# note(rdr): doesn't allow the use of optional embedded fields in structs. . I am not sure
# if it is beneficial
- "-QF1008"
# note: this disables deprecation warning because it is active for .golangci_diff.yaml
- "-SA1019"
exclusions:
generated: lax
presets:
Expand Down
4 changes: 4 additions & 0 deletions .golangci_diff.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@ linters:
enable:
- lll
- nolintlint
- staticcheck
settings:
lll:
line-length: 100
nolintlint:
allow-unused: false
require-explanation: true
require-specific: true
staticcheck:
checks:
- "SA1019"
run:
timeout: 10m
2 changes: 1 addition & 1 deletion consensus/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func initNode(

mockServices := consensus.InitMockServices(0, 0, index, cfg.nodeCount)

logger = &utils.ZapLogger{SugaredLogger: logger.Named(fmt.Sprint(index))}
logger = logger.Named(fmt.Sprint(index))
consensusDB := memory.New()
bc := getBlockchain(t, genesisDiff, genesisClasses)

Expand Down
173 changes: 98 additions & 75 deletions consensus/p2p/buffered/buffered_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/starknet-io/starknet-p2pspecs/p2p/proto/consensus/consensus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/protobuf/proto"
)
Expand All @@ -40,96 +41,118 @@ type origin struct {
}

func TestBufferedTopicSubscriptionAndProtoBroadcaster(t *testing.T) {
t.Run(fmt.Sprintf("%d nodes, each sending %d messages", nodeCount, messageCount), func(t *testing.T) {
logger, err := utils.NewZapLogger(utils.NewLogLevel(logLevel), true)
require.NoError(t, err)
t.Run(
fmt.Sprintf("%d nodes, each sending %d messages", nodeCount, messageCount),
func(t *testing.T) {
logger, err := utils.NewZapLogger(utils.NewLogLevel(logLevel), true)
require.NoError(t, err)

nodes := testutils.BuildNetworks(t, testutils.LineNetworkConfig(nodeCount))
topics := nodes.JoinTopic(t, chainID, protocolID, topicName)
nodes := testutils.BuildNetworks(t, testutils.LineNetworkConfig(nodeCount))
topics := nodes.JoinTopic(t, chainID, protocolID, topicName)

messages := make([][]*TestMessage, nodeCount)
allMessages := make(map[string]origin)
messages := make([][]*TestMessage, nodeCount)
allMessages := make(map[string]origin)

for i := range messages {
messages[i] = make([]*TestMessage, messageCount)
for j := range messages[i] {
msg := getTestMessage(i, j)
messages[i][j] = msg
for i := range messages {
messages[i] = make([]*TestMessage, messageCount)
for j := range messages[i] {
msg := getTestMessage(i, j)
messages[i][j] = msg

msgBytes, err := proto.Marshal(msg)
require.NoError(t, err)
msgBytes, err := proto.Marshal(msg)
require.NoError(t, err)

allMessages[string(msgBytes)] = origin{Source: i, Index: j}
allMessages[string(msgBytes)] = origin{Source: i, Index: j}
}
}
}

iterator := iter.Iterator[*pubsub.Topic]{MaxGoroutines: nodeCount}
finished := make(chan struct{}, nodeCount)
liveness := make(chan struct{}, 1)
iterator := iter.Iterator[*pubsub.Topic]{MaxGoroutines: nodeCount}
finished := make(chan struct{}, nodeCount)
liveness := make(chan struct{}, 1)

go func() {
iterator.ForEachIdx(topics, func(i int, destination **pubsub.Topic) {
logger := &utils.ZapLogger{SugaredLogger: logger.Named(fmt.Sprintf("destination-%d", i))}
pending := maps.Clone(allMessages)
go func() {
iterator.ForEachIdx(topics, func(i int, destination **pubsub.Topic) {
logger := logger.Named(fmt.Sprintf("destination-%d", i))
pending := maps.Clone(allMessages)

// Ignore the messages we are broadcasting
for _, message := range messages[i] {
msgBytes, err := proto.Marshal(message)
require.NoError(t, err)
delete(pending, string(msgBytes))
}

subscription := buffered.NewTopicSubscription(logger, nodeCount*messageCount, func(ctx context.Context, msg *pubsub.Message) {
msgStr := string(msg.Message.Data)
if _, ok := pending[msgStr]; !ok {
return
// Ignore the messages we are broadcasting
for _, message := range messages[i] {
msgBytes, err := proto.Marshal(message)
require.NoError(t, err)
delete(pending, string(msgBytes))
}

select {
case liveness <- struct{}{}:
default:
}

delete(pending, msgStr)

if len(pending) == 0 {
finished <- struct{}{}
logger.Info("all messages received")
subscription := buffered.NewTopicSubscription(
logger,
nodeCount*messageCount,
func(ctx context.Context, msg *pubsub.Message) {
msgStr := string(msg.Message.Data)
if _, ok := pending[msgStr]; !ok {
return
}

select {
case liveness <- struct{}{}:
default:
}

delete(pending, msgStr)

if len(pending) == 0 {
finished <- struct{}{}
logger.Info("all messages received")
}
logger.Debug(
"received",
zap.String("message", string(msg.Message.Data)),
zap.Int("pending", len(pending)),
)
})

subscription.Loop(t.Context(), *destination)
if len(pending) > 0 {
logger.Info(
"missing messages",
zap.Any("pending", slices.Collect(maps.Values(pending))),
)
}
logger.Debugw("received", "message", string(msg.Message.Data), "pending", len(pending))
})

subscription.Loop(t.Context(), *destination)
if len(pending) > 0 {
logger.Infow("missing messages", "pending", slices.Collect(maps.Values(pending)))
}
})
}()

go func() {
iterator.ForEachIdx(topics, func(i int, source **pubsub.Topic) {
logger := &utils.ZapLogger{SugaredLogger: logger.Named(fmt.Sprintf("source-%d", i))}
rebroadcastInterval := config.DefaultBufferSizes.RebroadcastInterval

var rebroadcastStrategy buffered.RebroadcastStrategy[*TestMessage]
if i%2 == 0 {
rebroadcastStrategy = buffered.NewRebroadcastStrategy(rebroadcastInterval, func(msg *TestMessage) uint64 {
return msg.BlockNumber
}()

go func() {
iterator.ForEachIdx(
topics,
func(i int, source **pubsub.Topic) {
logger := logger.Named(fmt.Sprintf("source-%d", i))
rebroadcastInterval := config.DefaultBufferSizes.RebroadcastInterval

var rebroadcastStrategy buffered.RebroadcastStrategy[*TestMessage]
if i%2 == 0 {
rebroadcastStrategy = buffered.NewRebroadcastStrategy(
rebroadcastInterval,
func(msg *TestMessage) uint64 {
return msg.BlockNumber
},
)
}
broadcaster := buffered.NewProtoBroadcaster(
logger, messageCount, rebroadcastInterval, rebroadcastStrategy,
)
go broadcaster.Loop(t.Context(), *source)
for _, message := range messages[i] {
logger.Debug(
"publishing",
zap.Any("message", message),
)
broadcaster.Broadcast(t.Context(), message)
}
})
}
broadcaster := buffered.NewProtoBroadcaster(logger, messageCount, rebroadcastInterval, rebroadcastStrategy)
go broadcaster.Loop(t.Context(), *source)
for _, message := range messages[i] {
logger.Debugw("publishing", "message", message)
broadcaster.Broadcast(t.Context(), message)
}
})
}()
}()

for range nodeCount {
wait(t, liveness, finished)
}
})
for range nodeCount {
wait(t, liveness, finished)
}
})

t.Run("canceled context", func(t *testing.T) {
logger, err := utils.NewZapLogger(utils.NewLogLevel(logLevel), true)
Expand Down
2 changes: 1 addition & 1 deletion mempool/p2p/p2p_broadcasters_listeners_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestMempoolBroadcastersAndListeners(t *testing.T) {
transactionWait := conc.NewWaitGroup()
peerWait := conc.NewWaitGroup()
for index, node := range nodes {
logger := &utils.ZapLogger{SugaredLogger: logger.Named(fmt.Sprint(index))}
logger := logger.Named(fmt.Sprint(index))

received := make(chan *mempool.BroadcastedTransaction, txCount)
pool := mockMempool(received)
Expand Down
3 changes: 1 addition & 2 deletions p2p/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,7 @@ func (s *Service) logError(msg string, err error) {
if !errors.Is(err, context.Canceled) {
var log utils.SimpleLogger
if v, ok := s.log.(*utils.ZapLogger); ok {
enhancedLogger := v.SugaredLogger.Desugar().WithOptions(zap.AddCallerSkip(1)).Sugar()
log = &utils.ZapLogger{SugaredLogger: enhancedLogger}
log = v.WithOptions(zap.AddCallerSkip(1))
} else {
log = s.log
}
Expand Down
Loading
Loading