Introduction MongoDB change streams use resume tokens to continue reading from where they left off after a disconnect. If the application is down longer than the oplog window, the resume token expires and MongoDB throws `ChangeStreamHistoryLost`. This causes event loss unless the application can fall back to a full collection scan.

Symptoms - `MongoServerError: The resume token was not found in the oplog` or `ChangeStreamHistoryLost` - Event processor misses updates that occurred during the downtime - Application crashes with unhandled change stream error - Oplog window is smaller than the application downtime duration - Duplicate events after reconnecting due to fallback rescan

Common Causes - Application downtime exceeding the oplog retention window - Oplog size too small for the write volume, causing rapid token expiration - Replica set failover causing temporary change stream disconnection - Slow event processing causing the consumer to fall behind the oplog - No checkpoint persistence for the resume token

Step-by-Step Fix 1. **Check current oplog size and window**: ```javascript // Check oplog status db.getReplicationInfo() // Look for: // "log size start" / "log size end" in MB // "log length start" / "log length end" in hours

// Detailed oplog info db.adminCommand({ replSetGetStatus: 1 }).optimes ```

  1. 1.Increase oplog size if window is too small:
  2. 2.```javascript
  3. 3.// Check current oplog size
  4. 4.use local
  5. 5.db.oplog.rs.stats().maxSize

// Resize oplog (requires restart) // /etc/mongod.conf // replication: // oplogSizeMB: 51200 // 50GB ```

  1. 1.**Implement resume token persistence with fallback":
  2. 2.```javascript
  3. 3.const { MongoClient } = require('mongodb');

async function startChangeStream(client) { const checkpointColl = client.db('admin').collection('change_stream_checkpoint'); const checkpoint = await checkpointColl.findOne({ stream: 'main' });

const pipeline = [{ $match: { operationType: { $in: ['insert', 'update', 'replace'] } } }]; const options = {};

if (checkpoint?.resumeToken) { options.resumeAfter = checkpoint.resumeToken; }

const changeStream = client.db('mydb').collection('events').watch(pipeline, options);

try { for await (const change of changeStream) { await processEvent(change); // Save resume token after each event await checkpointColl.updateOne( { stream: 'main' }, { $set: { resumeToken: change._id } }, { upsert: true } ); } } catch (err) { if (err.code === 286) { // ChangeStreamHistoryLost console.error('Resume token expired, performing full rescan'); await fullCollectionRescan(client); } throw err; } } ```

  1. 1.**Perform full collection rescan as fallback":
  2. 2.```javascript
  3. 3.async function fullCollectionRescan(client) {
  4. 4.const events = client.db('mydb').collection('events');
  5. 5.const cursor = events.find({}).sort({ _id: 1 });

for await (const doc of cursor) { await processEvent({ operationType: 'replace', fullDocument: doc, documentKey: { _id: doc._id } }); } } ```

Prevention - Size oplog to hold at least 72 hours of operations: `oplogSizeMB = (writes_per_hour * avg_doc_size * 72) / 1048576` - Persist resume tokens after every event or at least every second - Implement a full collection rescan fallback for when tokens expire - Monitor oplog window size and alert when it drops below 24 hours - Use multiple change stream consumers for parallel processing - Set `maxTimeMS` on change stream operations to detect stalls early - Consider using CDC tools like Debezium for more robust change data capture