@@ -225,26 +225,6 @@ func (d *TxnDatastore) Batch(ctx context.Context) (ds.Batch, error) {
225225 }, nil
226226}
227227
228- type transformBatch struct {
229- dst ds.Batch
230-
231- f KeyMapping
232- }
233-
234- var _ ds.Batch = (* transformBatch )(nil )
235-
236- func (t * transformBatch ) Put (ctx context.Context , key ds.Key , val []byte ) error {
237- return t .dst .Put (ctx , t .f (key ), val )
238- }
239-
240- func (t * transformBatch ) Delete (ctx context.Context , key ds.Key ) error {
241- return t .dst .Delete (ctx , t .f (key ))
242- }
243-
244- func (t * transformBatch ) Commit (ctx context.Context ) error {
245- return t .dst .Commit (ctx )
246- }
247-
248228func (d * TxnDatastore ) Check (ctx context.Context ) error {
249229 if c , ok := d .child .(ds.CheckedDatastore ); ok {
250230 return c .Check (ctx )
@@ -267,5 +247,169 @@ func (d *TxnDatastore) CollectGarbage(ctx context.Context) error {
267247}
268248
269249func (d * TxnDatastore ) NewTransaction (ctx context.Context , readOnly bool ) (ds.Txn , error ) {
270- panic ("implement me" )
250+ childTxn , err := d .child .NewTransaction (ctx , readOnly )
251+ if err != nil {
252+ return nil , err
253+ }
254+ return & txnWrapper {child : childTxn , KeyTransform : d .KeyTransform }, nil
255+ }
256+
257+ type txnWrapper struct {
258+ child ds.Txn
259+
260+ KeyTransform
261+ }
262+
263+ var _ ds.Txn = (* txnWrapper )(nil )
264+
265+ func (t * txnWrapper ) Get (ctx context.Context , key ds.Key ) (value []byte , err error ) {
266+ return t .child .Get (ctx , t .ConvertKey (key ))
267+ }
268+
269+ func (t * txnWrapper ) Has (ctx context.Context , key ds.Key ) (exists bool , err error ) {
270+ return t .child .Has (ctx , t .ConvertKey (key ))
271+ }
272+
273+ func (t * txnWrapper ) GetSize (ctx context.Context , key ds.Key ) (size int , err error ) {
274+ return t .child .GetSize (ctx , t .ConvertKey (key ))
275+ }
276+
277+ func (t * txnWrapper ) Query (ctx context.Context , q dsq.Query ) (dsq.Results , error ) {
278+ nq , cq := t .prepareQuery (q )
279+
280+ cqr , err := t .child .Query (ctx , cq )
281+ if err != nil {
282+ return nil , err
283+ }
284+
285+ qr := dsq .ResultsFromIterator (q , dsq.Iterator {
286+ Next : func () (dsq.Result , bool ) {
287+ r , ok := cqr .NextSync ()
288+ if ! ok {
289+ return r , false
290+ }
291+ if r .Error == nil {
292+ r .Entry .Key = t .InvertKey (ds .RawKey (r .Entry .Key )).String ()
293+ }
294+ return r , true
295+ },
296+ Close : func () error {
297+ return cqr .Close ()
298+ },
299+ })
300+ return dsq .NaiveQueryApply (nq , qr ), nil
301+ }
302+
303+ // Split the query into a child query and a naive query. That way, we can make
304+ // the child datastore do as much work as possible.
305+ func (t * txnWrapper ) prepareQuery (q dsq.Query ) (naive , child dsq.Query ) {
306+
307+ // First, put everything in the child query. Then, start taking things
308+ // out.
309+ child = q
310+
311+ // Always let the child handle the key prefix.
312+ child .Prefix = t .ConvertKey (ds .NewKey (child .Prefix )).String ()
313+
314+ // Check if the key transform is order-preserving so we can use the
315+ // child datastore's built-in ordering.
316+ orderPreserving := false
317+ switch t .KeyTransform .(type ) {
318+ case PrefixTransform , * PrefixTransform :
319+ orderPreserving = true
320+ }
321+
322+ // Try to let the child handle ordering.
323+ orders:
324+ for i , o := range child .Orders {
325+ switch o .(type ) {
326+ case dsq.OrderByValue , * dsq.OrderByValue ,
327+ dsq.OrderByValueDescending , * dsq.OrderByValueDescending :
328+ // Key doesn't matter.
329+ continue
330+ case dsq.OrderByKey , * dsq.OrderByKey ,
331+ dsq.OrderByKeyDescending , * dsq.OrderByKeyDescending :
332+ // if the key transform preserves order, we can delegate
333+ // to the child datastore.
334+ if orderPreserving {
335+ // When sorting, we compare with the first
336+ // Order, then, if equal, we compare with the
337+ // second Order, etc. However, keys are _unique_
338+ // so we'll never apply any additional orders
339+ // after ordering by key.
340+ child .Orders = child .Orders [:i + 1 ]
341+ break orders
342+ }
343+ }
344+
345+ // Can't handle this order under transform, punt it to a naive
346+ // ordering.
347+ naive .Orders = q .Orders
348+ child .Orders = nil
349+ naive .Offset = q .Offset
350+ child .Offset = 0
351+ naive .Limit = q .Limit
352+ child .Limit = 0
353+ break
354+ }
355+
356+ // Try to let the child handle the filters.
357+
358+ // don't modify the original filters.
359+ child .Filters = append ([]dsq.Filter (nil ), child .Filters ... )
360+
361+ for i , f := range child .Filters {
362+ switch f := f .(type ) {
363+ case dsq.FilterValueCompare , * dsq.FilterValueCompare :
364+ continue
365+ case dsq.FilterKeyCompare :
366+ child .Filters [i ] = dsq.FilterKeyCompare {
367+ Op : f .Op ,
368+ Key : t .ConvertKey (ds .NewKey (f .Key )).String (),
369+ }
370+ continue
371+ case * dsq.FilterKeyCompare :
372+ child .Filters [i ] = & dsq.FilterKeyCompare {
373+ Op : f .Op ,
374+ Key : t .ConvertKey (ds .NewKey (f .Key )).String (),
375+ }
376+ continue
377+ case dsq.FilterKeyPrefix :
378+ child .Filters [i ] = dsq.FilterKeyPrefix {
379+ Prefix : t .ConvertKey (ds .NewKey (f .Prefix )).String (),
380+ }
381+ continue
382+ case * dsq.FilterKeyPrefix :
383+ child .Filters [i ] = & dsq.FilterKeyPrefix {
384+ Prefix : t .ConvertKey (ds .NewKey (f .Prefix )).String (),
385+ }
386+ continue
387+ }
388+
389+ // Not a known filter, defer to the naive implementation.
390+ naive .Filters = q .Filters
391+ child .Filters = nil
392+ naive .Offset = q .Offset
393+ child .Offset = 0
394+ naive .Limit = q .Limit
395+ child .Limit = 0
396+ break
397+ }
398+ return
399+ }
400+
401+ func (t txnWrapper ) Put (ctx context.Context , key ds.Key , value []byte ) error {
402+ return t .child .Put (ctx , t .ConvertKey (key ), value )
403+ }
404+
405+ func (t txnWrapper ) Delete (ctx context.Context , key ds.Key ) error {
406+ return t .child .Delete (ctx , t .ConvertKey (key ))
407+ }
408+
409+ func (t txnWrapper ) Commit (ctx context.Context ) error {
410+ return t .child .Commit (ctx )
411+ }
412+
413+ func (t txnWrapper ) Discard (ctx context.Context ) {
414+ t .child .Discard (ctx )
271415}
0 commit comments