Skip to content

Commit 2cba13b

Browse files
authored
feat(spanner): add option for how to call BeginTransaction (#12436)
* feat(spanner): add option for how to call BeginTransaction Adds an option to the Spanner client for how BeginTransaction should be called. The default for read/write transactions that are executed in a transaction runner, is to use inlined-begin. That is; the BeginTransaction is inlined with the first statement in the transaction. This reduces the number of round-trips to Spanner by 1. The default for statement-based read/write transactions and read-only transactions is to use an explicit BeginTransaction RPC invocation. This requires one additional round-trip to Spanner, but can be more efficient if the application wants to execute multiple queries in parallel as the first statements in the transaction. This change introduces an explicit option that can be used to change the above defaults for a single transaction. This is especially useful for statement-based read/write transactions, as these are used by the database/sql driver, and currently requires an additional round-trip for every transaction. * chore: address review comments
1 parent 72225a5 commit 2cba13b

File tree

6 files changed

+829
-106
lines changed

6 files changed

+829
-106
lines changed

spanner/client.go

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1114,13 +1114,34 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea
11141114
// Some operations (for ex BatchUpdate) can be long-running. For such operations set the isLongRunningTransaction flag to be true
11151115
t.setSessionEligibilityForLongRunning(sh)
11161116
}
1117-
if t.shouldExplicitBegin(attempt) {
1117+
initTx := func(t *ReadWriteTransaction) {
1118+
t.txReadOnly.sp = c.idleSessions
1119+
t.txReadOnly.txReadEnv = t
1120+
t.txReadOnly.qo = c.qo
1121+
t.txReadOnly.ro = c.ro
1122+
t.txReadOnly.disableRouteToLeader = c.disableRouteToLeader
1123+
t.wb = []*Mutation{}
1124+
t.txOpts = c.txo.merge(options)
1125+
t.ct = c.ct
1126+
t.otConfig = c.otConfig
1127+
}
1128+
if t.shouldExplicitBegin(attempt, options) {
1129+
if t == nil {
1130+
t = &ReadWriteTransaction{
1131+
txReadyOrClosed: make(chan struct{}),
1132+
}
1133+
}
1134+
initTx(t)
11181135
// Make sure we set the current session handle before calling BeginTransaction.
11191136
// Note that the t.begin(ctx) call could change the session that is being used by the transaction, as the
11201137
// BeginTransaction RPC invocation will be retried on a new session if it returns SessionNotFound.
11211138
t.txReadOnly.sh = sh
11221139
if err = t.begin(ctx, nil); err != nil {
1123-
trace.TracePrintf(ctx, nil, "Error while BeginTransaction during retrying a ReadWrite transaction: %v", ToSpannerError(err))
1140+
if attempt > 0 {
1141+
trace.TracePrintf(ctx, nil, "Error while BeginTransaction during retrying a ReadWrite transaction: %v", ToSpannerError(err))
1142+
} else {
1143+
trace.TracePrintf(ctx, nil, "Error during the initial BeginTransaction for a ReadWrite transaction: %v", ToSpannerError(err))
1144+
}
11241145
return ToSpannerError(err)
11251146
}
11261147
} else {
@@ -1133,17 +1154,9 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea
11331154
previousTx: previousTx,
11341155
}
11351156
t.txReadOnly.sh = sh
1157+
initTx(t)
11361158
}
11371159
attempt++
1138-
t.txReadOnly.sp = c.idleSessions
1139-
t.txReadOnly.txReadEnv = t
1140-
t.txReadOnly.qo = c.qo
1141-
t.txReadOnly.ro = c.ro
1142-
t.txReadOnly.disableRouteToLeader = c.disableRouteToLeader
1143-
t.wb = []*Mutation{}
1144-
t.txOpts = c.txo.merge(options)
1145-
t.ct = c.ct
1146-
t.otConfig = c.otConfig
11471160

11481161
trace.TracePrintf(ctx, map[string]interface{}{"transactionSelector": t.getTransactionSelector().String()},
11491162
"Starting transaction attempt")

spanner/client_test.go

Lines changed: 75 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1801,18 +1801,24 @@ func TestClient_ReadWriteTransaction(t *testing.T) {
18011801
}
18021802
}
18031803

1804-
func validateIsolationLevelForRWTransactions(t *testing.T, server *MockedSpannerInMemTestServer, expected sppb.TransactionOptions_IsolationLevel) {
1804+
func validateIsolationLevelForRWTransactions(t *testing.T, server *MockedSpannerInMemTestServer, expected sppb.TransactionOptions_IsolationLevel, beginTransactionOption BeginTransactionOption) {
18051805
found := false
18061806
requests := drainRequestsFromServer(server.TestSpanner)
18071807
for _, req := range requests {
18081808
switch sqlReq := req.(type) {
18091809
case *sppb.ExecuteSqlRequest:
1810+
if beginTransactionOption == ExplicitBeginTransaction {
1811+
t.Fatalf("got TransactionOptions on ExecuteSqlRequest in combination with ExplicitBeginTransaction")
1812+
}
18101813
found = true
18111814
if sqlReq.GetTransaction().GetBegin().GetIsolationLevel() != expected {
18121815
t.Fatalf("Invalid IsolationLevel\n Expected: %v\n Got: %v\n", expected, sqlReq.GetTransaction().GetBegin().GetIsolationLevel())
18131816
}
18141817
break
18151818
case *sppb.BeginTransactionRequest:
1819+
if beginTransactionOption == InlinedBeginTransaction {
1820+
t.Fatalf("got BeginTransaction RPC in combination with InlinedBeginTransaction")
1821+
}
18161822
found = true
18171823
if sqlReq.GetOptions().GetIsolationLevel() != expected {
18181824
t.Fatalf("Invalid IsolationLevel\n Expected: %v\n Got: %v\n", expected, sqlReq.GetOptions().GetIsolationLevel())
@@ -1843,7 +1849,7 @@ func TestClient_ReadWriteTransactionWithNoIsolationLevelForRWTransactionAtClient
18431849
t.Fatal(err)
18441850
}
18451851
defer teardown()
1846-
validateIsolationLevelForRWTransactions(t, server, sppb.TransactionOptions_ISOLATION_LEVEL_UNSPECIFIED)
1852+
validateIsolationLevelForRWTransactions(t, server, sppb.TransactionOptions_ISOLATION_LEVEL_UNSPECIFIED, InlinedBeginTransaction)
18471853
}
18481854

18491855
func TestClient_ReadWriteTransactionWithIsolationLevelForRWTransactionAtClientConfig(t *testing.T) {
@@ -1853,7 +1859,7 @@ func TestClient_ReadWriteTransactionWithIsolationLevelForRWTransactionAtClientCo
18531859
t.Fatal(err)
18541860
}
18551861
defer teardown()
1856-
validateIsolationLevelForRWTransactions(t, server, sppb.TransactionOptions_REPEATABLE_READ)
1862+
validateIsolationLevelForRWTransactions(t, server, sppb.TransactionOptions_REPEATABLE_READ, InlinedBeginTransaction)
18571863
}
18581864

18591865
func TestClient_ReadWriteTransactionWithIsolationLevelForRWTransactionAtTransactionLevel(t *testing.T) {
@@ -1863,7 +1869,7 @@ func TestClient_ReadWriteTransactionWithIsolationLevelForRWTransactionAtTransact
18631869
t.Fatal(err)
18641870
}
18651871
defer teardown()
1866-
validateIsolationLevelForRWTransactions(t, server, sppb.TransactionOptions_REPEATABLE_READ)
1872+
validateIsolationLevelForRWTransactions(t, server, sppb.TransactionOptions_REPEATABLE_READ, InlinedBeginTransaction)
18671873
}
18681874

18691875
func TestClient_ReadWriteTransactionWithIsolationLevelForRWTransactionAtTransactionLevelWithAbort(t *testing.T) {
@@ -1882,7 +1888,7 @@ func TestClient_ReadWriteTransactionWithIsolationLevelForRWTransactionAtTransact
18821888
if err != nil {
18831889
t.Fatal(err)
18841890
}
1885-
validateIsolationLevelForRWTransactions(t, server, sppb.TransactionOptions_REPEATABLE_READ)
1891+
validateIsolationLevelForRWTransactions(t, server, sppb.TransactionOptions_REPEATABLE_READ, InlinedBeginTransaction)
18861892
}
18871893

18881894
func TestClient_ApplyMutationsWithAtLeastOnceIsolationLevel(t *testing.T) {
@@ -1897,7 +1903,7 @@ func TestClient_ApplyMutationsWithAtLeastOnceIsolationLevel(t *testing.T) {
18971903
if err != nil {
18981904
t.Fatal(err)
18991905
}
1900-
validateIsolationLevelForRWTransactions(t, server, sppb.TransactionOptions_REPEATABLE_READ)
1906+
validateIsolationLevelForRWTransactions(t, server, sppb.TransactionOptions_REPEATABLE_READ, ExplicitBeginTransaction)
19011907
}
19021908

19031909
func TestClient_ApplyMutationsWithIsolationLevel(t *testing.T) {
@@ -1912,61 +1918,111 @@ func TestClient_ApplyMutationsWithIsolationLevel(t *testing.T) {
19121918
if err != nil {
19131919
t.Fatal(err)
19141920
}
1915-
validateIsolationLevelForRWTransactions(t, server, sppb.TransactionOptions_SERIALIZABLE)
1921+
validateIsolationLevelForRWTransactions(t, server, sppb.TransactionOptions_SERIALIZABLE, ExplicitBeginTransaction)
19161922
}
19171923

1918-
func TestClient_ReadWriteStmtBasedTransactionWithIsolationLevelAtTransactionLevel(t *testing.T) {
1924+
func consumeIterator(iter *RowIterator) error {
1925+
defer iter.Stop()
1926+
for {
1927+
_, err := iter.Next()
1928+
if errors.Is(err, iterator.Done) {
1929+
break
1930+
}
1931+
if err != nil {
1932+
return err
1933+
}
1934+
}
1935+
return nil
1936+
}
1937+
1938+
func TestClient_ReadWriteStmtBasedTransactionWithIsolationLevelAtTransactionLevelWithExplicitBegin(t *testing.T) {
1939+
t.Parallel()
1940+
testClientReadWriteStmtBasedTransactionWithIsolationLevelAtTransactionLevel(t, ExplicitBeginTransaction)
1941+
}
1942+
1943+
func TestClient_ReadWriteStmtBasedTransactionWithIsolationLevelAtTransactionLevelWithInlineBegin(t *testing.T) {
1944+
t.Parallel()
1945+
testClientReadWriteStmtBasedTransactionWithIsolationLevelAtTransactionLevel(t, InlinedBeginTransaction)
1946+
}
1947+
1948+
func testClientReadWriteStmtBasedTransactionWithIsolationLevelAtTransactionLevel(t *testing.T, beginTransactionOption BeginTransactionOption) {
19191949
server, client, teardown := setupMockedTestServer(t)
19201950
defer teardown()
19211951
ctx := context.Background()
19221952
tx, err := NewReadWriteStmtBasedTransactionWithOptions(
19231953
ctx,
19241954
client,
1925-
TransactionOptions{IsolationLevel: sppb.TransactionOptions_REPEATABLE_READ})
1955+
TransactionOptions{IsolationLevel: sppb.TransactionOptions_REPEATABLE_READ, BeginTransactionOption: beginTransactionOption})
19261956
if err != nil {
19271957
t.Fatalf("Unexpected error when creating transaction: %v", err)
19281958
}
19291959

19301960
iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
1931-
defer iter.Stop()
1961+
if err := consumeIterator(iter); err != nil {
1962+
t.Fatal(err)
1963+
}
1964+
1965+
validateIsolationLevelForRWTransactions(t, server, sppb.TransactionOptions_REPEATABLE_READ, beginTransactionOption)
1966+
}
1967+
1968+
func TestClient_ReadWriteStmtBasedTransactionWithIsolationLevelAtClientConfigLevelWithExplicitBegin(t *testing.T) {
1969+
t.Parallel()
1970+
testClientReadWriteStmtBasedTransactionWithIsolationLevelAtClientConfigLevel(t, ExplicitBeginTransaction)
1971+
}
19321972

1933-
validateIsolationLevelForRWTransactions(t, server, sppb.TransactionOptions_REPEATABLE_READ)
1973+
func TestClient_ReadWriteStmtBasedTransactionWithIsolationLevelAtClientConfigLevelWithInlineBegin(t *testing.T) {
1974+
t.Parallel()
1975+
testClientReadWriteStmtBasedTransactionWithIsolationLevelAtClientConfigLevel(t, InlinedBeginTransaction)
19341976
}
19351977

1936-
func TestClient_ReadWriteStmtBasedTransactionWithIsolationLevelAtClientConfigLevel(t *testing.T) {
1978+
func testClientReadWriteStmtBasedTransactionWithIsolationLevelAtClientConfigLevel(t *testing.T, beginTransactionOption BeginTransactionOption) {
19371979
server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{TransactionOptions: TransactionOptions{IsolationLevel: sppb.TransactionOptions_SERIALIZABLE}})
19381980
defer teardown()
19391981
ctx := context.Background()
19401982
tx, err := NewReadWriteStmtBasedTransactionWithOptions(
19411983
ctx,
19421984
client,
1943-
TransactionOptions{})
1985+
TransactionOptions{BeginTransactionOption: beginTransactionOption})
19441986
if err != nil {
19451987
t.Fatalf("Unexpected error when creating transaction: %v", err)
19461988
}
19471989

19481990
iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
1949-
defer iter.Stop()
1991+
if err := consumeIterator(iter); err != nil {
1992+
t.Fatal(err)
1993+
}
1994+
1995+
validateIsolationLevelForRWTransactions(t, server, sppb.TransactionOptions_SERIALIZABLE, beginTransactionOption)
1996+
}
1997+
1998+
func TestClient_ReadWriteStmtBasedTransactionWithNoIsolationLevelWithExplicitBegin(t *testing.T) {
1999+
t.Parallel()
2000+
testClientReadWriteStmtBasedTransactionWithNoIsolationLevel(t, ExplicitBeginTransaction)
2001+
}
19502002

1951-
validateIsolationLevelForRWTransactions(t, server, sppb.TransactionOptions_SERIALIZABLE)
2003+
func TestClient_ReadWriteStmtBasedTransactionWithNoIsolationLevelWithInlineBegin(t *testing.T) {
2004+
t.Parallel()
2005+
testClientReadWriteStmtBasedTransactionWithNoIsolationLevel(t, InlinedBeginTransaction)
19522006
}
19532007

1954-
func TestClient_ReadWriteStmtBasedTransactionWithNoIsolationLevel(t *testing.T) {
2008+
func testClientReadWriteStmtBasedTransactionWithNoIsolationLevel(t *testing.T, beginTransactionOption BeginTransactionOption) {
19552009
server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{TransactionOptions: TransactionOptions{}})
19562010
defer teardown()
19572011
ctx := context.Background()
19582012
tx, err := NewReadWriteStmtBasedTransactionWithOptions(
19592013
ctx,
19602014
client,
1961-
TransactionOptions{})
2015+
TransactionOptions{BeginTransactionOption: beginTransactionOption})
19622016
if err != nil {
19632017
t.Fatalf("Unexpected error when creating transaction: %v", err)
19642018
}
19652019

19662020
iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
1967-
defer iter.Stop()
2021+
if err := consumeIterator(iter); err != nil {
2022+
t.Fatal(err)
2023+
}
19682024

1969-
validateIsolationLevelForRWTransactions(t, server, sppb.TransactionOptions_ISOLATION_LEVEL_UNSPECIFIED)
2025+
validateIsolationLevelForRWTransactions(t, server, sppb.TransactionOptions_ISOLATION_LEVEL_UNSPECIFIED, beginTransactionOption)
19702026
}
19712027

19722028
func TestClient_ReadWriteTransactionCommitAborted(t *testing.T) {

spanner/read.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ func (r *RowIterator) Next() (*Row, error) {
191191
// if request contains TransactionSelector::Begin option, this is here as fallback to retry with
192192
// explicit transactionID after a retry.
193193
r.setTransactionID(nil)
194-
r.err = errInlineBeginTransactionFailed()
194+
r.err = r.updateTxState(errInlineBeginTransactionFailed(nil))
195195
return nil, r.err
196196
}
197197
r.setTransactionID = nil

spanner/session.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2006,7 +2006,7 @@ func isFailedInlineBeginTransaction(err error) bool {
20062006
if err == nil {
20072007
return false
20082008
}
2009-
return ErrCode(err) == codes.Internal && strings.Contains(err.Error(), errInlineBeginTransactionFailed().Error())
2009+
return ErrCode(err) == codes.Internal && strings.Contains(err.Error(), errInlineBeginTransactionFailedMsg)
20102010
}
20112011

20122012
// isClientClosing returns true if the given error is a

0 commit comments

Comments
 (0)