From 2bb334e41d87214e4730ec27501738fe479da6c7 Mon Sep 17 00:00:00 2001 From: Alexander Berl Date: Sat, 28 Sep 2019 18:06:14 +0200 Subject: [PATCH] Start implementing auto-repair --- README.md | 22 ++++--- src/EventStore.js | 95 +++++++++++++++++++++++++++++- src/Index/WritableIndex.js | 5 ++ src/Partition/WritablePartition.js | 28 +++++---- src/Storage/ReadableStorage.js | 3 +- src/Storage/WritableStorage.js | 6 +- test/EventStore.spec.js | 49 +++++++++++++++ 7 files changed, 185 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index 4d81f84..e90e411 100644 --- a/README.md +++ b/README.md @@ -264,13 +264,21 @@ The storage engine is not strictly designed to follow ACID semantics. However, i #### Atomicity -A single document write is guaranteed to be atomic. Unless specifically configured, atomicity spreads to all subsequent -writes until the write buffer is flushed, which happens either if the current document doesn't fully fit into the write -buffer or on the next node event loop. -This can be (ab)used to create a reduced form of transactional behaviour: All writes that happen within a single event loop -and still fit into the write buffer will all happen together or not at all. -If strict atomicity for single documents is required, you can configure the option `maxWriteBufferDocuments` to 1, which -leads to every single document being flushed directly. +A single document write is guaranteed to be atomic through the deserializer. Incomplete writes can not be deserialized due +to the nature of JSON. If you use a custom serialization format, e.g. msgpack or protobuf, you should make use of a checksum. +If deserialization of the last document fails on startup, the storage will be truncated and hence repair itself. This covers +cases of "torn writes", where not all blocks are written by the disk, due to powerfail. + +Multi document writes (a commit of multiple events) is guaranteed to be atomic by checking the last commit to have been fully +finished during startup. If the last `commitId` does not match the `commitSize`, then this last commit was incomplete and will +be rolled back, by truncating the storage to the position before the commit. + +Due to the write buffering applied, writes typically also happen in batches of multiple documents. So logical atomicity spreads +over multiple documents. This can be controlled through the options `maxWriteBufferDocuments`, which defines how many documents +may at maximum sit inside a single write buffer before being flushed, and the `writeBufferSize` which gives a size limit to the +write buffer. For optimal performance, a write buffer of 16kb has turned out to be good, at least on the SSD I use, but YMMV. +Generally, not limiting the write buffer filling through `maxWriteBufferDocuments` is recommended, since flushing only a part of +a full page size (typically 4kb) will increase write amplification. #### Consistency diff --git a/src/EventStore.js b/src/EventStore.js index db0c2dd..9296e32 100644 --- a/src/EventStore.js +++ b/src/EventStore.js @@ -75,11 +75,101 @@ class EventStore extends EventEmitter { this.storage.close(); throw err; } + this.checkUnfinishedCommits(); this.emit('ready'); }); } /** + * Check if the last commit in the store was unfinished, which is the case if not all events of the commit have been written. + * Depends that no torn writes exist at the end of the storage, so a check for that is done as a first step. + * @private + */ + checkUnfinishedCommits() { + this.checkTornWrite(); + let position = this.storage.length; + let lastEvent; + let truncateIndex = false; + while (position > 0 && (lastEvent = this.storage.read(position)) === false) { + truncateIndex = true; + position--; + } + + if (lastEvent && lastEvent.metadata.commitSize && lastEvent.metadata.commitVersion !== lastEvent.metadata.commitSize - 1) { + this.emit('unfinished-commit', lastEvent); + // commitId = global sequence number at which the commit started + this.storage.truncate(lastEvent.metadata.commitId); + } else if (truncateIndex) { + // The index contained items that are not in the storage file, so truncate it after the last valid event + this.storage.truncate(position + 1); + } + // TODO: Somehow detect unfinished index updates and reindex documents + // For this, it needs to be possible to read from a storage without an index + // The complexity herein lies in how to correctly interleave reads from different partitions deterministically, + // so that they match the order at write-time. With a global sequence number stored in the document meta, this + // would be doable relatively easily. Otherwise, we need to: + // - store some other ordering information in the documents, e.g. monotonic timestamp + // - define a deterministic order across partitions, e.g. by ordering by partitionId + // - make sure the order at write-time holds the defined determinism, by guaranteeing that + // given concurrent writes A (partitionId = 1, timestamp = X) and B (partitionId = 2, timestamp = X) + // any index spanning both partitions will always have A come before B + // If the index writer happens to receive B, it hence needs to make sure no write C (partitionId <= 2, timestamp <= X) + // can still occur, before actually appending write B. + // For all partitions part of the index, wait until it is guaranteed that no write to any of those will + // happen with a timestamp <= current write timestamp. This can be guaranteed by delaying the index insert + // by an amount of time that is the maximum delay for a write reaching the index writer. Since there is no guarantee + // for an upper bound this needs to be tackled probabilistically, e.g. by measuring the 99th percentile of latencies. + // Then on any index append, it needs to be checked that the determinism rule is still held. If not, an error needs + // to be thrown, which would mean the according document write needs to be rolled back. + // With this we basically have built a cross-partition transactional scheme, which is contrary to the goals. + // So in contrast, no hard guarantee can be given for repeatable cross-partition reads. + this.storage.forEachPartition(partition => { + partition.readFi + }); + const indexes = []; + + let leastConsistentEntry = this.storage.index.lastEntry; + this.storage.forEachSecondaryIndex(index => { + indexes.push(index); + if (leastConsistentEntry.number > index.lastEntry.number) { + leastConsistentEntry = index.lastEntry; + } + + const lastEntry = index.lastEntry; + // Start reading forward from that last index entry and update index to be in line with the storage again + let nextEvent, position = lastEntry.position; + while (nextEvent = this.storage.readFrom(lastEntry.partition, lastEntry.position, lastEntry.size)) { + + } + }); + let firstReindexSequenceNumber = leastConsistentEntry.number; + while (true) { + let document = this.storage.readFrom(partitionId, position) + } + } + + /** + * Check if the last document in the storage is a torn write and if so, truncate the storage before that. + * @private + */ + checkTornWrite() { + let position = this.storage.length; + if (position === 0) { + return; + } + + try { + this.storage.read(position); + } catch (e) { + if (e instanceof Storage.CorruptFileError) { + console.log('Torn write detected. Truncating after', position - 1); + this.storage.truncate(position - 1); + } + } + } + + /** + * @private * @param {string} name * @param {object} config * @returns {ReadableStorage|WritableStorage} @@ -202,6 +292,7 @@ class EventStore extends EventEmitter { const commitId = this.length; let commitVersion = 0; + const commitSize = events.length; const committedAt = Date.now(); const commit = Object.assign({ commitId, @@ -216,7 +307,7 @@ class EventStore extends EventEmitter { callback(commit); }; for (let event of events) { - const eventMetadata = Object.assign({ commitId, committedAt }, metadata, { commitVersion, streamVersion }); + const eventMetadata = Object.assign({ commitId, committedAt }, metadata, { commitVersion, commitSize, streamVersion }); const storedEvent = { stream: streamName, payload: event, metadata: eventMetadata }; commitVersion++; streamVersion++; @@ -267,7 +358,7 @@ class EventStore extends EventEmitter { } /** - * Create a new event stream from existing streams by joining them. + * Create a new (transient) event stream from existing streams by joining them. * * @param {string} streamName The (transient) name of the joined stream. * @param {Array} streamNames An array of the stream names to join. diff --git a/src/Index/WritableIndex.js b/src/Index/WritableIndex.js index ddf1f93..6152f65 100644 --- a/src/Index/WritableIndex.js +++ b/src/Index/WritableIndex.js @@ -182,6 +182,11 @@ class WritableIndex extends ReadableIndex { assertEqual(entry.constructor.name, this.EntryClass.name, `Wrong entry object.`); assertEqual(entry.constructor.size, this.EntryClass.size, `Invalid entry size.`); + const lastEntry = this.lastEntry; + if (lastEntry !== false && lastEntry.number >= entry.number) { + throw new Error('Consistency error. Tried to add an index that should come before existing last entry.'); + } + if (this.readUntil === this.data.length - 1) { this.readUntil++; } diff --git a/src/Partition/WritablePartition.js b/src/Partition/WritablePartition.js index 5ef1719..c5a8c1f 100644 --- a/src/Partition/WritablePartition.js +++ b/src/Partition/WritablePartition.js @@ -319,26 +319,30 @@ class WritablePartition extends ReadablePartition { if (after > this.size) { return; } - if (after < 0) { - after = 0; - } + after = Math.max(0, after); this.flush(); let position = after, data; + let skipDeleteLog = false; try { data = this.readFrom(position); } catch (e) { - throw new Error('Can only truncate on valid document boundaries.'); + if (!(e instanceof ReadablePartition.CorruptFileError)) { + throw new Error('Can only truncate on valid document boundaries.'); + } + skipDeleteLog = true; } - // copy all truncated documents to some delete log - const deletedBranch = new WritablePartition(this.name + '-' + after + '.branch', { dataDirectory: this.dataDirectory }); - deletedBranch.open(); - while (data) { - deletedBranch.write(data); - position += this.documentWriteSize(Buffer.byteLength(data, 'utf8')); - data = this.readFrom(position); + if (!skipDeleteLog) { + // copy all truncated documents to some delete log + const deletedBranch = new WritablePartition(this.name + '-' + after + '.branch', { dataDirectory: this.dataDirectory }); + deletedBranch.open(); + while (data) { + deletedBranch.write(data); + position += this.documentWriteSize(Buffer.byteLength(data, 'utf8')); + data = this.readFrom(position); + } + deletedBranch.close(); } - deletedBranch.close(); fs.truncateSync(this.fileName, this.headerSize + after); this.truncateReadBuffer(after); diff --git a/src/Storage/ReadableStorage.js b/src/Storage/ReadableStorage.js index 9dfac75..af4c161 100644 --- a/src/Storage/ReadableStorage.js +++ b/src/Storage/ReadableStorage.js @@ -50,8 +50,8 @@ class ReadableStorage extends EventEmitter { this.dataDirectory = path.resolve(config.dataDirectory); - this.initializeIndexes(config); this.scanPartitions(config); + this.initializeIndexes(config); } /** @@ -325,3 +325,4 @@ class ReadableStorage extends EventEmitter { module.exports = ReadableStorage; module.exports.matches = matches; +module.exports.CorruptFileError = Partition.CorruptFileError; diff --git a/src/Storage/WritableStorage.js b/src/Storage/WritableStorage.js index 57c982e..9085d45 100644 --- a/src/Storage/WritableStorage.js +++ b/src/Storage/WritableStorage.js @@ -309,6 +309,9 @@ class WritableStorage extends ReadableStorage { if (!this.index.isOpen()) { this.index.open(); } + if (after < 0) { + after += this.index.length; + } this.truncatePartitions(after); @@ -366,4 +369,5 @@ class WritableStorage extends ReadableStorage { } module.exports = WritableStorage; -module.exports.StorageLockedError = StorageLockedError; \ No newline at end of file +module.exports.StorageLockedError = StorageLockedError; +module.exports.CorruptFileError = ReadableStorage.CorruptFileError; \ No newline at end of file diff --git a/test/EventStore.spec.js b/test/EventStore.spec.js index 84c792d..e03f728 100644 --- a/test/EventStore.spec.js +++ b/test/EventStore.spec.js @@ -46,6 +46,55 @@ describe('EventStore', function() { fs.readdir = originalReaddir; }); + it('repairs unfinished commits', function(done) { + eventstore = new EventStore({ + storageDirectory + }); + + let events = [{foo: 'bar'}, {foo: 'baz'}, {foo: 'quux'}]; + eventstore.on('ready', () => { + eventstore.commit('foo-bar', events, () => { + // Simulate an unfinished write after the second event (but indexes are still written) + const entry = eventstore.storage.index.get(3); + eventstore.storage.getPartition(entry.partition).truncate(entry.position); + eventstore.close(); + + eventstore = new EventStore({ + storageDirectory + }); + eventstore.on('ready', () => { + expect(eventstore.length).to.be(0); + expect(eventstore.getStreamVersion('foo-bar')).to.be(0); + done(); + }); + }); + }); + }); + + it('repairs torn writes', function(done) { + eventstore = new EventStore({ + storageDirectory + }); + + let events = [{foo: 'bar'.repeat(500)}]; + eventstore.on('ready', () => { + eventstore.commit('foo-bar', events, () => { + // Simulate a torn write (but indexes are still written) + fs.truncateSync(eventstore.storage.getPartition('foo-bar').fileName, 512); + eventstore.close(); + + eventstore = new EventStore({ + storageDirectory + }); + eventstore.on('ready', () => { + expect(eventstore.length).to.be(0); + expect(eventstore.getStreamVersion('foo-bar')).to.be(0); + done(); + }); + }); + }); + }); + it('throws when trying to open non-existing store read-only', function() { expect(() => new EventStore({ storageDirectory,