Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,13 +264,21 @@ The storage engine is not strictly designed to follow ACID semantics. However, i

#### Atomicity

A single document write is guaranteed to be atomic. Unless specifically configured, atomicity spreads to all subsequent
writes until the write buffer is flushed, which happens either if the current document doesn't fully fit into the write
buffer or on the next node event loop.
This can be (ab)used to create a reduced form of transactional behaviour: All writes that happen within a single event loop
and still fit into the write buffer will all happen together or not at all.
If strict atomicity for single documents is required, you can configure the option `maxWriteBufferDocuments` to 1, which
leads to every single document being flushed directly.
A single document write is guaranteed to be atomic through the deserializer. Incomplete writes can not be deserialized due
to the nature of JSON. If you use a custom serialization format, e.g. msgpack or protobuf, you should make use of a checksum.
If deserialization of the last document fails on startup, the storage will be truncated and hence repair itself. This covers
cases of "torn writes", where not all blocks are written by the disk, due to powerfail.

Multi document writes (a commit of multiple events) is guaranteed to be atomic by checking the last commit to have been fully
finished during startup. If the last `commitId` does not match the `commitSize`, then this last commit was incomplete and will
be rolled back, by truncating the storage to the position before the commit.

Due to the write buffering applied, writes typically also happen in batches of multiple documents. So logical atomicity spreads
over multiple documents. This can be controlled through the options `maxWriteBufferDocuments`, which defines how many documents
may at maximum sit inside a single write buffer before being flushed, and the `writeBufferSize` which gives a size limit to the
write buffer. For optimal performance, a write buffer of 16kb has turned out to be good, at least on the SSD I use, but YMMV.
Generally, not limiting the write buffer filling through `maxWriteBufferDocuments` is recommended, since flushing only a part of
a full page size (typically 4kb) will increase write amplification.

#### Consistency

Expand Down
95 changes: 93 additions & 2 deletions src/EventStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,101 @@ class EventStore extends EventEmitter {
this.storage.close();
throw err;
}
this.checkUnfinishedCommits();
this.emit('ready');
});
}

/**
* Check if the last commit in the store was unfinished, which is the case if not all events of the commit have been written.
* Depends that no torn writes exist at the end of the storage, so a check for that is done as a first step.
* @private
*/
checkUnfinishedCommits() {
this.checkTornWrite();
let position = this.storage.length;
let lastEvent;
let truncateIndex = false;
while (position > 0 && (lastEvent = this.storage.read(position)) === false) {
truncateIndex = true;
position--;
}

if (lastEvent && lastEvent.metadata.commitSize && lastEvent.metadata.commitVersion !== lastEvent.metadata.commitSize - 1) {
this.emit('unfinished-commit', lastEvent);
// commitId = global sequence number at which the commit started
this.storage.truncate(lastEvent.metadata.commitId);
} else if (truncateIndex) {
// The index contained items that are not in the storage file, so truncate it after the last valid event
this.storage.truncate(position + 1);
}
// TODO: Somehow detect unfinished index updates and reindex documents
// For this, it needs to be possible to read from a storage without an index
// The complexity herein lies in how to correctly interleave reads from different partitions deterministically,
// so that they match the order at write-time. With a global sequence number stored in the document meta, this
// would be doable relatively easily. Otherwise, we need to:
// - store some other ordering information in the documents, e.g. monotonic timestamp
// - define a deterministic order across partitions, e.g. by ordering by partitionId
// - make sure the order at write-time holds the defined determinism, by guaranteeing that
// given concurrent writes A (partitionId = 1, timestamp = X) and B (partitionId = 2, timestamp = X)
// any index spanning both partitions will always have A come before B
// If the index writer happens to receive B, it hence needs to make sure no write C (partitionId <= 2, timestamp <= X)
// can still occur, before actually appending write B.
// For all partitions part of the index, wait until it is guaranteed that no write to any of those will
// happen with a timestamp <= current write timestamp. This can be guaranteed by delaying the index insert
// by an amount of time that is the maximum delay for a write reaching the index writer. Since there is no guarantee
// for an upper bound this needs to be tackled probabilistically, e.g. by measuring the 99th percentile of latencies.
// Then on any index append, it needs to be checked that the determinism rule is still held. If not, an error needs
// to be thrown, which would mean the according document write needs to be rolled back.
// With this we basically have built a cross-partition transactional scheme, which is contrary to the goals.
// So in contrast, no hard guarantee can be given for repeatable cross-partition reads.
this.storage.forEachPartition(partition => {
partition.readFi
});
const indexes = [];

let leastConsistentEntry = this.storage.index.lastEntry;
this.storage.forEachSecondaryIndex(index => {
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: Updating secondary indexes is problematic, because they can fall back behind the primary index. However, we do not know if an index has actually fallen behind or if it just didn't match any later documents. This leads to the worst case of one secondary index only matching the first document written, but no other, which would mean the whole storage needs to be reindexed.

indexes.push(index);
if (leastConsistentEntry.number > index.lastEntry.number) {
leastConsistentEntry = index.lastEntry;
}

const lastEntry = index.lastEntry;
// Start reading forward from that last index entry and update index to be in line with the storage again
let nextEvent, position = lastEntry.position;
while (nextEvent = this.storage.readFrom(lastEntry.partition, lastEntry.position, lastEntry.size)) {

}
});
let firstReindexSequenceNumber = leastConsistentEntry.number;
while (true) {
let document = this.storage.readFrom(partitionId, position)
}
}

/**
* Check if the last document in the storage is a torn write and if so, truncate the storage before that.
* @private
*/
checkTornWrite() {
let position = this.storage.length;
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To fix torn writes (which should be a storage level concern) we need to infer the position from the actual documents in the storage rather than the information from the index. Hence the storage needs to read the last document of each partition and return the highest document header sequenceNumber.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #155

if (position === 0) {
return;
}

try {
this.storage.read(position);
} catch (e) {
if (e instanceof Storage.CorruptFileError) {
console.log('Torn write detected. Truncating after', position - 1);
this.storage.truncate(position - 1);
}
}
}

/**
* @private
* @param {string} name
* @param {object} config
* @returns {ReadableStorage|WritableStorage}
Expand Down Expand Up @@ -202,6 +292,7 @@ class EventStore extends EventEmitter {

const commitId = this.length;
let commitVersion = 0;
const commitSize = events.length;
const committedAt = Date.now();
const commit = Object.assign({
commitId,
Expand All @@ -216,7 +307,7 @@ class EventStore extends EventEmitter {
callback(commit);
};
for (let event of events) {
const eventMetadata = Object.assign({ commitId, committedAt }, metadata, { commitVersion, streamVersion });
const eventMetadata = Object.assign({ commitId, committedAt }, metadata, { commitVersion, commitSize, streamVersion });
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already implemented as of #159

const storedEvent = { stream: streamName, payload: event, metadata: eventMetadata };
commitVersion++;
streamVersion++;
Expand Down Expand Up @@ -267,7 +358,7 @@ class EventStore extends EventEmitter {
}

/**
* Create a new event stream from existing streams by joining them.
* Create a new (transient) event stream from existing streams by joining them.
*
* @param {string} streamName The (transient) name of the joined stream.
* @param {Array<string>} streamNames An array of the stream names to join.
Expand Down
5 changes: 5 additions & 0 deletions src/Index/WritableIndex.js
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ class WritableIndex extends ReadableIndex {
assertEqual(entry.constructor.name, this.EntryClass.name, `Wrong entry object.`);
assertEqual(entry.constructor.size, this.EntryClass.size, `Invalid entry size.`);

const lastEntry = this.lastEntry;
if (lastEntry !== false && lastEntry.number >= entry.number) {
throw new Error('Consistency error. Tried to add an index that should come before existing last entry.');
}

if (this.readUntil === this.data.length - 1) {
this.readUntil++;
}
Expand Down
28 changes: 16 additions & 12 deletions src/Partition/WritablePartition.js
Original file line number Diff line number Diff line change
Expand Up @@ -319,26 +319,30 @@ class WritablePartition extends ReadablePartition {
if (after > this.size) {
return;
}
if (after < 0) {
after = 0;
}
after = Math.max(0, after);
this.flush();

let position = after, data;
let skipDeleteLog = false;
try {
data = this.readFrom(position);
} catch (e) {
throw new Error('Can only truncate on valid document boundaries.');
if (!(e instanceof ReadablePartition.CorruptFileError)) {
throw new Error('Can only truncate on valid document boundaries.');
}
skipDeleteLog = true;
}
// copy all truncated documents to some delete log
const deletedBranch = new WritablePartition(this.name + '-' + after + '.branch', { dataDirectory: this.dataDirectory });
deletedBranch.open();
while (data) {
deletedBranch.write(data);
position += this.documentWriteSize(Buffer.byteLength(data, 'utf8'));
data = this.readFrom(position);
if (!skipDeleteLog) {
// copy all truncated documents to some delete log
const deletedBranch = new WritablePartition(this.name + '-' + after + '.branch', { dataDirectory: this.dataDirectory });
deletedBranch.open();
while (data) {
deletedBranch.write(data);
position += this.documentWriteSize(Buffer.byteLength(data, 'utf8'));
data = this.readFrom(position);
}
deletedBranch.close();
}
deletedBranch.close();

fs.truncateSync(this.fileName, this.headerSize + after);
this.truncateReadBuffer(after);
Expand Down
3 changes: 2 additions & 1 deletion src/Storage/ReadableStorage.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ class ReadableStorage extends EventEmitter {

this.dataDirectory = path.resolve(config.dataDirectory);

this.initializeIndexes(config);
this.scanPartitions(config);
this.initializeIndexes(config);
}

/**
Expand Down Expand Up @@ -325,3 +325,4 @@ class ReadableStorage extends EventEmitter {

module.exports = ReadableStorage;
module.exports.matches = matches;
module.exports.CorruptFileError = Partition.CorruptFileError;
6 changes: 5 additions & 1 deletion src/Storage/WritableStorage.js
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,9 @@ class WritableStorage extends ReadableStorage {
if (!this.index.isOpen()) {
this.index.open();
}
if (after < 0) {
after += this.index.length;
}

this.truncatePartitions(after);

Expand Down Expand Up @@ -366,4 +369,5 @@ class WritableStorage extends ReadableStorage {
}

module.exports = WritableStorage;
module.exports.StorageLockedError = StorageLockedError;
module.exports.StorageLockedError = StorageLockedError;
module.exports.CorruptFileError = ReadableStorage.CorruptFileError;
49 changes: 49 additions & 0 deletions test/EventStore.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,55 @@ describe('EventStore', function() {
fs.readdir = originalReaddir;
});

it('repairs unfinished commits', function(done) {
eventstore = new EventStore({
storageDirectory
});

let events = [{foo: 'bar'}, {foo: 'baz'}, {foo: 'quux'}];
eventstore.on('ready', () => {
eventstore.commit('foo-bar', events, () => {
// Simulate an unfinished write after the second event (but indexes are still written)
const entry = eventstore.storage.index.get(3);
eventstore.storage.getPartition(entry.partition).truncate(entry.position);
eventstore.close();

eventstore = new EventStore({
storageDirectory
});
eventstore.on('ready', () => {
expect(eventstore.length).to.be(0);
expect(eventstore.getStreamVersion('foo-bar')).to.be(0);
done();
});
});
});
});

it('repairs torn writes', function(done) {
eventstore = new EventStore({
storageDirectory
});

let events = [{foo: 'bar'.repeat(500)}];
eventstore.on('ready', () => {
eventstore.commit('foo-bar', events, () => {
// Simulate a torn write (but indexes are still written)
fs.truncateSync(eventstore.storage.getPartition('foo-bar').fileName, 512);
eventstore.close();

eventstore = new EventStore({
storageDirectory
});
eventstore.on('ready', () => {
expect(eventstore.length).to.be(0);
expect(eventstore.getStreamVersion('foo-bar')).to.be(0);
done();
});
});
});
});

it('throws when trying to open non-existing store read-only', function() {
expect(() => new EventStore({
storageDirectory,
Expand Down