Skip to content

Commit 8433105

Browse files
authored
update offset bugfix (#1968)
1 parent cb82c65 commit 8433105

File tree

7 files changed

+883
-83
lines changed

7 files changed

+883
-83
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
* Added support for `Timestamp64` type in `value.Any` converter
22
* Masked the sensitive credential data in the connection string (DSN, data source name) from error messages for security reasons
3+
* Fixed issue with topic offsets update in transactions
34

45
## v3.121.0
56
* Changed internal pprof label to pyroscope supported format
Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
package topicreaderinternal
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
8+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic"
9+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon"
10+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicreader"
11+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawydb"
12+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/operation"
13+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/topic/topicreadercommon"
14+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/tx"
15+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
16+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
17+
)
18+
19+
// errNoBatches is returned when there are no batches to process for a transaction.
20+
var errNoBatches = errors.New("no batches for transaction")
21+
22+
// transactionBatches stores batches for a single transaction.
23+
// It is not thread-safe and should be accessed only through batchTxStorage methods.
24+
type transactionBatches struct {
25+
batches []*topicreadercommon.PublicBatch
26+
}
27+
28+
// AddBatch adds a batch to the transaction.
29+
func (tb *transactionBatches) AddBatch(batch *topicreadercommon.PublicBatch) {
30+
tb.batches = append(tb.batches, batch)
31+
}
32+
33+
// GetBatches returns all batches stored for this transaction.
34+
func (tb *transactionBatches) GetBatches() []*topicreadercommon.PublicBatch {
35+
return tb.batches
36+
}
37+
38+
// batchTxStorage stores batches associated with transactions for commit within transaction.
39+
// It is thread-safe and allows multiple transactions to be managed concurrently.
40+
type batchTxStorage struct {
41+
transactions map[string]*transactionBatches
42+
consumer string
43+
m xsync.Mutex
44+
}
45+
46+
// newBatchTxStorage creates a new batch transaction storage with the given consumer name.
47+
// The consumer name is used when building UpdateOffsetsInTransactionRequest.
48+
func newBatchTxStorage(consumer string) *batchTxStorage {
49+
return &batchTxStorage{
50+
transactions: make(map[string]*transactionBatches),
51+
consumer: consumer,
52+
}
53+
}
54+
55+
// GetOrCreateTransactionBatches gets or creates a transaction batches handler for the given transaction.
56+
// This method is thread-safe.
57+
func (s *batchTxStorage) GetOrCreateTransactionBatches(
58+
transaction tx.Transaction,
59+
) (batches *transactionBatches, created bool) {
60+
s.m.Lock()
61+
defer s.m.Unlock()
62+
63+
txID := transaction.ID()
64+
txBatches, exists := s.transactions[txID]
65+
if !exists {
66+
txBatches = &transactionBatches{
67+
batches: make([]*topicreadercommon.PublicBatch, 0),
68+
}
69+
s.transactions[txID] = txBatches
70+
created = true
71+
}
72+
73+
return txBatches, created
74+
}
75+
76+
// GetBatches returns all batches stored for the given transaction.
77+
// Returns an empty slice (nil) if no batches are stored for the transaction.
78+
// This method is thread-safe.
79+
func (s *batchTxStorage) GetBatches(transaction tx.Transaction) []*topicreadercommon.PublicBatch {
80+
s.m.Lock()
81+
defer s.m.Unlock()
82+
83+
txBatches, ok := s.transactions[transaction.ID()]
84+
if !ok {
85+
return nil
86+
}
87+
88+
return txBatches.GetBatches()
89+
}
90+
91+
// GetUpdateOffsetsInTransactionRequest builds an UpdateOffsetsInTransactionRequest
92+
// from all batches stored for the given transaction.
93+
// The batches are converted to commit ranges, optimized (adjacent ranges are merged),
94+
// and grouped by topic and partition.
95+
// Returns nil, nil if no batches are stored for the transaction.
96+
// Returns an error if session info is missing for any partition offset.
97+
// This method is thread-safe.
98+
func (s *batchTxStorage) GetUpdateOffsetsInTransactionRequest(
99+
transaction tx.Transaction,
100+
) (*rawtopic.UpdateOffsetsInTransactionRequest, error) {
101+
s.m.Lock()
102+
txBatches, ok := s.transactions[transaction.ID()]
103+
s.m.Unlock()
104+
105+
if !ok {
106+
return nil, xerrors.WithStackTrace(errNoBatches)
107+
}
108+
109+
batches := txBatches.GetBatches()
110+
if len(batches) == 0 {
111+
return nil, xerrors.WithStackTrace(errNoBatches)
112+
}
113+
114+
// Convert batches to CommitRanges
115+
commitRanges := topicreadercommon.NewCommitRangesWithCapacity(len(batches))
116+
for _, batch := range batches {
117+
commitRange := topicreadercommon.GetCommitRange(batch)
118+
commitRanges.AppendCommitRange(commitRange)
119+
}
120+
121+
// Optimize ranges (merge adjacent ranges)
122+
commitRanges.Optimize()
123+
124+
// Build sessionID -> (topic, partitionID) map for efficient lookup
125+
sessionInfoMap := s.buildSessionInfoMap(batches)
126+
127+
// Convert to partition offsets
128+
partitionOffsets := commitRanges.ToPartitionsOffsets()
129+
if len(partitionOffsets) == 0 {
130+
return nil, xerrors.WithStackTrace(errNoBatches)
131+
}
132+
133+
// Group partition offsets by topic
134+
topicMap, err := s.buildPartitionOffsetsMap(partitionOffsets, sessionInfoMap)
135+
if err != nil {
136+
return nil, err
137+
}
138+
if len(topicMap) == 0 {
139+
return nil, xerrors.WithStackTrace(errNoBatches)
140+
}
141+
142+
// Build request
143+
return s.buildUpdateOffsetsRequest(transaction, topicMap), nil
144+
}
145+
146+
type sessionInfo struct {
147+
topic string
148+
partitionID int64
149+
}
150+
151+
// buildSessionInfoMap creates a map from partition session ID to topic and partition ID.
152+
func (s *batchTxStorage) buildSessionInfoMap(
153+
batches []*topicreadercommon.PublicBatch,
154+
) map[rawtopicreader.PartitionSessionID]sessionInfo {
155+
sessionInfoMap := make(map[rawtopicreader.PartitionSessionID]sessionInfo)
156+
for _, batch := range batches {
157+
commitRange := topicreadercommon.GetCommitRange(batch)
158+
sessionID := commitRange.PartitionSession.StreamPartitionSessionID
159+
if _, exists := sessionInfoMap[sessionID]; !exists {
160+
sessionInfoMap[sessionID] = sessionInfo{
161+
topic: commitRange.PartitionSession.Topic,
162+
partitionID: commitRange.PartitionSession.PartitionID,
163+
}
164+
}
165+
}
166+
167+
return sessionInfoMap
168+
}
169+
170+
// buildPartitionOffsetsMap groups partition offsets by topic.
171+
func (s *batchTxStorage) buildPartitionOffsetsMap(
172+
partitionOffsets []rawtopicreader.PartitionCommitOffset,
173+
sessionInfoMap map[rawtopicreader.PartitionSessionID]sessionInfo,
174+
) (map[string][]rawtopic.UpdateOffsetsInTransactionRequest_PartitionOffsets, error) {
175+
topicMap := make(map[string][]rawtopic.UpdateOffsetsInTransactionRequest_PartitionOffsets)
176+
for i := range partitionOffsets {
177+
po := &partitionOffsets[i]
178+
info, ok := sessionInfoMap[po.PartitionSessionID]
179+
if !ok {
180+
return nil, xerrors.WithStackTrace(
181+
fmt.Errorf("session info not found for partition session ID %d", po.PartitionSessionID),
182+
)
183+
}
184+
185+
topicMap[info.topic] = append(topicMap[info.topic], rawtopic.UpdateOffsetsInTransactionRequest_PartitionOffsets{
186+
PartitionID: info.partitionID,
187+
PartitionOffsets: po.Offsets,
188+
})
189+
}
190+
191+
return topicMap, nil
192+
}
193+
194+
// buildUpdateOffsetsRequest creates the final UpdateOffsetsInTransactionRequest.
195+
func (s *batchTxStorage) buildUpdateOffsetsRequest(
196+
transaction tx.Transaction,
197+
topicMap map[string][]rawtopic.UpdateOffsetsInTransactionRequest_PartitionOffsets,
198+
) *rawtopic.UpdateOffsetsInTransactionRequest {
199+
req := &rawtopic.UpdateOffsetsInTransactionRequest{
200+
OperationParams: rawydb.NewRawOperationParamsFromProto(
201+
operation.Params(context.Background(), 0, 0, operation.ModeSync),
202+
),
203+
Tx: rawtopiccommon.TransactionIdentity{
204+
ID: transaction.ID(),
205+
Session: transaction.SessionID(),
206+
},
207+
Consumer: s.consumer,
208+
Topics: make([]rawtopic.UpdateOffsetsInTransactionRequest_TopicOffsets, 0, len(topicMap)),
209+
}
210+
211+
for path, partitions := range topicMap {
212+
req.Topics = append(req.Topics, rawtopic.UpdateOffsetsInTransactionRequest_TopicOffsets{
213+
Path: path,
214+
Partitions: partitions,
215+
})
216+
}
217+
218+
return req
219+
}
220+
221+
// Clear removes all batches stored for the given transaction.
222+
// After calling Clear, GetBatches and GetUpdateOffsetsInTransactionRequest
223+
// will return empty results for this transaction.
224+
// This method is thread-safe.
225+
func (s *batchTxStorage) Clear(transaction tx.Transaction) {
226+
s.m.Lock()
227+
defer s.m.Unlock()
228+
229+
delete(s.transactions, transaction.ID())
230+
}

0 commit comments

Comments
 (0)