From 69c78ac45c2da05921a987813ada649ef2288a82 Mon Sep 17 00:00:00 2001 From: Alexander Berl Date: Sat, 27 Feb 2021 15:18:08 +0100 Subject: [PATCH 1/9] Automatically repair torn writes on opening --- src/Partition/ReadablePartition.js | 15 +++++++++++++++ src/Storage/WritableStorage.js | 22 ++++++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/src/Partition/ReadablePartition.js b/src/Partition/ReadablePartition.js index 616d21a..d586b73 100644 --- a/src/Partition/ReadablePartition.js +++ b/src/Partition/ReadablePartition.js @@ -123,6 +123,21 @@ class ReadablePartition extends events.EventEmitter { return true; } + /** + * @returns {number} -1 if the partition is ok and the sequence number of the broken document if a torn write was detected. + */ + checkTornWrite() { + const reader = this.prepareReadBufferBackwards(this.size); + const separator = reader.buffer.toString('ascii', reader.cursor - DOCUMENT_SEPARATOR.length, reader.cursor); + if (separator !== DOCUMENT_SEPARATOR) { + const position = this.findDocumentPositionBefore(this.size); + const reader = this.prepareReadBuffer(position); + const { sequenceNumber } = this.readDocumentHeader(reader.buffer, reader.cursor, position); + return sequenceNumber; + } + return -1; + } + /** * Read the partition metadata from the file. * diff --git a/src/Storage/WritableStorage.js b/src/Storage/WritableStorage.js index 051ef1e..a19b697 100644 --- a/src/Storage/WritableStorage.js +++ b/src/Storage/WritableStorage.js @@ -72,9 +72,31 @@ class WritableStorage extends ReadableStorage { if (!this.lock()) { return true; } + this.checkTornWrites(); return super.open(); } + /** + * Check all partitions torn writes and truncate the storage to the position before the first torn write. + * This might delete correctly written events in partitions, if their sequence number is higher than the + * torn write in another partition. + */ + checkTornWrites() { + // TODO: Only do this if a potential failed write is detected (e.g. a tx-marker exists) + let lastValidSequenceNumber = Number.MAX_SAFE_INTEGER; + this.forEachPartition(partition => { + partition.open(); + const tornSequenceNumber = partition.checkTornWrite(); + if (tornSequenceNumber >= 0) { + lastValidSequenceNumber = Math.min(lastValidSequenceNumber, tornSequenceNumber - 1); + } + partition.close(); + }); + if (lastValidSequenceNumber < Number.MAX_SAFE_INTEGER) { + this.truncate(lastValidSequenceNumber); + } + } + /** * Attempt to lock this storage by means of a lock directory. * @returns {boolean} True if the lock was created or false if the lock is already in place. From f15457818ce57a4f0e95f57df13442dd310da8f8 Mon Sep 17 00:00:00 2001 From: Alexander Berl Date: Sun, 28 Mar 2021 19:00:05 +0200 Subject: [PATCH 2/9] Only check for torn writes when unlocking --- src/Storage/WritableStorage.js | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/Storage/WritableStorage.js b/src/Storage/WritableStorage.js index a19b697..98beb39 100644 --- a/src/Storage/WritableStorage.js +++ b/src/Storage/WritableStorage.js @@ -72,7 +72,6 @@ class WritableStorage extends ReadableStorage { if (!this.lock()) { return true; } - this.checkTornWrites(); return super.open(); } @@ -82,7 +81,6 @@ class WritableStorage extends ReadableStorage { * torn write in another partition. */ checkTornWrites() { - // TODO: Only do this if a potential failed write is detected (e.g. a tx-marker exists) let lastValidSequenceNumber = Number.MAX_SAFE_INTEGER; this.forEachPartition(partition => { partition.open(); @@ -127,6 +125,9 @@ class WritableStorage extends ReadableStorage { * Current implementation just deletes a lock file that is named like the storage. */ unlock() { + if (fs.existsSync(this.lockFile)) { + this.checkTornWrites(); + } fs.rmdirSync(this.lockFile); this.locked = false; } From 8348038a3b8befc75047c1fd183c4d93d370932d Mon Sep 17 00:00:00 2001 From: Alexander Berl Date: Sun, 28 Mar 2021 20:18:54 +0200 Subject: [PATCH 3/9] Close partitions after truncating --- src/Storage/WritableStorage.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storage/WritableStorage.js b/src/Storage/WritableStorage.js index 98beb39..9bd6e5b 100644 --- a/src/Storage/WritableStorage.js +++ b/src/Storage/WritableStorage.js @@ -88,11 +88,11 @@ class WritableStorage extends ReadableStorage { if (tornSequenceNumber >= 0) { lastValidSequenceNumber = Math.min(lastValidSequenceNumber, tornSequenceNumber - 1); } - partition.close(); }); if (lastValidSequenceNumber < Number.MAX_SAFE_INTEGER) { this.truncate(lastValidSequenceNumber); } + this.forEachPartition(partition => partition.close()); } /** From 8a6b82e51734a4b8165bedf58cad357acb0a04ba Mon Sep 17 00:00:00 2001 From: Alexander Berl Date: Sat, 15 May 2021 14:35:22 +0200 Subject: [PATCH 4/9] Only repair when lock was not acquired --- src/Storage/WritableStorage.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storage/WritableStorage.js b/src/Storage/WritableStorage.js index 9bd6e5b..6d41107 100644 --- a/src/Storage/WritableStorage.js +++ b/src/Storage/WritableStorage.js @@ -125,7 +125,7 @@ class WritableStorage extends ReadableStorage { * Current implementation just deletes a lock file that is named like the storage. */ unlock() { - if (fs.existsSync(this.lockFile)) { + if (!this.locked && fs.existsSync(this.lockFile)) { this.checkTornWrites(); } fs.rmdirSync(this.lockFile); From e85e6e79629582519273f50ec65676867a7e8216 Mon Sep 17 00:00:00 2001 From: Alexander Berl Date: Sat, 15 May 2021 17:58:03 +0200 Subject: [PATCH 5/9] Ignore locks when iterating partitions --- src/Storage/ReadableStorage.js | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Storage/ReadableStorage.js b/src/Storage/ReadableStorage.js index 47ea87b..8b69e62 100644 --- a/src/Storage/ReadableStorage.js +++ b/src/Storage/ReadableStorage.js @@ -138,6 +138,7 @@ class ReadableStorage extends events.EventEmitter { for (let file of files) { if (file.substr(-6) === '.index') continue; if (file.substr(-7) === '.branch') continue; + if (file.substr(-5) === '.lock') continue; if (file.substr(0, this.storageFile.length) !== this.storageFile) continue; const partition = this.createPartition(file, this.partitionConfig); From 1a970f84d86d26dd6ca922b842e58240e468fa92 Mon Sep 17 00:00:00 2001 From: Alexander Berl Date: Sat, 15 May 2021 18:00:36 +0200 Subject: [PATCH 6/9] Truncate correct position on torn write --- src/Partition/ReadablePartition.js | 2 +- src/Storage/WritableStorage.js | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Partition/ReadablePartition.js b/src/Partition/ReadablePartition.js index d586b73..71c7c52 100644 --- a/src/Partition/ReadablePartition.js +++ b/src/Partition/ReadablePartition.js @@ -363,7 +363,7 @@ class ReadablePartition extends events.EventEmitter { } position -= this.readBufferLength; } while (position > 0); - return position; + return Math.max(0, position); } /** diff --git a/src/Storage/WritableStorage.js b/src/Storage/WritableStorage.js index 6d41107..0942236 100644 --- a/src/Storage/WritableStorage.js +++ b/src/Storage/WritableStorage.js @@ -86,7 +86,7 @@ class WritableStorage extends ReadableStorage { partition.open(); const tornSequenceNumber = partition.checkTornWrite(); if (tornSequenceNumber >= 0) { - lastValidSequenceNumber = Math.min(lastValidSequenceNumber, tornSequenceNumber - 1); + lastValidSequenceNumber = Math.min(lastValidSequenceNumber, tornSequenceNumber); } }); if (lastValidSequenceNumber < Number.MAX_SAFE_INTEGER) { From b058ae23edffee7e5c066975f6cfed36fb0b549f Mon Sep 17 00:00:00 2001 From: Alexander Berl Date: Sat, 15 May 2021 20:42:51 +0200 Subject: [PATCH 7/9] Add lock option to configure how an existing lock should be handled --- README.md | 13 ++++++++++++- src/EventStore.js | 4 +++- src/Storage/WritableStorage.js | 22 +++++++++++++++++----- test/EventStore.spec.js | 27 +++++++++++++++++++++++++++ 4 files changed, 59 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index f4c51d6..d7af612 100644 --- a/README.md +++ b/README.md @@ -413,7 +413,18 @@ leads to every single document being flushed directly. #### Consistency -Since the storage is append-only, consistency is automatically guaranteed. +Since the storage is append-only, consistency is automatically guaranteed for all successful writes. Writes that fail in +the middle, e.g. because the machine crashes before the full write buffer is flushed, will lead to a torn write. This is +a partial invalid write. To recover from such a state, the storage will detect torn writes and truncate them when an existing +lock is reclaimed. This can be done by instantiating the store with the following option: + +```javascript +const eventstore = new EventStore('my-event-store', { storageConfig: { lock: EventStore.LOCK_RECLAIM } }); +``` + +Note that this option will effectively bypass the lock that prevents multiple instances from being created, so you should +not use this carelessly. Having multiple instances write to the same files will lead to inconsistent data that can not be +easily recovered from. #### Isolation diff --git a/src/EventStore.js b/src/EventStore.js index 2fd12be..f31c9af 100644 --- a/src/EventStore.js +++ b/src/EventStore.js @@ -397,4 +397,6 @@ class EventStore extends events.EventEmitter { module.exports = EventStore; module.exports.ExpectedVersion = ExpectedVersion; -module.exports.OptimisticConcurrencyError = OptimisticConcurrencyError; \ No newline at end of file +module.exports.OptimisticConcurrencyError = OptimisticConcurrencyError; +module.exports.LOCK_THROW = Storage.LOCK_THROW; +module.exports.LOCK_RECLAIM = Storage.LOCK_RECLAIM; diff --git a/src/Storage/WritableStorage.js b/src/Storage/WritableStorage.js index 0942236..a3d0260 100644 --- a/src/Storage/WritableStorage.js +++ b/src/Storage/WritableStorage.js @@ -8,6 +8,9 @@ const { assert, matches, buildMetadataForMatcher, buildMatcherFromMetadata } = r const DEFAULT_WRITE_BUFFER_SIZE = 16 * 1024; +const LOCK_RECLAIM = 0x1; +const LOCK_THROW = 0x2; + class StorageLockedError extends Error {} /** @@ -37,6 +40,7 @@ class WritableStorage extends ReadableStorage { * @param {function(object, number): string} [config.partitioner] A function that takes a document and sequence number and returns a partition name that the document should be stored in. Defaults to write all documents to the primary partition. * @param {object} [config.indexOptions] An options object that should be passed to all indexes on construction. * @param {string} [config.hmacSecret] A private key that is used to verify matchers retrieved from indexes. + * @param {number} [config.lock] One of LOCK_* constants that defines how an existing lock should be handled. */ constructor(storageName = 'storage', config = {}) { if (typeof storageName !== 'string') { @@ -60,6 +64,11 @@ class WritableStorage extends ReadableStorage { } } super(storageName, config); + + this.lockFile = path.resolve(this.dataDirectory, this.storageFile + '.lock'); + if (config.lock === LOCK_RECLAIM) { + this.unlock(); + } this.partitioner = config.partitioner; } @@ -105,7 +114,6 @@ class WritableStorage extends ReadableStorage { if (this.locked) { return false; } - this.lockFile = path.resolve(this.dataDirectory, this.storageFile + '.lock'); try { fs.mkdirSync(this.lockFile); this.locked = true; @@ -125,10 +133,12 @@ class WritableStorage extends ReadableStorage { * Current implementation just deletes a lock file that is named like the storage. */ unlock() { - if (!this.locked && fs.existsSync(this.lockFile)) { - this.checkTornWrites(); + if (fs.existsSync(this.lockFile)) { + if (!this.locked) { + this.checkTornWrites(); + } + fs.rmdirSync(this.lockFile); } - fs.rmdirSync(this.lockFile); this.locked = false; } @@ -393,4 +403,6 @@ class WritableStorage extends ReadableStorage { } module.exports = WritableStorage; -module.exports.StorageLockedError = StorageLockedError; \ No newline at end of file +module.exports.StorageLockedError = StorageLockedError; +module.exports.LOCK_THROW = LOCK_THROW; +module.exports.LOCK_RECLAIM = LOCK_RECLAIM; \ No newline at end of file diff --git a/test/EventStore.spec.js b/test/EventStore.spec.js index 5335aa8..e7d3ad8 100644 --- a/test/EventStore.spec.js +++ b/test/EventStore.spec.js @@ -60,6 +60,33 @@ describe('EventStore', function() { fs.readdir = originalReaddir; }); + it('repairs torn writes', function(done) { + eventstore = new EventStore({ + storageDirectory + }); + + const events = [{foo: 'bar'.repeat(500)}]; + eventstore.on('ready', () => { + eventstore.commit('foo-bar', events, () => { + // Simulate a torn write (but indexes are still written) + fs.truncateSync(eventstore.storage.getPartition('foo-bar').fileName, 512); + + // The previous instance was not closed, so the lock still exists + eventstore = new EventStore({ + storageDirectory, + storageConfig: { + lock: EventStore.LOCK_RECLAIM + } + }); + eventstore.on('ready', () => { + expect(eventstore.length).to.be(0); + expect(eventstore.getStreamVersion('foo-bar')).to.be(0); + done(); + }); + }); + }); + }); + it('throws when trying to open non-existing store read-only', function() { expect(() => new EventStore({ storageDirectory, From d2cea70ec696b08b2354dede3b6c9da5109bd41b Mon Sep 17 00:00:00 2001 From: Alexander Berl Date: Sat, 15 May 2021 21:27:13 +0200 Subject: [PATCH 8/9] Move logic to ensure directory into util function --- src/Consumer.js | 7 ++----- src/Index/WritableIndex.js | 7 ++----- src/Partition/WritablePartition.js | 7 ++----- src/Storage/WritableStorage.js | 10 ++-------- src/util.js | 26 +++++++++++++++++++++++++- 5 files changed, 33 insertions(+), 24 deletions(-) diff --git a/src/Consumer.js b/src/Consumer.js index aae189a..cb2e887 100644 --- a/src/Consumer.js +++ b/src/Consumer.js @@ -1,8 +1,7 @@ const stream = require('stream'); const fs = require('fs'); const path = require('path'); -const mkdirpSync = require('mkdirp').sync; -const { assert } = require('./util'); +const { assert, ensureDirectory } = require('./util'); const Storage = require('./Storage/ReadableStorage'); const MAX_CATCHUP_BATCH = 10; @@ -59,9 +58,7 @@ class Consumer extends stream.Readable { this.indexName = indexName; const consumerDirectory = path.join(this.storage.indexDirectory, 'consumers'); this.fileName = path.join(consumerDirectory, this.storage.storageFile + '.' + indexName + '.' + identifier); - if (!fs.existsSync(consumerDirectory)) { - mkdirpSync(consumerDirectory); - } else { + if (ensureDirectory(consumerDirectory)) { this.cleanUpFailedWrites(); } } diff --git a/src/Index/WritableIndex.js b/src/Index/WritableIndex.js index 2e735de..6ba1edc 100644 --- a/src/Index/WritableIndex.js +++ b/src/Index/WritableIndex.js @@ -1,7 +1,6 @@ const fs = require('fs'); -const mkdirpSync = require('mkdirp').sync; const ReadableIndex = require('./ReadableIndex'); -const { assertEqual, buildMetadataHeader } = require('../util'); +const { assertEqual, buildMetadataHeader, ensureDirectory } = require('../util'); /** * An index is a simple append-only file that stores an ordered list of entry elements pointing to the actual file position @@ -45,9 +44,7 @@ class WritableIndex extends ReadableIndex { */ initialize(options) { super.initialize(options); - if (!fs.existsSync(options.dataDirectory)) { - mkdirpSync(options.dataDirectory); - } + ensureDirectory(options.dataDirectory); this.fileMode = 'a+'; this.writeBuffer = Buffer.allocUnsafe(options.writeBufferSize >>> 0); // jshint ignore:line diff --git a/src/Partition/WritablePartition.js b/src/Partition/WritablePartition.js index 545d0b9..014387a 100644 --- a/src/Partition/WritablePartition.js +++ b/src/Partition/WritablePartition.js @@ -1,7 +1,6 @@ const fs = require('fs'); -const mkdirpSync = require('mkdirp').sync; const ReadablePartition = require('./ReadablePartition'); -const { assert, buildMetadataHeader, alignTo } = require('../util'); +const { assert, buildMetadataHeader, alignTo, ensureDirectory } = require('../util'); const Clock = require('../Clock'); const DEFAULT_WRITE_BUFFER_SIZE = 16 * 1024; @@ -40,9 +39,7 @@ class WritablePartition extends ReadablePartition { config.metadata = Object.assign(defaults.metadata, config.metadata); config = Object.assign(defaults, config); super(name, config); - if (!fs.existsSync(this.dataDirectory)) { - mkdirpSync(this.dataDirectory); - } + ensureDirectory(this.dataDirectory); this.fileMode = 'a+'; this.writeBufferSize = config.writeBufferSize >>> 0; // jshint ignore:line this.maxWriteBufferDocuments = config.maxWriteBufferDocuments >>> 0; // jshint ignore:line diff --git a/src/Storage/WritableStorage.js b/src/Storage/WritableStorage.js index a3d0260..4b5cf03 100644 --- a/src/Storage/WritableStorage.js +++ b/src/Storage/WritableStorage.js @@ -1,10 +1,9 @@ const fs = require('fs'); -const mkdirpSync = require('mkdirp').sync; const path = require('path'); const WritablePartition = require('../Partition/WritablePartition'); const WritableIndex = require('../Index/WritableIndex'); const ReadableStorage = require('./ReadableStorage'); -const { assert, matches, buildMetadataForMatcher, buildMatcherFromMetadata } = require('../util'); +const { assert, matches, buildMetadataForMatcher, buildMatcherFromMetadata, ensureDirectory } = require('../util'); const DEFAULT_WRITE_BUFFER_SIZE = 16 * 1024; @@ -57,12 +56,7 @@ class WritableStorage extends ReadableStorage { }; config = Object.assign(defaults, config); config.indexOptions = Object.assign({ syncOnFlush: config.syncOnFlush }, config.indexOptions); - if (!fs.existsSync(config.dataDirectory)) { - try { - mkdirpSync(config.dataDirectory); - } catch (e) { - } - } + ensureDirectory(config.dataDirectory); super(storageName, config); this.lockFile = path.resolve(this.dataDirectory, this.storageFile + '.lock'); diff --git a/src/util.js b/src/util.js index db13dbe..7b0b5e9 100644 --- a/src/util.js +++ b/src/util.js @@ -1,4 +1,6 @@ const crypto = require('crypto'); +const fs = require('fs'); +const mkdirpSync = require('mkdirp').sync; /** * Assert that actual and expected match or throw an Error with the given message appended by information about expected and actual value. @@ -183,6 +185,27 @@ function wrapAndCheck(index, length) { return index; } +/** + * Ensure that the given directory exists. + * @param {string} dirName + * @return {boolean} true if the directory existed already + */ +function ensureDirectory(dirName) { + if (!fs.existsSync(dirName)) { + try { + mkdirpSync(dirName); + } catch (e) { + if (e.code !== 'EEXIST') { + throw e; + } + return true; + } + return false; + } + return true; +} + + module.exports = { assert, assertEqual, @@ -193,5 +216,6 @@ module.exports = { buildMetadataForMatcher, buildMatcherFromMetadata, buildMetadataHeader, - alignTo + alignTo, + ensureDirectory }; \ No newline at end of file From 03da3a7b0b4ca874882a0ea091ef1deee0068791 Mon Sep 17 00:00:00 2001 From: Alexander Berl Date: Sat, 15 May 2021 21:54:32 +0200 Subject: [PATCH 9/9] Mkdirp already checks for existing directory --- src/util.js | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/util.js b/src/util.js index 7b0b5e9..0b92ed7 100644 --- a/src/util.js +++ b/src/util.js @@ -195,10 +195,6 @@ function ensureDirectory(dirName) { try { mkdirpSync(dirName); } catch (e) { - if (e.code !== 'EEXIST') { - throw e; - } - return true; } return false; }