Skip to content

Commit 5f8e21f

Browse files
authored
feat(datastore): Adding BeginLater and transaction state (#8984)
* feat(datastore): Adding BeginLater and transaction state * feat(datastore): Use parseReadOptions instead of validateReadOptions * feat(datastore): Revert validate method * feat(datastore): Correct TODO formatting
1 parent 63673f3 commit 5f8e21f

File tree

2 files changed

+43
-12
lines changed

2 files changed

+43
-12
lines changed

datastore/query_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -763,8 +763,8 @@ func TestReadOptions(t *testing.T) {
763763
}
764764
// Test errors.
765765
for _, q := range []*Query{
766-
NewQuery("").Transaction(&Transaction{id: nil}),
767-
NewQuery("").Transaction(&Transaction{id: tid}).EventualConsistency(),
766+
NewQuery("").Transaction(&Transaction{id: nil, state: transactionStateExpired}),
767+
NewQuery("").Transaction(&Transaction{id: tid, state: transactionStateInProgress}).EventualConsistency(),
768768
} {
769769
req := &pb.RunQueryRequest{}
770770
if err := q.toRunQueryRequest(req); err == nil {

datastore/transaction.go

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,14 @@ type transactionSettings struct {
3737
readOnly bool
3838
prevID []byte // ID of the transaction to retry
3939
readTime *timestamppb.Timestamp
40+
41+
// When set, skips the initial BeginTransaction RPC call to obtain txn id and
42+
// uses the piggybacked txn id from first read rpc call.
43+
// If there are no read operations on transaction, BeginTransaction RPC call is made
44+
// before rollback or commit
45+
// Currently, this setting is set but unused
46+
// TODO: b/291258189 - Use this setting
47+
beginLater bool
4048
}
4149

4250
// newTransactionSettings creates a transactionSettings with a given TransactionOption slice.
@@ -91,6 +99,7 @@ var ReadOnly TransactionOption
9199

92100
func init() {
93101
ReadOnly = readOnly{}
102+
BeginLater = beginLater{}
94103
}
95104

96105
type readOnly struct{}
@@ -99,6 +108,25 @@ func (readOnly) apply(s *transactionSettings) {
99108
s.readOnly = true
100109
}
101110

111+
// BeginLater is a TransactionOption that can be used to improve transaction performance
112+
// Currently, it is a no-op
113+
// TODO: b/291258189 - Add implementation
114+
var BeginLater TransactionOption
115+
116+
type beginLater struct{}
117+
118+
func (beginLater) apply(s *transactionSettings) {
119+
s.beginLater = true
120+
}
121+
122+
type transactionState int
123+
124+
const (
125+
transactionStateNotStarted transactionState = iota // Currently unused
126+
transactionStateInProgress
127+
transactionStateExpired
128+
)
129+
102130
// Transaction represents a set of datastore operations to be committed atomically.
103131
//
104132
// Operations are enqueued by calling the Put and Delete methods on Transaction
@@ -114,6 +142,8 @@ type Transaction struct {
114142
ctx context.Context
115143
mutations []*pb.Mutation // The mutations to apply.
116144
pending map[int]*PendingKey // Map from mutation index to incomplete keys pending transaction completion.
145+
settings *transactionSettings
146+
state transactionState
117147
}
118148

119149
// NewTransaction starts a new transaction.
@@ -167,6 +197,8 @@ func (c *Client) newTransaction(ctx context.Context, s *transactionSettings) (_
167197
client: c,
168198
mutations: nil,
169199
pending: make(map[int]*PendingKey),
200+
state: transactionStateInProgress,
201+
settings: s,
170202
}, nil
171203
}
172204

@@ -223,7 +255,7 @@ func (t *Transaction) Commit() (c *Commit, err error) {
223255
t.ctx = trace.StartSpan(t.ctx, "cloud.google.com/go/datastore.Transaction.Commit")
224256
defer func() { trace.EndSpan(t.ctx, err) }()
225257

226-
if t.id == nil {
258+
if t.state == transactionStateExpired {
227259
return nil, errExpiredTransaction
228260
}
229261
req := &pb.CommitRequest{
@@ -237,7 +269,7 @@ func (t *Transaction) Commit() (c *Commit, err error) {
237269
if status.Code(err) == codes.Aborted {
238270
return nil, ErrConcurrentTransaction
239271
}
240-
t.id = nil // mark the transaction as expired
272+
t.state = transactionStateExpired // mark the transaction as expired
241273
if err != nil {
242274
return nil, err
243275
}
@@ -264,15 +296,14 @@ func (t *Transaction) Rollback() (err error) {
264296
t.ctx = trace.StartSpan(t.ctx, "cloud.google.com/go/datastore.Transaction.Rollback")
265297
defer func() { trace.EndSpan(t.ctx, err) }()
266298

267-
if t.id == nil {
299+
if t.state == transactionStateExpired {
268300
return errExpiredTransaction
269301
}
270-
id := t.id
271-
t.id = nil
302+
t.state = transactionStateExpired
272303
_, err = t.client.client.Rollback(t.ctx, &pb.RollbackRequest{
273304
ProjectId: t.client.dataset,
274305
DatabaseId: t.client.databaseID,
275-
Transaction: id,
306+
Transaction: t.id,
276307
})
277308
return err
278309
}
@@ -303,7 +334,7 @@ func (t *Transaction) GetMulti(keys []*Key, dst interface{}) (err error) {
303334
t.ctx = trace.StartSpan(t.ctx, "cloud.google.com/go/datastore.Transaction.GetMulti")
304335
defer func() { trace.EndSpan(t.ctx, err) }()
305336

306-
if t.id == nil {
337+
if t.state == transactionStateExpired {
307338
return errExpiredTransaction
308339
}
309340
opts := &pb.ReadOptions{
@@ -336,7 +367,7 @@ func (t *Transaction) Put(key *Key, src interface{}) (*PendingKey, error) {
336367
// element of src in the same order.
337368
// TODO(jba): rewrite in terms of Mutate.
338369
func (t *Transaction) PutMulti(keys []*Key, src interface{}) (ret []*PendingKey, err error) {
339-
if t.id == nil {
370+
if t.state == transactionStateExpired {
340371
return nil, errExpiredTransaction
341372
}
342373
mutations, err := putMutations(keys, src)
@@ -376,7 +407,7 @@ func (t *Transaction) Delete(key *Key) error {
376407
// DeleteMulti is a batch version of Delete.
377408
// TODO(jba): rewrite in terms of Mutate.
378409
func (t *Transaction) DeleteMulti(keys []*Key) (err error) {
379-
if t.id == nil {
410+
if t.state == transactionStateExpired {
380411
return errExpiredTransaction
381412
}
382413
mutations, err := deleteMutations(keys)
@@ -396,7 +427,7 @@ func (t *Transaction) DeleteMulti(keys []*Key) (err error) {
396427
//
397428
// For an example, see Client.Mutate.
398429
func (t *Transaction) Mutate(muts ...*Mutation) ([]*PendingKey, error) {
399-
if t.id == nil {
430+
if t.state == transactionStateExpired {
400431
return nil, errExpiredTransaction
401432
}
402433
pmuts, err := mutationProtos(muts)

0 commit comments

Comments
 (0)