Introduction MongoDB change streams use resume tokens to continue reading from where they left off after a disconnect. If the application is down longer than the oplog window, the resume token expires and MongoDB throws `ChangeStreamHistoryLost`. This causes event loss unless the application can fall back to a full collection scan.
Symptoms - `MongoServerError: The resume token was not found in the oplog` or `ChangeStreamHistoryLost` - Event processor misses updates that occurred during the downtime - Application crashes with unhandled change stream error - Oplog window is smaller than the application downtime duration - Duplicate events after reconnecting due to fallback rescan
Common Causes - Application downtime exceeding the oplog retention window - Oplog size too small for the write volume, causing rapid token expiration - Replica set failover causing temporary change stream disconnection - Slow event processing causing the consumer to fall behind the oplog - No checkpoint persistence for the resume token
Step-by-Step Fix 1. **Check current oplog size and window**: ```javascript // Check oplog status db.getReplicationInfo() // Look for: // "log size start" / "log size end" in MB // "log length start" / "log length end" in hours
// Detailed oplog info db.adminCommand({ replSetGetStatus: 1 }).optimes ```
- 1.Increase oplog size if window is too small:
- 2.```javascript
- 3.// Check current oplog size
- 4.use local
- 5.db.oplog.rs.stats().maxSize
// Resize oplog (requires restart) // /etc/mongod.conf // replication: // oplogSizeMB: 51200 // 50GB ```
- 1.**Implement resume token persistence with fallback":
- 2.```javascript
- 3.const { MongoClient } = require('mongodb');
async function startChangeStream(client) { const checkpointColl = client.db('admin').collection('change_stream_checkpoint'); const checkpoint = await checkpointColl.findOne({ stream: 'main' });
const pipeline = [{ $match: { operationType: { $in: ['insert', 'update', 'replace'] } } }]; const options = {};
if (checkpoint?.resumeToken) { options.resumeAfter = checkpoint.resumeToken; }
const changeStream = client.db('mydb').collection('events').watch(pipeline, options);
try { for await (const change of changeStream) { await processEvent(change); // Save resume token after each event await checkpointColl.updateOne( { stream: 'main' }, { $set: { resumeToken: change._id } }, { upsert: true } ); } } catch (err) { if (err.code === 286) { // ChangeStreamHistoryLost console.error('Resume token expired, performing full rescan'); await fullCollectionRescan(client); } throw err; } } ```
- 1.**Perform full collection rescan as fallback":
- 2.```javascript
- 3.async function fullCollectionRescan(client) {
- 4.const events = client.db('mydb').collection('events');
- 5.const cursor = events.find({}).sort({ _id: 1 });
for await (const doc of cursor) { await processEvent({ operationType: 'replace', fullDocument: doc, documentKey: { _id: doc._id } }); } } ```