-
Notifications
You must be signed in to change notification settings - Fork 4
Start implementing auto-repair #107
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -75,11 +75,101 @@ class EventStore extends EventEmitter { | |
| this.storage.close(); | ||
| throw err; | ||
| } | ||
| this.checkUnfinishedCommits(); | ||
| this.emit('ready'); | ||
| }); | ||
| } | ||
|
|
||
| /** | ||
| * Check if the last commit in the store was unfinished, which is the case if not all events of the commit have been written. | ||
| * Depends that no torn writes exist at the end of the storage, so a check for that is done as a first step. | ||
| * @private | ||
| */ | ||
| checkUnfinishedCommits() { | ||
| this.checkTornWrite(); | ||
| let position = this.storage.length; | ||
| let lastEvent; | ||
| let truncateIndex = false; | ||
| while (position > 0 && (lastEvent = this.storage.read(position)) === false) { | ||
| truncateIndex = true; | ||
| position--; | ||
| } | ||
|
|
||
| if (lastEvent && lastEvent.metadata.commitSize && lastEvent.metadata.commitVersion !== lastEvent.metadata.commitSize - 1) { | ||
| this.emit('unfinished-commit', lastEvent); | ||
| // commitId = global sequence number at which the commit started | ||
| this.storage.truncate(lastEvent.metadata.commitId); | ||
| } else if (truncateIndex) { | ||
| // The index contained items that are not in the storage file, so truncate it after the last valid event | ||
| this.storage.truncate(position + 1); | ||
| } | ||
| // TODO: Somehow detect unfinished index updates and reindex documents | ||
| // For this, it needs to be possible to read from a storage without an index | ||
| // The complexity herein lies in how to correctly interleave reads from different partitions deterministically, | ||
| // so that they match the order at write-time. With a global sequence number stored in the document meta, this | ||
| // would be doable relatively easily. Otherwise, we need to: | ||
| // - store some other ordering information in the documents, e.g. monotonic timestamp | ||
| // - define a deterministic order across partitions, e.g. by ordering by partitionId | ||
| // - make sure the order at write-time holds the defined determinism, by guaranteeing that | ||
| // given concurrent writes A (partitionId = 1, timestamp = X) and B (partitionId = 2, timestamp = X) | ||
| // any index spanning both partitions will always have A come before B | ||
| // If the index writer happens to receive B, it hence needs to make sure no write C (partitionId <= 2, timestamp <= X) | ||
| // can still occur, before actually appending write B. | ||
| // For all partitions part of the index, wait until it is guaranteed that no write to any of those will | ||
| // happen with a timestamp <= current write timestamp. This can be guaranteed by delaying the index insert | ||
| // by an amount of time that is the maximum delay for a write reaching the index writer. Since there is no guarantee | ||
| // for an upper bound this needs to be tackled probabilistically, e.g. by measuring the 99th percentile of latencies. | ||
| // Then on any index append, it needs to be checked that the determinism rule is still held. If not, an error needs | ||
| // to be thrown, which would mean the according document write needs to be rolled back. | ||
| // With this we basically have built a cross-partition transactional scheme, which is contrary to the goals. | ||
| // So in contrast, no hard guarantee can be given for repeatable cross-partition reads. | ||
| this.storage.forEachPartition(partition => { | ||
| partition.readFi | ||
| }); | ||
| const indexes = []; | ||
|
|
||
| let leastConsistentEntry = this.storage.index.lastEntry; | ||
| this.storage.forEachSecondaryIndex(index => { | ||
| indexes.push(index); | ||
| if (leastConsistentEntry.number > index.lastEntry.number) { | ||
| leastConsistentEntry = index.lastEntry; | ||
| } | ||
|
|
||
| const lastEntry = index.lastEntry; | ||
| // Start reading forward from that last index entry and update index to be in line with the storage again | ||
| let nextEvent, position = lastEntry.position; | ||
| while (nextEvent = this.storage.readFrom(lastEntry.partition, lastEntry.position, lastEntry.size)) { | ||
|
|
||
| } | ||
| }); | ||
| let firstReindexSequenceNumber = leastConsistentEntry.number; | ||
| while (true) { | ||
| let document = this.storage.readFrom(partitionId, position) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Check if the last document in the storage is a torn write and if so, truncate the storage before that. | ||
| * @private | ||
| */ | ||
| checkTornWrite() { | ||
| let position = this.storage.length; | ||
|
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To fix torn writes (which should be a storage level concern) we need to infer the position from the actual documents in the storage rather than the information from the index. Hence the storage needs to read the last document of each partition and return the highest document header sequenceNumber.
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See #155 |
||
| if (position === 0) { | ||
| return; | ||
| } | ||
|
|
||
| try { | ||
| this.storage.read(position); | ||
| } catch (e) { | ||
| if (e instanceof Storage.CorruptFileError) { | ||
| console.log('Torn write detected. Truncating after', position - 1); | ||
| this.storage.truncate(position - 1); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * @private | ||
| * @param {string} name | ||
| * @param {object} config | ||
| * @returns {ReadableStorage|WritableStorage} | ||
|
|
@@ -202,6 +292,7 @@ class EventStore extends EventEmitter { | |
|
|
||
| const commitId = this.length; | ||
| let commitVersion = 0; | ||
| const commitSize = events.length; | ||
| const committedAt = Date.now(); | ||
| const commit = Object.assign({ | ||
| commitId, | ||
|
|
@@ -216,7 +307,7 @@ class EventStore extends EventEmitter { | |
| callback(commit); | ||
| }; | ||
| for (let event of events) { | ||
| const eventMetadata = Object.assign({ commitId, committedAt }, metadata, { commitVersion, streamVersion }); | ||
| const eventMetadata = Object.assign({ commitId, committedAt }, metadata, { commitVersion, commitSize, streamVersion }); | ||
|
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is already implemented as of #159 |
||
| const storedEvent = { stream: streamName, payload: event, metadata: eventMetadata }; | ||
| commitVersion++; | ||
| streamVersion++; | ||
|
|
@@ -267,7 +358,7 @@ class EventStore extends EventEmitter { | |
| } | ||
|
|
||
| /** | ||
| * Create a new event stream from existing streams by joining them. | ||
| * Create a new (transient) event stream from existing streams by joining them. | ||
| * | ||
| * @param {string} streamName The (transient) name of the joined stream. | ||
| * @param {Array<string>} streamNames An array of the stream names to join. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: Updating secondary indexes is problematic, because they can fall back behind the primary index. However, we do not know if an index has actually fallen behind or if it just didn't match any later documents. This leads to the worst case of one secondary index only matching the first document written, but no other, which would mean the whole storage needs to be reindexed.