Skip to content

Commit

Permalink
feat(datastore): Adding BeginLater transaction option (#8972)
Browse files Browse the repository at this point in the history
* feat(datastore): new_transaction consistency
  • Loading branch information
bhshkh committed Apr 23, 2024
1 parent 9a69fb4 commit 4067f4e
Show file tree
Hide file tree
Showing 7 changed files with 876 additions and 75 deletions.
6 changes: 4 additions & 2 deletions datastore/datastore.go
Expand Up @@ -398,7 +398,8 @@ func (c *Client) Get(ctx context.Context, key *Key, dst interface{}) (err error)
}
}

// TODO: Use transaction ID returned by get
// Since opts does not contain Transaction option, 'get' call below will return nil
// as transaction id which can be ignored
_, err = c.get(ctx, []*Key{key}, []interface{}{dst}, opts)
if me, ok := err.(MultiError); ok {
return me[0]
Expand Down Expand Up @@ -432,7 +433,8 @@ func (c *Client) GetMulti(ctx context.Context, keys []*Key, dst interface{}) (er
}
}

// TODO: Use transaction ID returned by get
// Since opts does not contain Transaction option, 'get' call below will return nil
// as transaction id which can be ignored
_, err = c.get(ctx, keys, dst, opts)
return err
}
Expand Down
250 changes: 250 additions & 0 deletions datastore/integration_test.go
Expand Up @@ -753,6 +753,256 @@ func TestIntegration_Filters(t *testing.T) {
})
}

func populateData(t *testing.T, client *Client, childrenCount int, time int64, testKey string) ([]*Key, *Key, func()) {
ctx := context.Background()
parent := NameKey("SQParent", keyPrefix+testKey+suffix, nil)

children := []*SQChild{}

for i := 0; i < childrenCount; i++ {
children = append(children, &SQChild{I: i, T: time, U: time, V: 1.5, W: "str"})
}
keys := make([]*Key, childrenCount)
for i := range keys {
keys[i] = NameKey("SQChild", "sqChild"+fmt.Sprint(i), parent)
}
keys, err := client.PutMulti(ctx, keys, children)
if err != nil {
t.Fatalf("client.PutMulti: %v", err)
}

cleanup := func() {
err := client.DeleteMulti(ctx, keys)
if err != nil {
t.Errorf("client.DeleteMulti: %v", err)
}
}
return keys, parent, cleanup
}

type RunTransactionResult struct {
runTime float64
err error
}

func TestIntegration_BeginLaterPerf(t *testing.T) {
if testing.Short() {
t.Skip("Integration tests skipped in short mode")
}
runOptions := []bool{true, false} // whether BeginLater transaction option is used
var avgRunTimes [2]float64 // In seconds
numRepetitions := 10
numKeys := 10

res := make(chan RunTransactionResult)
for i, runOption := range runOptions {
sumRunTime := float64(0)

// Create client
ctx := context.Background()
client := newTestClient(ctx, t)
defer client.Close()

// Populate data
now := timeNow.Truncate(time.Millisecond).Unix()
keys, _, cleanupData := populateData(t, client, numKeys, now, "BeginLaterPerf"+fmt.Sprint(runOption)+fmt.Sprint(now))
defer cleanupData()

for rep := 0; rep < numRepetitions; rep++ {
go runTransaction(ctx, client, keys, res, runOption, t)
}
for rep := 0; rep < numRepetitions; rep++ {
runTransactionResult := <-res
if runTransactionResult.err != nil {
t.Fatal(runTransactionResult.err)
}
sumRunTime += runTransactionResult.runTime
}

avgRunTimes[i] = sumRunTime / float64(numRepetitions)
}
improvement := ((avgRunTimes[1] - avgRunTimes[0]) / avgRunTimes[1]) * 100
if improvement < 0 {
t.Logf("Run times:: with BeginLater: %.3fs, without BeginLater: %.3fs. improvement: %.2f%%", avgRunTimes[0], avgRunTimes[1], improvement)
t.Fatal("No perf improvement because of new transaction consistency type.")
}
}

func runTransaction(ctx context.Context, client *Client, keys []*Key, res chan RunTransactionResult, beginLater bool, t *testing.T) {

numKeys := len(keys)
txOpts := []TransactionOption{}
if beginLater {
txOpts = append(txOpts, BeginLater)
}

start := time.Now()
// Create transaction
tx, err := client.NewTransaction(ctx, txOpts...)
if err != nil {
runTransactionResult := RunTransactionResult{
err: fmt.Errorf("Failed to create transaction: %v", err),
}
res <- runTransactionResult
return
}

// Perform operations in transaction
dst := make([]*SQChild, numKeys)
if err := tx.GetMulti(keys, dst); err != nil {
runTransactionResult := RunTransactionResult{
err: fmt.Errorf("GetMulti got: %v, want: nil", err),
}
res <- runTransactionResult
return
}
if _, err := tx.PutMulti(keys, dst); err != nil {
runTransactionResult := RunTransactionResult{
err: fmt.Errorf("PutMulti got: %v, want: nil", err),
}
res <- runTransactionResult
return
}

// Commit the transaction
if _, err := tx.Commit(); err != nil {
runTransactionResult := RunTransactionResult{
err: fmt.Errorf("Commit got: %v, want: nil", err),
}
res <- runTransactionResult
return
}

runTransactionResult := RunTransactionResult{
runTime: time.Since(start).Seconds(),
}
res <- runTransactionResult
}

func TestIntegration_BeginLater(t *testing.T) {
if testing.Short() {
t.Skip("Integration tests skipped in short mode")
}
ctx := context.Background()
client := newTestClient(ctx, t)
defer client.Close()

wantAggResult := AggregationResult(map[string]interface{}{
"count": &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: 3}},
"sum": &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: 3}},
"avg": &pb.Value{ValueType: &pb.Value_DoubleValue{DoubleValue: 1}},
})

mockErr := errors.New("Mock error")
testcases := []struct {
desc string
options []TransactionOption
hasReadOnlyOption bool
failTransaction bool
}{
{
desc: "Failed transaction with BeginLater, MaxAttempts(2), ReadOnly options",
options: []TransactionOption{BeginLater, MaxAttempts(2), ReadOnly},
hasReadOnlyOption: true,
failTransaction: true,
},
{
desc: "BeginLater, MaxAttempts(2), ReadOnly",
options: []TransactionOption{BeginLater, MaxAttempts(2), ReadOnly},
hasReadOnlyOption: true,
failTransaction: false,
},
{
desc: "BeginLater, MaxAttempts(2)",
options: []TransactionOption{BeginLater, MaxAttempts(2)},
hasReadOnlyOption: false,
},
{
desc: "BeginLater, ReadOnly",
options: []TransactionOption{BeginLater, ReadOnly},
hasReadOnlyOption: true,
},
}

for _, testcase := range testcases {
// Populate data
now := timeNow.Truncate(time.Millisecond).Unix()
keys, parent, cleanupData := populateData(t, client, 3, now, "BeginLater")

testutil.Retry(t, 5, 10*time.Second, func(r *testutil.R) {
_, err := client.RunInTransaction(ctx, func(tx *Transaction) error {
query := NewQuery("SQChild").Ancestor(parent).FilterField("T", "=", now).Transaction(tx)
dst := []*SQChild{}
if _, err := client.GetAll(ctx, query, &dst); err != nil {
return err
}

aggQuery := query.NewAggregationQuery().
WithCount("count").
WithSum("I", "sum").
WithAvg("I", "avg")
gotAggResult, err := client.RunAggregationQuery(ctx, aggQuery)
if err != nil {
return err
}
if !reflect.DeepEqual(gotAggResult, wantAggResult) {
return fmt.Errorf("Mismatch in aggregation result got: %+v, want: %+v", gotAggResult, wantAggResult)
}

if !testcase.hasReadOnlyOption {
v := &SQChild{I: 22, T: now, U: now, V: 1.5, W: "str"}
if _, err := tx.Put(keys[0], v); err != nil {
return err
}

if err := tx.Delete(keys[1]); err != nil {
return err
}
}
if testcase.failTransaction {
// Deliberately, fail the transaction to rollback it
return mockErr
}
return nil
}, testcase.options...)

if !testcase.failTransaction {
if err != nil {
r.Errorf("%v got: %v, want: nil", testcase.desc, err)
}
if !testcase.hasReadOnlyOption {
// Transactions are atomic. Check if Put and Delete succeeded ensuring they were run as transaction
verifyBeginLater(r, testcase.desc+" Committed Put", client, parent, now, 22, 1)
verifyBeginLater(r, testcase.desc+" Committed Delete", client, parent, now, 1, 0)
}
} else {
if err == nil {
r.Errorf("%v got: nil, want: %v", testcase.desc, mockErr)
}
if !testcase.hasReadOnlyOption {
// Transactions are atomic. Check if Put and Delete rollbacked ensuring they were run as transaction
verifyBeginLater(r, testcase.desc+" Rollbacked Put", client, parent, now, 22, 0)
verifyBeginLater(r, testcase.desc+" Rollbacked Delete", client, parent, now, 1, 1)
}
}
})
cleanupData()
}
}

func verifyBeginLater(r *testutil.R, errPrefix string, client *Client, parent *Key, tvalue int64, ivalue, wantDstLen int) {
ctx := context.Background()
query := NewQuery("SQChild").Ancestor(parent).FilterField("T", "=", tvalue).FilterField("I", "=", ivalue)
dst := []*SQChild{}
_, err := client.GetAll(ctx, query, &dst)
if err != nil {
r.Errorf("%v GetAll got: %v, want: nil", errPrefix, err)
}
if len(dst) != wantDstLen {
r.Errorf("%v len(dst) got: %v, want: %v", errPrefix, len(dst), wantDstLen)
}
}

func TestIntegration_AggregationQueriesInTransaction(t *testing.T) {
ctx := context.Background()
client := newTestClient(ctx, t)
Expand Down
30 changes: 29 additions & 1 deletion datastore/mock_test.go
Expand Up @@ -49,6 +49,10 @@ type reqItem struct {
adjust func(gotReq proto.Message)
}

const (
mockProjectID = "projectID"
)

func newMock(t *testing.T) (_ *Client, _ *mockServer, _ func()) {
srv, cleanup, err := newMockServer()
if err != nil {
Expand All @@ -59,7 +63,7 @@ func newMock(t *testing.T) (_ *Client, _ *mockServer, _ func()) {
if err != nil {
t.Fatal(err)
}
client, err := NewClient(context.Background(), "projectID", option.WithGRPCConn(conn))
client, err := NewClient(context.Background(), mockProjectID, option.WithGRPCConn(conn))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -153,3 +157,27 @@ func (s *mockServer) Commit(_ context.Context, in *pb.CommitRequest) (*pb.Commit
}
return res.(*pb.CommitResponse), nil
}

func (s *mockServer) BeginTransaction(ctx context.Context, in *pb.BeginTransactionRequest) (*pb.BeginTransactionResponse, error) {
res, err := s.popRPC(in)
if err != nil {
return nil, err
}
return res.(*pb.BeginTransactionResponse), nil
}

func (s *mockServer) RunQuery(ctx context.Context, in *pb.RunQueryRequest) (*pb.RunQueryResponse, error) {
res, err := s.popRPC(in)
if err != nil {
return nil, err
}
return res.(*pb.RunQueryResponse), nil
}

func (s *mockServer) RunAggregationQuery(ctx context.Context, in *pb.RunAggregationQueryRequest) (*pb.RunAggregationQueryResponse, error) {
res, err := s.popRPC(in)
if err != nil {
return nil, err
}
return res.(*pb.RunAggregationQueryResponse), nil
}

0 comments on commit 4067f4e

Please sign in to comment.