What's Actually Happening
Your MongoDB Change Stream application crashes or restarts, and when it tries to resume from the saved resume token, MongoDB rejects the token as invalid or returns an error indicating the token has expired or the oplog has been purged.
The Error You'll See
Resume token invalid:
MongoServerError: Resume token is not valid. The token may have been
generated by a different MongoDB version or the resume token data has
been corrupted.Oplog purged:
MongoServerError: Cannot resume a change stream because the resume
token references an oplog entry that has been purged. The resume token
was created at 2026-04-01T00:00:00.000Z which is older than the oldest
oplog entry.Pre-image required:
MongoServerError: Change stream was configured to require pre-images,
but the collection does not have pre-image recording enabled. Cannot
resume without pre-image.Change event missing:
MongoServerError: Failed to resume change stream after 3 attempts.
The resume point is no longer available in the oplog.Why This Happens
- 1.Token expired - Too much time passed before resuming
- 2.Oplog purged - Token references oplog entry that's been overwritten
- 3.Version mismatch - Token from different MongoDB version
- 4.Pre-image disabled - Collection needs pre-image but not configured
- 5.Cluster change - Token from before resharding or migration
- 6.Improper storage - Token corrupted during persistence
- 7.Wrong resume point - Using wrong token after partial processing
- 8.Replica set reconfig - Token from different replica set configuration
Step 1: Understand Resume Token Structure
```javascript // Resume token structure: const token = { "_data": "8266067B2B000000012B022C0100296E5A1004ABCD..." // Base64 encoded }
// Decode token to see details: const bson = require('bson'); const decoded = bson.decode(Buffer.from(token._data, 'base64'));
console.log(decoded); // { // timestamp: Timestamp({ t: 1711690000, i: 1 }), // version: 1, // tokenType: 'resumeAfter', // ... // }
// Token contains: // 1. Cluster time when event occurred // 2. UUID of the change stream // 3. Document key information // 4. Version information
// Check token age:
const tokenTime = new Date(decoded.timestamp.toNumber() * 1000);
const age = Date.now() - tokenTime.getTime();
console.log(Token age: ${age / 1000 / 60} minutes);
```
Step 2: Configure Pre-Images for Change Streams
```javascript // Enable pre-images on collection (MongoDB 6.0+): db.createCollection("orders", { changeStreamPreAndPostImages: { enabled: true } });
// Or update existing collection: db.runCommand({ collMod: "orders", changeStreamPreAndPostImages: { enabled: true } });
// Check pre-image configuration: db.getCollectionInfos({ name: "orders" })[0].options.changeStreamPreAndPostImages;
// Watch with pre-images: const changeStream = db.orders.watch([], { fullDocument: "required", fullDocumentBeforeChange: "required" });
// Handle pre-image in change event: changeStream.on("change", (change) => { console.log("Document before:", change.fullDocumentBeforeChange); console.log("Document after:", change.fullDocument); console.log("Operation:", change.operationType); });
// When pre-images not needed: db.runCommand({ collMod: "orders", changeStreamPreAndPostImages: { enabled: false } }); ```
Step 3: Implement Robust Resume Strategy
```javascript const { MongoClient } = require('mongodb'); const fs = require('fs');
class ResilientChangeStream { constructor(uri, dbName, collectionName) { this.uri = uri; this.dbName = dbName; this.collectionName = collectionName; this.tokenFile = './resume_token.json'; this.client = null; this.changeStream = null; this.maxRetries = 5; this.retryDelay = 5000; }
async connect() { this.client = new MongoClient(this.uri, { maxPoolSize: 10, serverSelectionTimeoutMS: 5000, connectTimeoutMS: 10000 });
await this.client.connect(); this.collection = this.client.db(this.dbName).collection(this.collectionName); }
loadResumeToken() { try { if (fs.existsSync(this.tokenFile)) { const data = fs.readFileSync(this.tokenFile, 'utf8'); return JSON.parse(data); } } catch (error) { console.error('Error loading resume token:', error); } return null; }
saveResumeToken(token) { try { fs.writeFileSync(this.tokenFile, JSON.stringify(token)); } catch (error) { console.error('Error saving resume token:', error); } }
async startWatching() { const savedToken = this.loadResumeToken();
const options = { fullDocument: 'updateLookup', maxAwaitTimeMS: 30000 };
// Try to resume from saved token if (savedToken) { try { console.log('Attempting to resume from saved token...'); this.changeStream = this.collection.watch([], { ...options, resumeAfter: savedToken });
// Verify stream is valid await this.waitForEvent(this.changeStream, 5000); console.log('Successfully resumed from token'); return; } catch (error) { console.error('Resume failed:', error.message); console.log('Falling back to startAtOperationTime...'); } }
// Fallback: start from current time const currentTime = new Date(); try { this.changeStream = this.collection.watch([], { ...options, startAtOperationTime: new Timestamp(Math.floor(currentTime.getTime() / 1000), 0) }); console.log('Started new change stream from current time'); } catch (error) { console.error('Failed to start change stream:', error); throw error; } }
async waitForEvent(stream, timeout) { return new Promise((resolve, reject) => { const timer = setTimeout(() => { reject(new Error('Timeout waiting for change stream')); }, timeout);
stream.once('change', (change) => { clearTimeout(timer); resolve(change); });
stream.once('error', (error) => { clearTimeout(timer); reject(error); }); }); }
async processChange(change) {
try {
// Your processing logic here
console.log(Processing ${change.operationType} on ${change.ns.coll});
// Simulate processing await new Promise(resolve => setTimeout(resolve, 100));
// Save token after successful processing this.saveResumeToken(change._id);
} catch (error) { console.error('Error processing change:', error); throw error; // Re-throw to trigger reconnection } }
async run() { let retryCount = 0;
while (retryCount < this.maxRetries) { try { if (!this.client) { await this.connect(); }
await this.startWatching(); retryCount = 0; // Reset on success
// Process changes for await (const change of this.changeStream) { await this.processChange(change); }
} catch (error) {
retryCount++;
console.error(Error (attempt ${retryCount}/${this.maxRetries}):, error.message);
// Handle specific errors if (error.message.includes('resume token')) { console.log('Invalid token, clearing and restarting...'); fs.unlinkSync(this.tokenFile); }
if (error.message.includes('oplog entry')) { console.log('Oplog purged, starting from current time...'); fs.unlinkSync(this.tokenFile); }
// Close and reconnect if (this.changeStream) { await this.changeStream.close(); } if (this.client) { await this.client.close(); this.client = null; }
// Wait before retry await new Promise(resolve => setTimeout(resolve, this.retryDelay * retryCount)); } }
throw new Error('Max retries exceeded'); }
async stop() { if (this.changeStream) { await this.changeStream.close(); } if (this.client) { await this.client.close(); } } }
// Usage const watcher = new ResilientChangeStream( 'mongodb://localhost:27017', 'mydb', 'orders' );
watcher.run().catch(console.error); ```
Step 4: Handle Token Expiration and Oplog Purge
```javascript // Check oplog size and retention: const localDb = client.db('local'); const oplogStats = await localDb.collection('oplog.rs').stats();
console.log('Oplog size:', oplogStats.size); console.log('Oplog max size:', oplogStats.maxSize);
// Calculate oplog window: const firstEntry = await localDb.collection('oplog.rs').findOne({}, { sort: { $natural: 1 } }); const lastEntry = await localDb.collection('oplog.rs').findOne({}, { sort: { $natural: -1 } });
const firstTime = firstEntry.ts.toNumber(); const lastTime = lastEntry.ts.toNumber(); const windowHours = (lastTime - firstTime) / 3600;
console.log(Oplog window: ${windowHours.toFixed(2)} hours);
// Increase oplog size to retain more history: // In MongoDB config or command: // replication: // oplogSizeMB: 10240 // 10GB
// Or for running replica set: // db.adminCommand({ setParameter: 1, oplogSizeMB: 10240 });
// Implement token freshness check: function isTokenFresh(token, maxAgeMs = 24 * 60 * 60 * 1000) { // Default 24 hours try { const decoded = bson.decode(Buffer.from(token._data, 'base64')); const tokenTime = decoded.timestamp.toNumber() * 1000; const age = Date.now() - tokenTime; return age < maxAgeMs; } catch { return false; } }
// Use in resume logic: const savedToken = loadResumeToken(); if (savedToken && isTokenFresh(savedToken)) { // Safe to resume } else { // Token too old, start fresh } ```
Step 5: Monitor Change Stream Health
```javascript // Comprehensive monitoring: class ChangeStreamMonitor { constructor(collection) { this.collection = collection; this.metrics = { eventsProcessed: 0, errors: 0, reconnects: 0, lastEventTime: null, latency: 0 }; }
async getServerStatus() { const admin = this.collection.mongoClient.db('admin'); const status = await admin.command({ serverStatus: 1 });
return { connections: status.connections, network: status.network, opcounters: status.opcounters, replication: status.repl }; }
async getChangeStreamStats() { const admin = this.collection.mongoClient.db('admin');
try { const stats = await admin.command({ currentOp: 1, $all: true, $or: [ { 'command.aggregate': { $exists: true } }, { 'command.watch': { $exists: true } } ] });
return stats.inprog.filter(op => op.command && (op.command.aggregate || op.command.watch) ); } catch (error) { console.error('Error getting change stream stats:', error); return []; } }
calculateLatency(changeEvent) { const eventTime = changeEvent.clusterTime.toNumber() * 1000; return Date.now() - eventTime; }
recordEvent(change) { this.metrics.eventsProcessed++; this.metrics.lastEventTime = new Date(); this.metrics.latency = this.calculateLatency(change); }
recordError(error) { this.metrics.errors++; console.error('Change stream error:', error); }
getMetrics() { return { ...this.metrics, uptime: process.uptime(), timestamp: new Date() }; }
startReporting(interval = 60000) { setInterval(() => { const metrics = this.getMetrics(); console.log('Change Stream Metrics:', JSON.stringify(metrics, null, 2));
// Alert if issues if (metrics.latency > 10000) { console.warn('High latency detected:', metrics.latency); }
if (metrics.errors > 10) { console.warn('High error count:', metrics.errors); } }, interval); } }
// Usage with change stream: const monitor = new ChangeStreamMonitor(collection); monitor.startReporting();
changeStream.on('change', (change) => { monitor.recordEvent(change); // Process change });
changeStream.on('error', (error) => { monitor.recordError(error); }); ```
Step 6: Handle Cluster Changes and Migrations
```javascript // Handle replica set elections and migrations: class ClusterAwareChangeStream { constructor(client, collection) { this.client = client; this.collection = collection; this.lastKnownToken = null; this.topologyVersion = null; }
async initialize() { // Get initial topology info const admin = this.client.db('admin'); const status = await admin.command({ isMaster: 1 }); this.topologyVersion = status.topologyVersion;
console.log('Initial topology version:', this.topologyVersion); }
async watchWithTopologyHandling(pipeline = [], options = {}) { let changeStream = this.createChangeStream(pipeline, options);
changeStream.on('change', async (change) => { // Save token this.lastKnownToken = change._id;
// Check for topology changes if (change.$clusterTime && change.$clusterTime.clusterTime) { // Process change await this.processChange(change); } });
// Handle resumable errors changeStream.on('error', async (error) => { if (this.isResumableError(error)) { console.log('Resumable error, reconnecting...');
// Wait for cluster to stabilize await this.waitForClusterStable();
// Resume from last token changeStream = this.createChangeStream(pipeline, { ...options, resumeAfter: this.lastKnownToken }); } else { throw error; } });
return changeStream; }
isResumableError(error) { const resumableCodes = [ 6, // HostUnreachable 7, // HostNotFound 89, // NetworkTimeout 91, // ShutdownInProgress 189, // PrimarySteppedDown 262, // ExceededTimeLimit 9001, // SocketException 10107, // NotPrimary 11600, // InterruptedAtShutdown 11602, // InterruptedDueToReplStateChange 13435, // NotPrimaryNoSecondaryOk 13436 // NotPrimaryOrSecondary ];
return resumableCodes.includes(error.code); }
async waitForClusterStable(maxWaitMs = 30000) { const startTime = Date.now();
while (Date.now() - startTime < maxWaitMs) { try { const admin = this.client.db('admin'); await admin.command({ ping: 1 });
const status = await admin.command({ isMaster: 1 }); if (status.ismaster || status.secondary) { return true; } } catch (error) { console.log('Cluster not ready, waiting...'); await new Promise(r => setTimeout(r, 1000)); } }
throw new Error('Cluster did not stabilize'); }
createChangeStream(pipeline, options) { return this.collection.watch(pipeline, { fullDocument: 'updateLookup', ...options }); }
async processChange(change) { // Override for processing console.log('Processing change:', change.operationType); } } ```
Step 7: Implement Idempotent Processing
```javascript // Ensure processing is idempotent for resume safety: class IdempotentProcessor { constructor(collection) { this.processedCollection = collection.mongoClient.db('mydb').collection('processed_events'); }
async isProcessed(changeId) { const existing = await this.processedCollection.findOne({ _id: changeId }); return existing !== null; }
async markProcessed(changeId, documentKey) { await this.processedCollection.updateOne( { _id: changeId }, { $set: { processedAt: new Date(), documentKey: documentKey } }, { upsert: true } ); }
async processChange(change) { const changeId = change._id._data; // Resume token data
// Check if already processed if (await this.isProcessed(changeId)) { console.log('Change already processed, skipping'); return; }
// Process based on operation type switch (change.operationType) { case 'insert': await this.handleInsert(change); break; case 'update': await this.handleUpdate(change); break; case 'delete': await this.handleDelete(change); break; default: console.log('Unknown operation type:', change.operationType); }
// Mark as processed await this.markProcessed(changeId, change.documentKey); }
async handleInsert(change) { // Idempotent insert handling console.log('Processing insert:', change.fullDocument); }
async handleUpdate(change) { // Idempotent update handling console.log('Processing update:', change.documentKey); }
async handleDelete(change) { // Idempotent delete handling console.log('Processing delete:', change.documentKey); } }
// Cleanup old processed events: async function cleanupProcessedEvents(collection, maxAgeMs = 7 * 24 * 60 * 60 * 1000) { const cutoff = new Date(Date.now() - maxAgeMs);
const result = await collection.deleteMany({ processedAt: { $lt: cutoff } });
console.log(Cleaned up ${result.deletedCount} old processed events);
}
```
Step 8: Handle Sharded Cluster Change Streams
```javascript // Change streams in sharded clusters require special handling: class ShardedChangeStream { constructor(client, ns) { this.client = client; this.ns = ns; // { db: 'mydb', coll: 'mycollection' } }
async watchSharded(pipeline = [], options = {}) { // Must watch on mongos, not directly on shards const admin = this.client.db('admin');
// For sharded collections, changes may come out of order // Use fullDocument to handle this
const changeStream = this.client .db(this.ns.db) .collection(this.ns.coll) .watch(pipeline, { fullDocument: 'updateLookup', ...options });
// Track seen tokens per shard const shardTokens = new Map();
changeStream.on('change', (change) => { // Extract shard key from document const shardKey = this.extractShardKey(change);
// Ensure ordering within shard if (shardKey) { const lastToken = shardTokens.get(shardKey); if (lastToken && this.compareTokens(change._id, lastToken) <= 0) { console.warn('Out of order event detected'); } shardTokens.set(shardKey, change._id); }
this.processChange(change); });
return changeStream; }
extractShardKey(change) { // Extract shard key value from document // This depends on your shard key configuration if (change.fullDocument) { return change.fullDocument._id; // Example: sharding on _id } return null; }
compareTokens(token1, token2) { // Compare two resume tokens const time1 = this.getTokenTime(token1); const time2 = this.getTokenTime(token2); return time1 - time2; }
getTokenTime(token) { const decoded = bson.decode(Buffer.from(token._data, 'base64')); return decoded.timestamp.toNumber(); }
async processChange(change) { // Your processing logic console.log('Processing change from shard:', change); } }
// For cross-shard consistency:
async function getShardDistribution(client, dbName, collName) {
const admin = client.db('admin');
const result = await admin.command({
getShardDistribution: 1,
[${dbName}.${collName}]: 1
});
console.log('Shard distribution:', result); } ```
Step 9: Test Resume Token Handling
```javascript // Comprehensive test suite: const { MongoClient } = require('mongodb'); const assert = require('assert');
async function testChangeStreamResume() { const client = new MongoClient('mongodb://localhost:27017'); await client.connect();
const db = client.db('testdb'); const collection = db.collection('testcollection');
try { // Test 1: Basic resume console.log('Test 1: Basic resume...'); await collection.deleteMany({});
const stream1 = collection.watch(); const token1 = await new Promise((resolve) => { stream1.on('change', resolve); collection.insertOne({ test: 1 }); }).then(change => { stream1.close(); return change._id; });
// Resume from token const stream2 = collection.watch([], { resumeAfter: token1 }); await collection.insertOne({ test: 2 });
const change2 = await new Promise((resolve) => { stream2.on('change', resolve); });
assert.equal(change2.fullDocument.test, 2); stream2.close(); console.log('✓ Test 1 passed');
// Test 2: Invalid token handling console.log('Test 2: Invalid token handling...'); const fakeToken = { _data: 'invalid_token' };
try { const stream3 = collection.watch([], { resumeAfter: fakeToken }); await stream3.tryNext(); assert.fail('Should have thrown error'); } catch (error) { assert.ok(error.message.includes('resume token')); console.log('✓ Test 2 passed'); }
// Test 3: Start at operation time console.log('Test 3: Start at operation time...'); const startTime = new Timestamp(Math.floor(Date.now() / 1000), 0);
await collection.insertOne({ test: 'before' });
const stream4 = collection.watch([], { startAtOperationTime: startTime }); await collection.insertOne({ test: 'after' });
const change4 = await new Promise((resolve) => { stream4.on('change', resolve); });
assert.equal(change4.fullDocument.test, 'after'); stream4.close(); console.log('✓ Test 3 passed');
// Test 4: Token persistence console.log('Test 4: Token persistence...'); const watcher = new ResilientChangeStream( 'mongodb://localhost:27017', 'testdb', 'testcollection' );
await watcher.connect(); const testToken = { _data: 'test_token_data' }; watcher.saveResumeToken(testToken);
const loadedToken = watcher.loadResumeToken(); assert.deepEqual(loadedToken, testToken); console.log('✓ Test 4 passed');
console.log('All tests passed!');
} finally { await client.close(); } }
// Run tests testChangeStreamResume().catch(console.error); ```
Step 10: Production Configuration and Best Practices
```javascript // Complete production setup:
// 1. MongoDB configuration for change streams: // mongod.conf /* replication: oplogSizeMB: 10240 enableMajorityReadConcern: true
setParameter: enableTenantMigrationProxyPort: false */
// 2. Production change stream implementation: class ProductionChangeStream { constructor(config) { this.config = { uri: config.uri, database: config.database, collection: config.collection, batchSize: config.batchSize || 100, maxAwaitTimeMS: config.maxAwaitTimeMS || 30000, resumeTokenPath: config.resumeTokenPath || './resume_token.json', maxRetries: config.maxRetries || 10, retryDelayMs: config.retryDelayMs || 5000, processTimeoutMs: config.processTimeoutMs || 30000 };
this.client = null; this.changeStream = null; this.isShuttingDown = false; }
async initialize() { // Connect with production settings this.client = new MongoClient(this.config.uri, { maxPoolSize: 50, minPoolSize: 5, maxIdleTimeMS: 30000, connectTimeoutMS: 10000, socketTimeoutMS: 0, serverSelectionTimeoutMS: 10000, heartbeatFrequencyMS: 10000, retryWrites: true, retryReads: true });
await this.client.connect();
// Setup graceful shutdown process.on('SIGTERM', () => this.shutdown()); process.on('SIGINT', () => this.shutdown()); }
async run() { await this.initialize();
while (!this.isShuttingDown) { try { await this.processChanges(); } catch (error) { console.error('Error in change stream:', error); await this.handleError(error); } } }
async processChanges() { const resumeToken = this.loadResumeToken();
const options = { fullDocument: 'updateLookup', maxAwaitTimeMS: this.config.maxAwaitTimeMS, batchSize: this.config.batchSize };
if (resumeToken) { options.resumeAfter = resumeToken; }
this.changeStream = this.client .db(this.config.database) .collection(this.config.collection) .watch([], options);
for await (const change of this.changeStream) { if (this.isShuttingDown) { break; }
await this.processWithTimeout(change); this.saveResumeToken(change._id); } }
async processWithTimeout(change) { return Promise.race([ this.processChange(change), new Promise((_, reject) => setTimeout(() => reject(new Error('Processing timeout')), this.config.processTimeoutMs) ) ]); }
async processChange(change) {
// Your processing logic here
console.log(Processing ${change.operationType} on ${change.documentKey._id});
}
async handleError(error) { if (error.message.includes('resume token')) { // Clear invalid token this.saveResumeToken(null); }
// Wait before retry await new Promise(r => setTimeout(r, this.config.retryDelayMs)); }
async shutdown() { console.log('Shutting down gracefully...'); this.isShuttingDown = true;
if (this.changeStream) { await this.changeStream.close(); }
if (this.client) { await this.client.close(); }
process.exit(0); }
loadResumeToken() { // Implementation from earlier }
saveResumeToken(token) { // Implementation from earlier } }
// 3. Kubernetes deployment: /* apiVersion: apps/v1 kind: Deployment metadata: name: change-stream-processor spec: replicas: 1 selector: matchLabels: app: change-stream-processor template: metadata: labels: app: change-stream-processor spec: containers: - name: processor image: myapp:latest env: - name: MONGODB_URI valueFrom: secretKeyRef: name: mongodb-secret key: uri volumeMounts: - name: resume-token mountPath: /app/tokens volumes: - name: resume-token persistentVolumeClaim: claimName: resume-token-pvc */
// Usage const processor = new ProductionChangeStream({ uri: process.env.MONGODB_URI, database: 'mydb', collection: 'orders' });
processor.run().catch(console.error); ```
MongoDB Change Stream Resume Checklist
| Check | Command | Expected |
|---|---|---|
| Token valid | decode and check | Not expired |
| Oplog retention | check oplog window | > token age |
| Pre-images | collection options | Enabled if needed |
| Cluster stable | isMaster | Primary exists |
| Processing idempotent | test with reprocess | Same result |
| Error handling | test invalid token | Proper fallback |
| Graceful shutdown | test SIGTERM | Token saved |
Verify the Fix
```bash # After implementing robust change stream:
# 1. Test resume after restart # Insert document db.orders.insertOne({ test: 1 }) # Note resume token
# Restart application pm2 restart change-stream-app
# Verify it resumes correctly db.orders.insertOne({ test: 2 }) # Check logs for processing
# 2. Test token expiration handling # Wait for token to age # Insert document # Verify app handles expired token gracefully
# 3. Test cluster failover # Step down primary rs.stepDown()
# Verify app reconnects and resumes
# 4. Check metrics # Events processed # Latency # Error count # Resume success rate
# Compare before/after: # Before: Token errors, lost events, crashes # After: Graceful resume, no lost events, stable ```
Related Issues
- [Fix MongoDB Connection Timeout](/articles/fix-mongodb-connection-timeout)
- [Fix MongoDB Replica Set Primary Not Found](/articles/fix-mongodb-replica-set-primary-not-found)
- [Fix MongoDB Oplog Too Small](/articles/fix-mongodb-oplog-too-small)