diff --git a/CHANGELOG.md b/CHANGELOG.md index 727d3def2..32320f57d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,23 @@ [1]: https://www.npmjs.com/package/@google-cloud/bigtable?activeTab=versions +## [6.3.0](https://github.com/googleapis/nodejs-bigtable/compare/v6.2.0...v6.3.0) (2025-08-11) + + +### Features + +* Add client side metrics for checkAndMutateRow calls ([#1661](https://github.com/googleapis/nodejs-bigtable/issues/1661)) ([c258ea1](https://github.com/googleapis/nodejs-bigtable/commit/c258ea1b29203aad3eaaf9cfe64ddabb8c1018bf)) +* Add client side metrics for readModifyWriteRow calls ([#1656](https://github.com/googleapis/nodejs-bigtable/issues/1656)) ([2129312](https://github.com/googleapis/nodejs-bigtable/commit/2129312401bf9f5b8e51b13ac576cb765de401df)) +* Client side metrics support for mutateRows ([#1638](https://github.com/googleapis/nodejs-bigtable/issues/1638)) ([7601e4d](https://github.com/googleapis/nodejs-bigtable/commit/7601e4da115ff6a5da411cc857917b579c70ced7)) +* Collect client side metrics for sampleRowKeys calls ([#1660](https://github.com/googleapis/nodejs-bigtable/issues/1660)) ([6ed98fa](https://github.com/googleapis/nodejs-bigtable/commit/6ed98faefe446e67f83fd5394aae30374fd3ec3a)) +* For client side metrics, record metrics as MUTATE_ROW for single row mutates ([#1650](https://github.com/googleapis/nodejs-bigtable/issues/1650)) ([f190a8c](https://github.com/googleapis/nodejs-bigtable/commit/f190a8c322498ddfbe73406759a43a268c16bdc4)) +* Record ReadRows application latencies for client side metrics ([#1647](https://github.com/googleapis/nodejs-bigtable/issues/1647)) ([8af801b](https://github.com/googleapis/nodejs-bigtable/commit/8af801b3ecd7ff5e30e6c8cc67bd4123bdf34ee9)) + + +### Bug Fixes + +* FirstResponseLatencies should only be collected for readRows calls ([#1658](https://github.com/googleapis/nodejs-bigtable/issues/1658)) ([99cf5a6](https://github.com/googleapis/nodejs-bigtable/commit/99cf5a6010249ed0eedd88f23b2d32cacb106c07)) + ## [6.2.0](https://github.com/googleapis/nodejs-bigtable/compare/v6.1.0...v6.2.0) (2025-07-23) diff --git a/package.json b/package.json index 0bcb69e20..ac01f4faa 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@google-cloud/bigtable", - "version": "6.2.0", + "version": "6.3.0", "description": "Cloud Bigtable Client Library for Node.js", "keywords": [ "bigtable", @@ -57,8 +57,8 @@ "@opentelemetry/resources": "^1.30.0", "@opentelemetry/sdk-metrics": "^1.30.0", "@types/long": "^4.0.0", - "arrify": "2.0.0", "abort-controller": "^3.0.0", + "arrify": "2.0.0", "concat-stream": "^2.0.0", "dot-prop": "6.0.0", "escape-string-regexp": "4.0.0", diff --git a/samples/package.json b/samples/package.json index 26f229a44..d5f14feed 100644 --- a/samples/package.json +++ b/samples/package.json @@ -14,7 +14,7 @@ "node": ">=18" }, "dependencies": { - "@google-cloud/bigtable": "^6.2.0", + "@google-cloud/bigtable": "^6.3.0", "uuid": "^9.0.0", "yargs": "^16.0.0" }, diff --git a/src/client-side-metrics/gcp-metrics-handler.ts b/src/client-side-metrics/gcp-metrics-handler.ts index 73ebaf37a..6a9231760 100644 --- a/src/client-side-metrics/gcp-metrics-handler.ts +++ b/src/client-side-metrics/gcp-metrics-handler.ts @@ -31,6 +31,7 @@ const { } = require('@opentelemetry/sdk-metrics'); import * as os from 'os'; import * as crypto from 'crypto'; +import {MethodName} from './client-side-metrics-attributes'; /** * Generates a unique client identifier string. @@ -251,13 +252,19 @@ export class GCPMetricsHandler implements IMetricsHandler { status: data.status, ...commonAttributes, }); - otelInstruments.firstResponseLatencies.record(data.firstResponseLatency, { - status: data.status, - ...commonAttributes, - }); - for (const applicationLatency of data.applicationLatencies) { + if ( + data.metricsCollectorData.method === MethodName.READ_ROWS || + data.metricsCollectorData.method === MethodName.READ_ROW + ) { + otelInstruments.firstResponseLatencies.record(data.firstResponseLatency, { + status: data.status, + ...commonAttributes, + }); + } + + if (data.applicationLatency) { otelInstruments.applicationBlockingLatencies.record( - applicationLatency, + data.applicationLatency, commonAttributes, ); } diff --git a/src/interceptor.ts b/src/client-side-metrics/metric-interceptor.ts similarity index 76% rename from src/interceptor.ts rename to src/client-side-metrics/metric-interceptor.ts index c019f6e5e..89b23d4a3 100644 --- a/src/interceptor.ts +++ b/src/client-side-metrics/metric-interceptor.ts @@ -13,7 +13,7 @@ // limitations under the License. import {CallOptions} from 'google-gax'; -import {OperationMetricsCollector} from './client-side-metrics/operation-metrics-collector'; +import {OperationMetricsCollector} from './operation-metrics-collector'; // Mock Server Implementation import * as grpcJs from '@grpc/grpc-js'; @@ -49,16 +49,34 @@ function createMetricsInterceptorProvider( collector.onStatusMetadataReceived( status as unknown as ServerStatus, ); + collector.onAttemptComplete(status.code); nextStat(status); }, }; next(metadata, newListener); }, + sendMessage: function (message, next) { + collector.onAttemptStart(); + next(message); + }, }); }; } -export function withInterceptors( +/** + * Attaches a metrics interceptor to unary calls for collecting client-side metrics. + * + * This method modifies the given `gaxOptions` to include an interceptor that + * will be triggered during the execution of a unary gRPC call. The interceptor + * uses the provided `OperationMetricsCollector` to record various metrics + * related to the call, such as latency, retries, and errors. + * + * @param {CallOptions} gaxOptions The existing GAX call options to modify. + * @param {OperationMetricsCollector} metricsCollector The metrics collector + * for the operation. + * @returns {CallOptions} The modified `gaxOptions` with the interceptor attached. + */ +export function createMetricsUnaryInterceptorProvider( gaxOptions: CallOptions, metricsCollector?: OperationMetricsCollector, ) { diff --git a/src/client-side-metrics/metrics-handler.ts b/src/client-side-metrics/metrics-handler.ts index 6ce5bce12..3568f65e7 100644 --- a/src/client-side-metrics/metrics-handler.ts +++ b/src/client-side-metrics/metrics-handler.ts @@ -39,7 +39,7 @@ interface StandardData { export interface OnOperationCompleteData extends StandardData { firstResponseLatency?: number; operationLatency: number; - applicationLatencies: number[]; + applicationLatency?: number; retryCount?: number; } diff --git a/src/client-side-metrics/operation-metrics-collector.ts b/src/client-side-metrics/operation-metrics-collector.ts index 4851eddbb..52ed8c0eb 100644 --- a/src/client-side-metrics/operation-metrics-collector.ts +++ b/src/client-side-metrics/operation-metrics-collector.ts @@ -19,6 +19,7 @@ import * as gax from 'google-gax'; import {AbortableDuplex, BigtableOptions} from '../index'; import * as path from 'path'; import {IMetricsHandler} from './metrics-handler'; +import {TimedStream} from '../timed-stream'; // When this environment variable is set then print any errors associated // with failures in the metrics collector. @@ -115,9 +116,8 @@ export class OperationMetricsCollector { private serverTime: number | null; private connectivityErrorCount: number; private streamingOperation: StreamingState; - private applicationLatencies: number[]; - private lastRowReceivedTime: bigint | null; private handlers: IMetricsHandler[]; + public userStream?: TimedStream; /** * @param {ITabularApiSurface} tabularApiSurface Information about the Bigtable table being accessed. @@ -143,8 +143,6 @@ export class OperationMetricsCollector { this.serverTime = null; this.connectivityErrorCount = 0; this.streamingOperation = streamingOperation; - this.lastRowReceivedTime = null; - this.applicationLatencies = []; this.handlers = handlers; } @@ -168,7 +166,7 @@ export class OperationMetricsCollector { * * @param stream */ - handleStatusAndMetadata(stream: AbortableDuplex) { + wrapRequest(stream: AbortableDuplex) { stream .on( 'metadata', @@ -194,7 +192,6 @@ export class OperationMetricsCollector { checkState(this.state, [MetricsCollectorState.OPERATION_NOT_STARTED]); this.operationStartTime = hrtime.bigint(); this.firstResponseLatency = null; - this.applicationLatencies = []; this.state = MetricsCollectorState.OPERATION_STARTED_ATTEMPT_NOT_IN_PROGRESS; }); @@ -251,7 +248,6 @@ export class OperationMetricsCollector { this.serverTime = null; this.serverTimeRead = false; this.connectivityErrorCount = 0; - this.lastRowReceivedTime = null; }); } @@ -284,10 +280,21 @@ export class OperationMetricsCollector { * Called when an operation completes (successfully or unsuccessfully). * Records operation latencies, retry counts, and connectivity error counts. * @param {grpc.status} finalOperationStatus Information about the completed operation. + * @param {number} applicationLatency The application latency measurement. */ - onOperationComplete(finalOperationStatus: grpc.status) { - this.onAttemptComplete(finalOperationStatus); + onOperationComplete( + finalOperationStatus: grpc.status, + applicationLatency?: number, + ) { withMetricsDebug(() => { + if ( + this.state === + MetricsCollectorState.OPERATION_STARTED_ATTEMPT_IN_PROGRESS_NO_ROWS_YET || + this.state === + MetricsCollectorState.OPERATION_STARTED_ATTEMPT_IN_PROGRESS_SOME_ROWS_RECEIVED + ) { + this.onAttemptComplete(finalOperationStatus); + } checkState(this.state, [ MetricsCollectorState.OPERATION_STARTED_ATTEMPT_NOT_IN_PROGRESS, ]); @@ -307,8 +314,8 @@ export class OperationMetricsCollector { client_name: `nodejs-bigtable/${version}`, operationLatency: totalMilliseconds, retryCount: this.attemptCount - 1, - firstResponseLatency: this.firstResponseLatency ?? undefined, - applicationLatencies: this.applicationLatencies, + firstResponseLatency: this.firstResponseLatency ?? 0, + applicationLatency: applicationLatency ?? 0, }); } }); @@ -352,38 +359,6 @@ export class OperationMetricsCollector { } } - /** - * Called when a row from the Bigtable stream reaches the application user. - * - * This method is used to calculate the latency experienced by the application - * when reading rows from a Bigtable stream. It records the time between the - * previous row being received and the current row reaching the user. These - * latencies are then collected and reported as `applicationBlockingLatencies` - * when the operation completes. - */ - onRowReachesUser() { - if ( - this.state === - MetricsCollectorState.OPERATION_STARTED_ATTEMPT_IN_PROGRESS_NO_ROWS_YET || - this.state === - MetricsCollectorState.OPERATION_STARTED_ATTEMPT_IN_PROGRESS_SOME_ROWS_RECEIVED || - this.state === - MetricsCollectorState.OPERATION_STARTED_ATTEMPT_NOT_IN_PROGRESS - ) { - const currentTime = hrtime.bigint(); - if (this.lastRowReceivedTime) { - // application latency is measured in total milliseconds. - const applicationLatency = Number( - (currentTime - this.lastRowReceivedTime) / BigInt(1000000), - ); - this.applicationLatencies.push(applicationLatency); - } - this.lastRowReceivedTime = currentTime; - } else { - console.warn('Invalid state transition attempted'); - } - } - /** * Called when status information is received. Extracts zone and cluster information. * @param {object} status The received status information. diff --git a/src/row-data-utils.ts b/src/row-data-utils.ts index 070259146..65191057d 100644 --- a/src/row-data-utils.ts +++ b/src/row-data-utils.ts @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +import {OperationMetricsCollector} from './client-side-metrics/operation-metrics-collector'; + const dotProp = require('dot-prop'); import {Filter, RawFilter} from './filter'; import { @@ -30,6 +32,11 @@ import {TabularApiSurface} from './tabular-api-surface'; import arrify = require('arrify'); import {Bigtable} from './index'; import {CallOptions} from 'google-gax'; +import { + MethodName, + StreamingState, +} from './client-side-metrics/client-side-metrics-attributes'; +import {createMetricsUnaryInterceptorProvider} from './client-side-metrics/metric-interceptor'; interface TabularApiSurfaceRequest { tableName?: string; @@ -82,14 +89,32 @@ class RowDataUtils { properties.reqOpts, ); properties.requestData.data = {}; + // 1. Create a metrics collector. + const metricsCollector = new OperationMetricsCollector( + properties.requestData.table, + MethodName.CHECK_AND_MUTATE_ROW, + StreamingState.UNARY, + ( + properties.requestData.table as any + ).bigtable._metricsConfigManager!.metricsHandlers, + ); + // 2. Tell the metrics collector an attempt has been started. + metricsCollector.onOperationStart(); + // 3. Make a unary call with gax options that include interceptors. The + // interceptors are built from a method that hooks them up to the + // metrics collector properties.requestData.bigtable.request( { client: 'BigtableClient', method: 'checkAndMutateRow', reqOpts, - gaxOpts: config.gaxOptions, + gaxOpts: createMetricsUnaryInterceptorProvider( + config.gaxOptions ?? {}, + metricsCollector, + ), }, (err, apiResponse) => { + metricsCollector.onOperationComplete(err ? err.code : 0); if (err) { callback(err, null, apiResponse); return; @@ -193,14 +218,34 @@ class RowDataUtils { properties.reqOpts, ); properties.requestData.data = {}; + // 1. Create a metrics collector. + const metricsCollector = new OperationMetricsCollector( + properties.requestData.table, + MethodName.READ_MODIFY_WRITE_ROW, + StreamingState.UNARY, + ( + properties.requestData.table as any + ).bigtable._metricsConfigManager!.metricsHandlers, + ); + // 2. Tell the metrics collector an attempt has been started. + metricsCollector.onOperationStart(); + // 3. Make a unary call with gax options that include interceptors. The + // interceptors are built from a method that hooks them up to the + // metrics collector properties.requestData.bigtable.request( { client: 'BigtableClient', method: 'readModifyWriteRow', reqOpts, - gaxOpts: gaxOptions, + gaxOpts: createMetricsUnaryInterceptorProvider( + gaxOptions, + metricsCollector, + ), + }, + (err, ...args) => { + metricsCollector.onOperationComplete(err ? err.code : 0); + callback(err, ...args); }, - callback, ); } diff --git a/src/row.ts b/src/row.ts index 39af7a32b..94155211d 100644 --- a/src/row.ts +++ b/src/row.ts @@ -36,6 +36,7 @@ import { MethodName, StreamingState, } from './client-side-metrics/client-side-metrics-attributes'; +import {mutateInternal} from './utils/mutateInternal'; export interface Rule { column: string; @@ -830,7 +831,19 @@ export class Row { method: Mutation.methods.INSERT, } as {} as Entry; this.data = {}; - this.table.mutate(mutation, gaxOptions as {} as MutateOptions, callback); + const metricsCollector = + this.bigtable._metricsConfigManager.createOperation( + MethodName.MUTATE_ROW, + StreamingState.UNARY, + this.table, + ); + mutateInternal( + this.table, + metricsCollector, + mutation, + gaxOptions as {} as MutateOptions, + callback, + ); } } diff --git a/src/tabular-api-surface.ts b/src/tabular-api-surface.ts index 462658e92..f84a4d25a 100644 --- a/src/tabular-api-surface.ts +++ b/src/tabular-api-surface.ts @@ -29,7 +29,6 @@ import {BackoffSettings} from 'google-gax/build/src/gax'; import {google} from '../protos/protos'; import {CallOptions, grpc, ServiceError} from 'google-gax'; import {Transform} from 'stream'; -import * as is from 'is'; import {GoogleInnerError} from './table'; import {createReadStreamInternal} from './utils/createReadStreamInternal'; import {getRowsInternal} from './utils/getRowsInternal'; @@ -37,6 +36,7 @@ import { MethodName, StreamingState, } from './client-side-metrics/client-side-metrics-attributes'; +import {mutateInternal} from './utils/mutateInternal'; // See protos/google/rpc/code.proto // (4=DEADLINE_EXCEEDED, 8=RESOURCE_EXHAUSTED, 10=ABORTED, 14=UNAVAILABLE) @@ -333,178 +333,13 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); optionsOrCallback?: MutateOptions | MutateCallback, cb?: MutateCallback, ): void | Promise { - const callback = - typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; - const options = - typeof optionsOrCallback === 'object' ? optionsOrCallback : {}; - const entries: Entry[] = (arrify(entriesRaw) as Entry[]).reduce( - (a, b) => a.concat(b), - [], - ); - - /* - The following line of code sets the timeout if it was provided while - creating the client. This will be used to determine if the client should - retry on errors. Eventually, this will be handled downstream in google-gax. - */ - const timeout = - options?.gaxOptions?.timeout || - (this?.bigtable?.options?.BigtableClient?.clientConfig?.interfaces && - this?.bigtable?.options?.BigtableClient?.clientConfig?.interfaces[ - 'google.bigtable.v2.Bigtable' - ]?.methods['MutateRows']?.timeout_millis); - const callTimeMillis = new Date().getTime(); - - let numRequestsMade = 0; - - const maxRetries = is.number(this.maxRetries) ? this.maxRetries! : 3; - const pendingEntryIndices = new Set( - entries.map((entry: Entry, index: number) => index), - ); - const entryToIndex = new Map( - entries.map((entry: Entry, index: number) => [entry, index]), - ); - const mutationErrorsByEntryIndex = new Map(); - - const isRetryable = ( - err: ServiceError | null, - timeoutExceeded: boolean, - ) => { - if (timeoutExceeded) { - // If the timeout has been exceeded then do not retry. - return false; - } - // Don't retry if there are no more entries or retry attempts - if (pendingEntryIndices.size === 0 || numRequestsMade >= maxRetries + 1) { - return false; - } - // If the error is empty but there are still outstanding mutations, - // it means that there are retryable errors in the mutate response - // even when the RPC succeeded - return !err || RETRYABLE_STATUS_CODES.has(err.code); - }; - - const onBatchResponse = (err: ServiceError | null) => { - // Return if the error happened before a request was made - if (numRequestsMade === 0) { - callback(err); - return; - } - - const timeoutExceeded = !!( - timeout && timeout < new Date().getTime() - callTimeMillis - ); - if (isRetryable(err, timeoutExceeded)) { - const backOffSettings = - options.gaxOptions?.retry?.backoffSettings || - DEFAULT_BACKOFF_SETTINGS; - const nextDelay = getNextDelay(numRequestsMade, backOffSettings); - setTimeout(makeNextBatchRequest, nextDelay); - return; - } - - // If there's no more pending mutations, set the error - // to null - if (pendingEntryIndices.size === 0) { - err = null; - } - - const mutationErrors = Array.from(mutationErrorsByEntryIndex.values()); - if (mutationErrorsByEntryIndex.size !== 0) { - callback(new PartialFailureError(mutationErrors, err)); - return; - } - if (err) { - /* If there's an RPC level failure and the mutation entries don't have - a status code, the RPC level failure error code will be used as the - entry failure code. - */ - (err as ServiceError & {errors?: ServiceError[]}).errors = - mutationErrors.concat( - [...pendingEntryIndices] - .filter(index => !mutationErrorsByEntryIndex.has(index)) - .map(() => err), - ); - callback(err); - return; - } - callback(err); - }; - - const makeNextBatchRequest = () => { - const entryBatch = entries.filter((entry: Entry, index: number) => { - return pendingEntryIndices.has(index); - }); - - // If the viewName is provided then request will be made for an - // authorized view. Otherwise, the request is made for a table. - const baseReqOpts = ( - this.viewName - ? { - authorizedViewName: `${this.name}/authorizedViews/${this.viewName}`, - } - : { - tableName: this.name, - } - ) as google.bigtable.v2.IReadRowsRequest; - const reqOpts = Object.assign(baseReqOpts, { - appProfileId: this.bigtable.appProfileId, - entries: options.rawMutation - ? entryBatch - : entryBatch.map(Mutation.parse), - }); - - const retryOpts = { - currentRetryAttempt: numRequestsMade, - // Handling retries in this client. Specify the retry options to - // make sure nothing is retried in retry-request. - noResponseRetries: 0, - shouldRetryFn: (_: any) => { - return false; - }, - }; - - options.gaxOptions = populateAttemptHeader( - numRequestsMade, - options.gaxOptions, + const metricsCollector = + this.bigtable._metricsConfigManager.createOperation( + MethodName.MUTATE_ROWS, + StreamingState.STREAMING, + this, ); - - this.bigtable - .request({ - client: 'BigtableClient', - method: 'mutateRows', - reqOpts, - gaxOpts: options.gaxOptions, - retryOpts, - }) - .on('error', (err: ServiceError) => { - onBatchResponse(err); - }) - .on('data', (obj: google.bigtable.v2.IMutateRowsResponse) => { - obj.entries!.forEach(entry => { - const originalEntry = entryBatch[entry.index as number]; - const originalEntriesIndex = entryToIndex.get(originalEntry)!; - - // Mutation was successful. - if (entry.status!.code === 0) { - pendingEntryIndices.delete(originalEntriesIndex); - mutationErrorsByEntryIndex.delete(originalEntriesIndex); - return; - } - if (!RETRYABLE_STATUS_CODES.has(entry.status!.code!)) { - pendingEntryIndices.delete(originalEntriesIndex); - } - const errorDetails = entry.status; - // eslint-disable-next-line @typescript-eslint/no-explicit-any - (errorDetails as any).entry = originalEntry; - mutationErrorsByEntryIndex.set(originalEntriesIndex, errorDetails); - }); - }) - .on('end', onBatchResponse); - numRequestsMade++; - }; - - makeNextBatchRequest(); + mutateInternal(this, metricsCollector, entriesRaw, optionsOrCallback, cb); } /** @@ -627,16 +462,29 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); }, objectMode: true, }); - - return pumpify.obj([ - this.bigtable.request({ - client: 'BigtableClient', - method: 'sampleRowKeys', - reqOpts, - gaxOpts: Object.assign({}, gaxOptions), - }), - rowKeysStream, - ]); + const metricsCollector = + this.bigtable._metricsConfigManager.createOperation( + MethodName.SAMPLE_ROW_KEYS, + StreamingState.STREAMING, + this, + ); + metricsCollector.onOperationStart(); + metricsCollector.onAttemptStart(); + const requestStream = this.bigtable.request({ + client: 'BigtableClient', + method: 'sampleRowKeys', + reqOpts, + gaxOpts: Object.assign({}, gaxOptions), + }); + metricsCollector.wrapRequest(requestStream); + const stream = pumpify.obj([requestStream, rowKeysStream]); + stream.on('end', () => { + metricsCollector.onOperationComplete(0); + }); + stream.on('error', (err: ServiceError) => { + metricsCollector.onOperationComplete(err.code); + }); + return stream; } } diff --git a/src/timed-stream.ts b/src/timed-stream.ts index 85e3a54a2..ffdce604b 100644 --- a/src/timed-stream.ts +++ b/src/timed-stream.ts @@ -13,6 +13,7 @@ // limitations under the License. import {PassThrough, TransformCallback, TransformOptions} from 'stream'; +const {hrtime} = require('node:process'); /** * This interface is the usual options that can be passed into a Transform plus @@ -42,11 +43,11 @@ class StreamTimer { } start() { - this.startTime = process.hrtime.bigint(); + this.startTime = hrtime.bigint(); } stop() { - const endTime = process.hrtime.bigint(); + const endTime = hrtime.bigint(); const duration = endTime - this.startTime; this.totalDuration += duration; } @@ -65,7 +66,8 @@ export class TimedStream extends PassThrough { super({ ...options, objectMode: true, - highWaterMark: 0, + readableHighWaterMark: 0, // We need to disable readside buffering to allow for acceptable behavior when the end user cancels the stream early. + writableHighWaterMark: 0, // We need to disable writeside buffering because in nodejs 14 the call to _transform happens after write buffering. This creates problems for tracking the last seen row key. transform: (event, _encoding, callback) => { /* When we iterate through a stream, time spent waiting for the user's application is added to totalDurationTransform. When we use handlers, @@ -79,7 +81,6 @@ export class TimedStream extends PassThrough { if (options?.transformHook) { options?.transformHook(event, _encoding, callback); } - callback(null, event); this.transformTimer.stop(); }, }); diff --git a/src/utils/createReadStreamInternal.ts b/src/utils/createReadStreamInternal.ts index 9158bdc03..15f3b168a 100644 --- a/src/utils/createReadStreamInternal.ts +++ b/src/utils/createReadStreamInternal.ts @@ -26,10 +26,6 @@ import { } from '../chunktransformer'; import {TableUtils} from './table'; import {Duplex, PassThrough, Transform} from 'stream'; -import { - MethodName, - StreamingState, -} from '../client-side-metrics/client-side-metrics-attributes'; import {google} from '../../protos/protos'; const pumpify = require('pumpify'); import {grpc, ServiceError} from 'google-gax'; @@ -42,6 +38,7 @@ import { TabularApiSurface, } from '../tabular-api-surface'; import {OperationMetricsCollector} from '../client-side-metrics/operation-metrics-collector'; +import {TimedStream} from '../timed-stream'; /** * Creates a readable stream of rows from a Bigtable table or authorized view. @@ -128,11 +125,8 @@ export function createReadStreamInternal( // discarded in the per attempt subpipeline (rowStream) let lastRowKey = ''; let rowsRead = 0; - const userStream = new PassThrough({ - objectMode: true, - readableHighWaterMark: 0, // We need to disable readside buffering to allow for acceptable behavior when the end user cancels the stream early. - writableHighWaterMark: 0, // We need to disable writeside buffering because in nodejs 14 the call to _transform happens after write buffering. This creates problems for tracking the last seen row key. - transform(event, _encoding, callback) { + const userStream = new TimedStream({ + transformHook(event, _encoding, callback) { if (userCanceled) { callback(); return; @@ -326,6 +320,10 @@ export function createReadStreamInternal( gaxOpts, retryOpts, }); + requestStream.on('data', () => { + // This handler is necessary for recording firstResponseLatencies. + metricsCollector.onResponse(); + }); activeRequestStream = requestStream!; @@ -364,7 +362,7 @@ export function createReadStreamInternal( rowStream = pumpify.obj([requestStream, chunkTransformer, toRowStream]); - metricsCollector.handleStatusAndMetadata(requestStream); + metricsCollector.wrapRequest(requestStream); rowStream .on('error', (error: ServiceError) => { rowStreamUnpipe(rowStream, userStream); @@ -373,7 +371,10 @@ export function createReadStreamInternal( // We ignore the `cancelled` "error", since we are the ones who cause // it when the user calls `.abort()`. userStream.end(); - metricsCollector.onOperationComplete(error.code); + metricsCollector.onOperationComplete( + error.code, + userStream.getTotalDurationMs(), + ); return; } numConsecutiveErrors++; @@ -405,7 +406,10 @@ export function createReadStreamInternal( // error.code = grpc.status.CANCELLED; } - metricsCollector.onOperationComplete(error.code); + metricsCollector.onOperationComplete( + error.code, + userStream.getTotalDurationMs(), + ); userStream.emit('error', error); } }) @@ -413,11 +417,13 @@ export function createReadStreamInternal( // Reset error count after a successful read so the backoff // time won't keep increasing when as stream had multiple errors numConsecutiveErrors = 0; - metricsCollector.onResponse(); }) .on('end', () => { activeRequestStream = null; - metricsCollector.onOperationComplete(grpc.status.OK); + metricsCollector.onOperationComplete( + grpc.status.OK, + userStream.getTotalDurationMs(), + ); }); rowStreamPipe(rowStream, userStream); }; diff --git a/src/utils/mutateInternal.ts b/src/utils/mutateInternal.ts new file mode 100644 index 000000000..f068543cf --- /dev/null +++ b/src/utils/mutateInternal.ts @@ -0,0 +1,239 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import { + Entry, + MutateCallback, + MutateOptions, + PartialFailureError, +} from '../table'; +import {OperationMetricsCollector} from '../client-side-metrics/operation-metrics-collector'; +import { + DEFAULT_BACKOFF_SETTINGS, + getNextDelay, + populateAttemptHeader, + RETRYABLE_STATUS_CODES, + TabularApiSurface, +} from '../tabular-api-surface'; +import {ServiceError} from 'google-gax'; +import {google} from '../../protos/protos'; +import * as is from 'is'; +import {Mutation} from '../mutation'; +import arrify = require('arrify'); + +export function mutateInternal( + table: TabularApiSurface, + metricsCollector: OperationMetricsCollector, + entriesRaw: Entry | Entry[], + optionsOrCallback?: MutateOptions | MutateCallback, + cb?: MutateCallback, +) { + const callback = + typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; + const options = + typeof optionsOrCallback === 'object' ? optionsOrCallback : {}; + const entries: Entry[] = (arrify(entriesRaw) as Entry[]).reduce( + (a, b) => a.concat(b), + [], + ); + const collectMetricsCallback = ( + originalError: ServiceError | null, + err: ServiceError | PartialFailureError | null, + apiResponse?: google.protobuf.Empty, + ) => { + // originalError is the error that was sent from the gapic layer. The + // compiler guarantees that it contains a code which needs to be + // provided when an operation is marked complete. + // + // err is the error we intend to send back to the user. Often it is the + // same as originalError, but in one case we construct a + // PartialFailureError and send that back to the user instead. In this + // case, we still need to pass the originalError into the method + // because the PartialFailureError doesn't have a code, but we need to + // communicate a code to the metrics collector. + // + const code = originalError ? originalError.code : 0; + metricsCollector.onOperationComplete(code); + callback(err, apiResponse); + }; + + /* + The following line of code sets the timeout if it was provided while + creating the client. This will be used to determine if the client should + retry on errors. Eventually, this will be handled downstream in google-gax. + */ + const timeout = + options?.gaxOptions?.timeout || + (table?.bigtable?.options?.BigtableClient?.clientConfig?.interfaces && + table?.bigtable?.options?.BigtableClient?.clientConfig?.interfaces[ + 'google.bigtable.v2.Bigtable' + ]?.methods['MutateRows']?.timeout_millis); + const callTimeMillis = new Date().getTime(); + + let numRequestsMade = 0; + + const maxRetries = is.number(table.maxRetries) ? table.maxRetries! : 3; + const pendingEntryIndices = new Set( + entries.map((entry: Entry, index: number) => index), + ); + const entryToIndex = new Map( + entries.map((entry: Entry, index: number) => [entry, index]), + ); + const mutationErrorsByEntryIndex = new Map(); + + const isRetryable = (err: ServiceError | null, timeoutExceeded: boolean) => { + if (timeoutExceeded) { + // If the timeout has been exceeded then do not retry. + return false; + } + // Don't retry if there are no more entries or retry attempts + if (pendingEntryIndices.size === 0 || numRequestsMade >= maxRetries + 1) { + return false; + } + // If the error is empty but there are still outstanding mutations, + // it means that there are retryable errors in the mutate response + // even when the RPC succeeded + return !err || RETRYABLE_STATUS_CODES.has(err.code); + }; + + const onBatchResponse = (err: ServiceError | null) => { + // Return if the error happened before a request was made + if (numRequestsMade === 0) { + collectMetricsCallback(err, err); + return; + } + + const timeoutExceeded = !!( + timeout && timeout < new Date().getTime() - callTimeMillis + ); + if (isRetryable(err, timeoutExceeded)) { + // If the timeout or max retries is exceeded or if there are no + // pending indices left then the client doesn't retry. + // Otherwise, the client will retry if there is no error or if the + // error has a retryable status code. + const backOffSettings = + options.gaxOptions?.retry?.backoffSettings || DEFAULT_BACKOFF_SETTINGS; + const nextDelay = getNextDelay(numRequestsMade, backOffSettings); + metricsCollector.onAttemptComplete(err ? err.code : 0); + setTimeout(makeNextBatchRequest, nextDelay); + return; + } + + // If there's no more pending mutations, set the error + // to null + if (pendingEntryIndices.size === 0) { + err = null; + } + + const mutationErrors = Array.from(mutationErrorsByEntryIndex.values()); + if (mutationErrorsByEntryIndex.size !== 0) { + collectMetricsCallback(err, new PartialFailureError(mutationErrors, err)); + return; + } + if (err) { + /* If there's an RPC level failure and the mutation entries don't have + a status code, the RPC level failure error code will be used as the + entry failure code. + */ + (err as ServiceError & {errors?: ServiceError[]}).errors = + mutationErrors.concat( + [...pendingEntryIndices] + .filter(index => !mutationErrorsByEntryIndex.has(index)) + .map(() => err), + ); + collectMetricsCallback(err, err); + return; + } + collectMetricsCallback(null, null); + }; + + metricsCollector.onOperationStart(); + const makeNextBatchRequest = () => { + metricsCollector.onAttemptStart(); + const entryBatch = entries.filter((entry: Entry, index: number) => { + return pendingEntryIndices.has(index); + }); + + // If the viewName is provided then request will be made for an + // authorized view. Otherwise, the request is made for a table. + const baseReqOpts = ( + table.viewName + ? { + authorizedViewName: `${table.name}/authorizedViews/${table.viewName}`, + } + : { + tableName: table.name, + } + ) as google.bigtable.v2.IReadRowsRequest; + const reqOpts = Object.assign(baseReqOpts, { + appProfileId: table.bigtable.appProfileId, + entries: options.rawMutation + ? entryBatch + : entryBatch.map(Mutation.parse), + }); + + const retryOpts = { + currentRetryAttempt: numRequestsMade, + // Handling retries in this client. Specify the retry options to + // make sure nothing is retried in retry-request. + noResponseRetries: 0, + shouldRetryFn: (_: any) => { + return false; + }, + }; + + options.gaxOptions = populateAttemptHeader( + numRequestsMade, + options.gaxOptions, + ); + + const requestStream = + table.bigtable.request({ + client: 'BigtableClient', + method: 'mutateRows', + reqOpts, + gaxOpts: options.gaxOptions, + retryOpts, + }); + metricsCollector.wrapRequest(requestStream); + requestStream + .on('error', (err: ServiceError) => { + onBatchResponse(err); + }) + .on('data', (obj: google.bigtable.v2.IMutateRowsResponse) => { + obj.entries!.forEach(entry => { + const originalEntry = entryBatch[entry.index as number]; + const originalEntriesIndex = entryToIndex.get(originalEntry)!; + + // Mutation was successful. + if (entry.status!.code === 0) { + pendingEntryIndices.delete(originalEntriesIndex); + mutationErrorsByEntryIndex.delete(originalEntriesIndex); + return; + } + if (!RETRYABLE_STATUS_CODES.has(entry.status!.code!)) { + pendingEntryIndices.delete(originalEntriesIndex); + } + const errorDetails = entry.status; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (errorDetails as any).entry = originalEntry; + mutationErrorsByEntryIndex.set(originalEntriesIndex, errorDetails); + }); + }) + .on('end', onBatchResponse); + numRequestsMade++; + }; + + makeNextBatchRequest(); +} diff --git a/system-test/client-side-metrics-all-methods.ts b/system-test/client-side-metrics-all-methods.ts new file mode 100644 index 000000000..9dc6a64dd --- /dev/null +++ b/system-test/client-side-metrics-all-methods.ts @@ -0,0 +1,1927 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {after, before, describe, it} from 'mocha'; +import * as mocha from 'mocha'; +import { + CloudMonitoringExporter, + ExportResult, +} from '../src/client-side-metrics/exporter'; +import {ResourceMetrics} from '@opentelemetry/sdk-metrics'; +import * as assert from 'assert'; +import {GCPMetricsHandler} from '../src/client-side-metrics/gcp-metrics-handler'; +import * as proxyquire from 'proxyquire'; +import {Bigtable, RawFilter} from '../src'; +import {Mutation} from '../src/mutation'; +import {Row} from '../src/row'; +import { + setupBigtableWithInsert, + setupBigtable, +} from './client-side-metrics-setup-table'; +import {TestMetricsHandler} from '../test-common/test-metrics-handler'; +import { + OnAttemptCompleteData, + OnOperationCompleteData, +} from '../src/client-side-metrics/metrics-handler'; +import {ClientOptions} from 'google-gax'; +import {ClientSideMetricsConfigManager} from '../src/client-side-metrics/metrics-config-manager'; +import {MetricServiceClient} from '@google-cloud/monitoring'; +import {MethodName} from '../src/client-side-metrics/client-side-metrics-attributes'; +import {generateId} from './common'; + +const SECOND_PROJECT_ID = 'cfdb-sdk-node-tests'; +const instanceId1 = generateId('instance'); +const instanceId2 = generateId('instance'); +const tableId1 = 'my-table'; +const tableId2 = 'my-table2'; +const columnFamilyId = 'cf1'; + +function getFakeBigtable( + projectId: string, + metricsHandlerClass: typeof GCPMetricsHandler | typeof TestMetricsHandler, + apiEndpoint?: string, +) { + const metricHandler = new metricsHandlerClass({ + apiEndpoint, + } as unknown as ClientOptions & {value: string}); + const newClient = new Bigtable({ + projectId, + apiEndpoint, + }); + newClient._metricsConfigManager = new ClientSideMetricsConfigManager([ + metricHandler, + ]); + return newClient; +} + +function getHandlerFromExporter(Exporter: typeof CloudMonitoringExporter) { + return proxyquire('../src/client-side-metrics/gcp-metrics-handler.js', { + './exporter': { + CloudMonitoringExporter: Exporter, + }, + }).GCPMetricsHandler; +} + +function checkFirstResponseLatency(requestHandled: OnOperationCompleteData) { + assert( + Object.prototype.hasOwnProperty.call( + requestHandled, + 'firstResponseLatency', + ), + ); + if ( + requestHandled.metricsCollectorData.method === MethodName.READ_ROWS || + requestHandled.metricsCollectorData.method === MethodName.READ_ROW + ) { + assert(requestHandled.firstResponseLatency); + assert(requestHandled.firstResponseLatency > 0); + } else { + assert.strictEqual(requestHandled.firstResponseLatency, 0); + } + delete requestHandled.firstResponseLatency; +} + +function readRowsAssertionCheck( + projectId: string, + requestsHandled: (OnOperationCompleteData | OnAttemptCompleteData)[] = [], + method: string, + streaming: string, +) { + assert.strictEqual(requestsHandled.length, 4); + const firstRequest = requestsHandled[0] as any; + // We would expect these parameters to be different every time so delete + // them from the comparison after checking they exist. + assert(firstRequest.attemptLatency); + assert(firstRequest.serverLatency); + delete firstRequest.attemptLatency; + delete firstRequest.serverLatency; + delete firstRequest.metricsCollectorData.appProfileId; + assert.deepStrictEqual(firstRequest, { + connectivityErrorCount: 0, + streaming, + status: '0', + client_name: 'nodejs-bigtable', + metricsCollectorData: { + instanceId: instanceId1, + table: 'my-table', + cluster: 'fake-cluster3', + zone: 'us-west1-c', + method, + }, + projectId, + }); + const secondRequest = requestsHandled[1] as any; + // We would expect these parameters to be different every time so delete + // them from the comparison after checking they exist. + checkFirstResponseLatency(secondRequest); + assert(secondRequest.operationLatency); + delete secondRequest.operationLatency; + assert(secondRequest.applicationLatency < 10); + delete secondRequest.applicationLatency; + delete secondRequest.metricsCollectorData.appProfileId; + assert.deepStrictEqual(secondRequest, { + status: '0', + streaming, + client_name: 'nodejs-bigtable', + metricsCollectorData: { + instanceId: instanceId1, + cluster: 'fake-cluster3', + zone: 'us-west1-c', + method, + table: 'my-table', + }, + projectId, + retryCount: 0, + }); + // We would expect these parameters to be different every time so delete + // them from the comparison after checking they exist. + const thirdRequest = requestsHandled[2] as any; + assert(thirdRequest.attemptLatency); + assert(thirdRequest.serverLatency); + delete thirdRequest.attemptLatency; + delete thirdRequest.serverLatency; + delete thirdRequest.metricsCollectorData.appProfileId; + assert.deepStrictEqual(thirdRequest, { + connectivityErrorCount: 0, + streaming, + status: '0', + client_name: 'nodejs-bigtable', + metricsCollectorData: { + instanceId: instanceId1, + table: 'my-table2', + cluster: 'fake-cluster3', + zone: 'us-west1-c', + method, + }, + projectId, + }); + const fourthRequest = requestsHandled[3] as any; + // We would expect these parameters to be different every time so delete + // them from the comparison after checking they exist. + checkFirstResponseLatency(fourthRequest); + assert(fourthRequest.operationLatency); + assert(fourthRequest.applicationLatency < 10); + delete fourthRequest.operationLatency; + delete fourthRequest.applicationLatency; + delete fourthRequest.metricsCollectorData.appProfileId; + assert.deepStrictEqual(fourthRequest, { + status: '0', + streaming, + client_name: 'nodejs-bigtable', + metricsCollectorData: { + instanceId: instanceId1, + cluster: 'fake-cluster3', + zone: 'us-west1-c', + method, + table: 'my-table2', + }, + projectId, + retryCount: 0, + }); +} + +function checkCheckAndMutateCall( + projectId: string, + requestsHandled: (OnOperationCompleteData | OnAttemptCompleteData)[] = [], +) { + readRowsAssertionCheck( + projectId, + requestsHandled, + 'Bigtable.CheckAndMutateRow', + 'false', + ); +} + +function checkMultiRowCall( + projectId: string, + requestsHandled: (OnOperationCompleteData | OnAttemptCompleteData)[] = [], +) { + readRowsAssertionCheck( + projectId, + requestsHandled, + 'Bigtable.ReadRows', + 'true', + ); +} + +function checkMutateRowsCall( + projectId: string, + requestsHandled: (OnOperationCompleteData | OnAttemptCompleteData)[] = [], +) { + readRowsAssertionCheck( + projectId, + requestsHandled, + 'Bigtable.MutateRows', + 'true', + ); +} + +function checkSampleRowKeysCall( + projectId: string, + requestsHandled: (OnOperationCompleteData | OnAttemptCompleteData)[] = [], +) { + readRowsAssertionCheck( + projectId, + requestsHandled, + 'Bigtable.SampleRowKeys', + 'true', + ); +} + +function checkMutateRowCall( + projectId: string, + requestsHandled: (OnOperationCompleteData | OnAttemptCompleteData)[] = [], +) { + readRowsAssertionCheck( + projectId, + requestsHandled, + 'Bigtable.MutateRow', + 'false', + ); +} + +function checkSingleRowCall( + projectId: string, + requestsHandled: (OnOperationCompleteData | OnAttemptCompleteData)[] = [], +) { + readRowsAssertionCheck( + projectId, + requestsHandled, + 'Bigtable.ReadRow', + 'false', + ); +} + +function checkReadModifyWriteRowCall( + projectId: string, + requestsHandled: (OnOperationCompleteData | OnAttemptCompleteData)[] = [], +) { + readRowsAssertionCheck( + projectId, + requestsHandled, + 'Bigtable.ReadModifyWriteRow', + 'false', + ); +} + +const entry = { + cf1: { + column: 1, + }, +}; + +const mutation = { + key: 'rowId', + data: entry, + method: Mutation.methods.INSERT, +}; + +const filter: RawFilter = { + family: 'cf1', + value: 'alincoln', +}; + +const mutations = [ + { + method: 'delete', + data: ['cf1:alincoln'], + }, +]; + +const rules = [ + { + column: 'cf1:column', + append: 'c', + }, +]; + +/** + * Checks if metrics have been published to Google Cloud Monitoring. + * + * This asynchronous function queries Google Cloud Monitoring to verify + * that the expected metrics from the Bigtable client library have been + * successfully published. It constructs a `MetricServiceClient` to + * interact with the Cloud Monitoring API and retrieves time series data + * for a predefined set of metrics. The test passes if time series data + * is found for each of the specified metrics within a defined time + * interval. + * + * @param {string} projectId The Google Cloud project ID where metrics are + * expected to be published. + * @throws {Error} If no time series data is found for any of the specified + * metrics, indicating that the metrics were not successfully published to + * Cloud Monitoring. + */ +async function checkForPublishedMetrics(projectId: string) { + const monitoringClient = new MetricServiceClient(); // Correct instantiation + const now = Math.floor(Date.now() / 1000); + const filters = [ + 'metric.type="bigtable.googleapis.com/client/attempt_latencies"', + 'metric.type="bigtable.googleapis.com/client/operation_latencies"', + 'metric.type="bigtable.googleapis.com/client/retry_count"', + 'metric.type="bigtable.googleapis.com/client/server_latencies"', + 'metric.type="bigtable.googleapis.com/client/first_response_latencies"', + ]; + for (let i = 0; i < filters.length; i++) { + const filter = filters[i]; + const [series] = await monitoringClient.listTimeSeries({ + name: `projects/${projectId}`, + interval: { + endTime: { + seconds: now, + nanos: 0, + }, + startTime: { + seconds: now - 1000 * 60 * 60 * 24, + nanos: 0, + }, + }, + filter, + }); + assert(series.length > 0); + } +} + +describe('Bigtable/ClientSideMetricsAllMethods', () => { + let defaultProjectId: string; + + before(async () => { + for (const bigtable of [ + new Bigtable(), + new Bigtable({projectId: SECOND_PROJECT_ID}), + ]) { + for (const instanceId of [instanceId1, instanceId2]) { + await setupBigtableWithInsert(bigtable, columnFamilyId, instanceId, [ + tableId1, + tableId2, + ]); + } + defaultProjectId = await new Promise((resolve, reject) => { + bigtable.getProjectId_((err: Error | null, projectId?: string) => { + if (err) { + reject(err); + } else { + resolve(projectId as string); + } + }); + }); + } + }); + + after(async () => { + for (const bigtable of [ + new Bigtable(), + new Bigtable({projectId: SECOND_PROJECT_ID}), + ]) { + try { + // If the instance has been deleted already by another source, we don't + // want this after hook to block the continuous integration pipeline. + const instance = bigtable.instance(instanceId1); + await instance.delete({}); + } catch (e) { + console.warn('The instance has been deleted already'); + } + try { + // If the instance has been deleted already by another source, we don't + // want this after hook to block the continuous integration pipeline. + const instance = bigtable.instance(instanceId2); + await instance.delete({}); + } catch (e) { + console.warn('The instance has been deleted already'); + } + } + }); + + describe('Bigtable/ClientSideMetricsToGCM', () => { + // This test suite ensures that for each test all the export calls are + // successful even when multiple instances and tables are created. + async function mockBigtable( + projectId: string, + done: mocha.Done, + apiEndpoint?: string, + ) { + /* + The exporter is called every x seconds, but we only want to test the value + it receives once. Since done cannot be called multiple times in mocha, + exported variable ensures we only test the value export receives one time. + */ + let exported = false; + /* + We need to create a timeout here because if we don't then mocha shuts down + the test as it is sleeping before the GCPMetricsHandler has a chance to + export the data. + */ + const timeout = setTimeout(() => { + if (!exported) { + done( + new Error( + 'The exporters have not completed yet and the timeout is over', + ), + ); + } + }, 120000); + + class TestExporter extends CloudMonitoringExporter { + constructor(options: ClientOptions) { + super(options); + } + + async export( + metrics: ResourceMetrics, + resultCallback: (result: ExportResult) => void, + ): Promise { + try { + await super.export(metrics, (result: ExportResult) => { + if (!exported) { + exported = true; + try { + clearTimeout(timeout); + // The test passes when the code is 0 because that means the + // result from calling export was successful. + assert.strictEqual(result.code, 0); + resultCallback({code: 0}); + void checkForPublishedMetrics(projectId).then(() => { + done(); + }); + } catch (error) { + // The code here isn't 0 so we report the original error to the mocha test runner. + done(result); + done(error); + } + } else { + resultCallback({code: 0}); + } + }); + } catch (error) { + done(error); + } + } + } + + return getFakeBigtable( + projectId, + getHandlerFromExporter(TestExporter), + apiEndpoint, + ); + } + + describe('SampleRowKeys', () => { + it('should send the metrics to Google Cloud Monitoring for a SampleRowKeys call', done => { + (async () => { + try { + const bigtable = await mockBigtable(defaultProjectId, done); + for (const instanceId of [instanceId1, instanceId2]) { + await setupBigtableWithInsert( + bigtable, + columnFamilyId, + instanceId, + [tableId1, tableId2], + ); + const instance = bigtable.instance(instanceId); + const table = instance.table(tableId1); + await table.sampleRowKeys(); + const table2 = instance.table(tableId2); + await table2.sampleRowKeys(); + } + } catch (e) { + done(new Error('An error occurred while running the script')); + done(e); + } + })().catch(err => { + throw err; + }); + }); + it('should send the metrics to Google Cloud Monitoring for a custom endpoint', done => { + (async () => { + try { + const bigtable = await mockBigtable( + defaultProjectId, + done, + 'bogus-endpoint', + ); + const instance = bigtable.instance(instanceId1); + const table = instance.table(tableId1); + try { + // This call will fail because we are trying to hit a bogus endpoint. + // The idea here is that we just want to record at least one metric + // so that the exporter gets executed. + await table.sampleRowKeys(); + } catch (e: unknown) { + // Try blocks just need a catch/finally block. + } + } catch (e) { + done(new Error('An error occurred while running the script')); + done(e); + } + })().catch(err => { + throw err; + }); + }); + it('should send the metrics to Google Cloud Monitoring for a SampleRowKeys call with a second project', done => { + (async () => { + try { + // This is the second project the test is configured to work with: + const projectId = SECOND_PROJECT_ID; + const bigtable = await mockBigtable(projectId, done); + for (const instanceId of [instanceId1, instanceId2]) { + await setupBigtableWithInsert( + bigtable, + columnFamilyId, + instanceId, + [tableId1, tableId2], + ); + const instance = bigtable.instance(instanceId); + const table = instance.table(tableId1); + await table.sampleRowKeys(); + const table2 = instance.table(tableId2); + await table2.sampleRowKeys(); + } + } catch (e) { + done(new Error('An error occurred while running the script')); + done(e); + } + })().catch(err => { + throw err; + }); + }); + }); + describe('ReadRows', () => { + it('should send the metrics to Google Cloud Monitoring for a ReadRows call', done => { + (async () => { + try { + const bigtable = await mockBigtable(defaultProjectId, done); + for (const instanceId of [instanceId1, instanceId2]) { + await setupBigtableWithInsert( + bigtable, + columnFamilyId, + instanceId, + [tableId1, tableId2], + ); + const instance = bigtable.instance(instanceId); + const table = instance.table(tableId1); + await table.getRows(); + const table2 = instance.table(tableId2); + await table2.getRows(); + } + } catch (e) { + done(new Error('An error occurred while running the script')); + done(e); + } + })().catch(err => { + throw err; + }); + }); + it('should send the metrics to Google Cloud Monitoring for a custom endpoint', done => { + (async () => { + try { + const bigtable = await mockBigtable( + defaultProjectId, + done, + 'bogus-endpoint', + ); + const instance = bigtable.instance(instanceId1); + const table = instance.table(tableId1); + try { + // This call will fail because we are trying to hit a bogus endpoint. + // The idea here is that we just want to record at least one metric + // so that the exporter gets executed. + await table.getRows(); + } catch (e: unknown) { + // Try blocks just need a catch/finally block. + } + } catch (e) { + done(new Error('An error occurred while running the script')); + done(e); + } + })().catch(err => { + throw err; + }); + }); + it('should send the metrics to Google Cloud Monitoring for a ReadRows call with a second project', done => { + (async () => { + try { + // This is the second project the test is configured to work with: + const projectId = SECOND_PROJECT_ID; + const bigtable = await mockBigtable(projectId, done); + for (const instanceId of [instanceId1, instanceId2]) { + await setupBigtableWithInsert( + bigtable, + columnFamilyId, + instanceId, + [tableId1, tableId2], + ); + const instance = bigtable.instance(instanceId); + const table = instance.table(tableId1); + await table.getRows(); + const table2 = instance.table(tableId2); + await table2.getRows(); + } + } catch (e) { + done(new Error('An error occurred while running the script')); + done(e); + } + })().catch(err => { + throw err; + }); + }); + }); + describe('ReadModifyWriteRow', () => { + it('should send the metrics to Google Cloud Monitoring for a ReadModifyWriteRow call', done => { + (async () => { + try { + const bigtable = await mockBigtable(defaultProjectId, done); + for (const instanceId of [instanceId1, instanceId2]) { + await setupBigtableWithInsert( + bigtable, + columnFamilyId, + instanceId, + [tableId1, tableId2], + ); + const instance = bigtable.instance(instanceId); + const table = instance.table(tableId1); + const row = table.row(columnFamilyId); + await row.createRules(rules); + const table2 = instance.table(tableId2); + const row2 = table2.row(columnFamilyId); + await row2.createRules(rules); + } + } catch (e) { + done(new Error('An error occurred while running the script')); + done(e); + } + })().catch(err => { + throw err; + }); + }); + it('should send the metrics to Google Cloud Monitoring for a custom endpoint', done => { + (async () => { + try { + const bigtable = await mockBigtable( + defaultProjectId, + done, + 'bogus-endpoint', + ); + const instance = bigtable.instance(instanceId1); + const table = instance.table(tableId1); + try { + // This call will fail because we are trying to hit a bogus endpoint. + // The idea here is that we just want to record at least one metric + // so that the exporter gets executed. + const row = table.row(columnFamilyId); + await row.createRules(rules); + } catch (e: unknown) { + // Try blocks just need a catch/finally block. + } + } catch (e) { + done(new Error('An error occurred while running the script')); + done(e); + } + })().catch(err => { + throw err; + }); + }); + it('should send the metrics to Google Cloud Monitoring for a ReadModifyWriteRow call with a second project', done => { + (async () => { + try { + // This is the second project the test is configured to work with: + const projectId = SECOND_PROJECT_ID; + const bigtable = await mockBigtable(projectId, done); + for (const instanceId of [instanceId1, instanceId2]) { + await setupBigtableWithInsert( + bigtable, + columnFamilyId, + instanceId, + [tableId1, tableId2], + ); + const instance = bigtable.instance(instanceId); + const table = instance.table(tableId1); + const row = table.row(columnFamilyId); + await row.createRules(rules); + const table2 = instance.table(tableId2); + const row2 = table2.row(columnFamilyId); + await row2.createRules(rules); + } + } catch (e) { + done(new Error('An error occurred while running the script')); + done(e); + } + })().catch(err => { + throw err; + }); + }); + }); + describe('MutateRows', () => { + it('should send the metrics to Google Cloud Monitoring for a MutateRows call', done => { + (async () => { + try { + const bigtable = await mockBigtable(defaultProjectId, done); + for (const instanceId of [instanceId1, instanceId2]) { + await setupBigtableWithInsert( + bigtable, + columnFamilyId, + instanceId, + [tableId1, tableId2], + ); + const instance = bigtable.instance(instanceId); + const table = instance.table(tableId1); + await table.mutate(mutation); + const table2 = instance.table(tableId2); + await table2.mutate(mutation); + } + } catch (e) { + done(new Error('An error occurred while running the script')); + done(e); + } + })().catch(err => { + throw err; + }); + }); + it('should send the metrics to Google Cloud Monitoring for a custom endpoint', done => { + (async () => { + try { + const bigtable = await mockBigtable( + defaultProjectId, + done, + 'bogus-endpoint', + ); + const instance = bigtable.instance(instanceId1); + const table = instance.table(tableId1); + try { + // This call will fail because we are trying to hit a bogus endpoint. + // The idea here is that we just want to record at least one metric + // so that the exporter gets executed. + await table.mutate(mutation); + } catch (e: unknown) { + // Try blocks just need a catch/finally block. + } + } catch (e) { + done(new Error('An error occurred while running the script')); + done(e); + } + })().catch(err => { + throw err; + }); + }); + it('should send the metrics to Google Cloud Monitoring for a MutateRows call with a second project', done => { + (async () => { + try { + // This is the second project the test is configured to work with: + const projectId = SECOND_PROJECT_ID; + const bigtable = await mockBigtable(projectId, done); + for (const instanceId of [instanceId1, instanceId2]) { + await setupBigtableWithInsert( + bigtable, + columnFamilyId, + instanceId, + [tableId1, tableId2], + ); + const instance = bigtable.instance(instanceId); + const table = instance.table(tableId1); + await table.mutate(mutation); + const table2 = instance.table(tableId2); + await table2.mutate(mutation); + } + } catch (e) { + done(new Error('An error occurred while running the script')); + done(e); + } + })().catch(err => { + throw err; + }); + }); + }); + describe('MutateRow', () => { + it('should send the metrics to Google Cloud Monitoring for a single point MutateRows call', done => { + (async () => { + try { + const bigtable = await mockBigtable(defaultProjectId, done); + for (const instanceId of [instanceId1, instanceId2]) { + await setupBigtableWithInsert( + bigtable, + columnFamilyId, + instanceId, + [tableId1, tableId2], + ); + const instance = bigtable.instance(instanceId); + const table = instance.table(tableId1); + const row = table.row('gwashington'); + await row.save(entry); + const table2 = instance.table(tableId2); + const row2 = table2.row('gwashington'); + await row2.save(entry); + } + } catch (e) { + done(new Error('An error occurred while running the script')); + done(e); + } + })().catch(err => { + throw err; + }); + }); + it('should send the metrics to Google Cloud Monitoring for a custom endpoint', done => { + (async () => { + try { + const bigtable = await mockBigtable( + defaultProjectId, + done, + 'bogus-endpoint', + ); + const instance = bigtable.instance(instanceId1); + const table = instance.table(tableId1); + try { + // This call will fail because we are trying to hit a bogus endpoint. + // The idea here is that we just want to record at least one metric + // so that the exporter gets executed. + const row = table.row('gwashington'); + await row.save(entry); + } catch (e: unknown) { + // Try blocks just need a catch/finally block. + } + } catch (e) { + done(new Error('An error occurred while running the script')); + done(e); + } + })().catch(err => { + throw err; + }); + }); + it('should send the metrics to Google Cloud Monitoring for a MutateRow call with a second project and a single point', done => { + (async () => { + try { + // This is the second project the test is configured to work with: + const projectId = SECOND_PROJECT_ID; + const bigtable = await mockBigtable(projectId, done); + for (const instanceId of [instanceId1, instanceId2]) { + await setupBigtableWithInsert( + bigtable, + columnFamilyId, + instanceId, + [tableId1, tableId2], + ); + const instance = bigtable.instance(instanceId); + const table = instance.table(tableId1); + const row = table.row('gwashington'); + await row.save(entry); + const table2 = instance.table(tableId2); + const row2 = table2.row('gwashington'); + await row2.save(entry); + } + } catch (e) { + done(new Error('An error occurred while running the script')); + done(e); + } + })().catch(err => { + throw err; + }); + }); + }); + describe('CheckAndMutateRow', () => { + it('should send the metrics to Google Cloud Monitoring for a CheckAndMutateRow call', done => { + (async () => { + try { + const bigtable = await mockBigtable(defaultProjectId, done); + for (const instanceId of [instanceId1, instanceId2]) { + await setupBigtableWithInsert( + bigtable, + columnFamilyId, + instanceId, + [tableId1, tableId2], + ); + const instance = bigtable.instance(instanceId); + const table = instance.table(tableId1); + const row = table.row(columnFamilyId); + await row.filter(filter, {onMatch: mutations}); + const table2 = instance.table(tableId2); + const row2 = table2.row(columnFamilyId); + await row2.filter(filter, {onMatch: mutations}); + } + } catch (e) { + done(new Error('An error occurred while running the script')); + done(e); + } + })().catch(err => { + throw err; + }); + }); + it('should send the metrics to Google Cloud Monitoring for a custom endpoint', done => { + (async () => { + try { + const bigtable = await mockBigtable( + defaultProjectId, + done, + 'bogus-endpoint', + ); + const instance = bigtable.instance(instanceId1); + const table = instance.table(tableId1); + try { + // This call will fail because we are trying to hit a bogus endpoint. + // The idea here is that we just want to record at least one metric + // so that the exporter gets executed. + const row = table.row(columnFamilyId); + await row.filter(filter, {onMatch: mutations}); + } catch (e: unknown) { + // Try blocks just need a catch/finally block. + } + } catch (e) { + done(new Error('An error occurred while running the script')); + done(e); + } + })().catch(err => { + throw err; + }); + }); + it('should send the metrics to Google Cloud Monitoring for a CheckAndMutateRow call with a second project', done => { + (async () => { + try { + // This is the second project the test is configured to work with: + const projectId = SECOND_PROJECT_ID; + const bigtable = await mockBigtable(projectId, done); + for (const instanceId of [instanceId1, instanceId2]) { + await setupBigtableWithInsert( + bigtable, + columnFamilyId, + instanceId, + [tableId1, tableId2], + ); + const instance = bigtable.instance(instanceId); + const table = instance.table(tableId1); + const row = table.row(columnFamilyId); + await row.filter(filter, {onMatch: mutations}); + const table2 = instance.table(tableId2); + const row2 = table2.row(columnFamilyId); + await row2.filter(filter, {onMatch: mutations}); + } + } catch (e) { + done(new Error('An error occurred while running the script')); + done(e); + } + })().catch(err => { + throw err; + }); + }); + }); + }); + describe('Bigtable/ClientSideMetricsToGCMTimeout', () => { + // This test suite simulates a situation where the user creates multiple + // clients and ensures that the exporter doesn't produce any errors even + // when multiple clients are attempting an export. + async function mockBigtable( + projectId: string, + done: mocha.Done, + onExportSuccess?: () => void, + ) { + class TestExporter extends CloudMonitoringExporter { + constructor(options: ClientOptions) { + super(options); + } + + async export( + metrics: ResourceMetrics, + resultCallback: (result: ExportResult) => void, + ): Promise { + try { + await super.export(metrics, async (result: ExportResult) => { + try { + // The code is expected to be 0 because the + // result from calling export was successful. + assert.strictEqual(result.code, 0); + resultCallback({code: 0}); + if (onExportSuccess) { + onExportSuccess(); + } + } catch (error) { + // The code here isn't 0 so we report the original error to the + // mocha test runner. + // The test fails here because it means that an export was + // unsuccessful. + done(result); + done(error); + resultCallback({code: 0}); + } + }); + } catch (error) { + done(error); + resultCallback({code: 0}); + } + } + } + + /* + Below we mock out the table so that it sends the metrics to a test exporter + that will still send the metrics to Google Cloud Monitoring, but then also + ensure the export was successful and pass the test with code 0 if it is + successful. + */ + return getFakeBigtable(projectId, getHandlerFromExporter(TestExporter)); + } + + describe('SampleRowKeys', () => { + it('should send the metrics to Google Cloud Monitoring for a SampleRowKeys call', done => { + let testFinished = false; + /* + We need to create a timeout here because if we don't then mocha shuts down + the test as it is sleeping before the GCPMetricsHandler has a chance to + export the data. When the timeout is finished, if there were no export + errors then the test passes. + */ + setTimeout(() => { + testFinished = true; + done(); + }, 120000); + (async () => { + try { + const bigtable1 = await mockBigtable(defaultProjectId, done); + const bigtable2 = await mockBigtable(defaultProjectId, done); + for (const bigtable of [bigtable1, bigtable2]) { + for (const instanceId of [instanceId1, instanceId2]) { + await setupBigtableWithInsert( + bigtable, + columnFamilyId, + instanceId, + [tableId1, tableId2], + ); + const instance = bigtable.instance(instanceId); + const table = instance.table(tableId1); + await table.sampleRowKeys(); + const table2 = instance.table(tableId2); + await table2.sampleRowKeys(); + } + } + } catch (e) { + done(new Error('An error occurred while running the script')); + done(e); + } + })().catch(err => { + throw err; + }); + }); + it('should send the metrics to Google Cloud Monitoring for a SampleRowKeys call with thirty clients', done => { + /* + We need to create a timeout here because if we don't then mocha shuts down + the test as it is sleeping before the GCPMetricsHandler has a chance to + export the data. When the timeout is finished, if there were no export + errors then the test passes. + */ + const testTimeout = setTimeout(() => { + done(new Error('The test timed out')); + }, 480000); + let testComplete = false; + const numClients = 30; + (async () => { + try { + const bigtableList = []; + const completedSet = new Set(); + for ( + let bigtableCount = 0; + bigtableCount < numClients; + bigtableCount++ + ) { + const currentCount = bigtableCount; + const onExportSuccess = () => { + completedSet.add(currentCount); + if (completedSet.size === numClients) { + // If every client has completed the export then pass the test. + clearTimeout(testTimeout); + if (!testComplete) { + testComplete = true; + done(); + } + } + }; + bigtableList.push( + await mockBigtable(defaultProjectId, done, onExportSuccess), + ); + } + for (const bigtable of bigtableList) { + for (const instanceId of [instanceId1, instanceId2]) { + await setupBigtableWithInsert( + bigtable, + columnFamilyId, + instanceId, + [tableId1, tableId2], + ); + const instance = bigtable.instance(instanceId); + const table = instance.table(tableId1); + await table.sampleRowKeys(); + const table2 = instance.table(tableId2); + await table2.sampleRowKeys(); + } + } + } catch (e) { + done(e); + done(new Error('An error occurred while running the script')); + } + })().catch(err => { + throw err; + }); + }); + }); + describe('ReadRows', () => { + it('should send the metrics to Google Cloud Monitoring for a ReadRows call', done => { + let testFinished = false; + /* + We need to create a timeout here because if we don't then mocha shuts down + the test as it is sleeping before the GCPMetricsHandler has a chance to + export the data. When the timeout is finished, if there were no export + errors then the test passes. + */ + setTimeout(() => { + testFinished = true; + done(); + }, 120000); + (async () => { + try { + const bigtable1 = await mockBigtable(defaultProjectId, done); + const bigtable2 = await mockBigtable(defaultProjectId, done); + for (const bigtable of [bigtable1, bigtable2]) { + for (const instanceId of [instanceId1, instanceId2]) { + await setupBigtableWithInsert( + bigtable, + columnFamilyId, + instanceId, + [tableId1, tableId2], + ); + const instance = bigtable.instance(instanceId); + const table = instance.table(tableId1); + await table.getRows(); + const table2 = instance.table(tableId2); + await table2.getRows(); + } + } + } catch (e) { + done(new Error('An error occurred while running the script')); + done(e); + } + })().catch(err => { + throw err; + }); + }); + it('should send the metrics to Google Cloud Monitoring for a ReadRows call with thirty clients', done => { + /* + We need to create a timeout here because if we don't then mocha shuts down + the test as it is sleeping before the GCPMetricsHandler has a chance to + export the data. When the timeout is finished, if there were no export + errors then the test passes. + */ + const testTimeout = setTimeout(() => { + done(new Error('The test timed out')); + }, 480000); + let testComplete = false; + const numClients = 30; + (async () => { + try { + const bigtableList = []; + const completedSet = new Set(); + for ( + let bigtableCount = 0; + bigtableCount < numClients; + bigtableCount++ + ) { + const currentCount = bigtableCount; + const onExportSuccess = () => { + completedSet.add(currentCount); + if (completedSet.size === numClients) { + // If every client has completed the export then pass the test. + clearTimeout(testTimeout); + if (!testComplete) { + testComplete = true; + done(); + } + } + }; + bigtableList.push( + await mockBigtable(defaultProjectId, done, onExportSuccess), + ); + } + for (const bigtable of bigtableList) { + for (const instanceId of [instanceId1, instanceId2]) { + await setupBigtableWithInsert( + bigtable, + columnFamilyId, + instanceId, + [tableId1, tableId2], + ); + const instance = bigtable.instance(instanceId); + const table = instance.table(tableId1); + await table.getRows(); + const table2 = instance.table(tableId2); + await table2.getRows(); + } + } + } catch (e) { + done(e); + done(new Error('An error occurred while running the script')); + } + })().catch(err => { + throw err; + }); + }); + }); + describe('ReadModifyWriteRow', () => { + it('should send the metrics to Google Cloud Monitoring for a ReadModifyWriteRow call', done => { + let testFinished = false; + /* + We need to create a timeout here because if we don't then mocha shuts down + the test as it is sleeping before the GCPMetricsHandler has a chance to + export the data. When the timeout is finished, if there were no export + errors then the test passes. + */ + setTimeout(() => { + testFinished = true; + done(); + }, 120000); + (async () => { + try { + const bigtable1 = await mockBigtable(defaultProjectId, done); + const bigtable2 = await mockBigtable(defaultProjectId, done); + for (const bigtable of [bigtable1, bigtable2]) { + for (const instanceId of [instanceId1, instanceId2]) { + await setupBigtableWithInsert( + bigtable, + columnFamilyId, + instanceId, + [tableId1, tableId2], + ); + const instance = bigtable.instance(instanceId); + const table = instance.table(tableId1); + const row = table.row(columnFamilyId); + await row.createRules(rules); + const table2 = instance.table(tableId2); + const row2 = table2.row(columnFamilyId); + await row2.createRules(rules); + } + } + } catch (e) { + done(new Error('An error occurred while running the script')); + done(e); + } + })().catch(err => { + throw err; + }); + }); + it('should send the metrics to Google Cloud Monitoring for a ReadModifyWriteRow call with thirty clients', done => { + /* + We need to create a timeout here because if we don't then mocha shuts down + the test as it is sleeping before the GCPMetricsHandler has a chance to + export the data. When the timeout is finished, if there were no export + errors then the test passes. + */ + const testTimeout = setTimeout(() => { + done(new Error('The test timed out')); + }, 480000); + let testComplete = false; + const numClients = 30; + (async () => { + try { + const bigtableList = []; + const completedSet = new Set(); + for ( + let bigtableCount = 0; + bigtableCount < numClients; + bigtableCount++ + ) { + const currentCount = bigtableCount; + const onExportSuccess = () => { + completedSet.add(currentCount); + if (completedSet.size === numClients) { + // If every client has completed the export then pass the test. + clearTimeout(testTimeout); + if (!testComplete) { + testComplete = true; + done(); + } + } + }; + bigtableList.push( + await mockBigtable(defaultProjectId, done, onExportSuccess), + ); + } + for (const bigtable of bigtableList) { + for (const instanceId of [instanceId1, instanceId2]) { + await setupBigtableWithInsert( + bigtable, + columnFamilyId, + instanceId, + [tableId1, tableId2], + ); + const instance = bigtable.instance(instanceId); + const table = instance.table(tableId1); + const row = table.row(columnFamilyId); + await row.createRules(rules); + const table2 = instance.table(tableId2); + const row2 = table2.row(columnFamilyId); + await row2.createRules(rules); + } + } + } catch (e) { + done(e); + done(new Error('An error occurred while running the script')); + } + })().catch(err => { + throw err; + }); + }); + }); + describe('MutateRows', () => { + it('should send the metrics to Google Cloud Monitoring for a MutateRows call', done => { + let testFinished = false; + /* + We need to create a timeout here because if we don't then mocha shuts down + the test as it is sleeping before the GCPMetricsHandler has a chance to + export the data. When the timeout is finished, if there were no export + errors then the test passes. + */ + setTimeout(() => { + testFinished = true; + done(); + }, 120000); + (async () => { + try { + const bigtable1 = await mockBigtable(defaultProjectId, done); + const bigtable2 = await mockBigtable(defaultProjectId, done); + for (const bigtable of [bigtable1, bigtable2]) { + for (const instanceId of [instanceId1, instanceId2]) { + await setupBigtableWithInsert( + bigtable, + columnFamilyId, + instanceId, + [tableId1, tableId2], + ); + const instance = bigtable.instance(instanceId); + const table = instance.table(tableId1); + await table.mutate(mutation); + const table2 = instance.table(tableId2); + await table2.mutate(mutation); + } + } + } catch (e) { + done(new Error('An error occurred while running the script')); + done(e); + } + })().catch(err => { + throw err; + }); + }); + it('should send the metrics to Google Cloud Monitoring for a MutateRows call with thirty clients', done => { + /* + We need to create a timeout here because if we don't then mocha shuts down + the test as it is sleeping before the GCPMetricsHandler has a chance to + export the data. When the timeout is finished, if there were no export + errors then the test passes. + */ + const testTimeout = setTimeout(() => { + done(new Error('The test timed out')); + }, 480000); + let testComplete = false; + const numClients = 30; + (async () => { + try { + const bigtableList = []; + const completedSet = new Set(); + for ( + let bigtableCount = 0; + bigtableCount < numClients; + bigtableCount++ + ) { + const currentCount = bigtableCount; + const onExportSuccess = () => { + completedSet.add(currentCount); + if (completedSet.size === numClients) { + // If every client has completed the export then pass the test. + clearTimeout(testTimeout); + if (!testComplete) { + testComplete = true; + done(); + } + } + }; + bigtableList.push( + await mockBigtable(defaultProjectId, done, onExportSuccess), + ); + } + for (const bigtable of bigtableList) { + for (const instanceId of [instanceId1, instanceId2]) { + await setupBigtableWithInsert( + bigtable, + columnFamilyId, + instanceId, + [tableId1, tableId2], + ); + const instance = bigtable.instance(instanceId); + const table = instance.table(tableId1); + await table.mutate(mutation); + const table2 = instance.table(tableId2); + await table2.mutate(mutation); + } + } + } catch (e) { + done(e); + done(new Error('An error occurred while running the script')); + } + })().catch(err => { + throw err; + }); + }); + }); + describe('MutateRow', () => { + it('should send the metrics to Google Cloud Monitoring for a MutateRows call for a single point', done => { + let testFinished = false; + /* + We need to create a timeout here because if we don't then mocha shuts down + the test as it is sleeping before the GCPMetricsHandler has a chance to + export the data. When the timeout is finished, if there were no export + errors then the test passes. + */ + setTimeout(() => { + testFinished = true; + done(); + }, 120000); + (async () => { + try { + const bigtable1 = await mockBigtable(defaultProjectId, done); + const bigtable2 = await mockBigtable(defaultProjectId, done); + for (const bigtable of [bigtable1, bigtable2]) { + for (const instanceId of [instanceId1, instanceId2]) { + await setupBigtableWithInsert( + bigtable, + columnFamilyId, + instanceId, + [tableId1, tableId2], + ); + const instance = bigtable.instance(instanceId); + const table = instance.table(tableId1); + const row = table.row('gwashington'); + await row.save(entry); + const table2 = instance.table(tableId2); + const row2 = table2.row('gwashington'); + await row2.save(entry); + } + } + } catch (e) { + done(new Error('An error occurred while running the script')); + done(e); + } + })().catch(err => { + throw err; + }); + }); + it('should send the metrics to Google Cloud Monitoring for a single MutateRow call with thirty clients', done => { + /* + We need to create a timeout here because if we don't then mocha shuts down + the test as it is sleeping before the GCPMetricsHandler has a chance to + export the data. When the timeout is finished, if there were no export + errors then the test passes. + */ + const testTimeout = setTimeout(() => { + done(new Error('The test timed out')); + }, 480000); + let testComplete = false; + const numClients = 30; + (async () => { + try { + const bigtableList = []; + const completedSet = new Set(); + for ( + let bigtableCount = 0; + bigtableCount < numClients; + bigtableCount++ + ) { + const currentCount = bigtableCount; + const onExportSuccess = () => { + completedSet.add(currentCount); + if (completedSet.size === numClients) { + // If every client has completed the export then pass the test. + clearTimeout(testTimeout); + if (!testComplete) { + testComplete = true; + done(); + } + } + }; + bigtableList.push( + await mockBigtable(defaultProjectId, done, onExportSuccess), + ); + } + for (const bigtable of bigtableList) { + for (const instanceId of [instanceId1, instanceId2]) { + await setupBigtableWithInsert( + bigtable, + columnFamilyId, + instanceId, + [tableId1, tableId2], + ); + const instance = bigtable.instance(instanceId); + const table = instance.table(tableId1); + const row = table.row('gwashington'); + await row.save(entry); + const table2 = instance.table(tableId2); + const row2 = table2.row('gwashington'); + await row2.save(entry); + } + } + } catch (e) { + done(e); + done(new Error('An error occurred while running the script')); + } + })().catch(err => { + throw err; + }); + }); + }); + describe('CheckAndMutateRow', () => { + it('should send the metrics to Google Cloud Monitoring for a CheckAndMutateRow call', done => { + let testFinished = false; + /* + We need to create a timeout here because if we don't then mocha shuts down + the test as it is sleeping before the GCPMetricsHandler has a chance to + export the data. When the timeout is finished, if there were no export + errors then the test passes. + */ + setTimeout(() => { + testFinished = true; + done(); + }, 120000); + (async () => { + try { + const bigtable1 = await mockBigtable(defaultProjectId, done); + const bigtable2 = await mockBigtable(defaultProjectId, done); + for (const bigtable of [bigtable1, bigtable2]) { + for (const instanceId of [instanceId1, instanceId2]) { + await setupBigtableWithInsert( + bigtable, + columnFamilyId, + instanceId, + [tableId1, tableId2], + ); + const instance = bigtable.instance(instanceId); + const table = instance.table(tableId1); + const row = table.row(columnFamilyId); + await row.filter(filter, {onMatch: mutations}); + const table2 = instance.table(tableId2); + const row2 = table2.row(columnFamilyId); + await row2.filter(filter, {onMatch: mutations}); + } + } + } catch (e) { + done(new Error('An error occurred while running the script')); + done(e); + } + })().catch(err => { + throw err; + }); + }); + it('should send the metrics to Google Cloud Monitoring for a single CheckAndMutateRow call with thirty clients', done => { + /* + We need to create a timeout here because if we don't then mocha shuts down + the test as it is sleeping before the GCPMetricsHandler has a chance to + export the data. When the timeout is finished, if there were no export + errors then the test passes. + */ + const testTimeout = setTimeout(() => { + done(new Error('The test timed out')); + }, 480000); + let testComplete = false; + const numClients = 30; + (async () => { + try { + const bigtableList = []; + const completedSet = new Set(); + for ( + let bigtableCount = 0; + bigtableCount < numClients; + bigtableCount++ + ) { + const currentCount = bigtableCount; + const onExportSuccess = () => { + completedSet.add(currentCount); + if (completedSet.size === numClients) { + // If every client has completed the export then pass the test. + clearTimeout(testTimeout); + if (!testComplete) { + testComplete = true; + done(); + } + } + }; + bigtableList.push( + await mockBigtable(defaultProjectId, done, onExportSuccess), + ); + } + for (const bigtable of bigtableList) { + for (const instanceId of [instanceId1, instanceId2]) { + await setupBigtableWithInsert( + bigtable, + columnFamilyId, + instanceId, + [tableId1, tableId2], + ); + const instance = bigtable.instance(instanceId); + const table = instance.table(tableId1); + const row = table.row(columnFamilyId); + await row.filter(filter, {onMatch: mutations}); + const table2 = instance.table(tableId2); + const row2 = table2.row(columnFamilyId); + await row2.filter(filter, {onMatch: mutations}); + } + } + } catch (e) { + done(e); + done(new Error('An error occurred while running the script')); + } + })().catch(err => { + throw err; + }); + }); + }); + }); + describe('Bigtable/ClientSideMetricsToMetricsHandler', () => { + async function getFakeBigtableWithHandler( + projectId: string, + done: mocha.Done, + checkFn: ( + projectId: string, + requestsHandled: (OnOperationCompleteData | OnAttemptCompleteData)[], + ) => void, + ) { + let handlerRequestCount = 0; + class TestGCPMetricsHandler extends TestMetricsHandler { + projectId = projectId; + onOperationComplete(data: OnOperationCompleteData) { + handlerRequestCount++; + try { + super.onOperationComplete(data); + if (handlerRequestCount > 1) { + checkFn(projectId, this.requestsHandled); + done(); + } + } catch (e) { + done(e); + } + } + } + return getFakeBigtable(projectId, TestGCPMetricsHandler); + } + + /** + * Returns a bigtable client with a test metrics handler that will check + * the metrics it receives and pass/fail the test if we get the right + * metrics. This method doesn't insert data so that extra mutateRows calls + * don't get made because those extra calls will produce different metrics. + * + * @param projectId + * @param done + * @param checkFn + */ + async function mockBigtableWithNoInserts( + projectId: string, + done: mocha.Done, + checkFn: ( + projectId: string, + requestsHandled: (OnOperationCompleteData | OnAttemptCompleteData)[], + ) => void, + ) { + const bigtable = await getFakeBigtableWithHandler( + projectId, + done, + checkFn, + ); + await setupBigtable(bigtable, columnFamilyId, instanceId1, [ + tableId1, + tableId2, + ]); + return bigtable; + } + describe('SampleRowKeys', () => { + it('should send the metrics to the metrics handler for a SampleRowKeys call', done => { + (async () => { + const bigtable = await mockBigtableWithNoInserts( + defaultProjectId, + done, + checkSampleRowKeysCall, + ); + const instance = bigtable.instance(instanceId1); + const table = instance.table(tableId1); + await table.sampleRowKeys(); + const table2 = instance.table(tableId2); + await table2.sampleRowKeys(); + })().catch(err => { + throw err; + }); + }); + it('should pass the projectId to the metrics handler properly', done => { + (async () => { + const bigtable = await mockBigtableWithNoInserts( + defaultProjectId, + done, + checkSampleRowKeysCall, + ); + const instance = bigtable.instance(instanceId1); + const table = instance.table(tableId1); + await table.sampleRowKeys(); + const table2 = instance.table(tableId2); + await table2.sampleRowKeys(); + })().catch(err => { + throw err; + }); + }); + }); + describe('ReadRows', () => { + it('should send the metrics to the metrics handler for a ReadRows call', done => { + (async () => { + const bigtable = await mockBigtableWithNoInserts( + defaultProjectId, + done, + checkMultiRowCall, + ); + const instance = bigtable.instance(instanceId1); + const table = instance.table(tableId1); + await table.getRows(); + const table2 = instance.table(tableId2); + await table2.getRows(); + })().catch(err => { + throw err; + }); + }); + it('should pass the projectId to the metrics handler properly', done => { + (async () => { + const bigtable = await mockBigtableWithNoInserts( + defaultProjectId, + done, + checkMultiRowCall, + ); + const instance = bigtable.instance(instanceId1); + const table = instance.table(tableId1); + await table.getRows(); + const table2 = instance.table(tableId2); + await table2.getRows(); + })().catch(err => { + throw err; + }); + }); + it('should send the metrics to the metrics handler for a single row read', done => { + (async () => { + try { + const projectId = SECOND_PROJECT_ID; + const bigtable = await mockBigtableWithNoInserts( + projectId, + done, + checkSingleRowCall, + ); + const instance = bigtable.instance(instanceId1); + const table = instance.table(tableId1); + const row = new Row(table, 'rowId'); + await row.get(); + const table2 = instance.table(tableId2); + const row2 = new Row(table2, 'rowId'); + await row2.get(); + } catch (e) { + done(e); + } + })().catch(err => { + throw err; + }); + }); + }); + describe('ReadModifyWriteRow', () => { + it('should send the metrics to the metrics handler for a ReadModifyWriteRow call', done => { + (async () => { + const bigtable = await mockBigtableWithNoInserts( + defaultProjectId, + done, + checkReadModifyWriteRowCall, + ); + const instance = bigtable.instance(instanceId1); + const table = instance.table(tableId1); + const row = table.row(columnFamilyId); + await row.createRules(rules); + const table2 = instance.table(tableId2); + const row2 = table2.row(columnFamilyId); + await row2.createRules(rules); + })().catch(err => { + throw err; + }); + }); + it('should pass the projectId to the metrics handler properly', done => { + (async () => { + const bigtable = await mockBigtableWithNoInserts( + defaultProjectId, + done, + checkReadModifyWriteRowCall, + ); + const instance = bigtable.instance(instanceId1); + const table = instance.table(tableId1); + const row = table.row(columnFamilyId); + await row.createRules(rules); + const table2 = instance.table(tableId2); + const row2 = table2.row(columnFamilyId); + await row2.createRules(rules); + })().catch(err => { + throw err; + }); + }); + }); + describe('MutateRows', () => { + it('should send the metrics to the metrics handler for a MutateRows call', done => { + (async () => { + const bigtable = await mockBigtableWithNoInserts( + defaultProjectId, + done, + checkMutateRowsCall, + ); + const instance = bigtable.instance(instanceId1); + const table = instance.table(tableId1); + await table.mutate(mutation); + const table2 = instance.table(tableId2); + await table2.mutate(mutation); + })().catch(err => { + throw err; + }); + }); + it('should pass the projectId to the metrics handler properly', done => { + (async () => { + const bigtable = await mockBigtableWithNoInserts( + defaultProjectId, + done, + checkMutateRowsCall, + ); + const instance = bigtable.instance(instanceId1); + const table = instance.table(tableId1); + await table.mutate(mutation); + const table2 = instance.table(tableId2); + await table2.mutate(mutation); + })().catch(err => { + throw err; + }); + }); + }); + describe('MutateRow', () => { + it('should send the metrics to the metrics handler for a MutateRows call for a single point', done => { + (async () => { + const bigtable = await mockBigtableWithNoInserts( + defaultProjectId, + done, + checkMutateRowCall, + ); + const instance = bigtable.instance(instanceId1); + const table = instance.table(tableId1); + const row = table.row('gwashington'); + await row.save(entry); + const table2 = instance.table(tableId2); + const row2 = table2.row('gwashington'); + await row2.save(entry); + })().catch(err => { + throw err; + }); + }); + it('should pass the projectId to the metrics handler properly for a single mutateRow point', done => { + (async () => { + const bigtable = await mockBigtableWithNoInserts( + defaultProjectId, + done, + checkMutateRowCall, + ); + const instance = bigtable.instance(instanceId1); + const table = instance.table(tableId1); + const row = table.row('gwashington'); + await row.save(entry); + const table2 = instance.table(tableId2); + const row2 = table2.row('gwashington'); + await row2.save(entry); + })().catch(err => { + throw err; + }); + }); + }); + describe('CheckAndMutateRow', () => { + it('should send the metrics to the metrics handler for a CheckAndMutateRow call for a single point', done => { + (async () => { + const bigtable = await mockBigtableWithNoInserts( + defaultProjectId, + done, + checkCheckAndMutateCall, + ); + const instance = bigtable.instance(instanceId1); + const table = instance.table(tableId1); + const row = table.row(columnFamilyId); + await row.filter(filter, {onMatch: mutations}); + const table2 = instance.table(tableId2); + const row2 = table2.row(columnFamilyId); + await row2.filter(filter, {onMatch: mutations}); + })().catch(err => { + throw err; + }); + }); + }); + }); +}); diff --git a/system-test/client-side-metrics-setup-table.ts b/system-test/client-side-metrics-setup-table.ts index 003d9b3bf..4bac03e49 100644 --- a/system-test/client-side-metrics-setup-table.ts +++ b/system-test/client-side-metrics-setup-table.ts @@ -56,6 +56,19 @@ export async function setupBigtable( } } // Add some data so that a firstResponseLatency is recorded. + } +} + +export async function setupBigtableWithInsert( + bigtable: Bigtable, + columnFamilyId: string, + instanceId: string, + tableIds: string[], +) { + await setupBigtable(bigtable, columnFamilyId, instanceId, tableIds); + const instance = bigtable.instance(instanceId); + const tables = tableIds.map(tableId => instance.table(tableId)); + for (const currentTable of tables) { await currentTable.insert([ { key: 'rowId', diff --git a/system-test/client-side-metrics.ts b/system-test/client-side-metrics.ts index 3d6f9d450..819088d79 100644 --- a/system-test/client-side-metrics.ts +++ b/system-test/client-side-metrics.ts @@ -22,25 +22,119 @@ import {ResourceMetrics} from '@opentelemetry/sdk-metrics'; import * as assert from 'assert'; import {GCPMetricsHandler} from '../src/client-side-metrics/gcp-metrics-handler'; import * as proxyquire from 'proxyquire'; -import {Bigtable} from '../src'; +import {Bigtable, BigtableOptions} from '../src'; import {Row} from '../src/row'; -import {setupBigtable} from './client-side-metrics-setup-table'; +import { + setupBigtable, + setupBigtableWithInsert, +} from './client-side-metrics-setup-table'; import {TestMetricsHandler} from '../test-common/test-metrics-handler'; import { OnAttemptCompleteData, OnOperationCompleteData, } from '../src/client-side-metrics/metrics-handler'; import {ClientOptions} from 'google-gax'; -import {ClientSideMetricsConfigManager} from '../src/client-side-metrics/metrics-config-manager'; +import {PassThrough} from 'stream'; +import {generateChunksFromRequest} from '../test-common/utils/readRowsImpl'; +import {TabularApiSurface} from '../src/tabular-api-surface'; import {MetricServiceClient} from '@google-cloud/monitoring'; +import {generateId} from './common'; const SECOND_PROJECT_ID = 'cfdb-sdk-node-tests'; +const instanceId1 = generateId('instance'); +const instanceId2 = generateId('instance'); +const tableId1 = 'my-table'; +const tableId2 = 'my-table2'; +const columnFamilyId = 'cf1'; + +class FakeHRTime { + startTime = BigInt(0); + + bigint() { + this.startTime += BigInt(1000000000); + return this.startTime; + } +} + +/** + * Retrieves a mocked Bigtable client without a metrics handler attached. + * + * This function is used for testing purposes to create a Bigtable client + * with mocked dependencies, allowing for controlled simulation of stream + * behavior and time measurements. It does not include any metrics + * handling. + * + * @param {BigtableOptions} options Options to configure the Bigtable client. + * @param {FakeHRTime} hrtime An instance of FakeHRTime to control time. + * @returns {Bigtable} A mocked Bigtable client. + */ +function getFakeBigtableWithoutHandler( + options: BigtableOptions, + hrtime: FakeHRTime, +) { + const FakeTimedStream = proxyquire('../src/timed-stream.js', { + 'node:process': { + hrtime, + }, + }).TimedStream; + const FakeCreateReadStreamInternal = proxyquire( + '../src/utils/createReadStreamInternal.js', + { + '../timed-stream.js': { + TimedStream: FakeTimedStream, + }, + }, + ).createReadStreamInternal; + const FakeTabularApiSurface = proxyquire('../src/tabular-api-surface.js', { + './utils/createReadStreamInternal.js': { + createReadStreamInternal: FakeCreateReadStreamInternal, + }, + }).TabularApiSurface; + const FakeTable: TabularApiSurface = proxyquire('../src/table.js', { + './tabular-api-surface.js': {TabularApiSurface: FakeTabularApiSurface}, + }).Table; + const FakeInstance = proxyquire('../src/instance.js', { + './table.js': {Table: FakeTable}, + }).Instance; + const FakeBigtable = proxyquire('../src/index.js', { + './instance.js': {Instance: FakeInstance}, + }).Bigtable; + return new FakeBigtable(options); +} +/** + * This method retrieves a bigtable client that sends metrics to the metrics + * handler class. The client also uses metrics collectors that have + * deterministic timestamps associated with the various latency metrics so that + * they can be tested. + * + * @param projectId + * @param metricsHandlerClass + * @param hrtime + * @param apiEndpoint + */ function getFakeBigtable( projectId: string, metricsHandlerClass: typeof GCPMetricsHandler | typeof TestMetricsHandler, + hrtime: FakeHRTime, apiEndpoint?: string, ) { + const FakeOperationsMetricsCollector = proxyquire( + '../src/client-side-metrics/operation-metrics-collector.js', + { + 'node:process': { + hrtime, + }, + }, + ).OperationMetricsCollector; + const FakeClientSideMetricsConfigManager = proxyquire( + '../src/client-side-metrics/metrics-config-manager.js', + { + './operation-metrics-collector.js': { + OperationMetricsCollector: FakeOperationsMetricsCollector, + }, + }, + ).ClientSideMetricsConfigManager; // Normally the options passed into the client are passed into the metrics // handler so when we mock out the metrics handler, it really should have // the same options that are passed into the client. @@ -49,8 +143,8 @@ function getFakeBigtable( apiEndpoint, }; const metricHandler = new metricsHandlerClass(options); - const newClient = new Bigtable(options); - newClient._metricsConfigManager = new ClientSideMetricsConfigManager([ + const newClient = getFakeBigtableWithoutHandler(options, hrtime); + newClient._metricsConfigManager = new FakeClientSideMetricsConfigManager([ metricHandler, ]); return newClient; @@ -85,7 +179,7 @@ function readRowsAssertionCheck( status: '0', client_name: 'nodejs-bigtable', metricsCollectorData: { - instanceId: 'emulator-test-instance', + instanceId: instanceId1, table: 'my-table', cluster: 'fake-cluster3', zone: 'us-west1-c', @@ -98,17 +192,17 @@ function readRowsAssertionCheck( // them from the comparison after checking they exist. assert(secondRequest.operationLatency); assert(secondRequest.firstResponseLatency); - assert(secondRequest.applicationLatencies); + assert.strictEqual(secondRequest.applicationLatency, 0); delete secondRequest.operationLatency; delete secondRequest.firstResponseLatency; - delete secondRequest.applicationLatencies; + delete secondRequest.applicationLatency; delete secondRequest.metricsCollectorData.appProfileId; assert.deepStrictEqual(secondRequest, { status: '0', streaming, client_name: 'nodejs-bigtable', metricsCollectorData: { - instanceId: 'emulator-test-instance', + instanceId: instanceId1, cluster: 'fake-cluster3', zone: 'us-west1-c', method, @@ -131,7 +225,7 @@ function readRowsAssertionCheck( status: '0', client_name: 'nodejs-bigtable', metricsCollectorData: { - instanceId: 'emulator-test-instance', + instanceId: instanceId1, table: 'my-table2', cluster: 'fake-cluster3', zone: 'us-west1-c', @@ -144,17 +238,17 @@ function readRowsAssertionCheck( // them from the comparison after checking they exist. assert(fourthRequest.operationLatency); assert(fourthRequest.firstResponseLatency); - assert(fourthRequest.applicationLatencies); + assert.strictEqual(fourthRequest.applicationLatency, 0); delete fourthRequest.operationLatency; delete fourthRequest.firstResponseLatency; - delete fourthRequest.applicationLatencies; + delete fourthRequest.applicationLatency; delete fourthRequest.metricsCollectorData.appProfileId; assert.deepStrictEqual(fourthRequest, { status: '0', streaming, client_name: 'nodejs-bigtable', metricsCollectorData: { - instanceId: 'emulator-test-instance', + instanceId: instanceId1, cluster: 'fake-cluster3', zone: 'us-west1-c', method, @@ -237,49 +331,52 @@ async function checkForPublishedMetrics(projectId: string) { } describe('Bigtable/ClientSideMetrics', () => { - const instanceId1 = 'emulator-test-instance'; - const instanceId2 = 'emulator-test-instance2'; - const tableId1 = 'my-table'; - const tableId2 = 'my-table2'; - const columnFamilyId = 'cf1'; let defaultProjectId: string; before(async () => { - const bigtable = new Bigtable(); - for (const instanceId of [instanceId1, instanceId2]) { - await setupBigtable(bigtable, columnFamilyId, instanceId, [ - tableId1, - tableId2, - ]); - } - defaultProjectId = await new Promise((resolve, reject) => { - bigtable.getProjectId_((err: Error | null, projectId?: string) => { - if (err) { - reject(err); - } else { - resolve(projectId as string); - } + for (const bigtable of [ + new Bigtable(), + new Bigtable({projectId: SECOND_PROJECT_ID}), + ]) { + for (const instanceId of [instanceId1, instanceId2]) { + await setupBigtableWithInsert(bigtable, columnFamilyId, instanceId, [ + tableId1, + tableId2, + ]); + } + defaultProjectId = await new Promise((resolve, reject) => { + bigtable.getProjectId_((err: Error | null, projectId?: string) => { + if (err) { + reject(err); + } else { + resolve(projectId as string); + } + }); }); - }); + } }); after(async () => { - const bigtable = new Bigtable(); - try { - // If the instance has been deleted already by another source, we don't - // want this after hook to block the continuous integration pipeline. - const instance = bigtable.instance(instanceId1); - await instance.delete({}); - } catch (e) { - console.warn('The instance has been deleted already'); - } - try { - // If the instance has been deleted already by another source, we don't - // want this after hook to block the continuous integration pipeline. - const instance = bigtable.instance(instanceId2); - await instance.delete({}); - } catch (e) { - console.warn('The instance has been deleted already'); + for (const bigtable of [ + new Bigtable(), + new Bigtable({projectId: SECOND_PROJECT_ID}), + ]) { + try { + // If the instance has been deleted already by another source, we don't + // want this after hook to block the continuous integration pipeline. + const instance = bigtable.instance(instanceId1); + await instance.delete({}); + } catch (e) { + console.warn('The instance has been deleted already'); + } + try { + // If the instance has been deleted already by another source, we don't + // want this after hook to block the continuous integration pipeline. + const instance = bigtable.instance(instanceId2); + await instance.delete({}); + } catch (e) { + console.warn('The instance has been deleted already'); + } } }); @@ -357,6 +454,7 @@ describe('Bigtable/ClientSideMetrics', () => { return getFakeBigtable( projectId, getHandlerFromExporter(TestExporter), + new FakeHRTime(), apiEndpoint, ); } @@ -487,7 +585,11 @@ describe('Bigtable/ClientSideMetrics', () => { ensure the export was successful and pass the test with code 0 if it is successful. */ - return getFakeBigtable(projectId, getHandlerFromExporter(TestExporter)); + return getFakeBigtable( + projectId, + getHandlerFromExporter(TestExporter), + new FakeHRTime(), + ); } it('should send the metrics to Google Cloud Monitoring for a ReadRows call', done => { @@ -587,6 +689,277 @@ describe('Bigtable/ClientSideMetrics', () => { }); }); describe('Bigtable/ClientSideMetricsToMetricsHandler', () => { + /** + * This method is called to do a bunch of basic assertion checks that are + * expected to pass when a client makes two getRows calls. + * + * @param projectId The projectId the request was made with + * @param requestsHandled The requests handled by the mock metrics handler + */ + function standardAssertionChecks( + projectId: string, + requestsHandled: (OnOperationCompleteData | OnAttemptCompleteData)[], + ) { + const firstRequest = requestsHandled[0] as any; + // We would expect these parameters to be different every time so delete + // them from the comparison after checking they exist. + assert(firstRequest.attemptLatency); + assert(firstRequest.serverLatency); + delete firstRequest.attemptLatency; + delete firstRequest.serverLatency; + delete firstRequest.metricsCollectorData.appProfileId; + assert.deepStrictEqual(firstRequest, { + connectivityErrorCount: 0, + streaming: 'true', + status: '0', + client_name: 'nodejs-bigtable', + metricsCollectorData: { + instanceId: instanceId1, + table: 'my-table', + cluster: 'fake-cluster3', + zone: 'us-west1-c', + method: 'Bigtable.ReadRows', + }, + projectId, + }); + const secondRequest = requestsHandled[1] as any; + // We would expect these parameters to be different every time so delete + // them from the comparison after checking they exist. + assert(secondRequest.operationLatency); + assert(secondRequest.firstResponseLatency); + assert.strictEqual(secondRequest.applicationLatency, 0); + delete secondRequest.operationLatency; + delete secondRequest.firstResponseLatency; + delete secondRequest.applicationLatency; + delete secondRequest.metricsCollectorData.appProfileId; + assert.deepStrictEqual(secondRequest, { + status: '0', + streaming: 'true', + client_name: 'nodejs-bigtable', + metricsCollectorData: { + instanceId: instanceId1, + cluster: 'fake-cluster3', + zone: 'us-west1-c', + method: 'Bigtable.ReadRows', + table: 'my-table', + }, + projectId, + retryCount: 0, + }); + // We would expect these parameters to be different every time so delete + // them from the comparison after checking they exist. + const thirdRequest = requestsHandled[2] as any; + assert(thirdRequest.attemptLatency); + assert(thirdRequest.serverLatency); + delete thirdRequest.attemptLatency; + delete thirdRequest.serverLatency; + delete thirdRequest.metricsCollectorData.appProfileId; + assert.deepStrictEqual(thirdRequest, { + connectivityErrorCount: 0, + streaming: 'true', + status: '0', + client_name: 'nodejs-bigtable', + metricsCollectorData: { + instanceId: instanceId1, + table: 'my-table2', + cluster: 'fake-cluster3', + zone: 'us-west1-c', + method: 'Bigtable.ReadRows', + }, + projectId, + }); + const fourthRequest = requestsHandled[3] as any; + // We would expect these parameters to be different every time so delete + // them from the comparison after checking they exist. + assert(fourthRequest.operationLatency); + assert(fourthRequest.firstResponseLatency); + assert.strictEqual(fourthRequest.applicationLatency, 0); + delete fourthRequest.operationLatency; + delete fourthRequest.firstResponseLatency; + delete fourthRequest.applicationLatency; + delete fourthRequest.metricsCollectorData.appProfileId; + assert.deepStrictEqual(fourthRequest, { + status: '0', + streaming: 'true', + client_name: 'nodejs-bigtable', + metricsCollectorData: { + instanceId: instanceId1, + cluster: 'fake-cluster3', + zone: 'us-west1-c', + method: 'Bigtable.ReadRows', + table: 'my-table2', + }, + projectId, + retryCount: 0, + }); + } + + /** + * This method is called to check that the requests handled from the + * readRows streaming calls have the right application latencies and other + * appropriate metrics. + * + * @param projectId The projectId the request was made with + * @param requestsHandled The requests handled by the mock metrics handler + */ + function applicationLatenciesChecksHandlers( + projectId: string, + requestsHandled: (OnOperationCompleteData | OnAttemptCompleteData)[], + ) { + const compareValue = [ + { + projectId, + serverLatency: undefined, + attemptLatency: 23000, + connectivityErrorCount: 0, + streaming: 'true', + status: '0', + client_name: 'nodejs-bigtable', + metricsCollectorData: { + instanceId: instanceId1, + table: 'my-table', + cluster: '', + zone: 'global', + method: 'Bigtable.ReadRows', + }, + }, + { + projectId, + status: '0', + streaming: 'true', + metricsCollectorData: { + instanceId: instanceId1, + table: 'my-table', + cluster: '', + zone: 'global', + method: 'Bigtable.ReadRows', + }, + client_name: 'nodejs-bigtable', + operationLatency: 25000, + retryCount: 0, + firstResponseLatency: 2000, + applicationLatency: 18000, // From the stream for loop + }, + { + projectId, + attemptLatency: 2000, + serverLatency: undefined, + connectivityErrorCount: 0, + streaming: 'true', + status: '0', + client_name: 'nodejs-bigtable', + metricsCollectorData: { + instanceId: instanceId1, + table: 'my-table2', + cluster: '', + zone: 'global', + method: 'Bigtable.ReadRows', + }, + }, + { + projectId, + status: '0', + streaming: 'true', + metricsCollectorData: { + instanceId: instanceId1, + table: 'my-table2', + cluster: '', + zone: 'global', + method: 'Bigtable.ReadRows', + }, + client_name: 'nodejs-bigtable', + operationLatency: 4000, + retryCount: 0, + firstResponseLatency: 2000, + applicationLatency: 0, // This is from the getRows call. + }, + ]; + assert.deepStrictEqual(requestsHandled, compareValue); + } + + /** + * This method is called to check that the requests handled from the + * readRows streaming calls have the right application latencies and other + * appropriate metrics. + * + * @param projectId The projectId the request was made with + * @param requestsHandled The requests handled by the mock metrics handler + */ + function applicationLatenciesChecksIterative( + projectId: string, + requestsHandled: (OnOperationCompleteData | OnAttemptCompleteData)[], + ) { + const compareValue = [ + { + projectId, + serverLatency: undefined, + attemptLatency: 28000, + connectivityErrorCount: 0, + streaming: 'true', + status: '0', + client_name: 'nodejs-bigtable', + metricsCollectorData: { + instanceId: instanceId1, + table: 'my-table', + cluster: '', + zone: 'global', + method: 'Bigtable.ReadRows', + }, + }, + { + projectId, + status: '0', + streaming: 'true', + metricsCollectorData: { + instanceId: instanceId1, + table: 'my-table', + cluster: '', + zone: 'global', + method: 'Bigtable.ReadRows', + }, + client_name: 'nodejs-bigtable', + operationLatency: 30000, + retryCount: 0, + firstResponseLatency: 2000, + applicationLatency: 16000, // From the stream for loop + }, + { + projectId, + attemptLatency: 2000, + serverLatency: undefined, + connectivityErrorCount: 0, + streaming: 'true', + status: '0', + client_name: 'nodejs-bigtable', + metricsCollectorData: { + instanceId: instanceId1, + table: 'my-table2', + cluster: '', + zone: 'global', + method: 'Bigtable.ReadRows', + }, + }, + { + projectId, + status: '0', + streaming: 'true', + metricsCollectorData: { + instanceId: instanceId1, + table: 'my-table2', + cluster: '', + zone: 'global', + method: 'Bigtable.ReadRows', + }, + client_name: 'nodejs-bigtable', + operationLatency: 4000, + retryCount: 0, + firstResponseLatency: 2000, + applicationLatency: 0, // This is from the getRows call. + }, + ]; + assert.deepStrictEqual(requestsHandled, compareValue); + } + async function mockBigtable( projectId: string, done: mocha.Done, @@ -594,6 +967,7 @@ describe('Bigtable/ClientSideMetrics', () => { projectId: string, requestsHandled: (OnOperationCompleteData | OnAttemptCompleteData)[], ) => void, + hrtime: FakeHRTime, ) { let handlerRequestCount = 0; class TestGCPMetricsHandler extends TestMetricsHandler { @@ -603,6 +977,7 @@ describe('Bigtable/ClientSideMetrics', () => { try { super.onOperationComplete(data); if (handlerRequestCount > 1) { + assert.strictEqual(this.requestsHandled.length, 4); checkFn(projectId, this.requestsHandled); done(); } @@ -612,7 +987,11 @@ describe('Bigtable/ClientSideMetrics', () => { } } - const bigtable = getFakeBigtable(projectId, TestGCPMetricsHandler); + const bigtable = getFakeBigtable( + projectId, + TestGCPMetricsHandler, + hrtime, + ); await setupBigtable(bigtable, columnFamilyId, instanceId1, [ tableId1, tableId2, @@ -622,10 +1001,12 @@ describe('Bigtable/ClientSideMetrics', () => { it('should send the metrics to the metrics handler for a ReadRows call', done => { (async () => { + const projectId = defaultProjectId; const bigtable = await mockBigtable( - defaultProjectId, + projectId, done, - checkMultiRowCall, + standardAssertionChecks, + new FakeHRTime(), ); const instance = bigtable.instance(instanceId1); const table = instance.table(tableId1); @@ -638,16 +1019,22 @@ describe('Bigtable/ClientSideMetrics', () => { }); it('should pass the projectId to the metrics handler properly', done => { (async () => { - const bigtable = await mockBigtable( - defaultProjectId, - done, - checkMultiRowCall, - ); - const instance = bigtable.instance(instanceId1); - const table = instance.table(tableId1); - await table.getRows(); - const table2 = instance.table(tableId2); - await table2.getRows(); + try { + const projectId = SECOND_PROJECT_ID; + const bigtable = await mockBigtable( + projectId, + done, + standardAssertionChecks, + new FakeHRTime(), + ); + const instance = bigtable.instance(instanceId1); + const table = instance.table(tableId1); + await table.getRows(); + const table2 = instance.table(tableId2); + await table2.getRows(); + } catch (e) { + done(e); + } })().catch(err => { throw err; }); @@ -660,6 +1047,7 @@ describe('Bigtable/ClientSideMetrics', () => { projectId, done, checkSingleRowCall, + new FakeHRTime(), ); const instance = bigtable.instance(instanceId1); const table = instance.table(tableId1); @@ -675,5 +1063,127 @@ describe('Bigtable/ClientSideMetrics', () => { throw err; }); }); + it('should record the right metrics when handling rows through readrows stream', done => { + (async () => { + try { + const hrtime = new FakeHRTime(); + const bigtable = await mockBigtable( + SECOND_PROJECT_ID, + done, + applicationLatenciesChecksHandlers, + hrtime, + ); + const instance = bigtable.instance(instanceId1); + const table = instance.table(tableId1); + // Mock stream behaviour: + // @ts-ignore + table.bigtable.request = () => { + const chunks = generateChunksFromRequest( + {}, + { + chunkSize: 1, + valueSize: 1, + errorAfterChunkNo: 2, + keyFrom: 0, + keyTo: 3, + chunksPerResponse: 1, + debugLog: () => {}, + }, + ); + const data = { + lastRowKey: chunks[2].rowKey, + chunks, + }; + const stream = new PassThrough({ + objectMode: true, + }); + + setImmediate(() => { + stream.emit('data', data); + stream.emit('end'); + }); + + return stream; + }; + const stream = table.createReadStream(); + stream.on('data', () => { + // Simulate an application that takes 5 seconds between row reads. + hrtime.bigint(); + hrtime.bigint(); + hrtime.bigint(); + hrtime.bigint(); + hrtime.bigint(); + }); + stream.on('end', async () => { + const table2 = instance.table(tableId2); + await table2.getRows(); + }); + } catch (e) { + done(e); + } + })().catch(err => { + throw err; + }); + }); + it('should record the right metrics when iterating through readrows stream', done => { + (async () => { + try { + const hrtime = new FakeHRTime(); + const bigtable = await mockBigtable( + SECOND_PROJECT_ID, + done, + applicationLatenciesChecksIterative, + hrtime, + ); + const instance = bigtable.instance(instanceId1); + const table = instance.table(tableId1); + // Mock stream behaviour: + // @ts-ignore + table.bigtable.request = () => { + const chunks = generateChunksFromRequest( + {}, + { + chunkSize: 1, + valueSize: 1, + errorAfterChunkNo: 2, + keyFrom: 0, + keyTo: 3, + chunksPerResponse: 1, + debugLog: () => {}, + }, + ); + const data = { + lastRowKey: chunks[2].rowKey, + chunks, + }; + const stream = new PassThrough({ + objectMode: true, + }); + + setImmediate(() => { + stream.emit('data', data); + stream.emit('end'); + }); + + return stream; + }; + const stream = table.createReadStream(); + for await (const row of stream) { + // Simulate an application that takes 5 seconds between row reads. + hrtime.bigint(); + hrtime.bigint(); + hrtime.bigint(); + hrtime.bigint(); + hrtime.bigint(); + } + const table2 = instance.table(tableId2); + await table2.getRows(); + } catch (e) { + done(e); + } + })().catch(err => { + throw err; + }); + }); }); }); diff --git a/system-test/read-modify-write-row-interceptors.ts b/system-test/read-modify-write-row-interceptors.ts index 2996e0b3d..96d847fe8 100644 --- a/system-test/read-modify-write-row-interceptors.ts +++ b/system-test/read-modify-write-row-interceptors.ts @@ -28,7 +28,7 @@ import { } from '../src/client-side-metrics/client-side-metrics-attributes'; import * as assert from 'assert'; import {status as GrpcStatus} from '@grpc/grpc-js'; -import {withInterceptors} from '../src/interceptor'; +import {createMetricsUnaryInterceptorProvider} from '../src/client-side-metrics/metric-interceptor'; const INSTANCE_ID = 'isolated-rmw-instance'; const TABLE_ID = 'isolated-rmw-table'; @@ -209,7 +209,10 @@ describe('Bigtable/ReadModifyWriteRowInterceptorMetrics', () => { ], appProfileId: undefined, }, - gaxOpts: withInterceptors({}, metricsCollector), + gaxOpts: createMetricsUnaryInterceptorProvider( + {}, + metricsCollector, + ), }, (err: ServiceError | null, resp?: any) => { if (err) { @@ -222,7 +225,7 @@ describe('Bigtable/ReadModifyWriteRowInterceptorMetrics', () => { }); // 4. Tell the metrics collector the attempt is over metricsCollector.onAttemptComplete(GrpcStatus.OK); - metricsCollector.onOperationComplete(GrpcStatus.OK); + metricsCollector.onOperationComplete(GrpcStatus.OK, 0); // 5. Return results of method call to the user return responseArray; }; diff --git a/system-test/read-rows-acceptance-tests.ts b/system-test/read-rows-acceptance-tests.ts index 8d70db610..530ac03d7 100644 --- a/system-test/read-rows-acceptance-tests.ts +++ b/system-test/read-rows-acceptance-tests.ts @@ -41,7 +41,7 @@ class FakeOperationMetricsCollector extends OperationMetricsCollector { onAttemptStart() {} onAttemptComplete() {} onOperationStart() {} - handleStatusAndMetadata() {} + wrapRequest() {} onMetadataReceived() {} onRowReachesUser() {} onStatusMetadataReceived() {} diff --git a/test-common/expected-otel-export-input.ts b/test-common/expected-otel-export-input.ts index 7561dda38..0833d169b 100644 --- a/test-common/expected-otel-export-input.ts +++ b/test-common/expected-otel-export-input.ts @@ -69,7 +69,7 @@ export const expectedOtelExportConvertedValue = { value: { distributionValue: { count: '1', - mean: 10000, + mean: 6000, bucketOptions: { explicitBuckets: { bounds: [ @@ -167,7 +167,7 @@ export const expectedOtelExportConvertedValue = { value: { distributionValue: { count: '1', - mean: 4000, + mean: 2000, bucketOptions: { explicitBuckets: { bounds: [ @@ -208,7 +208,6 @@ export const expectedOtelExportConvertedValue = { '0', '0', '0', - '0', '1', '0', '0', @@ -220,6 +219,7 @@ export const expectedOtelExportConvertedValue = { '0', '0', '0', + '0', ], }, }, @@ -265,7 +265,7 @@ export const expectedOtelExportConvertedValue = { value: { distributionValue: { count: '1', - mean: 3000, + mean: 1000, bucketOptions: { explicitBuckets: { bounds: [ @@ -305,9 +305,9 @@ export const expectedOtelExportConvertedValue = { '0', '0', '0', + '1', '0', '0', - '1', '0', '0', '0', @@ -437,8 +437,8 @@ export const expectedOtelExportConvertedValue = { }, value: { distributionValue: { - count: '2', - mean: 1000, + count: '1', + mean: 1256, bucketOptions: { explicitBuckets: { bounds: [ @@ -478,8 +478,8 @@ export const expectedOtelExportConvertedValue = { '0', '0', '0', - '2', '0', + '1', '0', '0', '0', @@ -947,9 +947,9 @@ export const expectedOtelExportInput = { startTime: [123, 789], endTime: [456, 789], value: { - min: 10000, - max: 10000, - sum: 10000, + min: 6000, + max: 6000, + sum: 6000, buckets: { boundaries: [ 0, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, @@ -1003,9 +1003,9 @@ export const expectedOtelExportInput = { startTime: [123, 789], endTime: [456, 789], value: { - min: 4000, - max: 4000, - sum: 4000, + min: 2000, + max: 2000, + sum: 2000, buckets: { boundaries: [ 0, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, @@ -1015,7 +1015,7 @@ export const expectedOtelExportInput = { ], counts: [ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ], }, @@ -1037,9 +1037,9 @@ export const expectedOtelExportInput = { startTime: [123, 789], endTime: [456, 789], value: { - min: 3000, - max: 3000, - sum: 3000, + min: 1000, + max: 1000, + sum: 1000, buckets: { boundaries: [ 0, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, @@ -1049,7 +1049,7 @@ export const expectedOtelExportInput = { ], counts: [ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ], }, @@ -1136,9 +1136,9 @@ export const expectedOtelExportInput = { startTime: [123, 789], endTime: [456, 789], value: { - min: 1000, - max: 1000, - sum: 2000, + min: 1256, + max: 1256, + sum: 1256, buckets: { boundaries: [ 0, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, @@ -1148,11 +1148,11 @@ export const expectedOtelExportInput = { ], counts: [ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ], }, - count: 2, + count: 1, }, }, ], diff --git a/test-common/metrics-handler-fixture.ts b/test-common/metrics-handler-fixture.ts index 39e89cfcd..8fb6b2bb2 100644 --- a/test-common/metrics-handler-fixture.ts +++ b/test-common/metrics-handler-fixture.ts @@ -14,7 +14,7 @@ export const expectedRequestsHandled = [ { - attemptLatency: 4000, + attemptLatency: 2000, serverLatency: 101, connectivityErrorCount: 0, streaming: 'true', @@ -30,7 +30,7 @@ export const expectedRequestsHandled = [ projectId: 'my-project', }, { - attemptLatency: 3000, + attemptLatency: 1000, serverLatency: 103, connectivityErrorCount: 0, streaming: 'true', @@ -57,8 +57,8 @@ export const expectedRequestsHandled = [ }, client_name: 'nodejs-bigtable', projectId: 'my-project', - operationLatency: 10000, - applicationLatencies: [1000, 1000], + operationLatency: 6000, + applicationLatency: 1256, retryCount: 1, firstResponseLatency: 2000, }, diff --git a/test/utils/readRowsImpl.ts b/test-common/utils/readRowsImpl.ts similarity index 99% rename from test/utils/readRowsImpl.ts rename to test-common/utils/readRowsImpl.ts index aad79706f..dc268def2 100644 --- a/test/utils/readRowsImpl.ts +++ b/test-common/utils/readRowsImpl.ts @@ -19,7 +19,7 @@ import { DebugLog, ReadRowsServiceParameters, ReadRowsWritableStream, -} from './readRowsServiceParameters'; +} from '../../test/utils/readRowsServiceParameters'; import {google} from '../../protos/protos'; import IRowRange = google.bigtable.v2.IRowRange; @@ -226,7 +226,7 @@ function getSelectedKey( * @param debugLog A function that logs debug messages. * @returns {protos.google.bigtable.v2.ReadRowsResponse.ICellChunk[]} The generated chunks. */ -function generateChunksFromRequest( +export function generateChunksFromRequest( request: protos.google.bigtable.v2.IReadRowsRequest, serviceParameters: ReadRowsServiceParameters, ) { diff --git a/test/authorized-views.ts b/test/authorized-views.ts index be4593d27..a36eefd43 100644 --- a/test/authorized-views.ts +++ b/test/authorized-views.ts @@ -50,6 +50,14 @@ describe('Bigtable/AuthorizedViews', () => { try { requestCount++; delete config['retryOpts']; + if ( + config && + config.gaxOpts && + config.gaxOpts.otherArgs && + config.gaxOpts.otherArgs.options + ) { + delete config.gaxOpts.otherArgs['options']; + } assert.deepStrictEqual(config, compareFn(requestCount)); } catch (err: unknown) { done(err); @@ -339,6 +347,7 @@ describe('Bigtable/AuthorizedViews', () => { method: 'readModifyWriteRow', gaxOpts: { maxRetries: 4, + otherArgs: {}, }, reqOpts: Object.assign( { @@ -425,6 +434,7 @@ describe('Bigtable/AuthorizedViews', () => { method: 'checkAndMutateRow', gaxOpts: { maxRetries: 4, + otherArgs: {}, }, reqOpts: Object.assign( { diff --git a/test/metric-service-client-credentials.ts b/test/metric-service-client-credentials.ts index edd8001eb..f1ec1a881 100644 --- a/test/metric-service-client-credentials.ts +++ b/test/metric-service-client-credentials.ts @@ -15,7 +15,6 @@ import * as proxyquire from 'proxyquire'; import {ClientOptions, grpc} from 'google-gax'; import * as assert from 'assert'; -import {setupBigtable} from '../system-test/client-side-metrics-setup-table'; import {MetricServiceClient} from '@google-cloud/monitoring'; describe('Bigtable/MetricServiceClientCredentials', () => { diff --git a/test/metrics-collector/metrics-collector.ts b/test/metrics-collector/metrics-collector.ts index 626c254c0..0436c4870 100644 --- a/test/metrics-collector/metrics-collector.ts +++ b/test/metrics-collector/metrics-collector.ts @@ -26,7 +26,6 @@ import * as path from 'path'; // Import the 'path' module import * as gax from 'google-gax'; import * as proxyquire from 'proxyquire'; import {GCPMetricsHandler} from '../../src/client-side-metrics/gcp-metrics-handler'; -import {ResourceMetrics} from '@opentelemetry/sdk-metrics'; const protoPath = path.join( __dirname, '../../protos/google/bigtable/v2/response_params.proto', @@ -90,6 +89,12 @@ describe('Bigtable/MetricsCollector', () => { bigtable = new FakeBigtable(); async fakeMethod(): Promise { + class FakeUserStream { + getTotalDurationMs() { + return 1256; + } + } + const userStream = new FakeUserStream(); function createMetadata(duration: string) { return { internalRepr: new Map([ @@ -135,13 +140,11 @@ describe('Bigtable/MetricsCollector', () => { logger.value += '5. Client receives first row.\n'; metricsCollector.onResponse(this.bigtable.projectId); logger.value += '6. User receives first row.\n'; - metricsCollector.onRowReachesUser(); logger.value += '7. Client receives metadata.\n'; metricsCollector.onMetadataReceived(createMetadata('102')); logger.value += '8. Client receives second row.\n'; metricsCollector.onResponse(this.bigtable.projectId); logger.value += '9. User receives second row.\n'; - metricsCollector.onRowReachesUser(); logger.value += '10. A transient error occurs.\n'; metricsCollector.onAttemptComplete(grpc.status.DEADLINE_EXCEEDED); logger.value += '11. After a timeout, the second attempt is made.\n'; @@ -153,16 +156,17 @@ describe('Bigtable/MetricsCollector', () => { logger.value += '14. Client receives third row.\n'; metricsCollector.onResponse(this.bigtable.projectId); logger.value += '15. User receives third row.\n'; - metricsCollector.onRowReachesUser(); logger.value += '16. Client receives metadata.\n'; metricsCollector.onMetadataReceived(createMetadata('104')); logger.value += '17. Client receives fourth row.\n'; metricsCollector.onResponse(this.bigtable.projectId); logger.value += '18. User receives fourth row.\n'; - metricsCollector.onRowReachesUser(); logger.value += '19. User reads row 1\n'; logger.value += '20. Stream ends, operation completes\n'; - metricsCollector.onOperationComplete(grpc.status.OK); + metricsCollector.onOperationComplete( + grpc.status.OK, + userStream.getTotalDurationMs(), + ); } } } diff --git a/test/metrics-collector/typical-method-call.txt b/test/metrics-collector/typical-method-call.txt index 17a134445..9b563eb00 100644 --- a/test/metrics-collector/typical-method-call.txt +++ b/test/metrics-collector/typical-method-call.txt @@ -7,31 +7,27 @@ getDate call returns 2000 ms 5. Client receives first row. getDate call returns 3000 ms 6. User receives first row. -getDate call returns 4000 ms 7. Client receives metadata. 8. Client receives second row. 9. User receives second row. -getDate call returns 5000 ms 10. A transient error occurs. -getDate call returns 6000 ms +getDate call returns 4000 ms Recording parameters for onAttemptComplete: -{"projectId":"my-project","attemptLatency":4000,"serverLatency":101,"connectivityErrorCount":0,"streaming":"true","status":"4","client_name":"nodejs-bigtable","metricsCollectorData":{"instanceId":"fakeInstanceId","table":"fakeTableId","cluster":"fake-cluster3","zone":"us-west1-c","method":"Bigtable.ReadRows"}} +{"projectId":"my-project","attemptLatency":2000,"serverLatency":101,"connectivityErrorCount":0,"streaming":"true","status":"4","client_name":"nodejs-bigtable","metricsCollectorData":{"instanceId":"fakeInstanceId","table":"fakeTableId","cluster":"fake-cluster3","zone":"us-west1-c","method":"Bigtable.ReadRows"}} 11. After a timeout, the second attempt is made. -getDate call returns 7000 ms +getDate call returns 5000 ms 12. Client receives status information. 13. Client receives metadata. 14. Client receives third row. 15. User receives third row. -getDate call returns 8000 ms 16. Client receives metadata. 17. Client receives fourth row. 18. User receives fourth row. -getDate call returns 9000 ms 19. User reads row 1 20. Stream ends, operation completes -getDate call returns 10000 ms +getDate call returns 6000 ms Recording parameters for onAttemptComplete: -{"projectId":"my-project","attemptLatency":3000,"serverLatency":103,"connectivityErrorCount":0,"streaming":"true","status":"0","client_name":"nodejs-bigtable","metricsCollectorData":{"instanceId":"fakeInstanceId","table":"fakeTableId","cluster":"fake-cluster3","zone":"us-west1-c","method":"Bigtable.ReadRows"}} -getDate call returns 11000 ms +{"projectId":"my-project","attemptLatency":1000,"serverLatency":103,"connectivityErrorCount":0,"streaming":"true","status":"0","client_name":"nodejs-bigtable","metricsCollectorData":{"instanceId":"fakeInstanceId","table":"fakeTableId","cluster":"fake-cluster3","zone":"us-west1-c","method":"Bigtable.ReadRows"}} +getDate call returns 7000 ms Recording parameters for onOperationComplete: -{"projectId":"my-project","status":"0","streaming":"true","metricsCollectorData":{"instanceId":"fakeInstanceId","table":"fakeTableId","cluster":"fake-cluster3","zone":"us-west1-c","method":"Bigtable.ReadRows"},"client_name":"nodejs-bigtable","operationLatency":10000,"retryCount":1,"firstResponseLatency":2000,"applicationLatencies":[1000,1000]} +{"projectId":"my-project","status":"0","streaming":"true","metricsCollectorData":{"instanceId":"fakeInstanceId","table":"fakeTableId","cluster":"fake-cluster3","zone":"us-west1-c","method":"Bigtable.ReadRows"},"client_name":"nodejs-bigtable","operationLatency":6000,"retryCount":1,"firstResponseLatency":2000,"applicationLatency":1256} diff --git a/test/readrows.ts b/test/readrows.ts index 913223a1e..341f71052 100644 --- a/test/readrows.ts +++ b/test/readrows.ts @@ -21,7 +21,7 @@ import {GoogleError} from 'google-gax'; import {MockServer} from '../src/util/mock-servers/mock-server'; import {BigtableClientMockService} from '../src/util/mock-servers/service-implementations/bigtable-client-mock-service'; import {MockService} from '../src/util/mock-servers/mock-service'; -import {ReadRowsImpl} from './utils/readRowsImpl'; +import {ReadRowsImpl} from '../test-common/utils/readRowsImpl'; import { ReadRowsServiceParameters, diff --git a/test/row.ts b/test/row.ts index 9e4833239..ca1f444cf 100644 --- a/test/row.ts +++ b/test/row.ts @@ -25,6 +25,8 @@ import { GetRowsOptions, GetRowsCallback, GetRowsResponse, + MutateOptions, + MutateCallback, } from '../src/table.js'; import {Chunk} from '../src/chunktransformer.js'; import {CallOptions, ServiceError} from 'google-gax'; @@ -32,6 +34,8 @@ import {ClientSideMetricsConfigManager} from '../src/client-side-metrics/metrics import {Bigtable} from '../src/'; import {getRowsInternal} from '../src/utils/getRowsInternal'; import {TabularApiSurface} from '../src/tabular-api-surface'; +import * as pumpify from 'pumpify'; +import {OperationMetricsCollector} from '../src/client-side-metrics/operation-metrics-collector'; const sandbox = sinon.createSandbox(); @@ -88,6 +92,23 @@ describe('Bigtable/Row', () => { let RowError: typeof rw.RowError; let row: rw.Row; + function getFakeMutateRow( + fn: ( + table: TabularApiSurface, + metricsCollector: OperationMetricsCollector, + entry: Entry | Entry[], + gaxOptions_: MutateOptions | MutateCallback, + callback: Function, + ) => void | Promise, + ) { + const Fake = proxyquire('../src/row.js', { + '../src/utils/mutateInternal': { + mutateInternal: fn, + }, + }); + return Fake; + } + function getFakeRow( getRowsInternal: ( table: TabularApiSurface, @@ -941,8 +962,14 @@ describe('Bigtable/Row', () => { config.reqOpts.falseMutations, fakeMutations.mutations, ); - - assert.strictEqual(config.gaxOpts, undefined); + config.gaxOpts.otherArgs.options.interceptors = []; + assert.deepStrictEqual(config.gaxOpts, { + otherArgs: { + options: { + interceptors: [], + }, + }, + }); assert.strictEqual(FakeMutation.parse.callCount, 2); assert.strictEqual(FakeMutation.parse.getCall(0).args[0], mutations[0]); assert.strictEqual(FakeMutation.parse.getCall(1).args[0], mutations[0]); @@ -1513,34 +1540,52 @@ describe('Bigtable/Row', () => { }; it('should insert an object', done => { - (row.table.mutate as Function) = ( + const fn = ( + table: TabularApiSurface, + metricsCollector: OperationMetricsCollector, // eslint-disable-next-line @typescript-eslint/no-explicit-any - entry: any, + entry: Entry | Entry[], gaxOptions: {}, callback: Function, ) => { assert.strictEqual(entry.data, data); callback(); // done() }; - row.save(data, done); + const SavedRow = getFakeMutateRow(fn).Row; + const savedRow = new SavedRow(TABLE, ROW_ID); + savedRow.save(data, done); }); it('should accept gaxOptions', done => { const gaxOptions = {}; - sandbox.stub(row.table, 'mutate').callsFake((entry, gaxOptions_) => { + const fn = ( + table: TabularApiSurface, + metricsCollector: OperationMetricsCollector, + entry: Entry | Entry[], + gaxOptions_: MutateOptions | MutateCallback, + ) => { assert.strictEqual(gaxOptions_, gaxOptions); done(); - }); - row.save(data, gaxOptions, assert.ifError); + }; + const SavedRow = getFakeMutateRow(fn).Row; + const savedRow = new SavedRow(TABLE, ROW_ID); + savedRow.save(data, gaxOptions, assert.ifError); }); it('should remove existing data', done => { const gaxOptions = {}; - sandbox.stub(row.table, 'mutate').callsFake((entry, gaxOptions_) => { + const fn = ( + table: TabularApiSurface, + metricsCollector: OperationMetricsCollector, + entry: Entry | Entry[], + gaxOptions_: MutateOptions | MutateCallback, + ) => { assert.strictEqual(gaxOptions_, gaxOptions); done(); - }); - row.save(data, gaxOptions, assert.ifError); + }; + const SavedRow = getFakeMutateRow(fn).Row; + const savedRow = new SavedRow(TABLE, ROW_ID); + savedRow.save(data, gaxOptions, assert.ifError); assert.strictEqual(row.data, undefined); }); }); diff --git a/test/table.ts b/test/table.ts index 913e77c12..c10e89145 100644 --- a/test/table.ts +++ b/test/table.ts @@ -35,6 +35,7 @@ import {OperationMetricsCollector} from '../src/client-side-metrics/operation-me import {SinonSpy} from 'sinon'; import {TabularApiSurface} from '../src/tabular-api-surface'; import {GetRowsOptions} from '../src/table'; +import {mutateInternal} from '../src/utils/mutateInternal'; const sandbox = sinon.createSandbox(); const noop = () => {}; @@ -71,7 +72,7 @@ class FakeMetricsCollector { onAttemptStart() {} onAttemptComplete() {} onMetadataReceived() {} - handleStatusAndMetadata() {} + wrapRequest() {} onStatusMetadataReceived() {} onRowReachesUser() {} } @@ -135,6 +136,13 @@ function getTableMock( createReadStreamInternal: createReadStreamInternal, }, }); + const FakeMutateInternal = proxyquire('../src/utils/mutateInternal.js', { + '../row.js': {Row: FakeRow}, + '../chunktransformer.js': {ChunkTransformer: FakeChunkTransformer}, + '../filter.js': {Filter: FakeFilter}, + '../mutation.js': {Mutation: FakeMutation}, + pumpify, + }).mutateInternal; const FakeTabularApiSurface = proxyquire('../src/tabular-api-surface.js', { '@google-cloud/promisify': fakePromisify, './family.js': {Family: FakeFamily}, @@ -146,6 +154,9 @@ function getTableMock( './utils/createReadStreamInternal': { createReadStreamInternal, }, + './utils/mutateInternal': { + mutateInternal: FakeMutateInternal, + }, './utils/getRowsInternal': { getRowsInternal: FakeGetRows.getRowsInternal, }, diff --git a/test/timed-stream.ts b/test/timed-stream.ts index c58cf6552..7c278142f 100644 --- a/test/timed-stream.ts +++ b/test/timed-stream.ts @@ -24,13 +24,23 @@ function* numberGenerator(n: number) { } } +class UserStream extends TimedStream { + constructor() { + super({ + transformHook(event, _encoding, callback) { + callback(null, event); + }, + }); + } +} + describe('Bigtable/TimedStream', () => { describe('with handlers', () => { describe('with no delay from server', () => { it('should measure the total time accurately for a series of 30 rows with a synchronous call', function (done) { this.timeout(200000); const sourceStream = Readable.from(numberGenerator(30)); - const timedStream = new TimedStream({}); + const timedStream = new UserStream(); // @ts-ignore sourceStream.pipe(timedStream as unknown as WritableStream); // iterate stream @@ -55,7 +65,7 @@ describe('Bigtable/TimedStream', () => { it('should measure the total time accurately for a series of 30 rows with an async call', function (done) { this.timeout(200000); const sourceStream = Readable.from(numberGenerator(30)); - const timedStream = new TimedStream({}); + const timedStream = new UserStream(); // @ts-ignore sourceStream.pipe(timedStream as unknown as WritableStream); // iterate stream @@ -78,7 +88,7 @@ describe('Bigtable/TimedStream', () => { it('should measure the total time accurately for a series of 30 rows with a sync then an async call', function (done) { this.timeout(200000); const sourceStream = Readable.from(numberGenerator(30)); - const timedStream = new TimedStream({}); + const timedStream = new UserStream(); // @ts-ignore sourceStream.pipe(timedStream as unknown as WritableStream); // iterate stream @@ -106,7 +116,7 @@ describe('Bigtable/TimedStream', () => { it('should measure the total time accurately for a series of 30 rows with an async call then a sync call', function (done) { this.timeout(200000); const sourceStream = Readable.from(numberGenerator(30)); - const timedStream = new TimedStream({}); + const timedStream = new UserStream(); // @ts-ignore sourceStream.pipe(timedStream as unknown as WritableStream); // iterate stream @@ -139,7 +149,7 @@ describe('Bigtable/TimedStream', () => { i.toString(), ); const sourceStream = new PassThrough(); - const timedStream = new TimedStream({}); + const timedStream = new UserStream(); // @ts-ignore sourceStream.pipe(timedStream); @@ -184,7 +194,7 @@ describe('Bigtable/TimedStream', () => { } const dataEvents = eventNumbers.map(i => i.toString()); const sourceStream = new PassThrough(); - const timedStream = new TimedStream({}); + const timedStream = new UserStream(); // @ts-ignore sourceStream.pipe(timedStream as unknown as WritableStream); // iterate stream @@ -199,8 +209,8 @@ describe('Bigtable/TimedStream', () => { clearInterval(interval); const totalMilliseconds = timedStream.getTotalDurationMs(); try { - assert(totalMilliseconds > 39000); - assert(totalMilliseconds < 41000); + assert(totalMilliseconds > 38000); + assert(totalMilliseconds < 42000); done(); } catch (e) { done(e); @@ -228,7 +238,7 @@ describe('Bigtable/TimedStream', () => { it('should measure the total time accurately for a series of 30 rows', async function () { this.timeout(200000); const sourceStream = Readable.from(numberGenerator(30)); - const timedStream = new TimedStream({}); + const timedStream = new UserStream(); // @ts-ignore sourceStream.pipe(timedStream as unknown as WritableStream); // iterate stream @@ -246,7 +256,7 @@ describe('Bigtable/TimedStream', () => { it('should measure the total time accurately for a series of 30 rows with an async call', async function () { this.timeout(200000); const sourceStream = Readable.from(numberGenerator(30)); - const timedStream = new TimedStream({}); + const timedStream = new UserStream(); // @ts-ignore sourceStream.pipe(timedStream as unknown as WritableStream); // iterate stream @@ -267,7 +277,7 @@ describe('Bigtable/TimedStream', () => { i.toString(), ); const sourceStream = new PassThrough(); - const timedStream = new TimedStream({}); + const timedStream = new UserStream(); // @ts-ignore sourceStream.pipe(timedStream); @@ -309,7 +319,7 @@ describe('Bigtable/TimedStream', () => { } const dataEvents = eventNumbers.map(i => i.toString()); const sourceStream = new PassThrough(); - const timedStream = new TimedStream({}); + const timedStream = new UserStream(); // @ts-ignore sourceStream.pipe(timedStream as unknown as WritableStream); // First load the stream with events. diff --git a/tsconfig.json b/tsconfig.json index c78adddb7..c0c10fd48 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -20,10 +20,8 @@ "src/v2/bigtable_client_config.json", "src/v2/bigtable_table_admin_client_config.json", "src/v2/bigtable_instance_admin_client_config.json", - "test-common/test-metrics-handler.ts", - "test-common/replace-timestamps.ts", - "test-common/expected-otel-export-input.ts", - "test-common/metrics-handler-fixture.ts", + "test-common/*.ts", + "test-common/**/*.ts", "protos/protos.json" ] }