Skip to content

Commit

Permalink
feat(datastore): Adding BeginLater and transaction state (#8984)
Browse files Browse the repository at this point in the history
* feat(datastore): Adding BeginLater and transaction state

* feat(datastore): Use parseReadOptions instead of validateReadOptions

* feat(datastore): Revert validate method

* feat(datastore): Correct TODO formatting
  • Loading branch information
bhshkh committed Feb 5, 2024
1 parent 63673f3 commit 5f8e21f
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 12 deletions.
4 changes: 2 additions & 2 deletions datastore/query_test.go
Expand Up @@ -763,8 +763,8 @@ func TestReadOptions(t *testing.T) {
}
// Test errors.
for _, q := range []*Query{
NewQuery("").Transaction(&Transaction{id: nil}),
NewQuery("").Transaction(&Transaction{id: tid}).EventualConsistency(),
NewQuery("").Transaction(&Transaction{id: nil, state: transactionStateExpired}),
NewQuery("").Transaction(&Transaction{id: tid, state: transactionStateInProgress}).EventualConsistency(),
} {
req := &pb.RunQueryRequest{}
if err := q.toRunQueryRequest(req); err == nil {
Expand Down
51 changes: 41 additions & 10 deletions datastore/transaction.go
Expand Up @@ -37,6 +37,14 @@ type transactionSettings struct {
readOnly bool
prevID []byte // ID of the transaction to retry
readTime *timestamppb.Timestamp

// When set, skips the initial BeginTransaction RPC call to obtain txn id and
// uses the piggybacked txn id from first read rpc call.
// If there are no read operations on transaction, BeginTransaction RPC call is made
// before rollback or commit
// Currently, this setting is set but unused
// TODO: b/291258189 - Use this setting
beginLater bool
}

// newTransactionSettings creates a transactionSettings with a given TransactionOption slice.
Expand Down Expand Up @@ -91,6 +99,7 @@ var ReadOnly TransactionOption

func init() {
ReadOnly = readOnly{}
BeginLater = beginLater{}
}

type readOnly struct{}
Expand All @@ -99,6 +108,25 @@ func (readOnly) apply(s *transactionSettings) {
s.readOnly = true
}

// BeginLater is a TransactionOption that can be used to improve transaction performance
// Currently, it is a no-op
// TODO: b/291258189 - Add implementation
var BeginLater TransactionOption

type beginLater struct{}

func (beginLater) apply(s *transactionSettings) {
s.beginLater = true
}

type transactionState int

const (
transactionStateNotStarted transactionState = iota // Currently unused
transactionStateInProgress
transactionStateExpired
)

// Transaction represents a set of datastore operations to be committed atomically.
//
// Operations are enqueued by calling the Put and Delete methods on Transaction
Expand All @@ -114,6 +142,8 @@ type Transaction struct {
ctx context.Context
mutations []*pb.Mutation // The mutations to apply.
pending map[int]*PendingKey // Map from mutation index to incomplete keys pending transaction completion.
settings *transactionSettings
state transactionState
}

// NewTransaction starts a new transaction.
Expand Down Expand Up @@ -167,6 +197,8 @@ func (c *Client) newTransaction(ctx context.Context, s *transactionSettings) (_
client: c,
mutations: nil,
pending: make(map[int]*PendingKey),
state: transactionStateInProgress,
settings: s,
}, nil
}

Expand Down Expand Up @@ -223,7 +255,7 @@ func (t *Transaction) Commit() (c *Commit, err error) {
t.ctx = trace.StartSpan(t.ctx, "cloud.google.com/go/datastore.Transaction.Commit")
defer func() { trace.EndSpan(t.ctx, err) }()

if t.id == nil {
if t.state == transactionStateExpired {
return nil, errExpiredTransaction
}
req := &pb.CommitRequest{
Expand All @@ -237,7 +269,7 @@ func (t *Transaction) Commit() (c *Commit, err error) {
if status.Code(err) == codes.Aborted {
return nil, ErrConcurrentTransaction
}
t.id = nil // mark the transaction as expired
t.state = transactionStateExpired // mark the transaction as expired
if err != nil {
return nil, err
}
Expand All @@ -264,15 +296,14 @@ func (t *Transaction) Rollback() (err error) {
t.ctx = trace.StartSpan(t.ctx, "cloud.google.com/go/datastore.Transaction.Rollback")
defer func() { trace.EndSpan(t.ctx, err) }()

if t.id == nil {
if t.state == transactionStateExpired {
return errExpiredTransaction
}
id := t.id
t.id = nil
t.state = transactionStateExpired
_, err = t.client.client.Rollback(t.ctx, &pb.RollbackRequest{
ProjectId: t.client.dataset,
DatabaseId: t.client.databaseID,
Transaction: id,
Transaction: t.id,
})
return err
}
Expand Down Expand Up @@ -303,7 +334,7 @@ func (t *Transaction) GetMulti(keys []*Key, dst interface{}) (err error) {
t.ctx = trace.StartSpan(t.ctx, "cloud.google.com/go/datastore.Transaction.GetMulti")
defer func() { trace.EndSpan(t.ctx, err) }()

if t.id == nil {
if t.state == transactionStateExpired {
return errExpiredTransaction
}
opts := &pb.ReadOptions{
Expand Down Expand Up @@ -336,7 +367,7 @@ func (t *Transaction) Put(key *Key, src interface{}) (*PendingKey, error) {
// element of src in the same order.
// TODO(jba): rewrite in terms of Mutate.
func (t *Transaction) PutMulti(keys []*Key, src interface{}) (ret []*PendingKey, err error) {
if t.id == nil {
if t.state == transactionStateExpired {
return nil, errExpiredTransaction
}
mutations, err := putMutations(keys, src)
Expand Down Expand Up @@ -376,7 +407,7 @@ func (t *Transaction) Delete(key *Key) error {
// DeleteMulti is a batch version of Delete.
// TODO(jba): rewrite in terms of Mutate.
func (t *Transaction) DeleteMulti(keys []*Key) (err error) {
if t.id == nil {
if t.state == transactionStateExpired {
return errExpiredTransaction
}
mutations, err := deleteMutations(keys)
Expand All @@ -396,7 +427,7 @@ func (t *Transaction) DeleteMulti(keys []*Key) (err error) {
//
// For an example, see Client.Mutate.
func (t *Transaction) Mutate(muts ...*Mutation) ([]*PendingKey, error) {
if t.id == nil {
if t.state == transactionStateExpired {
return nil, errExpiredTransaction
}
pmuts, err := mutationProtos(muts)
Expand Down

0 comments on commit 5f8e21f

Please sign in to comment.