diff --git a/CHANGELOG.md b/CHANGELOG.md index a2b75c64e..16514939a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ * Masked the sensitive credential data in the connection string (DSN, data source name) from error messages for security reasons +* Fixed issue with topic offsets update in transactions ## v3.121.0 * Changed internal pprof label to pyroscope supported format diff --git a/internal/topic/topicreaderinternal/batch_tx_storage.go b/internal/topic/topicreaderinternal/batch_tx_storage.go new file mode 100644 index 000000000..021937108 --- /dev/null +++ b/internal/topic/topicreaderinternal/batch_tx_storage.go @@ -0,0 +1,230 @@ +package topicreaderinternal + +import ( + "context" + "errors" + "fmt" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicreader" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawydb" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/operation" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/topic/topicreadercommon" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/tx" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync" +) + +// errNoBatches is returned when there are no batches to process for a transaction. +var errNoBatches = errors.New("no batches for transaction") + +// transactionBatches stores batches for a single transaction. +// It is not thread-safe and should be accessed only through batchTxStorage methods. +type transactionBatches struct { + batches []*topicreadercommon.PublicBatch +} + +// AddBatch adds a batch to the transaction. +func (tb *transactionBatches) AddBatch(batch *topicreadercommon.PublicBatch) { + tb.batches = append(tb.batches, batch) +} + +// GetBatches returns all batches stored for this transaction. +func (tb *transactionBatches) GetBatches() []*topicreadercommon.PublicBatch { + return tb.batches +} + +// batchTxStorage stores batches associated with transactions for commit within transaction. +// It is thread-safe and allows multiple transactions to be managed concurrently. +type batchTxStorage struct { + transactions map[string]*transactionBatches + consumer string + m xsync.Mutex +} + +// newBatchTxStorage creates a new batch transaction storage with the given consumer name. +// The consumer name is used when building UpdateOffsetsInTransactionRequest. +func newBatchTxStorage(consumer string) *batchTxStorage { + return &batchTxStorage{ + transactions: make(map[string]*transactionBatches), + consumer: consumer, + } +} + +// GetOrCreateTransactionBatches gets or creates a transaction batches handler for the given transaction. +// This method is thread-safe. +func (s *batchTxStorage) GetOrCreateTransactionBatches( + transaction tx.Transaction, +) (batches *transactionBatches, created bool) { + s.m.Lock() + defer s.m.Unlock() + + txID := transaction.ID() + txBatches, exists := s.transactions[txID] + if !exists { + txBatches = &transactionBatches{ + batches: make([]*topicreadercommon.PublicBatch, 0), + } + s.transactions[txID] = txBatches + created = true + } + + return txBatches, created +} + +// GetBatches returns all batches stored for the given transaction. +// Returns an empty slice (nil) if no batches are stored for the transaction. +// This method is thread-safe. +func (s *batchTxStorage) GetBatches(transaction tx.Transaction) []*topicreadercommon.PublicBatch { + s.m.Lock() + defer s.m.Unlock() + + txBatches, ok := s.transactions[transaction.ID()] + if !ok { + return nil + } + + return txBatches.GetBatches() +} + +// GetUpdateOffsetsInTransactionRequest builds an UpdateOffsetsInTransactionRequest +// from all batches stored for the given transaction. +// The batches are converted to commit ranges, optimized (adjacent ranges are merged), +// and grouped by topic and partition. +// Returns nil, nil if no batches are stored for the transaction. +// Returns an error if session info is missing for any partition offset. +// This method is thread-safe. +func (s *batchTxStorage) GetUpdateOffsetsInTransactionRequest( + transaction tx.Transaction, +) (*rawtopic.UpdateOffsetsInTransactionRequest, error) { + s.m.Lock() + txBatches, ok := s.transactions[transaction.ID()] + s.m.Unlock() + + if !ok { + return nil, xerrors.WithStackTrace(errNoBatches) + } + + batches := txBatches.GetBatches() + if len(batches) == 0 { + return nil, xerrors.WithStackTrace(errNoBatches) + } + + // Convert batches to CommitRanges + commitRanges := topicreadercommon.NewCommitRangesWithCapacity(len(batches)) + for _, batch := range batches { + commitRange := topicreadercommon.GetCommitRange(batch) + commitRanges.AppendCommitRange(commitRange) + } + + // Optimize ranges (merge adjacent ranges) + commitRanges.Optimize() + + // Build sessionID -> (topic, partitionID) map for efficient lookup + sessionInfoMap := s.buildSessionInfoMap(batches) + + // Convert to partition offsets + partitionOffsets := commitRanges.ToPartitionsOffsets() + if len(partitionOffsets) == 0 { + return nil, xerrors.WithStackTrace(errNoBatches) + } + + // Group partition offsets by topic + topicMap, err := s.buildPartitionOffsetsMap(partitionOffsets, sessionInfoMap) + if err != nil { + return nil, err + } + if len(topicMap) == 0 { + return nil, xerrors.WithStackTrace(errNoBatches) + } + + // Build request + return s.buildUpdateOffsetsRequest(transaction, topicMap), nil +} + +type sessionInfo struct { + topic string + partitionID int64 +} + +// buildSessionInfoMap creates a map from partition session ID to topic and partition ID. +func (s *batchTxStorage) buildSessionInfoMap( + batches []*topicreadercommon.PublicBatch, +) map[rawtopicreader.PartitionSessionID]sessionInfo { + sessionInfoMap := make(map[rawtopicreader.PartitionSessionID]sessionInfo) + for _, batch := range batches { + commitRange := topicreadercommon.GetCommitRange(batch) + sessionID := commitRange.PartitionSession.StreamPartitionSessionID + if _, exists := sessionInfoMap[sessionID]; !exists { + sessionInfoMap[sessionID] = sessionInfo{ + topic: commitRange.PartitionSession.Topic, + partitionID: commitRange.PartitionSession.PartitionID, + } + } + } + + return sessionInfoMap +} + +// buildPartitionOffsetsMap groups partition offsets by topic. +func (s *batchTxStorage) buildPartitionOffsetsMap( + partitionOffsets []rawtopicreader.PartitionCommitOffset, + sessionInfoMap map[rawtopicreader.PartitionSessionID]sessionInfo, +) (map[string][]rawtopic.UpdateOffsetsInTransactionRequest_PartitionOffsets, error) { + topicMap := make(map[string][]rawtopic.UpdateOffsetsInTransactionRequest_PartitionOffsets) + for i := range partitionOffsets { + po := &partitionOffsets[i] + info, ok := sessionInfoMap[po.PartitionSessionID] + if !ok { + return nil, xerrors.WithStackTrace( + fmt.Errorf("session info not found for partition session ID %d", po.PartitionSessionID), + ) + } + + topicMap[info.topic] = append(topicMap[info.topic], rawtopic.UpdateOffsetsInTransactionRequest_PartitionOffsets{ + PartitionID: info.partitionID, + PartitionOffsets: po.Offsets, + }) + } + + return topicMap, nil +} + +// buildUpdateOffsetsRequest creates the final UpdateOffsetsInTransactionRequest. +func (s *batchTxStorage) buildUpdateOffsetsRequest( + transaction tx.Transaction, + topicMap map[string][]rawtopic.UpdateOffsetsInTransactionRequest_PartitionOffsets, +) *rawtopic.UpdateOffsetsInTransactionRequest { + req := &rawtopic.UpdateOffsetsInTransactionRequest{ + OperationParams: rawydb.NewRawOperationParamsFromProto( + operation.Params(context.Background(), 0, 0, operation.ModeSync), + ), + Tx: rawtopiccommon.TransactionIdentity{ + ID: transaction.ID(), + Session: transaction.SessionID(), + }, + Consumer: s.consumer, + Topics: make([]rawtopic.UpdateOffsetsInTransactionRequest_TopicOffsets, 0, len(topicMap)), + } + + for path, partitions := range topicMap { + req.Topics = append(req.Topics, rawtopic.UpdateOffsetsInTransactionRequest_TopicOffsets{ + Path: path, + Partitions: partitions, + }) + } + + return req +} + +// Clear removes all batches stored for the given transaction. +// After calling Clear, GetBatches and GetUpdateOffsetsInTransactionRequest +// will return empty results for this transaction. +// This method is thread-safe. +func (s *batchTxStorage) Clear(transaction tx.Transaction) { + s.m.Lock() + defer s.m.Unlock() + + delete(s.transactions, transaction.ID()) +} diff --git a/internal/topic/topicreaderinternal/batch_tx_storage_test.go b/internal/topic/topicreaderinternal/batch_tx_storage_test.go new file mode 100644 index 000000000..66265de65 --- /dev/null +++ b/internal/topic/topicreaderinternal/batch_tx_storage_test.go @@ -0,0 +1,419 @@ +package topicreaderinternal + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicreader" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/topic/topicreadercommon" +) + +func TestBatchTxStorageAdd_NewTransaction(t *testing.T) { + tx := newMockTransactionWrapper("session-1", "tx-1") + batch := createTestBatch("topic-1", 1, 10, 20, 1) + + storage := newBatchTxStorage("test-consumer") + txBatches, isNew := storage.GetOrCreateTransactionBatches(tx) + txBatches.AddBatch(batch) + + assert.True(t, isNew) +} + +func TestBatchTxStorageAdd_ExistingTransaction(t *testing.T) { + tx := newMockTransactionWrapper("session-1", "tx-1") + batch1 := createTestBatch("topic-1", 1, 10, 20, 1) + batch2 := createTestBatch("topic-1", 1, 20, 30, 1) + + storage := newBatchTxStorage("test-consumer") + + txBatches1, _ := storage.GetOrCreateTransactionBatches(tx) + txBatches1.AddBatch(batch1) + _, isNew := storage.GetOrCreateTransactionBatches(tx) + txBatches1.AddBatch(batch2) + + assert.False(t, isNew) +} + +func TestBatchTxStorageGetBatches_Empty(t *testing.T) { + tx := newMockTransactionWrapper("session-1", "tx-1") + + batches := newBatchTxStorage("test-consumer").GetBatches(tx) + + assert.Empty(t, batches) +} + +func TestBatchTxStorageGetBatches_WithBatches(t *testing.T) { + tx := newMockTransactionWrapper("session-1", "tx-1") + batch1 := createTestBatch("topic-1", 1, 10, 20, 1) + batch2 := createTestBatch("topic-1", 1, 20, 30, 1) + + storage := newBatchTxStorage("test-consumer") + txBatches, _ := storage.GetOrCreateTransactionBatches(tx) + txBatches.AddBatch(batch1) + txBatches.AddBatch(batch2) + + batches := storage.GetBatches(tx) + + require.Len(t, batches, 2) + assert.Equal(t, batch1, batches[0]) + assert.Equal(t, batch2, batches[1]) +} + +func TestBatchTxStorageGetUpdateOffsetsInTransactionRequest_Empty(t *testing.T) { + tx := newMockTransactionWrapper("session-1", "tx-1") + + req, err := newBatchTxStorage("test-consumer").GetUpdateOffsetsInTransactionRequest(tx) + + assert.Error(t, err) + assert.True(t, errors.Is(err, errNoBatches)) + assert.Nil(t, req) +} + +func TestBatchTxStorageGetUpdateOffsetsInTransactionRequest_SingleBatch(t *testing.T) { + tx := newMockTransactionWrapper("session-1", "tx-1") + _ = tx.UnLazy(context.TODO()) + batch := createTestBatch("topic-1", 1, 10, 20, 1) + + storage := newBatchTxStorage("test-consumer") + + txBatches, _ := storage.GetOrCreateTransactionBatches(tx) + txBatches.AddBatch(batch) + + req, err := storage.GetUpdateOffsetsInTransactionRequest(tx) + + require.NoError(t, err) + require.NotNil(t, req) + assert.Equal(t, "test-consumer", req.Consumer) + assert.Equal(t, "tx-1", req.Tx.ID) + assert.Equal(t, "session-1", req.Tx.Session) + require.Len(t, req.Topics, 1) + assert.Equal(t, "topic-1", req.Topics[0].Path) + require.Len(t, req.Topics[0].Partitions, 1) + assert.Equal(t, int64(1), req.Topics[0].Partitions[0].PartitionID) + require.Len(t, req.Topics[0].Partitions[0].PartitionOffsets, 1) + assert.Equal(t, rawtopiccommon.Offset(10), req.Topics[0].Partitions[0].PartitionOffsets[0].Start) + assert.Equal(t, rawtopiccommon.Offset(20), req.Topics[0].Partitions[0].PartitionOffsets[0].End) +} + +func TestBatchTxStorageGetUpdateOffsetsInTransactionRequest_MultipleBatches(t *testing.T) { + tx := newMockTransactionWrapper("session-1", "tx-1") + batch1 := createTestBatch("topic-1", 1, 10, 20, 1) + batch2 := createTestBatch("topic-1", 1, 20, 30, 1) + + storage := newBatchTxStorage("test-consumer") + txBatches, _ := storage.GetOrCreateTransactionBatches(tx) + txBatches.AddBatch(batch1) + txBatches.AddBatch(batch2) + + req, err := storage.GetUpdateOffsetsInTransactionRequest(tx) + + require.NoError(t, err) + require.NotNil(t, req) + require.Len(t, req.Topics, 1) + require.Len(t, req.Topics[0].Partitions, 1) + require.Len(t, req.Topics[0].Partitions[0].PartitionOffsets, 1) + assert.Equal(t, rawtopiccommon.Offset(10), req.Topics[0].Partitions[0].PartitionOffsets[0].Start) + assert.Equal(t, rawtopiccommon.Offset(30), req.Topics[0].Partitions[0].PartitionOffsets[0].End) +} + +func TestBatchTxStorageGetUpdateOffsetsInTransactionRequest_MultipleTopics(t *testing.T) { + tx := newMockTransactionWrapper("session-1", "tx-1") + batch1 := createTestBatch("topic-1", 1, 10, 20, 1) + batch2 := createTestBatch("topic-2", 2, 30, 40, 2) + + storage := newBatchTxStorage("test-consumer") + txBatches, _ := storage.GetOrCreateTransactionBatches(tx) + txBatches.AddBatch(batch1) + txBatches.AddBatch(batch2) + + req, err := storage.GetUpdateOffsetsInTransactionRequest(tx) + + require.NoError(t, err) + require.NotNil(t, req) + require.Len(t, req.Topics, 2) + topicMap := buildTopicMap(req.Topics) + assert.Contains(t, topicMap, "topic-1") + assert.Contains(t, topicMap, "topic-2") +} + +func TestBatchTxStorageGetUpdateOffsetsInTransactionRequest_MultiplePartitionsSameTopic(t *testing.T) { + tx := newMockTransactionWrapper("session-1", "tx-1") + batch1 := createTestBatch("topic-1", 1, 10, 20, 1) + batch2 := createTestBatch("topic-1", 2, 30, 40, 2) + + storage := newBatchTxStorage("test-consumer") + txBatches, _ := storage.GetOrCreateTransactionBatches(tx) + txBatches.AddBatch(batch1) + txBatches.AddBatch(batch2) + + req, err := storage.GetUpdateOffsetsInTransactionRequest(tx) + + require.NoError(t, err) + require.NotNil(t, req) + require.Len(t, req.Topics, 1) + assert.Equal(t, "topic-1", req.Topics[0].Path) + require.Len(t, req.Topics[0].Partitions, 2) + + partitionMap := buildPartitionMap(req.Topics[0].Partitions) + assert.Contains(t, partitionMap, int64(1)) + assert.Contains(t, partitionMap, int64(2)) + require.Len(t, partitionMap[1].PartitionOffsets, 1) + require.Len(t, partitionMap[2].PartitionOffsets, 1) + assert.Equal(t, rawtopiccommon.Offset(10), partitionMap[1].PartitionOffsets[0].Start) + assert.Equal(t, rawtopiccommon.Offset(20), partitionMap[1].PartitionOffsets[0].End) + assert.Equal(t, rawtopiccommon.Offset(30), partitionMap[2].PartitionOffsets[0].Start) + assert.Equal(t, rawtopiccommon.Offset(40), partitionMap[2].PartitionOffsets[0].End) +} + +func TestBatchTxStorageGetUpdateOffsetsInTransactionRequest_NonAdjacentBatches(t *testing.T) { + tx := newMockTransactionWrapper("session-1", "tx-1") + batch1 := createTestBatch("topic-1", 1, 10, 20, 1) + batch2 := createTestBatch("topic-1", 1, 30, 40, 1) + + storage := newBatchTxStorage("test-consumer") + txBatches, _ := storage.GetOrCreateTransactionBatches(tx) + txBatches.AddBatch(batch1) + txBatches.AddBatch(batch2) + + req, err := storage.GetUpdateOffsetsInTransactionRequest(tx) + + require.NoError(t, err) + require.NotNil(t, req) + require.Len(t, req.Topics, 1) + require.Len(t, req.Topics[0].Partitions, 1) + require.Len(t, req.Topics[0].Partitions[0].PartitionOffsets, 2) + assert.Equal(t, rawtopiccommon.Offset(10), req.Topics[0].Partitions[0].PartitionOffsets[0].Start) + assert.Equal(t, rawtopiccommon.Offset(20), req.Topics[0].Partitions[0].PartitionOffsets[0].End) + assert.Equal(t, rawtopiccommon.Offset(30), req.Topics[0].Partitions[0].PartitionOffsets[1].Start) + assert.Equal(t, rawtopiccommon.Offset(40), req.Topics[0].Partitions[0].PartitionOffsets[1].End) +} + +func TestBatchTxStorageGetUpdateOffsetsInTransactionRequest_MultiplePartitionsMultipleTopics(t *testing.T) { + tx := newMockTransactionWrapper("session-1", "tx-1") + batch1 := createTestBatch("topic-1", 1, 10, 20, 1) + batch2 := createTestBatch("topic-1", 2, 30, 40, 2) + batch3 := createTestBatch("topic-2", 1, 50, 60, 3) + batch4 := createTestBatch("topic-2", 2, 70, 80, 4) + + storage := newBatchTxStorage("test-consumer") + txBatches, _ := storage.GetOrCreateTransactionBatches(tx) + txBatches.AddBatch(batch1) + txBatches.AddBatch(batch2) + txBatches.AddBatch(batch3) + txBatches.AddBatch(batch4) + + req, err := storage.GetUpdateOffsetsInTransactionRequest(tx) + + require.NoError(t, err) + require.NotNil(t, req) + require.Len(t, req.Topics, 2) + + topicMap := buildTopicOffsetsMap(req.Topics) + assert.Contains(t, topicMap, "topic-1") + assert.Contains(t, topicMap, "topic-2") + require.Len(t, topicMap["topic-1"].Partitions, 2) + require.Len(t, topicMap["topic-2"].Partitions, 2) + + partition1Map := buildPartitionMap(topicMap["topic-1"].Partitions) + assert.Contains(t, partition1Map, int64(1)) + assert.Contains(t, partition1Map, int64(2)) + + partition2Map := buildPartitionMap(topicMap["topic-2"].Partitions) + assert.Contains(t, partition2Map, int64(1)) + assert.Contains(t, partition2Map, int64(2)) +} + +func TestBatchTxStorageGetUpdateOffsetsInTransactionRequest_ComplexOptimization(t *testing.T) { + tx := newMockTransactionWrapper("session-1", "tx-1") + batch1 := createTestBatch("topic-1", 1, 10, 20, 1) + batch2 := createTestBatch("topic-1", 1, 20, 30, 1) + batch3 := createTestBatch("topic-1", 1, 30, 40, 1) + batch4 := createTestBatch("topic-1", 1, 50, 60, 1) + + storage := newBatchTxStorage("test-consumer") + txBatches, _ := storage.GetOrCreateTransactionBatches(tx) + txBatches.AddBatch(batch1) + txBatches.AddBatch(batch2) + txBatches.AddBatch(batch3) + txBatches.AddBatch(batch4) + + req, err := storage.GetUpdateOffsetsInTransactionRequest(tx) + + require.NoError(t, err) + require.NotNil(t, req) + require.Len(t, req.Topics, 1) + require.Len(t, req.Topics[0].Partitions, 1) + require.Len(t, req.Topics[0].Partitions[0].PartitionOffsets, 2) + assert.Equal(t, rawtopiccommon.Offset(10), req.Topics[0].Partitions[0].PartitionOffsets[0].Start) + assert.Equal(t, rawtopiccommon.Offset(40), req.Topics[0].Partitions[0].PartitionOffsets[0].End) + assert.Equal(t, rawtopiccommon.Offset(50), req.Topics[0].Partitions[0].PartitionOffsets[1].Start) + assert.Equal(t, rawtopiccommon.Offset(60), req.Topics[0].Partitions[0].PartitionOffsets[1].End) +} + +func TestBatchTxStorageGetUpdateOffsetsInTransactionRequest_MixedPartitionsAndTopics(t *testing.T) { + tx := newMockTransactionWrapper("session-1", "tx-1") + batch1 := createTestBatch("topic-1", 1, 10, 20, 1) + batch2 := createTestBatch("topic-1", 1, 20, 30, 1) + batch3 := createTestBatch("topic-1", 2, 40, 50, 2) + batch4 := createTestBatch("topic-1", 2, 60, 70, 2) + batch5 := createTestBatch("topic-2", 1, 80, 90, 3) + + storage := newBatchTxStorage("test-consumer") + txBatches, _ := storage.GetOrCreateTransactionBatches(tx) + txBatches.AddBatch(batch1) + txBatches.AddBatch(batch2) + txBatches.AddBatch(batch3) + txBatches.AddBatch(batch4) + txBatches.AddBatch(batch5) + + req, err := storage.GetUpdateOffsetsInTransactionRequest(tx) + + require.NoError(t, err) + require.NotNil(t, req) + require.Len(t, req.Topics, 2) + + topicMap := buildTopicOffsetsMap(req.Topics) + assert.Contains(t, topicMap, "topic-1") + require.Len(t, topicMap["topic-1"].Partitions, 2) + assert.Contains(t, topicMap, "topic-2") + require.Len(t, topicMap["topic-2"].Partitions, 1) + + partition1Map := buildPartitionMap(topicMap["topic-1"].Partitions) + require.Len(t, partition1Map[1].PartitionOffsets, 1) // Merged + assert.Equal(t, rawtopiccommon.Offset(10), partition1Map[1].PartitionOffsets[0].Start) + assert.Equal(t, rawtopiccommon.Offset(30), partition1Map[1].PartitionOffsets[0].End) + + require.Len(t, partition1Map[2].PartitionOffsets, 2) // Not merged + assert.Equal(t, rawtopiccommon.Offset(40), partition1Map[2].PartitionOffsets[0].Start) + assert.Equal(t, rawtopiccommon.Offset(50), partition1Map[2].PartitionOffsets[0].End) + assert.Equal(t, rawtopiccommon.Offset(60), partition1Map[2].PartitionOffsets[1].Start) + assert.Equal(t, rawtopiccommon.Offset(70), partition1Map[2].PartitionOffsets[1].End) + + require.Len(t, topicMap["topic-2"].Partitions[0].PartitionOffsets, 1) + assert.Equal(t, rawtopiccommon.Offset(80), topicMap["topic-2"].Partitions[0].PartitionOffsets[0].Start) + assert.Equal(t, rawtopiccommon.Offset(90), topicMap["topic-2"].Partitions[0].PartitionOffsets[0].End) +} + +func TestBatchTxStorageClear(t *testing.T) { + tx := newMockTransactionWrapper("session-1", "tx-1") + batch := createTestBatch("topic-1", 1, 10, 20, 1) + + storage := newBatchTxStorage("test-consumer") + txBatches, _ := storage.GetOrCreateTransactionBatches(tx) + txBatches.AddBatch(batch) + + storage.Clear(tx) + + batches := storage.GetBatches(tx) + assert.Empty(t, batches) + + req, err := storage.GetUpdateOffsetsInTransactionRequest(tx) + assert.Error(t, err) + assert.True(t, errors.Is(err, errNoBatches)) + assert.Nil(t, req) +} + +func TestBatchTxStorageMultipleTransactions(t *testing.T) { + tx1 := newMockTransactionWrapper("session-1", "tx-1") + _ = tx1.UnLazy(context.TODO()) + tx2 := newMockTransactionWrapper("session-2", "tx-2") + _ = tx1.UnLazy(context.TODO()) + batch1 := createTestBatch("topic-1", 1, 10, 20, 1) + batch2 := createTestBatch("topic-2", 2, 30, 40, 2) + + storage := newBatchTxStorage("test-consumer") + txBatches1, _ := storage.GetOrCreateTransactionBatches(tx1) + txBatches1.AddBatch(batch1) + txBatches2, _ := storage.GetOrCreateTransactionBatches(tx2) + txBatches2.AddBatch(batch2) + + batches1 := storage.GetBatches(tx1) + batches2 := storage.GetBatches(tx2) + + require.Len(t, batches1, 1) + assert.Equal(t, batch1, batches1[0]) + require.Len(t, batches2, 1) + assert.Equal(t, batch2, batches2[0]) + + storage.Clear(tx1) + + batches1 = storage.GetBatches(tx1) + assert.Empty(t, batches1) + batches2 = storage.GetBatches(tx2) + require.Len(t, batches2, 1) + assert.Equal(t, batch2, batches2[0]) +} + +// Helper methods for assertions + +func buildTopicMap( + topics []rawtopic.UpdateOffsetsInTransactionRequest_TopicOffsets, +) map[string]bool { + topicMap := make(map[string]bool) + for _, topic := range topics { + topicMap[topic.Path] = true + } + + return topicMap +} + +func buildTopicOffsetsMap( + topics []rawtopic.UpdateOffsetsInTransactionRequest_TopicOffsets, +) map[string]rawtopic.UpdateOffsetsInTransactionRequest_TopicOffsets { + topicMap := make(map[string]rawtopic.UpdateOffsetsInTransactionRequest_TopicOffsets) + for _, topic := range topics { + topicMap[topic.Path] = topic + } + + return topicMap +} + +func buildPartitionMap( + partitions []rawtopic.UpdateOffsetsInTransactionRequest_PartitionOffsets, +) map[int64]rawtopic.UpdateOffsetsInTransactionRequest_PartitionOffsets { + partitionMap := make(map[int64]rawtopic.UpdateOffsetsInTransactionRequest_PartitionOffsets) + for _, partition := range partitions { + partitionMap[partition.PartitionID] = partition + } + + return partitionMap +} + +// Helper methods for test data creation + +func createTestBatch( + topic string, + partitionID int64, + startOffset, endOffset int64, + sessionID int, +) *topicreadercommon.PublicBatch { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + session := topicreadercommon.NewPartitionSession( + ctx, + topic, + partitionID, + 1, + "connection-1", + rawtopicreader.PartitionSessionID(sessionID), + int64(sessionID), + rawtopiccommon.Offset(0), + ) + + batch, _ := topicreadercommon.NewBatch(session, nil) + + commitRange := topicreadercommon.CommitRange{ + CommitOffsetStart: rawtopiccommon.Offset(startOffset), + CommitOffsetEnd: rawtopiccommon.Offset(endOffset), + PartitionSession: session, + } + + return topicreadercommon.BatchSetCommitRangeForTest(batch, commitRange) +} diff --git a/internal/topic/topicreaderinternal/stream_reader_impl.go b/internal/topic/topicreaderinternal/stream_reader_impl.go index e379849db..6f7de9d7c 100644 --- a/internal/topic/topicreaderinternal/stream_reader_impl.go +++ b/internal/topic/topicreaderinternal/stream_reader_impl.go @@ -15,17 +15,13 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/credentials" "github.com/ydb-platform/ydb-go-sdk/v3/internal/background" "github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic" "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon" "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicreader" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawydb" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/operation" "github.com/ydb-platform/ydb-go-sdk/v3/internal/topic/topicreadercommon" "github.com/ydb-platform/ydb-go-sdk/v3/internal/tx" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync" - "github.com/ydb-platform/ydb-go-sdk/v3/retry" "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) @@ -65,6 +61,8 @@ type topicStreamReaderImpl struct { err error started bool closed bool + + batchTxStorage *batchTxStorage } type topicStreamReaderConfig struct { @@ -177,6 +175,8 @@ func newTopicStreamReaderStopped( res.committer.BufferCountTrigger = cfg.CommitterBatchCounterTrigger res.freeBytes <- cfg.BufferSizeProtoBytes + res.batchTxStorage = newBatchTxStorage(cfg.Consumer) + return res } @@ -223,6 +223,25 @@ func (r *topicStreamReaderImpl) PopMessagesBatchTx( return nil, err } +// commitWithTransaction prepares a batch of messages to be committed within a YDB transaction. +// +// This method does not immediately commit the batch. Instead, it registers hooks that will be +// executed when the transaction is committed: +// 1. Materializes the transaction +// 2. Registers a pre-commit hook to update topic offsets +// 3. Registers a completion hook to update local offsets or close the reader on failure +// +// The actual batch commit happens when the transaction is committed and the registered hooks are executed. +// The offset update is performed on the same node where the transaction was initiated to avoid +// coordination errors. All errors from UpdateOffsetsInTransaction are treated as retryable. +// +// Parameters: +// - ctx: context for the operation +// - tx: the YDB transaction to associate the batch commit with +// - batch: the batch of messages to be committed when the transaction commits +// +// Returns an error if transaction materialization fails. Offset update errors are handled +// within the transaction's pre-commit hook and will cause the transaction to fail. func (r *topicStreamReaderImpl) commitWithTransaction( ctx context.Context, tx tx.Transaction, @@ -232,8 +251,21 @@ func (r *topicStreamReaderImpl) commitWithTransaction( return fmt.Errorf("ydb: failed to materialize transaction: %w", err) } - req := r.createUpdateOffsetRequest(ctx, batch, tx) - updateOffsetInTransactionErr := retry.Retry(ctx, func(ctx context.Context) (err error) { + txBatches, isNew := r.batchTxStorage.GetOrCreateTransactionBatches(tx) + txBatches.AddBatch(batch) + if !isNew { + // tx hooks already configured - exiting + return nil + } + + tx.OnBeforeCommit(r.txBeforeCommitFn(tx)) + tx.OnCompleted(r.txOnCompletedFn(ctx, tx)) + + return nil +} + +func (r *topicStreamReaderImpl) txBeforeCommitFn(tx tx.Transaction) tx.OnTransactionBeforeCommit { + return func(ctx context.Context) (err error) { logCtx := r.cfg.BaseContext onDone := trace.TopicOnReaderUpdateOffsetsInTransaction( r.cfg.Trace, @@ -251,47 +283,26 @@ func (r *topicStreamReaderImpl) commitWithTransaction( // Otherwise, errors such as `Database coordinators are unavailable` may occur. ctx = endpoint.WithNodeID(ctx, tx.NodeID()) - err = r.topicClient.UpdateOffsetsInTransaction(ctx, req) - - return err - }) - if updateOffsetInTransactionErr == nil { - r.addOnTransactionCompletedHandler(ctx, tx, batch) - } else { - _ = retry.Retry(ctx, func(ctx context.Context) (err error) { - logCtx := r.cfg.BaseContext - onDone := trace.TopicOnReaderTransactionRollback( - r.cfg.Trace, - &logCtx, - r.readerID, - r.readConnectionID, - tx.SessionID(), - tx, - ) - defer func() { - onDone(err) - }() + req, err := r.batchTxStorage.GetUpdateOffsetsInTransactionRequest(tx) + if err != nil { + if errors.Is(err, errNoBatches) { + return nil + } - return tx.Rollback(ctx) - }) + return xerrors.WithStackTrace(fmt.Errorf("building update offsets request: %w", err)) + } - _ = r.CloseWithError(xcontext.ValueOnly(ctx), xerrors.WithStackTrace(xerrors.Retryable( - fmt.Errorf("ydb: failed add topic offsets in transaction: %w", updateOffsetInTransactionErr), - ))) + err = r.topicClient.UpdateOffsetsInTransaction(ctx, req) + if err != nil { + return xerrors.WithStackTrace(fmt.Errorf("updating offsets in transaction: %w", err)) + } - return updateOffsetInTransactionErr + return nil } - - return nil } -func (r *topicStreamReaderImpl) addOnTransactionCompletedHandler( - ctx context.Context, - tx tx.Transaction, - batch *topicreadercommon.PublicBatch, -) { - commitRange := topicreadercommon.GetCommitRange(batch) - tx.OnCompleted(func(transactionResult error) { +func (r *topicStreamReaderImpl) txOnCompletedFn(ctx context.Context, tx tx.Transaction) tx.OnTransactionCompletedFunc { + return func(err error) { logCtx := r.cfg.BaseContext onDone := trace.TopicOnReaderTransactionCompleted( r.cfg.Trace, @@ -300,50 +311,25 @@ func (r *topicStreamReaderImpl) addOnTransactionCompletedHandler( r.readConnectionID, tx.SessionID(), tx, - transactionResult, + err, ) defer onDone() - if transactionResult == nil { - topicreadercommon.BatchGetPartitionSession(batch).SetCommittedOffsetForward(commitRange.CommitOffsetEnd) - } else { - _ = r.CloseWithError(xcontext.ValueOnly(ctx), xerrors.WithStackTrace(xerrors.Retryable( - fmt.Errorf("ydb: failed batch commit because transaction doesn't committed: %w", transactionResult), - ))) + defer r.batchTxStorage.Clear(tx) + + if err != nil { + // mark error as retryable - for proper reconnector working + err = xerrors.Retryable(err) + + _ = r.CloseWithError(xcontext.ValueOnly(ctx), fmt.Errorf("transaction failed: %w", err)) + + return } - }) -} -func (r *topicStreamReaderImpl) createUpdateOffsetRequest( - ctx context.Context, - batch *topicreadercommon.PublicBatch, - tx tx.Transaction, -) *rawtopic.UpdateOffsetsInTransactionRequest { - commitRange := topicreadercommon.GetCommitRange(batch) - - return &rawtopic.UpdateOffsetsInTransactionRequest{ - OperationParams: rawydb.NewRawOperationParamsFromProto(operation.Params(ctx, 0, 0, operation.ModeSync)), - Tx: rawtopiccommon.TransactionIdentity{ - ID: tx.ID(), - Session: tx.SessionID(), - }, - Topics: []rawtopic.UpdateOffsetsInTransactionRequest_TopicOffsets{ - { - Path: batch.Topic(), - Partitions: []rawtopic.UpdateOffsetsInTransactionRequest_PartitionOffsets{ - { - PartitionID: batch.PartitionID(), - PartitionOffsets: []rawtopiccommon.OffsetRange{ - { - Start: commitRange.CommitOffsetStart, - End: commitRange.CommitOffsetEnd, - }, - }, - }, - }, - }, - }, - Consumer: r.cfg.Consumer, + for _, batch := range r.batchTxStorage.GetBatches(tx) { + commitRange := topicreadercommon.GetCommitRange(batch) + topicreadercommon.BatchGetPartitionSession(batch).SetCommittedOffsetForward(commitRange.CommitOffsetEnd) + } } } diff --git a/internal/topic/topicreaderinternal/stream_reader_impl_test.go b/internal/topic/topicreaderinternal/stream_reader_impl_test.go index ab8ff03ee..f56b4d16b 100644 --- a/internal/topic/topicreaderinternal/stream_reader_impl_test.go +++ b/internal/topic/topicreaderinternal/stream_reader_impl_test.go @@ -1374,6 +1374,8 @@ func TestUpdateCommitInTransaction(t *testing.T) { require.NoError(t, err) err = e.reader.commitWithTransaction(e.ctx, txMock, batch) require.NoError(t, err) + err = txMock.onBeforeCommit[0](e.ctx) + require.NoError(t, err) require.Len(t, txMock.onCompleted, 1) txMock.onCompleted[0](nil) @@ -1400,14 +1402,17 @@ func TestUpdateCommitInTransaction(t *testing.T) { }) require.NoError(t, err) err = e.reader.commitWithTransaction(e.ctx, txMock, batch) + require.NoError(t, err) + + require.NotEmpty(t, txMock.onBeforeCommit) + err = txMock.onBeforeCommit[0](e.ctx) require.ErrorIs(t, err, testError) - require.NoError(t, xerrors.RetryableError(err)) - require.Empty(t, txMock.onCompleted) + txMock.onCompleted[0](err) require.True(t, e.reader.closed) require.ErrorIs(t, e.reader.err, testError) require.Error(t, xerrors.RetryableError(e.reader.err)) - require.True(t, txMock.RolledBack) + require.False(t, txMock.RolledBack, "Rollback is an responsibility of tx itself") require.True(t, txMock.materialized) }) t.Run("UpdateOffsetsInTransaction must be executed on the tx Node", func(t *testing.T) { @@ -1432,5 +1437,8 @@ func TestUpdateCommitInTransaction(t *testing.T) { err = e.reader.commitWithTransaction(e.ctx, txMock, batch) require.NoError(t, err) + + err = txMock.onBeforeCommit[0](e.ctx) + require.NoError(t, err) }) } diff --git a/internal/topic/topicreaderinternal/transaction_mock_test.go b/internal/topic/topicreaderinternal/transaction_mock_test.go index a4bbe5e3a..687b80440 100644 --- a/internal/topic/topicreaderinternal/transaction_mock_test.go +++ b/internal/topic/topicreaderinternal/transaction_mock_test.go @@ -24,6 +24,7 @@ type mockTransaction struct { sessionID string nodeID uint32 onCompleted []tx.OnTransactionCompletedFunc + onBeforeCommit []tx.OnTransactionBeforeCommit RolledBack bool } @@ -43,6 +44,7 @@ func (m *mockTransaction) NodeID() uint32 { } func (m *mockTransaction) OnBeforeCommit(f tx.OnTransactionBeforeCommit) { + m.onBeforeCommit = append(m.onBeforeCommit, f) } func (m *mockTransaction) OnCompleted(f tx.OnTransactionCompletedFunc) { diff --git a/tests/integration/topic_read_update_offsets_test.go b/tests/integration/topic_read_update_offsets_test.go new file mode 100644 index 000000000..a43ddb8e8 --- /dev/null +++ b/tests/integration/topic_read_update_offsets_test.go @@ -0,0 +1,154 @@ +//go:build integration +// +build integration + +package integration + +import ( + "context" + "strings" + "sync" + "testing" + + "github.com/stretchr/testify/require" + "github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query" + + "github.com/ydb-platform/ydb-go-sdk/v3" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/tx" + "github.com/ydb-platform/ydb-go-sdk/v3/query" + "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader" + "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicsugar" + "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicwriter" +) + +func TestSingleTransaction(t *testing.T) { + scope := newScope(t) + writer := scope.TopicWriter() + reader := scope.TopicReader() + driver := scope.Driver() + ctx := scope.Ctx + + writeMessage(t, ctx, writer, "1") + + var batch *topicreader.Batch + + var once sync.Once + err := driver.Query().DoTx(ctx, func(ctx context.Context, tr query.TxActor) error { + once.Do(func() { + deleteTxSession(t, ctx, driver, tr.(tx.Transaction)) + }) + + var err error + batch, err = reader.PopMessagesBatchTx(ctx, tr) + return err + }) + + require.NoError(t, err) + require.Len(t, batch.Messages, 1) + msgEqualString(t, "1", batch.Messages[0]) +} + +func TestSeveralReads(t *testing.T) { + scope := newScope(t) + writer := scope.TopicWriter() + reader := scope.TopicReader() + driver := scope.Driver() + ctx := scope.Ctx + + writeMessage(t, ctx, writer, "1") + + err := driver.Query().DoTx(ctx, func(ctx context.Context, tr query.TxActor) error { + _, err := reader.PopMessagesBatchTx(ctx, tr) + require.NoError(t, err) + + writeMessage(t, ctx, writer, "2") + + _, err = reader.PopMessagesBatchTx(ctx, tr) + require.NoError(t, err) + + writeMessage(t, ctx, writer, "3") + + return nil + }) + require.NoError(t, err) + + msg, err := reader.ReadMessage(ctx) + require.NoError(t, err) + msgEqualString(t, "3", msg) +} + +func TestSeveralTransactions(t *testing.T) { + scope := newScope(t) + writer := scope.TopicWriter() + reader := scope.TopicReader() + driver := scope.Driver() + ctx := scope.Ctx + + writeMessage(t, ctx, writer, "1") + + var ( + once1 sync.Once + once2 sync.Once + ) + + err := driver.Query().DoTx(ctx, func(ctx context.Context, tr query.TxActor) error { + once1.Do(func() { + deleteTxSession(t, ctx, driver, tr.(tx.Transaction)) + }) + + _, err := reader.PopMessagesBatchTx(ctx, tr) + return err + }) + require.NoError(t, err) + + writeMessage(t, ctx, writer, "2") + + var batch *topicreader.Batch + err = driver.Query().DoTx(ctx, func(ctx context.Context, tr query.TxActor) error { + once2.Do(func() { + deleteTxSession(t, ctx, driver, tr.(tx.Transaction)) + }) + + var err error + batch, err = reader.PopMessagesBatchTx(ctx, tr) + return err + }) + require.NoError(t, err) + + msgEqualString(t, "2", batch.Messages[0]) +} + +// Helper methods + +func writeMessage(t *testing.T, ctx context.Context, writer *topicwriter.Writer, msg string) { + t.Helper() + err := writer.Write(ctx, topicwriter.Message{Data: strings.NewReader(msg)}) + require.NoError(t, err) +} + +func msgEqualString(t *testing.T, expected string, msg *topicreader.Message) { + t.Helper() + + var actual string + + topicsugar.ReadMessageDataWithCallback(msg, func(data []byte) error { + actual = string(data) + return nil + }) + + require.Equal(t, expected, actual) +} + +func deleteTxSession(t *testing.T, ctx context.Context, driver *ydb.Driver, tx tx.Transaction) { + t.Helper() + deleteSession(t, ctx, driver, tx.SessionID()) +} + +func deleteSession(t *testing.T, ctx context.Context, driver *ydb.Driver, sessionID string) { + t.Helper() + _, err := Ydb_Query_V1.NewQueryServiceClient(ydb.GRPCConn(driver)). + DeleteSession(ctx, &Ydb_Query.DeleteSessionRequest{ + SessionId: sessionID, + }) + require.NoError(t, err) +}