diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java index 11e83add51..e228b76e6b 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java @@ -53,6 +53,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; /** * {@link TransactionRunner} that automatically handles "UNIMPLEMENTED" errors with the message @@ -315,6 +316,18 @@ public void close() { */ private final AtomicBoolean unimplemented = new AtomicBoolean(false); + /** + * This flag is set to true if create session RPC is in progress. This flag prevents application + * from firing two requests concurrently + */ + private final AtomicBoolean retryingSessionCreation = new AtomicBoolean(false); + + /** + * This lock is used to prevent two threads from retrying createSession RPC requests in + * concurrently. + */ + private final ReentrantLock sessionCreationLock = new ReentrantLock(); + /** * This flag is set to true if the server return UNIMPLEMENTED when a read-write transaction is * executed on a multiplexed session. TODO: Remove once this is guaranteed to be available. @@ -358,11 +371,20 @@ public void close() { SettableApiFuture.create(); this.readWriteBeginTransactionReferenceFuture = SettableApiFuture.create(); this.multiplexedSessionReference = new AtomicReference<>(initialSessionReferenceFuture); + asyncCreateMultiplexedSession(initialSessionReferenceFuture); + maybeWaitForSessionCreation( + sessionClient.getSpanner().getOptions().getSessionPoolOptions(), + initialSessionReferenceFuture); + } + + private void asyncCreateMultiplexedSession( + SettableApiFuture sessionReferenceFuture) { this.sessionClient.asyncCreateMultiplexedSession( new SessionConsumer() { @Override public void onSessionReady(SessionImpl session) { - initialSessionReferenceFuture.set(session.getSessionReference()); + retryingSessionCreation.set(false); + sessionReferenceFuture.set(session.getSessionReference()); // only start the maintainer if we actually managed to create a session in the first // place. maintainer.start(); @@ -394,13 +416,11 @@ public void onSessionReady(SessionImpl session) { public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount) { // Mark multiplexes sessions as unimplemented and fall back to regular sessions if // UNIMPLEMENTED is returned. + retryingSessionCreation.set(false); maybeMarkUnimplemented(t); - initialSessionReferenceFuture.setException(t); + sessionReferenceFuture.setException(t); } }); - maybeWaitForSessionCreation( - sessionClient.getSpanner().getOptions().getSessionPoolOptions(), - initialSessionReferenceFuture); } void setPool(SessionPool pool) { @@ -546,10 +566,38 @@ MultiplexedSessionMaintainer getMaintainer() { return this.maintainer; } + ApiFuture getCurrentSessionReferenceFuture() { + System.out.println("Accessing Multiplexed Session " + Instant.now()); + return ApiFutures.catchingAsync( + this.multiplexedSessionReference.get(), + Throwable.class, + (throwable) -> { + maybeRetrySessionCreation(); + return this.multiplexedSessionReference.get(); + }, + MoreExecutors.directExecutor()); + } + + private void maybeRetrySessionCreation() { + sessionCreationLock.lock(); + try { + if (isValid() + && isMultiplexedSessionsSupported() + && retryingSessionCreation.compareAndSet(false, true)) { + System.out.println("Retrying Multiplexed Session " + Instant.now()); + SettableApiFuture settableApiFuture = SettableApiFuture.create(); + asyncCreateMultiplexedSession(settableApiFuture); + multiplexedSessionReference.set(settableApiFuture); + } + } finally { + sessionCreationLock.unlock(); + } + } + @VisibleForTesting SessionReference getCurrentSessionReference() { try { - return this.multiplexedSessionReference.get().get(); + return getCurrentSessionReferenceFuture().get(); } catch (ExecutionException executionException) { throw SpannerExceptionFactory.asSpannerException(executionException.getCause()); } catch (InterruptedException interruptedException) { @@ -587,28 +635,22 @@ private DatabaseClient createMultiplexedSessionTransaction(boolean singleUse) { private MultiplexedSessionTransaction createDirectMultiplexedSessionTransaction( boolean singleUse) { - try { - return new MultiplexedSessionTransaction( - this, - tracer.getCurrentSpan(), - // Getting the result of the SettableApiFuture that contains the multiplexed session will - // also automatically propagate any error that happened during the creation of the - // session, such as for example a DatabaseNotFound exception. We therefore do not need - // any special handling of such errors. - multiplexedSessionReference.get().get(), - singleUse ? getSingleUseChannelHint() : NO_CHANNEL_HINT, - singleUse, - this.pool); - } catch (ExecutionException executionException) { - throw SpannerExceptionFactory.asSpannerException(executionException.getCause()); - } catch (InterruptedException interruptedException) { - throw SpannerExceptionFactory.propagateInterrupt(interruptedException); - } + return new MultiplexedSessionTransaction( + this, + tracer.getCurrentSpan(), + // Getting the result of the SettableApiFuture that contains the multiplexed session will + // also automatically propagate any error that happened during the creation of the + // session, such as for example a DatabaseNotFound exception. We therefore do not need + // any special handling of such errors. + getCurrentSessionReference(), + singleUse ? getSingleUseChannelHint() : NO_CHANNEL_HINT, + singleUse, + this.pool); } private DelayedMultiplexedSessionTransaction createDelayedMultiplexSessionTransaction() { return new DelayedMultiplexedSessionTransaction( - this, tracer.getCurrentSpan(), multiplexedSessionReference.get(), this.pool); + this, tracer.getCurrentSpan(), getCurrentSessionReferenceFuture(), this.pool); } private int getSingleUseChannelHint() { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java index a1852b9088..d34ff840ae 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java @@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import io.opentelemetry.api.common.Attributes; +import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -271,6 +272,7 @@ void createMultiplexedSession(SessionConsumer consumer) { * GRPC channel. In case of an error during the gRPC calls, an exception will be thrown. */ SessionImpl createMultiplexedSession() { + System.out.println("Creating Multiplexed Session " + Instant.now()); ISpan span = spanner .getTracer() diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index 7b85c8e772..4602867572 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -3202,7 +3202,8 @@ public void testDatabaseOrInstanceDoesNotExistOnCreate() { mockSpanner.unfreeze(); assertThrows(ResourceNotFoundException.class, rs::next); // The server should only receive one BatchCreateSessions request. - assertThat(mockSpanner.getRequests()).hasSize(1); + // If multiplexed session used, it will be retried once so 2 CreateSession requests + assertThat(mockSpanner.getRequests()).hasSize(useMultiplexedSession ? 2 : 1); } assertThrows( ResourceNotFoundException.class, @@ -3221,9 +3222,9 @@ public void testDatabaseOrInstanceDoesNotExistOnCreate() { } else { // Note that in case of the use of regular sessions, then we have 1 request: // BatchCreateSessions for the session pool. - // Note that in case of the use of multiplexed sessions for read-write, then we have 1 - // request: CreateSession for the multiplexed session. - assertThat(mockSpanner.getRequests()).hasSize(1); + // Note that in case of the use of multiplexed sessions for read-write, then we have 3 + // requests: CreateSession for the multiplexed session. + assertThat(mockSpanner.getRequests()).hasSize(useMultiplexedSession ? 3 : 1); } } } @@ -3413,7 +3414,9 @@ public void testGetInvalidatedClientMultipleTimes() { if (spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSession()) { // We should only receive 1 CreateSession request. The query should never be executed, // as the session creation fails before it gets to executing a query. - assertEquals(1, mockSpanner.countRequestsOfType(CreateSessionRequest.class)); + assertEquals( + 2 + (2 * run) + useClient, + mockSpanner.countRequestsOfType(CreateSessionRequest.class)); assertEquals(0, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)); } else { // The server should only receive one BatchCreateSessions request for each run as we diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index 0448656475..8f6dc6588b 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -54,6 +54,8 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -245,6 +247,175 @@ public void testUnimplementedErrorOnCreation_fallsBackToRegularSessions() { assertEquals(0L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get()); } + @Test + public void testDeadlineExceededErrorWithOneRetry() { + // Setting up two exceptions + mockSpanner.setCreateSessionExecutionTime( + SimulatedExecutionTime.ofExceptions( + Arrays.asList( + Status.DEADLINE_EXCEEDED + .withDescription( + "CallOptions deadline exceeded after 22.986872393s. " + + "Name resolution delay 6.911918521 seconds. [closed=[], " + + "open=[[connecting_and_lb_delay=32445014148ns, was_still_waiting]]]") + .asRuntimeException(), + Status.DEADLINE_EXCEEDED + .withDescription( + "CallOptions deadline exceeded after 22.986872393s. " + + "Name resolution delay 6.911918521 seconds. [closed=[], " + + "open=[[connecting_and_lb_delay=32445014148ns, was_still_waiting]]]") + .asRuntimeException()))); + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + assertNotNull(client.multiplexedSessionDatabaseClient); + + // initial fetch call fails with exception + // this call will try to fetch it again which again throws an exception + assertThrows( + SpannerException.class, + () -> { + try (ResultSet resultSet = client.singleUse().executeQuery(STATEMENT)) { + //noinspection StatementWithEmptyBody + while (resultSet.next()) { + // ignore + } + } + }); + + // When third request comes it should succeed + try (ResultSet resultSet = client.singleUse().executeQuery(STATEMENT)) { + //noinspection StatementWithEmptyBody + while (resultSet.next()) { + // ignore + } + } + + // Verify that we received one ExecuteSqlRequest, and that it used a multiplexed session. + assertEquals(1, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)); + List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); + + Session session = mockSpanner.getSession(requests.get(0).getSession()); + assertNotNull(session); + assertTrue(session.getMultiplexed()); + + assertNotNull(client.multiplexedSessionDatabaseClient); + assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get()); + assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get()); + } + + @Test + public void testDeadlineExceededErrorWithOneRetryWithParallelRequests() + throws InterruptedException { + System.out.println( + "---------------------------------------------------------------------------"); + mockSpanner.setCreateSessionExecutionTime( + SimulatedExecutionTime.ofExceptions( + Arrays.asList( + Status.DEADLINE_EXCEEDED + .withDescription( + "CallOptions deadline exceeded after 22.986872393s. " + + "Name resolution delay 6.911918521 seconds. [closed=[], " + + "open=[[connecting_and_lb_delay=32445014148ns, was_still_waiting]]]") + .asRuntimeException(), + Status.DEADLINE_EXCEEDED + .withDescription( + "CallOptions deadline exceeded after 22.986872393s. " + + "Name resolution delay 6.911918521 seconds. [closed=[], " + + "open=[[connecting_and_lb_delay=32445014148ns, was_still_waiting]]]") + .asRuntimeException()))); + + SpannerOptions spannerOptions = + SpannerOptions.newBuilder() + .setProjectId("test-project") + .setCredentials(NoCredentials.getInstance()) + .setChannelProvider(channelProvider) + .setSessionPoolOption( + SessionPoolOptions.newBuilder() + .setMultiplexedSessionMaintenanceDuration(Duration.ofMinutes(10)) + .build()) + .build(); + + Spanner testSpanner = spannerOptions.getService(); + DatabaseClientImpl client = + (DatabaseClientImpl) testSpanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + assertNotNull(client.multiplexedSessionDatabaseClient); + + ExecutorService executor = Executors.newCachedThreadPool(); + + // First set of request should fail with an error + CountDownLatch failureCountDownLatch = new CountDownLatch(3); + for (int i = 0; i < 3; i++) { + if (i == 0) { + mockSpanner.freeze(); + } + executor.submit( + () -> { + try { + try (ResultSet resultSet = client.singleUse().executeQuery(STATEMENT)) { + //noinspection StatementWithEmptyBody + while (resultSet.next()) { + // ignore + } + } + } catch (SpannerException e) { + failureCountDownLatch.countDown(); + } + }); + if (i == 2) { + Thread.sleep(1000); + mockSpanner.unfreeze(); + } + } + + failureCountDownLatch.await(2, TimeUnit.SECONDS); + assertEquals(0, failureCountDownLatch.getCount()); + + // Second set of requests should pass + CountDownLatch countDownLatch = new CountDownLatch(3); + for (int i = 0; i < 3; i++) { + if (i == 0) { + mockSpanner.freeze(); + } + executor.submit( + () -> { + try { + try (ResultSet resultSet = client.singleUse().executeQuery(STATEMENT)) { + //noinspection StatementWithEmptyBody + while (resultSet.next()) { + // ignore + } + } + } catch (SpannerException e) { + countDownLatch.countDown(); + } + }); + if (i == 2) { + Thread.sleep(1000); + mockSpanner.unfreeze(); + } + } + + countDownLatch.await(2, TimeUnit.SECONDS); + assertEquals(3, countDownLatch.getCount()); + + // Verify that we received 3 ExecuteSqlRequest, and that it used a multiplexed session. + assertEquals(3, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)); + List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); + + Session session = mockSpanner.getSession(requests.get(0).getSession()); + assertNotNull(session); + assertTrue(session.getMultiplexed()); + + assertNotNull(client.multiplexedSessionDatabaseClient); + assertEquals(3L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get()); + assertEquals(3L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get()); + + testSpanner.close(); + + System.out.println( + "---------------------------------------------------------------------------"); + } + @Test public void testUnimplementedErrorOnCreation_firstReceivesError_secondFallsBackToRegularSessions() { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java index ce1bb87ed4..aabdab480d 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java @@ -339,7 +339,7 @@ public void testNoNetworkConnection() { // Attempt count should have a failed metric point for CreateSession. assertEquals( - 1, getAggregatedValue(attemptCountMetricData, expectedAttributesCreateSessionFailed), 0); + 2, getAggregatedValue(attemptCountMetricData, expectedAttributesCreateSessionFailed), 0); assertTrue( checkIfMetricExists(metricReader, BuiltInMetricsConstant.GFE_CONNECTIVITY_ERROR_NAME)); assertTrue(