diff --git a/packages/core/src/__tests__/api.test.ts b/packages/core/src/__tests__/api.test.ts index f968a3201..ce7aaf021 100644 --- a/packages/core/src/__tests__/api.test.ts +++ b/packages/core/src/__tests__/api.test.ts @@ -25,7 +25,11 @@ describe('#sendEvents', () => { .mockReturnValue('2001-01-01T00:00:00.000Z'); }); - async function sendAnEventPer(writeKey: string, toUrl: string) { + async function sendAnEventPer( + writeKey: string, + toUrl: string, + retryCount?: number + ) { const mockResponse = Promise.resolve('MANOS'); // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore @@ -60,9 +64,19 @@ describe('#sendEvents', () => { writeKey: writeKey, url: toUrl, events: [event], + retryCount, }); - expect(fetch).toHaveBeenCalledWith(toUrl, { + return event; + } + + it('sends an event', async () => { + const toSegmentBatchApi = 'https://api.segment.io/v1.b'; + const writeKey = 'SEGMENT_KEY'; + + const event = await sendAnEventPer(writeKey, toSegmentBatchApi); + + expect(fetch).toHaveBeenCalledWith(toSegmentBatchApi, { method: 'POST', keepalive: true, body: JSON.stringify({ @@ -72,21 +86,67 @@ describe('#sendEvents', () => { }), headers: { 'Content-Type': 'application/json; charset=utf-8', + 'X-Retry-Count': '0', }, }); - } - - it('sends an event', async () => { - const toSegmentBatchApi = 'https://api.segment.io/v1.b'; - const writeKey = 'SEGMENT_KEY'; - - await sendAnEventPer(writeKey, toSegmentBatchApi); }); it('sends an event to proxy', async () => { const toProxyUrl = 'https://myprox.io/b'; const writeKey = 'SEGMENT_KEY'; - await sendAnEventPer(writeKey, toProxyUrl); + const event = await sendAnEventPer(writeKey, toProxyUrl); + + expect(fetch).toHaveBeenCalledWith(toProxyUrl, { + method: 'POST', + body: JSON.stringify({ + batch: [event], + sentAt: '2001-01-01T00:00:00.000Z', + writeKey: 'SEGMENT_KEY', + }), + headers: { + 'Content-Type': 'application/json; charset=utf-8', + 'X-Retry-Count': '0', + }, + keepalive: true, + }); + }); + + it('sends X-Retry-Count header with default value 0', async () => { + const url = 'https://api.segment.io/v1.b'; + await sendAnEventPer('KEY', url); + + expect(fetch).toHaveBeenCalledWith( + url, + expect.objectContaining({ + headers: expect.objectContaining({ + 'X-Retry-Count': '0', + }), + }) + ); + }); + + it('sends X-Retry-Count header with provided retry count', async () => { + const url = 'https://api.segment.io/v1.b'; + await sendAnEventPer('KEY', url, 5); + + expect(fetch).toHaveBeenCalledWith( + url, + expect.objectContaining({ + headers: expect.objectContaining({ + 'X-Retry-Count': '5', + }), + }) + ); + }); + + it('sends X-Retry-Count as string format', async () => { + const url = 'https://api.segment.io/v1.b'; + await sendAnEventPer('KEY', url, 42); + + const callArgs = (fetch as jest.Mock).mock.calls[0]; + const headers = callArgs[1].headers; + expect(typeof headers['X-Retry-Count']).toBe('string'); + expect(headers['X-Retry-Count']).toBe('42'); }); }); diff --git a/packages/core/src/__tests__/internal/fetchSettings.test.ts b/packages/core/src/__tests__/internal/fetchSettings.test.ts index f061afe5a..4599b55bf 100644 --- a/packages/core/src/__tests__/internal/fetchSettings.test.ts +++ b/packages/core/src/__tests__/internal/fetchSettings.test.ts @@ -1,5 +1,5 @@ import { SegmentClient } from '../../analytics'; -import { settingsCDN } from '../../constants'; +import { settingsCDN, defaultHttpConfig } from '../../constants'; import { SEGMENT_DESTINATION_KEY } from '../../plugins/SegmentDestination'; import { getMockLogger, MockSegmentStore } from '../../test-helpers'; import { getURL } from '../../util'; @@ -436,6 +436,80 @@ describe('internal #getSettings', () => { }); }); + describe('CDN integrations validation', () => { + it('treats null integrations as empty (no integrations configured)', async () => { + (fetch as jest.MockedFunction).mockResolvedValueOnce({ + ok: true, + json: () => Promise.resolve({ integrations: null }), + status: 200, + } as Response); + + const client = new SegmentClient(clientArgs); + await client.fetchSettings(); + + expect(setSettingsSpy).toHaveBeenCalledWith({}); + }); + + it('treats missing integrations as empty (no integrations configured)', async () => { + (fetch as jest.MockedFunction).mockResolvedValueOnce({ + ok: true, + json: () => Promise.resolve({}), + status: 200, + } as Response); + + const client = new SegmentClient(clientArgs); + await client.fetchSettings(); + + expect(setSettingsSpy).toHaveBeenCalledWith({}); + }); + + it('falls back to defaults when CDN returns integrations as an array', async () => { + (fetch as jest.MockedFunction).mockResolvedValueOnce({ + ok: true, + json: () => Promise.resolve({ integrations: ['invalid'] }), + status: 200, + } as Response); + + const client = new SegmentClient(clientArgs); + await client.fetchSettings(); + + expect(setSettingsSpy).toHaveBeenCalledWith( + defaultIntegrationSettings.integrations + ); + }); + + it('falls back to defaults when CDN returns integrations as a string', async () => { + (fetch as jest.MockedFunction).mockResolvedValueOnce({ + ok: true, + json: () => Promise.resolve({ integrations: 'invalid' }), + status: 200, + } as Response); + + const client = new SegmentClient(clientArgs); + await client.fetchSettings(); + + expect(setSettingsSpy).toHaveBeenCalledWith( + defaultIntegrationSettings.integrations + ); + }); + + it('stores empty integrations when CDN returns null integrations and no defaults', async () => { + (fetch as jest.MockedFunction).mockResolvedValueOnce({ + ok: true, + json: () => Promise.resolve({ integrations: null }), + status: 200, + } as Response); + + const client = new SegmentClient({ + ...clientArgs, + config: { ...clientArgs.config, defaultSettings: undefined }, + }); + await client.fetchSettings(); + + expect(setSettingsSpy).toHaveBeenCalledWith({}); + }); + }); + describe('httpConfig extraction', () => { it('extracts httpConfig from CDN response and merges with defaults', async () => { const serverHttpConfig = { @@ -483,7 +557,7 @@ describe('internal #getSettings', () => { expect(result?.backoffConfig?.jitterPercent).toBe(20); }); - it('returns undefined httpConfig when CDN has no httpConfig', async () => { + it('returns defaultHttpConfig when CDN has no httpConfig', async () => { (fetch as jest.MockedFunction).mockResolvedValueOnce({ ok: true, json: () => Promise.resolve(defaultIntegrationSettings), @@ -496,7 +570,17 @@ describe('internal #getSettings', () => { }); await anotherClient.fetchSettings(); - expect(anotherClient.getHttpConfig()).toBeUndefined(); + const result = anotherClient.getHttpConfig(); + expect(result).toBeDefined(); + expect(result?.rateLimitConfig?.enabled).toBe( + defaultHttpConfig.rateLimitConfig!.enabled + ); + expect(result?.backoffConfig?.enabled).toBe( + defaultHttpConfig.backoffConfig!.enabled + ); + expect(result?.backoffConfig?.statusCodeOverrides).toEqual( + defaultHttpConfig.backoffConfig!.statusCodeOverrides + ); }); it('returns undefined httpConfig when fetch fails', async () => { diff --git a/packages/core/src/analytics.ts b/packages/core/src/analytics.ts index 8017fcd57..33e8bb1f9 100644 --- a/packages/core/src/analytics.ts +++ b/packages/core/src/analytics.ts @@ -10,6 +10,7 @@ import { workspaceDestinationFilterKey, defaultFlushInterval, defaultFlushAt, + defaultHttpConfig, maxPendingEvents, } from './constants'; import { getContext } from './context'; @@ -73,7 +74,11 @@ import { SegmentError, translateHTTPError, } from './errors'; -import { validateIntegrations, extractHttpConfig } from './config-validation'; +import { + validateIntegrations, + validateRateLimitConfig, + validateBackoffConfig, +} from './config-validation'; import { QueueFlushingPlugin } from './plugins/QueueFlushingPlugin'; import { WaitingPlugin } from './plugin'; @@ -198,8 +203,8 @@ export class SegmentClient { } /** - * Retrieves the server-side httpConfig from CDN settings. - * Returns undefined if the CDN did not provide httpConfig (retry features disabled). + * Retrieves the merged httpConfig (defaultHttpConfig ← CDN ← config overrides). + * Returns undefined only if settings have not yet been fetched. */ getHttpConfig(): HttpConfig | undefined { return this.httpConfig; @@ -418,9 +423,43 @@ export class SegmentClient { resJson.middlewareSettings?.routingRules ?? [] ); - if (resJson.httpConfig) { - this.httpConfig = extractHttpConfig(resJson.httpConfig, this.logger); - this.logger.info('Loaded httpConfig from CDN settings.'); + // Merge httpConfig: defaultHttpConfig ← CDN ← config overrides + { + const cdnConfig = resJson.httpConfig ?? {}; + const clientConfig = this.config.httpConfig ?? {}; + + const mergedRateLimit = { + ...defaultHttpConfig.rateLimitConfig!, + ...(cdnConfig.rateLimitConfig ?? {}), + ...(clientConfig.rateLimitConfig ?? {}), + }; + + const mergedBackoff = { + ...defaultHttpConfig.backoffConfig!, + ...(cdnConfig.backoffConfig ?? {}), + ...(clientConfig.backoffConfig ?? {}), + statusCodeOverrides: { + ...defaultHttpConfig.backoffConfig!.statusCodeOverrides, + ...(cdnConfig.backoffConfig?.statusCodeOverrides ?? {}), + ...(clientConfig.backoffConfig?.statusCodeOverrides ?? {}), + }, + }; + + const validatedRateLimit = validateRateLimitConfig( + mergedRateLimit, + this.logger + ); + this.httpConfig = { + rateLimitConfig: validatedRateLimit, + backoffConfig: validateBackoffConfig( + mergedBackoff, + this.logger, + validatedRateLimit + ), + }; + if (resJson.httpConfig) { + this.logger.info('Loaded httpConfig from CDN settings.'); + } } this.logger.info('Received settings from Segment succesfully.'); diff --git a/packages/core/src/api.ts b/packages/core/src/api.ts index 6aa2851a7..8853234e7 100644 --- a/packages/core/src/api.ts +++ b/packages/core/src/api.ts @@ -4,10 +4,12 @@ export const uploadEvents = async ({ writeKey, url, events, + retryCount = 0, }: { writeKey: string; url: string; events: SegmentEvent[]; + retryCount?: number; }) => { return await fetch(url, { method: 'POST', @@ -19,6 +21,7 @@ export const uploadEvents = async ({ }), headers: { 'Content-Type': 'application/json; charset=utf-8', + 'X-Retry-Count': retryCount.toString(), }, }); }; diff --git a/packages/core/src/plugins/QueueFlushingPlugin.ts b/packages/core/src/plugins/QueueFlushingPlugin.ts index 1580ee288..fa92c9c9a 100644 --- a/packages/core/src/plugins/QueueFlushingPlugin.ts +++ b/packages/core/src/plugins/QueueFlushingPlugin.ts @@ -15,12 +15,12 @@ export class QueueFlushingPlugin extends UtilityPlugin { type = PluginType.after; private storeKey: string; - private isPendingUpload = false; private queueStore: Store<{ events: SegmentEvent[] }> | undefined; private onFlush: (events: SegmentEvent[]) => Promise; private isRestoredResolve: () => void; private isRestored: Promise; private timeoutWarned = false; + private flushPromise?: Promise; /** * @param onFlush callback to execute when the queue is flushed (either by reaching the limit or manually) e.g. code to upload events to your destination @@ -63,16 +63,35 @@ export class QueueFlushingPlugin extends UtilityPlugin { async execute(event: SegmentEvent): Promise { await this.queueStore?.dispatch((state) => { - const events = [...state.events, event]; + const stampedEvent = { ...event, _queuedAt: Date.now() }; + const events = [...state.events, stampedEvent]; return { events }; }); return event; } /** - * Calls the onFlush callback with the events in the queue + * Calls the onFlush callback with the events in the queue. + * Ensures only one flush operation runs at a time. */ async flush() { + if (this.flushPromise) { + this.analytics?.logger.info( + 'Flush already in progress, waiting for completion' + ); + await this.flushPromise; + return; + } + + this.flushPromise = this._doFlush(); + try { + await this.flushPromise; + } finally { + this.flushPromise = undefined; + } + } + + private async _doFlush(): Promise { // Wait for the queue to be restored try { await this.isRestored; @@ -103,33 +122,24 @@ export class QueueFlushingPlugin extends UtilityPlugin { } const events = (await this.queueStore?.getState(true))?.events ?? []; - if (!this.isPendingUpload) { - try { - this.isPendingUpload = true; - await this.onFlush(events); - } finally { - this.isPendingUpload = false; - } - } + await this.onFlush(events); } - /** - * Removes one or multiple events from the queue - * @param events events to remove - */ - async dequeue(events: SegmentEvent | SegmentEvent[]) { + async dequeueByMessageIds(messageIds: string[]): Promise { await this.queueStore?.dispatch((state) => { - const eventsToRemove = Array.isArray(events) ? events : [events]; - - if (eventsToRemove.length === 0 || state.events.length === 0) { + if (messageIds.length === 0 || state.events.length === 0) { return state; } - const setToRemove = new Set(eventsToRemove); - const filteredEvents = state.events.filter((e) => !setToRemove.has(e)); + const idsToRemove = new Set(messageIds); + const filteredEvents = state.events.filter( + (e) => e.messageId == null || !idsToRemove.has(e.messageId) + ); + return { events: filteredEvents }; }); } + /** * Clear all events from the queue */ diff --git a/packages/core/src/plugins/SegmentDestination.ts b/packages/core/src/plugins/SegmentDestination.ts index cc7e911e6..076008818 100644 --- a/packages/core/src/plugins/SegmentDestination.ts +++ b/packages/core/src/plugins/SegmentDestination.ts @@ -1,5 +1,6 @@ import { DestinationPlugin } from '../plugin'; import { + HttpConfig, PluginType, SegmentAPIIntegration, SegmentAPISettings, @@ -11,20 +12,45 @@ import { uploadEvents } from '../api'; import type { SegmentClient } from '../analytics'; import { DestinationMetadataEnrichment } from './DestinationMetadataEnrichment'; import { QueueFlushingPlugin } from './QueueFlushingPlugin'; -import { defaultApiHost } from '../constants'; -import { checkResponseForErrors, translateHTTPError } from '../errors'; -import { defaultConfig } from '../constants'; +import { defaultApiHost, defaultConfig } from '../constants'; +import { + SegmentError, + ErrorType, + translateHTTPError, + classifyError, + parseRetryAfter, +} from '../errors'; +import { RetryManager } from '../backoff/RetryManager'; +import type { RetryResult } from '../backoff'; const MAX_EVENTS_PER_BATCH = 100; const MAX_PAYLOAD_SIZE_IN_KB = 500; export const SEGMENT_DESTINATION_KEY = 'Segment.io'; +type BatchResult = { + batch: SegmentEvent[]; + messageIds: string[]; + status: 'success' | '429' | 'transient' | 'permanent' | 'network_error'; + statusCode?: number; + retryAfterSeconds?: number; +}; + +type ErrorAggregation = { + successfulMessageIds: string[]; + rateLimitResults: BatchResult[]; + hasTransientError: boolean; + permanentErrorMessageIds: string[]; + retryableMessageIds: string[]; +}; + export class SegmentDestination extends DestinationPlugin { type = PluginType.destination; key = SEGMENT_DESTINATION_KEY; private apiHost?: string; + private httpConfig?: HttpConfig; private settingsResolve: () => void; private settingsPromise: Promise; + private retryManager?: RetryManager; constructor() { super(); @@ -34,9 +60,184 @@ export class SegmentDestination extends DestinationPlugin { this.settingsResolve = resolve; } + private async uploadBatch(batch: SegmentEvent[]): Promise { + const config = this.analytics?.getConfig() ?? defaultConfig; + const messageIds = batch + .map((e) => e.messageId) + .filter((id): id is string => id !== undefined && id !== ''); + + const retryCount = this.retryManager + ? await this.retryManager.getRetryCount() + : 0; + + const cleanedBatch = batch.map(({ _queuedAt, ...event }) => event); + + try { + const res = await uploadEvents({ + writeKey: config.writeKey, + url: this.getEndpoint(), + events: cleanedBatch as SegmentEvent[], + retryCount, + }); + + if (res.ok) { + return { batch, messageIds, status: 'success', statusCode: res.status }; + } + + const retryAfterSeconds = + res.status === 429 + ? parseRetryAfter( + res.headers.get('Retry-After'), + this.httpConfig?.rateLimitConfig?.maxRetryInterval + ) + : undefined; + + const classification = classifyError(res.status, { + default4xxBehavior: this.httpConfig?.backoffConfig?.default4xxBehavior, + default5xxBehavior: this.httpConfig?.backoffConfig?.default5xxBehavior, + statusCodeOverrides: + this.httpConfig?.backoffConfig?.statusCodeOverrides, + rateLimitEnabled: this.httpConfig?.rateLimitConfig?.enabled, + }); + + if (classification.errorType === 'rate_limit') { + return { + batch, + messageIds, + status: '429', + statusCode: res.status, + retryAfterSeconds: retryAfterSeconds ?? 60, + }; + } else if (classification.errorType === 'transient') { + return { + batch, + messageIds, + status: 'transient', + statusCode: res.status, + }; + } else { + return { + batch, + messageIds, + status: 'permanent', + statusCode: res.status, + }; + } + } catch (e) { + this.analytics?.reportInternalError(translateHTTPError(e)); + return { batch, messageIds, status: 'network_error' }; + } + } + + private aggregateErrors(results: BatchResult[]): ErrorAggregation { + const aggregation: ErrorAggregation = { + successfulMessageIds: [], + rateLimitResults: [], + hasTransientError: false, + permanentErrorMessageIds: [], + retryableMessageIds: [], + }; + + for (const result of results) { + switch (result.status) { + case 'success': + aggregation.successfulMessageIds.push(...result.messageIds); + break; + case '429': + aggregation.rateLimitResults.push(result); + aggregation.retryableMessageIds.push(...result.messageIds); + break; + case 'transient': + case 'network_error': + aggregation.hasTransientError = true; + aggregation.retryableMessageIds.push(...result.messageIds); + break; + case 'permanent': + aggregation.permanentErrorMessageIds.push(...result.messageIds); + break; + } + } + + return aggregation; + } + + /** + * Drop events whose _queuedAt exceeds maxTotalBackoffDuration. + * Returns the remaining fresh events. + */ + private async pruneExpiredEvents( + events: SegmentEvent[] + ): Promise { + const maxAge = this.httpConfig?.backoffConfig?.maxTotalBackoffDuration ?? 0; + if (maxAge <= 0) { + return events; + } + + const now = Date.now(); + const maxAgeMs = maxAge * 1000; + const expiredMessageIds: string[] = []; + const freshEvents: SegmentEvent[] = []; + + for (const event of events) { + if (event._queuedAt !== undefined && now - event._queuedAt > maxAgeMs) { + if (event.messageId !== undefined && event.messageId !== '') { + expiredMessageIds.push(event.messageId); + } + } else { + freshEvents.push(event); + } + } + + if (expiredMessageIds.length > 0) { + await this.queuePlugin.dequeueByMessageIds(expiredMessageIds); + this.analytics?.reportInternalError( + new SegmentError( + ErrorType.EventsDropped, + `Dropped ${expiredMessageIds.length} events exceeding max age (${maxAge}s)`, + undefined, + { droppedCount: expiredMessageIds.length, reason: 'max_age_exceeded' } + ) + ); + this.analytics?.logger.warn( + `Pruned ${expiredMessageIds.length} events older than ${maxAge}s` + ); + } + + return freshEvents; + } + + /** + * Update retry state based on aggregated batch results. + * 429 takes precedence over transient errors. + * Returns true if retry limits were exceeded (caller should drop events). + */ + private async updateRetryState( + aggregation: ErrorAggregation + ): Promise { + if (!this.retryManager) { + return false; + } + + const has429 = aggregation.rateLimitResults.length > 0; + let result: RetryResult | undefined; + + if (has429) { + for (const r of aggregation.rateLimitResults) { + result = await this.retryManager.handle429(r.retryAfterSeconds ?? 60); + } + } else if (aggregation.hasTransientError) { + result = await this.retryManager.handleTransientError(); + } else if (aggregation.successfulMessageIds.length > 0) { + await this.retryManager.reset(); + } + + return result === 'limit_exceeded'; + } + private sendEvents = async (events: SegmentEvent[]): Promise => { if (events.length === 0) { - return Promise.resolve(); + await this.retryManager?.reset(); + return; } // We're not sending events until Segment has loaded all settings @@ -44,46 +245,92 @@ export class SegmentDestination extends DestinationPlugin { const config = this.analytics?.getConfig() ?? defaultConfig; - const chunkedEvents: SegmentEvent[][] = chunk( + events = await this.pruneExpiredEvents(events); + if (events.length === 0) { + await this.retryManager?.reset(); + return; + } + + if (this.retryManager && !(await this.retryManager.canRetry())) { + this.analytics?.logger.info('Upload blocked by retry manager'); + return; + } + + const batches: SegmentEvent[][] = chunk( events, config.maxBatchSize ?? MAX_EVENTS_PER_BATCH, MAX_PAYLOAD_SIZE_IN_KB ); - let sentEvents: SegmentEvent[] = []; - let numFailedEvents = 0; - - await Promise.all( - chunkedEvents.map(async (batch: SegmentEvent[]) => { - try { - const res = await uploadEvents({ - writeKey: config.writeKey, - url: this.getEndpoint(), - events: batch, - }); - checkResponseForErrors(res); - sentEvents = sentEvents.concat(batch); - } catch (e) { - this.analytics?.reportInternalError(translateHTTPError(e)); - this.analytics?.logger.warn(e); - numFailedEvents += batch.length; - } finally { - await this.queuePlugin.dequeue(sentEvents); - } - }) + const results: BatchResult[] = await Promise.all( + batches.map((batch) => this.uploadBatch(batch)) ); - if (sentEvents.length) { + const aggregation = this.aggregateErrors(results); + + const limitExceeded = await this.updateRetryState(aggregation); + + if (aggregation.successfulMessageIds.length > 0) { + await this.queuePlugin.dequeueByMessageIds( + aggregation.successfulMessageIds + ); if (config.debug === true) { - this.analytics?.logger.info(`Sent ${sentEvents.length} events`); + this.analytics?.logger.info( + `Sent ${aggregation.successfulMessageIds.length} events` + ); } } - if (numFailedEvents) { - this.analytics?.logger.error(`Failed to send ${numFailedEvents} events.`); + if (aggregation.permanentErrorMessageIds.length > 0) { + await this.queuePlugin.dequeueByMessageIds( + aggregation.permanentErrorMessageIds + ); + this.analytics?.reportInternalError( + new SegmentError( + ErrorType.EventsDropped, + `Dropped ${aggregation.permanentErrorMessageIds.length} events due to permanent errors`, + undefined, + { + droppedCount: aggregation.permanentErrorMessageIds.length, + reason: 'permanent_error', + } + ) + ); + this.analytics?.logger.error( + `Dropped ${aggregation.permanentErrorMessageIds.length} events due to permanent errors` + ); + } + + if (limitExceeded && aggregation.retryableMessageIds.length > 0) { + await this.queuePlugin.dequeueByMessageIds( + aggregation.retryableMessageIds + ); + this.analytics?.reportInternalError( + new SegmentError( + ErrorType.EventsDropped, + `Dropped ${aggregation.retryableMessageIds.length} events due to retry limit exceeded`, + undefined, + { + droppedCount: aggregation.retryableMessageIds.length, + reason: 'retry_limit_exceeded', + } + ) + ); + this.analytics?.logger.error( + `Dropped ${aggregation.retryableMessageIds.length} events due to retry limit exceeded` + ); } - return Promise.resolve(); + const failedCount = + events.length - + aggregation.successfulMessageIds.length - + aggregation.permanentErrorMessageIds.length; + if (failedCount > 0) { + const has429 = aggregation.rateLimitResults.length > 0; + this.analytics?.logger.warn( + `${failedCount} events will retry (429: ${has429}, transient: ${aggregation.hasTransientError})` + ); + } }; private readonly queuePlugin = new QueueFlushingPlugin(this.sendEvents); @@ -95,7 +342,7 @@ export class SegmentDestination extends DestinationPlugin { let baseURL = ''; let endpoint = ''; if (hasProxy) { - //baseURL is always config?.proxy if hasProxy + //baseURL is always config?.proxy if hasProxy baseURL = config?.proxy ?? ''; if (useSegmentEndpoints) { const isProxyEndsWithSlash = baseURL.endsWith('/'); @@ -111,12 +358,15 @@ export class SegmentDestination extends DestinationPlugin { return defaultApiHost; } } + configure(analytics: SegmentClient): void { super.configure(analytics); + const config = analytics.getConfig(); + // If the client has a proxy we don't need to await for settings apiHost, we can send events directly // Important! If new settings are required in the future you probably want to change this! - if (analytics.getConfig().proxy !== undefined) { + if (config.proxy !== undefined) { this.settingsResolve(); } @@ -137,13 +387,33 @@ export class SegmentDestination extends DestinationPlugin { //assign the api host from segment settings (domain/v1) this.apiHost = `https://${segmentSettings.apiHost}/b`; } + + const httpConfig = this.analytics?.getHttpConfig(); + if (httpConfig) { + this.httpConfig = httpConfig; + + if ( + !this.retryManager && + (httpConfig.rateLimitConfig || httpConfig.backoffConfig) + ) { + const config = this.analytics?.getConfig(); + this.retryManager = new RetryManager( + config?.writeKey ?? '', + config?.storePersistor, + httpConfig.rateLimitConfig, + httpConfig.backoffConfig, + this.analytics?.logger, + config?.retryStrategy ?? 'lazy' + ); + } + } + this.settingsResolve(); } execute(event: SegmentEvent): Promise { // Execute the internal timeline here, the queue plugin will pick up the event and add it to the queue automatically - const enrichedEvent = super.execute(event); - return enrichedEvent; + return super.execute(event); } async flush() { diff --git a/packages/core/src/plugins/__tests__/QueueFlushingPlugin.test.ts b/packages/core/src/plugins/__tests__/QueueFlushingPlugin.test.ts index b3b02d802..1a483f73d 100644 --- a/packages/core/src/plugins/__tests__/QueueFlushingPlugin.test.ts +++ b/packages/core/src/plugins/__tests__/QueueFlushingPlugin.test.ts @@ -60,6 +60,7 @@ describe('QueueFlushingPlugin', () => { const event: SegmentEvent = { type: EventType.TrackEvent, event: 'test2', + messageId: 'msg-dequeue-1', properties: { test: 'test2', }, @@ -72,7 +73,7 @@ describe('QueueFlushingPlugin', () => { // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore expect(queuePlugin.queueStore?.getState().events).toHaveLength(1); - await queuePlugin.dequeue(event); + await queuePlugin.dequeueByMessageIds(['msg-dequeue-1']); // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore expect(queuePlugin.queueStore?.getState().events).toHaveLength(0); @@ -111,6 +112,7 @@ describe('QueueFlushingPlugin', () => { const event1: SegmentEvent = { type: EventType.TrackEvent, event: 'test1', + messageId: 'msg-count-1', properties: { test: 'test1', }, @@ -119,6 +121,7 @@ describe('QueueFlushingPlugin', () => { const event2: SegmentEvent = { type: EventType.TrackEvent, event: 'test2', + messageId: 'msg-count-2', properties: { test: 'test2', }, @@ -130,7 +133,7 @@ describe('QueueFlushingPlugin', () => { let eventsCount = await queuePlugin.pendingEvents(); expect(eventsCount).toBe(2); - await queuePlugin.dequeue(event1); + await queuePlugin.dequeueByMessageIds(['msg-count-1']); eventsCount = await queuePlugin.pendingEvents(); expect(eventsCount).toBe(1); diff --git a/packages/core/src/plugins/__tests__/SegmentDestination.test.ts b/packages/core/src/plugins/__tests__/SegmentDestination.test.ts index 885097fd9..23f4d410c 100644 --- a/packages/core/src/plugins/__tests__/SegmentDestination.test.ts +++ b/packages/core/src/plugins/__tests__/SegmentDestination.test.ts @@ -9,6 +9,7 @@ import { import { Config, EventType, + HttpConfig, SegmentAPIIntegration, SegmentEvent, TrackEventType, @@ -260,10 +261,12 @@ describe('SegmentDestination', () => { config, settings, events, + httpConfig, }: { config?: Config; settings?: SegmentAPIIntegration; events: SegmentEvent[]; + httpConfig?: HttpConfig; }) => { const plugin = new SegmentDestination(); @@ -278,6 +281,13 @@ describe('SegmentDestination', () => { }); plugin.configure(analytics); + + if (httpConfig !== undefined) { + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + plugin.httpConfig = httpConfig; + } + // The settings store won't match but that's ok, the plugin should rely only on the settings it receives during update plugin.update( { @@ -322,6 +332,7 @@ describe('SegmentDestination', () => { expect(sendEventsSpy).toHaveBeenCalledWith({ url: getURL(defaultApiHost, ''), // default api already appended with '/b' writeKey: '123-456', + retryCount: 0, events: events.slice(0, 2).map((e) => ({ ...e, })), @@ -329,6 +340,7 @@ describe('SegmentDestination', () => { expect(sendEventsSpy).toHaveBeenCalledWith({ url: getURL(defaultApiHost, ''), // default api already appended with '/b' writeKey: '123-456', + retryCount: 0, events: events.slice(2, 4).map((e) => ({ ...e, })), @@ -356,6 +368,7 @@ describe('SegmentDestination', () => { expect(sendEventsSpy).toHaveBeenCalledWith({ url: getURL(customEndpoint, '/b'), writeKey: '123-456', + retryCount: 0, events: events.slice(0, 2).map((e) => ({ ...e, })), @@ -407,13 +420,129 @@ describe('SegmentDestination', () => { expect(sendEventsSpy).toHaveBeenCalledWith({ url: expectedUrl, writeKey: '123-456', + retryCount: 0, events: events.map((e) => ({ ...e, })), }); } ); + + describe('event age pruning', () => { + it('prunes events older than maxTotalBackoffDuration', async () => { + const now = Date.now(); + const events = [ + { messageId: 'old-1', _queuedAt: now - 50000 * 1000 }, + { messageId: 'fresh-1' }, + { messageId: 'fresh-2' }, + ] as SegmentEvent[]; + + const { plugin, sendEventsSpy } = createTestWith({ + events, + httpConfig: { + backoffConfig: { + enabled: true, + maxRetryCount: 100, + baseBackoffInterval: 0.5, + maxBackoffInterval: 300, + maxTotalBackoffDuration: 43200, + jitterPercent: 10, + default4xxBehavior: 'drop', + default5xxBehavior: 'retry', + statusCodeOverrides: {}, + }, + }, + }); + + await plugin.flush(); + + expect(sendEventsSpy).toHaveBeenCalledTimes(1); + const sentEvents = sendEventsSpy.mock.calls[0][0].events; + expect(sentEvents).toHaveLength(2); + expect(sentEvents.map((e: SegmentEvent) => e.messageId)).toEqual([ + 'fresh-1', + 'fresh-2', + ]); + }); + + it('does not prune when maxTotalBackoffDuration is 0', async () => { + const now = Date.now(); + const events = [ + { messageId: 'old-1', _queuedAt: now - 50000 * 1000 }, + { messageId: 'fresh-1' }, + ] as SegmentEvent[]; + + const { plugin, sendEventsSpy } = createTestWith({ + events, + httpConfig: { + backoffConfig: { + enabled: true, + maxRetryCount: 100, + baseBackoffInterval: 0.5, + maxBackoffInterval: 300, + maxTotalBackoffDuration: 0, + jitterPercent: 10, + default4xxBehavior: 'drop', + default5xxBehavior: 'retry', + statusCodeOverrides: {}, + }, + }, + }); + + await plugin.flush(); + + expect(sendEventsSpy).toHaveBeenCalledTimes(1); + const sentEvents = sendEventsSpy.mock.calls[0][0].events; + expect(sentEvents).toHaveLength(2); + }); + + it('does not prune events without _queuedAt', async () => { + const events = [ + { messageId: 'old-1' }, + { messageId: 'fresh-1' }, + ] as SegmentEvent[]; + + const { plugin, sendEventsSpy } = createTestWith({ + events, + httpConfig: { + backoffConfig: { + enabled: true, + maxRetryCount: 100, + baseBackoffInterval: 0.5, + maxBackoffInterval: 300, + maxTotalBackoffDuration: 43200, + jitterPercent: 10, + default4xxBehavior: 'drop', + default5xxBehavior: 'retry', + statusCodeOverrides: {}, + }, + }, + }); + + await plugin.flush(); + + expect(sendEventsSpy).toHaveBeenCalledTimes(1); + const sentEvents = sendEventsSpy.mock.calls[0][0].events; + expect(sentEvents).toHaveLength(2); + }); + + it('strips _queuedAt before upload', async () => { + const now = Date.now(); + const events = [ + { messageId: 'msg-1', _queuedAt: now - 1000 }, + ] as SegmentEvent[]; + + const { plugin, sendEventsSpy } = createTestWith({ events }); + + await plugin.flush(); + + expect(sendEventsSpy).toHaveBeenCalledTimes(1); + const sentEvents = sendEventsSpy.mock.calls[0][0].events; + expect(sentEvents[0]).not.toHaveProperty('_queuedAt'); + }); + }); }); + describe('getEndpoint', () => { it.each([ ['example.com/v1/', 'https://example.com/v1/'], diff --git a/packages/core/src/types.ts b/packages/core/src/types.ts index aef451e61..12e58a43c 100644 --- a/packages/core/src/types.ts +++ b/packages/core/src/types.ts @@ -154,6 +154,8 @@ export type Config = { cdnProxy?: string; useSegmentEndpoints?: boolean; // Use if you want to use Segment endpoints errorHandler?: (error: SegmentError) => void; + /** Client-side httpConfig overrides (highest precedence over defaults and CDN). */ + httpConfig?: DeepPartial; }; export type ClientMethods = {