Skip to content

Commit df7ca5d

Browse files
add token bucket
1 parent e70fdc9 commit df7ca5d

31 files changed

+12893
-41
lines changed

src/cmap/connect.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ export interface HandshakeDocument extends Document {
224224
compression: string[];
225225
saslSupportedMechs?: string;
226226
loadBalanced?: boolean;
227+
backpressure: true;
227228
}
228229

229230
/**
@@ -241,6 +242,7 @@ export async function prepareHandshakeDocument(
241242

242243
const handshakeDoc: HandshakeDocument = {
243244
[serverApi?.version || options.loadBalanced === true ? 'hello' : LEGACY_HELLO_COMMAND]: 1,
245+
backpressure: true,
244246
helloOk: true,
245247
client: clientMetadata,
246248
compression: compressors

src/cmap/connection.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -582,6 +582,9 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
582582
this.throwIfAborted();
583583
}
584584
} catch (error) {
585+
if (options.session != null && !(error instanceof MongoServerError)) {
586+
updateSessionFromResponse(options.session, MongoDBResponse.empty);
587+
}
585588
if (this.shouldEmitAndLogCommand) {
586589
this.emitAndLogCommand(
587590
this.monitorCommands,

src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ export {
8787
MongoWriteConcernError,
8888
WriteConcernErrorResult
8989
} from './error';
90+
export { TokenBucket } from './token_bucket';
9091
export {
9192
AbstractCursor,
9293
// Actual driver classes exported

src/operations/execute_operation.ts

Lines changed: 83 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { setTimeout } from 'node:timers/promises';
2+
13
import { MIN_SUPPORTED_SNAPSHOT_READS_WIRE_VERSION } from '../cmap/wire_protocol/constants';
24
import {
35
isRetryableReadError,
@@ -10,6 +12,7 @@ import {
1012
MongoInvalidArgumentError,
1113
MongoNetworkError,
1214
MongoNotConnectedError,
15+
MongoOperationTimeoutError,
1316
MongoRuntimeError,
1417
MongoServerError,
1518
MongoTransactionError,
@@ -26,9 +29,15 @@ import {
2629
import type { Topology } from '../sdam/topology';
2730
import type { ClientSession } from '../sessions';
2831
import { TimeoutContext } from '../timeout';
29-
import { abortable, maxWireVersion, supportsRetryableWrites } from '../utils';
32+
import { RETRY_COST, TOKEN_REFRESH_RATE } from '../token_bucket';
33+
import {
34+
abortable,
35+
ExponentialBackoffProvider,
36+
maxWireVersion,
37+
supportsRetryableWrites
38+
} from '../utils';
3039
import { AggregateOperation } from './aggregate';
31-
import { AbstractOperation, Aspect } from './operation';
40+
import { AbstractOperation, Aspect, RetryContext } from './operation';
3241

3342
const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation;
3443
const MMAPv1_RETRY_WRITES_ERROR_MESSAGE =
@@ -50,7 +59,7 @@ type ResultTypeFromOperation<TOperation extends AbstractOperation> = ReturnType<
5059
* The expectation is that this function:
5160
* - Connects the MongoClient if it has not already been connected, see {@link autoConnect}
5261
* - Creates a session if none is provided and cleans up the session it creates
53-
* - Tries an operation and retries under certain conditions, see {@link tryOperation}
62+
* - Tries an operation and retries under certain conditions, see {@link executeOperationWithRetries}
5463
*
5564
* @typeParam T - The operation's type
5665
* @typeParam TResult - The type of the operation's result, calculated from T
@@ -120,7 +129,7 @@ export async function executeOperation<
120129
});
121130

122131
try {
123-
return await tryOperation(operation, {
132+
return await executeOperationWithRetries(operation, {
124133
topology,
125134
timeoutContext,
126135
session,
@@ -184,7 +193,10 @@ type RetryOptions = {
184193
*
185194
* @param operation - The operation to execute
186195
* */
187-
async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFromOperation<T>>(
196+
async function executeOperationWithRetries<
197+
T extends AbstractOperation,
198+
TResult = ResultTypeFromOperation<T>
199+
>(
188200
operation: T,
189201
{ topology, timeoutContext, session, readPreference }: RetryOptions
190202
): Promise<TResult> {
@@ -233,11 +245,27 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
233245
session.incrementTransactionNumber();
234246
}
235247

236-
const maxTries = willRetry ? (timeoutContext.csotEnabled() ? Infinity : 2) : 1;
237248
let previousOperationError: MongoError | undefined;
238249
const deprioritizedServers = new DeprioritizedServers();
239250

240-
for (let tries = 0; tries < maxTries; tries++) {
251+
const backoffDelayProvider = new ExponentialBackoffProvider(
252+
10_000, // MAX_BACKOFF
253+
100, // base backoff
254+
2 // backoff rate
255+
);
256+
257+
const retryContext =
258+
operation.retryContext ??
259+
new RetryContext(willRetry ? (timeoutContext.csotEnabled() ? Infinity : 2) : 1);
260+
for (
261+
let attempt = 0;
262+
attempt < retryContext.maxAttempts;
263+
attempt++,
264+
retryContext.maxAttempts =
265+
willRetry && previousOperationError?.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)
266+
? 6
267+
: retryContext.maxAttempts
268+
) {
241269
if (previousOperationError) {
242270
if (hasWriteAspect && previousOperationError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
243271
throw new MongoServerError({
@@ -247,15 +275,39 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
247275
});
248276
}
249277

250-
if (operation.hasAspect(Aspect.COMMAND_BATCHING) && !operation.canRetryWrite) {
278+
const isRetryable =
279+
// bulk write commands are retryable if all operations in the batch are retryable
280+
(operation.hasAspect(Aspect.COMMAND_BATCHING) && operation.canRetryWrite) ||
281+
// if we have a retryable read or write operation, we can retry
282+
(hasWriteAspect && willRetryWrite && isRetryableWriteError(previousOperationError)) ||
283+
(hasReadAspect && willRetryRead && isRetryableReadError(previousOperationError)) ||
284+
// if we have a retryable, system overloaded error, we can retry
285+
(previousOperationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError) &&
286+
previousOperationError.hasErrorLabel(MongoErrorLabel.RetryableError));
287+
288+
if (!isRetryable) {
251289
throw previousOperationError;
252290
}
253291

254-
if (hasWriteAspect && !isRetryableWriteError(previousOperationError))
255-
throw previousOperationError;
256-
257-
if (hasReadAspect && !isRetryableReadError(previousOperationError)) {
258-
throw previousOperationError;
292+
if (previousOperationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) {
293+
const delayMS = backoffDelayProvider.getNextBackoffDuration();
294+
295+
// if the delay would exhaust the CSOT timeout, short-circuit.
296+
if (timeoutContext.csotEnabled() && delayMS > timeoutContext.remainingTimeMS) {
297+
// TODO: is this the right error to throw?
298+
throw new MongoOperationTimeoutError(
299+
`MongoDB SystemOverload exponential backoff would exceed timeoutMS deadline: remaining CSOT deadline=${timeoutContext.remainingTimeMS}, backoff delayMS=${delayMS}`,
300+
{
301+
cause: previousOperationError
302+
}
303+
);
304+
}
305+
306+
if (!topology.tokenBucket.consume(RETRY_COST)) {
307+
throw previousOperationError;
308+
}
309+
310+
await setTimeout(delayMS);
259311
}
260312

261313
if (
@@ -285,19 +337,34 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
285337
operation.server = server;
286338

287339
try {
288-
// If tries > 0 and we are command batching we need to reset the batch.
289-
if (tries > 0 && operation.hasAspect(Aspect.COMMAND_BATCHING)) {
340+
const isRetry = attempt > 0;
341+
342+
// If attempt > 0 and we are command batching we need to reset the batch.
343+
if (isRetry && operation.hasAspect(Aspect.COMMAND_BATCHING)) {
290344
operation.resetBatch();
291345
}
292346

293347
try {
294348
const result = await server.command(operation, timeoutContext);
349+
topology.tokenBucket.deposit(
350+
isRetry
351+
? // on successful retry, deposit the retry cost + the refresh rate.
352+
TOKEN_REFRESH_RATE + RETRY_COST
353+
: // otherwise, just deposit the refresh rate.
354+
TOKEN_REFRESH_RATE
355+
);
295356
return operation.handleOk(result);
296357
} catch (error) {
297358
return operation.handleError(error);
298359
}
299360
} catch (operationError) {
300361
if (!(operationError instanceof MongoError)) throw operationError;
362+
363+
if (!operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) {
364+
// if an operation fails with an error that does not contain the SystemOverloadError, deposit 1 token.
365+
topology.tokenBucket.deposit(RETRY_COST);
366+
}
367+
301368
if (
302369
previousOperationError != null &&
303370
operationError.hasErrorLabel(MongoErrorLabel.NoWritesPerformed)
@@ -312,8 +379,5 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
312379
}
313380
}
314381

315-
throw (
316-
previousOperationError ??
317-
new MongoRuntimeError('Tried to propagate retryability error, but no error was found.')
318-
);
382+
throw previousOperationError ?? new MongoRuntimeError('ahh');
319383
}

src/operations/operation.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ export interface OperationOptions extends BSONSerializeOptions {
4545
timeoutMS?: number;
4646
}
4747

48+
export class RetryContext {
49+
constructor(public maxAttempts: number) {}
50+
}
51+
4852
/**
4953
* This class acts as a parent class for any operation and is responsible for setting this.options,
5054
* as well as setting and getting a session.
@@ -66,6 +70,8 @@ export abstract class AbstractOperation<TResult = any> {
6670
/** Specifies the time an operation will run until it throws a timeout error. */
6771
timeoutMS?: number;
6872

73+
retryContext?: RetryContext;
74+
6975
private _session: ClientSession | undefined;
7076

7177
static aspects?: Set<symbol>;

src/sdam/topology.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import { type Abortable, TypedEventEmitter } from '../mongo_types';
3535
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
3636
import type { ClientSession } from '../sessions';
3737
import { Timeout, TimeoutContext, TimeoutError } from '../timeout';
38+
import { TokenBucket } from '../token_bucket';
3839
import type { Transaction } from '../transactions';
3940
import {
4041
addAbortListener,
@@ -207,18 +208,15 @@ export type TopologyEvents = {
207208
* @internal
208209
*/
209210
export class Topology extends TypedEventEmitter<TopologyEvents> {
210-
/** @internal */
211211
s: TopologyPrivate;
212-
/** @internal */
213212
waitQueue: List<ServerSelectionRequest>;
214-
/** @internal */
215213
hello?: Document;
216-
/** @internal */
217214
_type?: string;
218215

216+
tokenBucket = new TokenBucket(1000);
217+
219218
client!: MongoClient;
220219

221-
/** @internal */
222220
private connectionLock?: Promise<Topology>;
223221

224222
/** @event */

src/sessions.ts

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import {
2525
import type { MongoClient, MongoOptions } from './mongo_client';
2626
import { TypedEventEmitter } from './mongo_types';
2727
import { executeOperation } from './operations/execute_operation';
28+
import { RetryContext } from './operations/operation';
2829
import { RunCommandOperation } from './operations/run_command';
2930
import { ReadConcernLevel } from './read_concern';
3031
import { ReadPreference } from './read_preference';
@@ -468,7 +469,11 @@ export class ClientSession
468469
} else {
469470
const wcKeys = Object.keys(wc);
470471
if (wcKeys.length > 2 || (!wcKeys.includes('wtimeoutMS') && !wcKeys.includes('wTimeoutMS')))
471-
// if the write concern was specified with wTimeoutMS, then we set both wtimeoutMS and wTimeoutMS, guaranteeing at least two keys, so if we have more than two keys, then we can automatically assume that we should add the write concern to the command. If it has 2 or fewer keys, we need to check that those keys aren't the wtimeoutMS or wTimeoutMS options before we add the write concern to the command
472+
// if the write concern was specified with wTimeoutMS, then we set both wtimeoutMS
473+
// and wTimeoutMS, guaranteeing at least two keys, so if we have more than two keys,
474+
// then we can automatically assume that we should add the write concern to the command.
475+
// If it has 2 or fewer keys, we need to check that those keys aren't the wtimeoutMS
476+
// or wTimeoutMS options before we add the write concern to the command
472477
WriteConcern.apply(command, { ...wc, wtimeoutMS: undefined });
473478
}
474479
}
@@ -489,11 +494,14 @@ export class ClientSession
489494
command.recoveryToken = this.transaction.recoveryToken;
490495
}
491496

497+
const retryContext = new RetryContext(5);
498+
492499
const operation = new RunCommandOperation(new MongoDBNamespace('admin'), command, {
493500
session: this,
494501
readPreference: ReadPreference.primary,
495502
bypassPinningCheck: true
496503
});
504+
operation.retryContext = retryContext;
497505

498506
const timeoutContext =
499507
this.timeoutContext ??
@@ -518,15 +526,13 @@ export class ClientSession
518526
this.unpin({ force: true });
519527

520528
try {
521-
await executeOperation(
522-
this.client,
523-
new RunCommandOperation(new MongoDBNamespace('admin'), command, {
524-
session: this,
525-
readPreference: ReadPreference.primary,
526-
bypassPinningCheck: true
527-
}),
528-
timeoutContext
529-
);
529+
const op = new RunCommandOperation(new MongoDBNamespace('admin'), command, {
530+
session: this,
531+
readPreference: ReadPreference.primary,
532+
bypassPinningCheck: true
533+
});
534+
op.retryContext = retryContext;
535+
await executeOperation(this.client, op, timeoutContext);
530536
return;
531537
} catch (retryCommitError) {
532538
// If the retry failed, we process that error instead of the original
@@ -1013,6 +1019,11 @@ export class ServerSession {
10131019
id: ServerSessionId;
10141020
lastUse: number;
10151021
txnNumber: number;
1022+
1023+
/*
1024+
* Indicates that a network error has been encountered while using this session.
1025+
* Once a session is marked as dirty, it is always dirty.
1026+
*/
10161027
isDirty: boolean;
10171028

10181029
/** @internal */
@@ -1106,16 +1117,15 @@ export class ServerSessionPool {
11061117
* @param session - The session to release to the pool
11071118
*/
11081119
release(session: ServerSession): void {
1109-
const sessionTimeoutMinutes = this.client.topology?.logicalSessionTimeoutMinutes ?? 10;
1120+
if (this.client.topology?.loadBalanced) {
1121+
if (session.isDirty) return;
11101122

1111-
if (this.client.topology?.loadBalanced && !sessionTimeoutMinutes) {
11121123
this.sessions.unshift(session);
1113-
}
1114-
1115-
if (!sessionTimeoutMinutes) {
11161124
return;
11171125
}
11181126

1127+
const sessionTimeoutMinutes = this.client.topology?.logicalSessionTimeoutMinutes ?? 10;
1128+
11191129
this.sessions.prune(session => session.hasTimedOut(sessionTimeoutMinutes));
11201130

11211131
if (!session.hasTimedOut(sessionTimeoutMinutes)) {
@@ -1203,9 +1213,9 @@ export function applySession(
12031213
command.autocommit = false;
12041214

12051215
if (session.transaction.state === TxnState.STARTING_TRANSACTION) {
1206-
session.transaction.transition(TxnState.TRANSACTION_IN_PROGRESS);
12071216
command.startTransaction = true;
12081217

1218+
// TODO: read concern only applied if it is not the same as the server's default
12091219
const readConcern =
12101220
session.transaction.options.readConcern || session?.clientOptions?.readConcern;
12111221
if (readConcern) {
@@ -1241,4 +1251,17 @@ export function updateSessionFromResponse(session: ClientSession, document: Mong
12411251
session.snapshotTime = atClusterTime;
12421252
}
12431253
}
1254+
1255+
if (session.transaction.state === TxnState.STARTING_TRANSACTION) {
1256+
if (document.ok === 1) {
1257+
session.transaction.transition(TxnState.TRANSACTION_IN_PROGRESS);
1258+
} else {
1259+
const error = new MongoServerError(document.toObject());
1260+
const isBackpressureError = error.hasErrorLabel(MongoErrorLabel.RetryableError);
1261+
1262+
if (!isBackpressureError) {
1263+
session.transaction.transition(TxnState.TRANSACTION_IN_PROGRESS);
1264+
}
1265+
}
1266+
}
12441267
}

0 commit comments

Comments
 (0)