Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;

/**
* {@link TransactionRunner} that automatically handles "UNIMPLEMENTED" errors with the message
Expand Down Expand Up @@ -315,6 +316,18 @@ public void close() {
*/
private final AtomicBoolean unimplemented = new AtomicBoolean(false);

/**
* This flag is set to true if create session RPC is in progress. This flag prevents application
* from firing two requests concurrently
*/
private final AtomicBoolean retryingSessionCreation = new AtomicBoolean(false);

/**
* This lock is used to prevent two threads from retrying createSession RPC requests in
* concurrently.
*/
private final ReentrantLock sessionCreationLock = new ReentrantLock();

/**
* This flag is set to true if the server return UNIMPLEMENTED when a read-write transaction is
* executed on a multiplexed session. TODO: Remove once this is guaranteed to be available.
Expand Down Expand Up @@ -358,11 +371,20 @@ public void close() {
SettableApiFuture.create();
this.readWriteBeginTransactionReferenceFuture = SettableApiFuture.create();
this.multiplexedSessionReference = new AtomicReference<>(initialSessionReferenceFuture);
asyncCreateMultiplexedSession(initialSessionReferenceFuture);
maybeWaitForSessionCreation(
sessionClient.getSpanner().getOptions().getSessionPoolOptions(),
initialSessionReferenceFuture);
}

private void asyncCreateMultiplexedSession(
SettableApiFuture<SessionReference> sessionReferenceFuture) {
this.sessionClient.asyncCreateMultiplexedSession(
new SessionConsumer() {
@Override
public void onSessionReady(SessionImpl session) {
initialSessionReferenceFuture.set(session.getSessionReference());
retryingSessionCreation.set(false);
sessionReferenceFuture.set(session.getSessionReference());
// only start the maintainer if we actually managed to create a session in the first
// place.
maintainer.start();
Expand Down Expand Up @@ -394,13 +416,11 @@ public void onSessionReady(SessionImpl session) {
public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount) {
// Mark multiplexes sessions as unimplemented and fall back to regular sessions if
// UNIMPLEMENTED is returned.
retryingSessionCreation.set(false);
maybeMarkUnimplemented(t);
initialSessionReferenceFuture.setException(t);
sessionReferenceFuture.setException(t);
}
});
maybeWaitForSessionCreation(
sessionClient.getSpanner().getOptions().getSessionPoolOptions(),
initialSessionReferenceFuture);
}

void setPool(SessionPool pool) {
Expand Down Expand Up @@ -546,10 +566,36 @@ MultiplexedSessionMaintainer getMaintainer() {
return this.maintainer;
}

ApiFuture<SessionReference> getCurrentSessionReferenceFuture() {
return ApiFutures.catchingAsync(
this.multiplexedSessionReference.get(),
Throwable.class,
(throwable) -> {
maybeRetrySessionCreation();
return this.multiplexedSessionReference.get();
},
MoreExecutors.directExecutor());
}

private void maybeRetrySessionCreation() {
sessionCreationLock.lock();
try {
if (isValid()
&& isMultiplexedSessionsSupported()
&& retryingSessionCreation.compareAndSet(false, true)) {
SettableApiFuture<SessionReference> settableApiFuture = SettableApiFuture.create();
asyncCreateMultiplexedSession(settableApiFuture);
multiplexedSessionReference.set(settableApiFuture);
}
} finally {
sessionCreationLock.unlock();
}
}

@VisibleForTesting
SessionReference getCurrentSessionReference() {
try {
return this.multiplexedSessionReference.get().get();
return getCurrentSessionReferenceFuture().get();
} catch (ExecutionException executionException) {
throw SpannerExceptionFactory.asSpannerException(executionException.getCause());
} catch (InterruptedException interruptedException) {
Expand Down Expand Up @@ -587,28 +633,22 @@ private DatabaseClient createMultiplexedSessionTransaction(boolean singleUse) {

private MultiplexedSessionTransaction createDirectMultiplexedSessionTransaction(
boolean singleUse) {
try {
return new MultiplexedSessionTransaction(
this,
tracer.getCurrentSpan(),
// Getting the result of the SettableApiFuture that contains the multiplexed session will
// also automatically propagate any error that happened during the creation of the
// session, such as for example a DatabaseNotFound exception. We therefore do not need
// any special handling of such errors.
multiplexedSessionReference.get().get(),
singleUse ? getSingleUseChannelHint() : NO_CHANNEL_HINT,
singleUse,
this.pool);
} catch (ExecutionException executionException) {
throw SpannerExceptionFactory.asSpannerException(executionException.getCause());
} catch (InterruptedException interruptedException) {
throw SpannerExceptionFactory.propagateInterrupt(interruptedException);
}
return new MultiplexedSessionTransaction(
this,
tracer.getCurrentSpan(),
// Getting the result of the SettableApiFuture that contains the multiplexed session will
// also automatically propagate any error that happened during the creation of the
// session, such as for example a DatabaseNotFound exception. We therefore do not need
// any special handling of such errors.
getCurrentSessionReference(),
singleUse ? getSingleUseChannelHint() : NO_CHANNEL_HINT,
singleUse,
this.pool);
}

private DelayedMultiplexedSessionTransaction createDelayedMultiplexSessionTransaction() {
return new DelayedMultiplexedSessionTransaction(
this, tracer.getCurrentSpan(), multiplexedSessionReference.get(), this.pool);
this, tracer.getCurrentSpan(), getCurrentSessionReferenceFuture(), this.pool);
}

private int getSingleUseChannelHint() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3411,9 +3411,11 @@ public void testGetInvalidatedClientMultipleTimes() {
ResourceNotFoundException.class,
() -> dbClient.singleUse().executeQuery(SELECT1).next());
if (spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSession()) {
// We should only receive 1 CreateSession request. The query should never be executed,
// We should receive 2 CreateSession request. The query should never be executed,
// as the session creation fails before it gets to executing a query.
assertEquals(1, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
assertEquals(
(2 * run) + useClient + 2,
mockSpanner.countRequestsOfType(CreateSessionRequest.class));
assertEquals(0, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class));
} else {
// The server should only receive one BatchCreateSessions request for each run as we
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
import java.time.Duration;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -245,6 +247,143 @@ public void testUnimplementedErrorOnCreation_fallsBackToRegularSessions() {
assertEquals(0L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
}

@Test
public void testDeadlineExceededErrorWithOneRetry() {
// Setting up two exceptions
mockSpanner.setCreateSessionExecutionTime(
SimulatedExecutionTime.ofExceptions(
Arrays.asList(
Status.DEADLINE_EXCEEDED
.withDescription(
"CallOptions deadline exceeded after 22.986872393s. "
+ "Name resolution delay 6.911918521 seconds. [closed=[], "
+ "open=[[connecting_and_lb_delay=32445014148ns, was_still_waiting]]]")
.asRuntimeException(),
Status.DEADLINE_EXCEEDED
.withDescription(
"CallOptions deadline exceeded after 22.986872393s. "
+ "Name resolution delay 6.911918521 seconds. [closed=[], "
+ "open=[[connecting_and_lb_delay=32445014148ns, was_still_waiting]]]")
.asRuntimeException())));
DatabaseClientImpl client =
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
assertNotNull(client.multiplexedSessionDatabaseClient);

// initial fetch call fails with exception
// this call will try to fetch it again which again throws an exception
assertThrows(
SpannerException.class,
() -> {
try (ResultSet resultSet = client.singleUse().executeQuery(STATEMENT)) {
//noinspection StatementWithEmptyBody
while (resultSet.next()) {
// ignore
}
}
});

// When third request comes it should succeed
try (ResultSet resultSet = client.singleUse().executeQuery(STATEMENT)) {
//noinspection StatementWithEmptyBody
while (resultSet.next()) {
// ignore
}
}

// Verify that we received one ExecuteSqlRequest, and that it used a multiplexed session.
assertEquals(1, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class));
List<ExecuteSqlRequest> requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class);

Session session = mockSpanner.getSession(requests.get(0).getSession());
assertNotNull(session);
assertTrue(session.getMultiplexed());

assertNotNull(client.multiplexedSessionDatabaseClient);
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get());
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
}

@Test
public void testDeadlineExceededErrorWithOneRetryWithParallelRequests()
throws InterruptedException {
mockSpanner.setCreateSessionExecutionTime(
SimulatedExecutionTime.ofMinimumAndRandomTimeAndExceptions(
2000,
0,
Arrays.asList(
Status.DEADLINE_EXCEEDED
.withDescription(
"CallOptions deadline exceeded after 22.986872393s. "
+ "Name resolution delay 6.911918521 seconds. [closed=[], "
+ "open=[[connecting_and_lb_delay=32445014148ns, was_still_waiting]]]")
.asRuntimeException(),
Status.DEADLINE_EXCEEDED
.withDescription(
"CallOptions deadline exceeded after 22.986872393s. "
+ "Name resolution delay 6.911918521 seconds. [closed=[], "
+ "open=[[connecting_and_lb_delay=32445014148ns, was_still_waiting]]]")
.asRuntimeException())));
DatabaseClientImpl client =
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
assertNotNull(client.multiplexedSessionDatabaseClient);

ExecutorService executor = Executors.newCachedThreadPool();

// First set of request should fail with an error
CountDownLatch failureCountDownLatch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
executor.submit(
() -> {
try {
try (ResultSet resultSet = client.singleUse().executeQuery(STATEMENT)) {
//noinspection StatementWithEmptyBody
while (resultSet.next()) {
// ignore
}
}
} catch (SpannerException e) {
failureCountDownLatch.countDown();
}
});
}

assertTrue(failureCountDownLatch.await(5, TimeUnit.SECONDS));
assertEquals(0, failureCountDownLatch.getCount());

// Second set of requests should pass
CountDownLatch countDownLatch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
executor.submit(
() -> {
try {
try (ResultSet resultSet = client.singleUse().executeQuery(STATEMENT)) {
//noinspection StatementWithEmptyBody
while (resultSet.next()) {
// ignore
}
}
} catch (SpannerException e) {
countDownLatch.countDown();
}
});
}

assertFalse(countDownLatch.await(5, TimeUnit.SECONDS));
assertEquals(3, countDownLatch.getCount());

// Verify that we received 3 ExecuteSqlRequest, and that it used a multiplexed session.
assertEquals(3, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class));
List<ExecuteSqlRequest> requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class);

Session session = mockSpanner.getSession(requests.get(0).getSession());
assertNotNull(session);
assertTrue(session.getMultiplexed());

assertNotNull(client.multiplexedSessionDatabaseClient);
assertEquals(3L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get());
assertEquals(3L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
}

@Test
public void
testUnimplementedErrorOnCreation_firstReceivesError_secondFallsBackToRegularSessions() {
Expand Down
Loading