From a62fcc4fbc70a1f81b22f798a824969bab62f021 Mon Sep 17 00:00:00 2001 From: Alexander Berl Date: Tue, 9 Feb 2021 12:55:51 +0100 Subject: [PATCH 1/8] Separate documents by a unique sequence --- src/Partition/WritablePartition.js | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/Partition/WritablePartition.js b/src/Partition/WritablePartition.js index 5ef1719..54b6f6c 100644 --- a/src/Partition/WritablePartition.js +++ b/src/Partition/WritablePartition.js @@ -7,17 +7,18 @@ const Clock = require('../Clock'); const DEFAULT_WRITE_BUFFER_SIZE = 16 * 1024; const DOCUMENT_HEADER_SIZE = 16; const DOCUMENT_ALIGNMENT = 4; -const DOCUMENT_PAD = ' '.repeat(15) + "\n"; +const DOCUMENT_SEPARATOR = "\x00\x00\x1E\n"; +const DOCUMENT_PAD = ' '.repeat(16 - DOCUMENT_SEPARATOR.length) + DOCUMENT_SEPARATOR; const NES_EPOCH = new Date('2020-01-01T00:00:00'); /** * @param {number} dataSize - * @returns {string} The data padded to 16 bytes alignment and ended with a line break. + * @returns {string} The data padded to 16 bytes alignment and ended with \0x1E (record separator) and a line break. */ function padData(dataSize) { - const padSize = (DOCUMENT_ALIGNMENT - ((dataSize + 1) % DOCUMENT_ALIGNMENT)) % DOCUMENT_ALIGNMENT; - return DOCUMENT_PAD.substr(-padSize - 1); + const padSize = (DOCUMENT_ALIGNMENT - ((dataSize + DOCUMENT_SEPARATOR.length) % DOCUMENT_ALIGNMENT)) % DOCUMENT_ALIGNMENT; + return DOCUMENT_PAD.substr(-padSize - DOCUMENT_SEPARATOR.length); } /** From f28c444bee3b01ab12cf3fdc89f07cc759e5eb58 Mon Sep 17 00:00:00 2001 From: Alexander Berl Date: Tue, 9 Feb 2021 22:51:54 +0100 Subject: [PATCH 2/8] Fix ReadablePartition for document separator --- src/Partition/ReadablePartition.js | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/Partition/ReadablePartition.js b/src/Partition/ReadablePartition.js index e3dded9..b8bd578 100644 --- a/src/Partition/ReadablePartition.js +++ b/src/Partition/ReadablePartition.js @@ -6,9 +6,10 @@ const { assert } = require('../util'); const DEFAULT_READ_BUFFER_SIZE = 64 * 1024; const DOCUMENT_HEADER_SIZE = 16; const DOCUMENT_ALIGNMENT = 4; +const DOCUMENT_SEPARATOR = "\x00\x00\x1E\n"; -// node-event-store partition V02 -const HEADER_MAGIC = "nesprt02"; +// node-event-store partition V03 +const HEADER_MAGIC = "nesprt03"; class CorruptFileError extends Error {} class InvalidDataSizeError extends Error {} @@ -160,8 +161,8 @@ class ReadablePartition extends events.EventEmitter { * @returns {number} The size of the data including header, padded to 16 bytes alignment and ended with a line break. */ documentWriteSize(dataSize) { - const padSize = (DOCUMENT_ALIGNMENT - ((dataSize + 1) % DOCUMENT_ALIGNMENT)) % DOCUMENT_ALIGNMENT; - return dataSize + 1 + padSize + DOCUMENT_HEADER_SIZE; + const padSize = (DOCUMENT_ALIGNMENT - ((dataSize + DOCUMENT_SEPARATOR.length) % DOCUMENT_ALIGNMENT)) % DOCUMENT_ALIGNMENT; + return dataSize + DOCUMENT_SEPARATOR.length + padSize + DOCUMENT_HEADER_SIZE; } /** @@ -221,7 +222,8 @@ class ReadablePartition extends events.EventEmitter { throw new InvalidDataSizeError(`Invalid document size ${dataSize} at position ${position}, expected ${size}.`); } - if (position + dataSize + DOCUMENT_HEADER_SIZE > this.size) { + const writeSize = this.documentWriteSize(dataSize); + if (position + writeSize > this.size) { throw new CorruptFileError(`Invalid document at position ${position}. This may be caused by an unfinished write.`); } From 9bd75918ff89b4f143ed3d3153fa05961c53e415 Mon Sep 17 00:00:00 2001 From: Alexander Berl Date: Sun, 21 Feb 2021 22:30:19 +0100 Subject: [PATCH 3/8] Add support for reading partition backwards --- src/Partition/ReadablePartition.js | 72 ++++++++++++++++++++++++++++++ test/Partition.spec.js | 30 +++++++++++++ 2 files changed, 102 insertions(+) diff --git a/src/Partition/ReadablePartition.js b/src/Partition/ReadablePartition.js index b8bd578..da23139 100644 --- a/src/Partition/ReadablePartition.js +++ b/src/Partition/ReadablePartition.js @@ -251,6 +251,25 @@ class ReadablePartition extends events.EventEmitter { return ({ buffer: this.readBuffer, cursor: bufferCursor, length: this.readBufferLength }); } + /** + * Prepare the read buffer for reading *before* the specified position. Don't try to reader *after* the returned cursor. + * + * @protected + * @param {number} position The position in the file to prepare the read buffer for reading before. + * @returns {{ buffer: Buffer|null, cursor: number, length: number }} A reader object with properties `buffer`, `cursor` and `length`. + */ + prepareReadBufferBackwards(position) { + if (position < 0) { + return ({ buffer: null, cursor: 0, length: 0 }); + } + let bufferCursor = position - this.readBufferPos; + if (this.readBufferPos < 0 || (this.readBufferPos > 0 && bufferCursor < DOCUMENT_SEPARATOR.length + 4)) { + this.fillBuffer(Math.max(position - this.readBuffer.byteLength, 0)); + bufferCursor = this.readBufferLength; + } + return ({ buffer: this.readBuffer, cursor: bufferCursor, length: this.readBufferLength }); + } + /** * Read the data from the given position. * @@ -292,6 +311,45 @@ class ReadablePartition extends events.EventEmitter { return reader.buffer.toString('utf8', dataPosition, dataPosition + dataSize); } + /** + * Find the start position of the document that precedes the given position. + * + * @protected + * @param {number} position The file position to read backwards from. + * @returns {number|boolean} The start position of the first document before the given position or false if no header could be found. + */ + findDocumentPositionBefore(position) { + assert(this.fd, 'Partition is not opened.'); + if (position <= 0) { + return false; + } + + assert((position % DOCUMENT_ALIGNMENT) === 0, `Invalid read position. Needs to be a multiple of ${DOCUMENT_ALIGNMENT}.`); + + const separatorSize = DOCUMENT_SEPARATOR.length; + // Optimization if we are at an exact document boundary, where we can just read the document size + let reader/* = this.prepareReadBufferBackwards(position); + const block = reader.buffer.toString('ascii', reader.cursor - separatorSize, reader.cursor); + if (block === DOCUMENT_SEPARATOR) { + const dataSize = reader.buffer.readUInt32BE(reader.cursor - separatorSize - 4); + return position - this.documentWriteSize(dataSize); + }*/ + + do { + reader = this.prepareReadBufferBackwards(position - separatorSize); + + const bufferSeparatorPosition = reader.buffer.lastIndexOf(DOCUMENT_SEPARATOR, reader.cursor - separatorSize, 'ascii'); + if (bufferSeparatorPosition >= 0) { + position = this.readBufferPos + bufferSeparatorPosition + separatorSize; + break; + } + position -= this.readBufferLength; + } while (position > 0); + return position; + /*const header = this.readDocumentHeader(reader.buffer, reader.cursor, position); + return ({ position, ...header });*/ + } + /** * @api * @returns {Generator} A generator that returns all documents in this partition. @@ -305,6 +363,20 @@ class ReadablePartition extends events.EventEmitter { } } + /** + * @api + * @returns {Generator} A generator that returns all documents in this partition in reverse order. + */ + *readAllBackwards() { + let position = this.size; + while ((position = this.findDocumentPositionBefore(position)) !== false) { + const data = this.readFrom(position); + if (data === false) { + break; + } + yield data; + } + } } module.exports = ReadablePartition; diff --git a/test/Partition.spec.js b/test/Partition.spec.js index ba1f261..72d040a 100644 --- a/test/Partition.spec.js +++ b/test/Partition.spec.js @@ -156,6 +156,36 @@ describe('Partition', function() { }); + describe('readAll', function() { + + it('reads all documents in write order', function() { + partition.open(); + fillPartition(50, i => 'foo-' + i.toString()); + partition.close(); + partition.open(); + let i = 1; + for (let data of partition.readAll()) { + expect(data).to.be('foo-' + i.toString()); + i++; + } + expect(i).to.be(51); + }); + + it('reads all documents in backwards write order', function() { + partition.open(); + fillPartition(50, i => 'foo-' + i.toString()); + partition.close(); + partition.open(); + let i = 50; + for (let data of partition.readAllBackwards()) { + expect(data).to.be('foo-' + i.toString()); + i--; + } + expect(i).to.be(0); + }); + + }); + describe('readFrom', function() { it('returns false when partition is not open', function() { From 7c8d829e50f08ab052c9fd38c700f33ca39d3ff6 Mon Sep 17 00:00:00 2001 From: Alexander Berl Date: Sun, 21 Feb 2021 22:51:38 +0100 Subject: [PATCH 4/8] Add document size before separator --- src/Partition/ReadablePartition.js | 8 ++++---- src/Partition/WritablePartition.js | 16 ++++++++++++---- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/src/Partition/ReadablePartition.js b/src/Partition/ReadablePartition.js index da23139..eb3792a 100644 --- a/src/Partition/ReadablePartition.js +++ b/src/Partition/ReadablePartition.js @@ -161,8 +161,8 @@ class ReadablePartition extends events.EventEmitter { * @returns {number} The size of the data including header, padded to 16 bytes alignment and ended with a line break. */ documentWriteSize(dataSize) { - const padSize = (DOCUMENT_ALIGNMENT - ((dataSize + DOCUMENT_SEPARATOR.length) % DOCUMENT_ALIGNMENT)) % DOCUMENT_ALIGNMENT; - return dataSize + DOCUMENT_SEPARATOR.length + padSize + DOCUMENT_HEADER_SIZE; + const padSize = (DOCUMENT_ALIGNMENT - ((dataSize + 4 + DOCUMENT_SEPARATOR.length) % DOCUMENT_ALIGNMENT)) % DOCUMENT_ALIGNMENT; + return dataSize + DOCUMENT_SEPARATOR.length + 4 + padSize + DOCUMENT_HEADER_SIZE; } /** @@ -328,12 +328,12 @@ class ReadablePartition extends events.EventEmitter { const separatorSize = DOCUMENT_SEPARATOR.length; // Optimization if we are at an exact document boundary, where we can just read the document size - let reader/* = this.prepareReadBufferBackwards(position); + let reader = this.prepareReadBufferBackwards(position); const block = reader.buffer.toString('ascii', reader.cursor - separatorSize, reader.cursor); if (block === DOCUMENT_SEPARATOR) { const dataSize = reader.buffer.readUInt32BE(reader.cursor - separatorSize - 4); return position - this.documentWriteSize(dataSize); - }*/ + } do { reader = this.prepareReadBufferBackwards(position - separatorSize); diff --git a/src/Partition/WritablePartition.js b/src/Partition/WritablePartition.js index 54b6f6c..8644eb6 100644 --- a/src/Partition/WritablePartition.js +++ b/src/Partition/WritablePartition.js @@ -8,7 +8,7 @@ const DEFAULT_WRITE_BUFFER_SIZE = 16 * 1024; const DOCUMENT_HEADER_SIZE = 16; const DOCUMENT_ALIGNMENT = 4; const DOCUMENT_SEPARATOR = "\x00\x00\x1E\n"; -const DOCUMENT_PAD = ' '.repeat(16 - DOCUMENT_SEPARATOR.length) + DOCUMENT_SEPARATOR; +const DOCUMENT_PAD = ' '.repeat(16 - 4 - DOCUMENT_SEPARATOR.length)/* + DOCUMENT_SEPARATOR*/; const NES_EPOCH = new Date('2020-01-01T00:00:00'); @@ -17,8 +17,8 @@ const NES_EPOCH = new Date('2020-01-01T00:00:00'); * @returns {string} The data padded to 16 bytes alignment and ended with \0x1E (record separator) and a line break. */ function padData(dataSize) { - const padSize = (DOCUMENT_ALIGNMENT - ((dataSize + DOCUMENT_SEPARATOR.length) % DOCUMENT_ALIGNMENT)) % DOCUMENT_ALIGNMENT; - return DOCUMENT_PAD.substr(-padSize - DOCUMENT_SEPARATOR.length); + const padSize = (DOCUMENT_ALIGNMENT - ((dataSize + 4 + DOCUMENT_SEPARATOR.length) % DOCUMENT_ALIGNMENT)) % DOCUMENT_ALIGNMENT; + return DOCUMENT_PAD.substr(0, padSize); } /** @@ -219,6 +219,11 @@ class WritablePartition extends ReadablePartition { let bytesWritten = 0; bytesWritten += fs.writeSync(this.fd, dataHeader); bytesWritten += fs.writeSync(this.fd, data); + bytesWritten += fs.writeSync(this.fd, padData(dataSize)); + const dataSizeBuffer = Buffer.alloc(4); + dataSizeBuffer.writeUInt32BE(dataSize, 0); + bytesWritten += fs.writeSync(this.fd, dataSizeBuffer); + bytesWritten += fs.writeSync(this.fd, DOCUMENT_SEPARATOR); if (typeof callback === 'function') { process.nextTick(callback); } @@ -241,6 +246,10 @@ class WritablePartition extends ReadablePartition { let bytesWritten = 0; bytesWritten += this.writeDocumentHeader(this.writeBuffer, this.writeBufferCursor, dataSize, sequenceNumber); bytesWritten += this.writeBuffer.write(data, this.writeBufferCursor + bytesWritten, 'utf8'); + bytesWritten += this.writeBuffer.write(padData(dataSize), this.writeBufferCursor + bytesWritten, 'utf8'); + this.writeBuffer.writeUInt32BE(dataSize, this.writeBufferCursor + bytesWritten); + bytesWritten += 4; + bytesWritten += this.writeBuffer.write(DOCUMENT_SEPARATOR, this.writeBufferCursor + bytesWritten, 'utf8'); this.writeBufferCursor += bytesWritten; this.writeBufferDocuments++; if (typeof callback === 'function') { @@ -268,7 +277,6 @@ class WritablePartition extends ReadablePartition { const dataSize = Buffer.byteLength(data, 'utf8'); assert(dataSize <= 64 * 1024 * 1024, 'Document is too large! Maximum is 64 MB'); - data += padData(dataSize); const dataPosition = this.size; if (dataSize + DOCUMENT_HEADER_SIZE >= this.writeBuffer.byteLength * 4 / 5) { this.size += this.writeUnbuffered(data, dataSize, sequenceNumber, callback); From ff85df0b985dae9fb0a345689d69e2892e337f7e Mon Sep 17 00:00:00 2001 From: Alexander Berl Date: Sun, 21 Feb 2021 23:21:01 +0100 Subject: [PATCH 5/8] Some code and docblock cleanups --- src/Partition/ReadablePartition.js | 4 ++-- src/Partition/WritablePartition.js | 14 ++++++-------- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/src/Partition/ReadablePartition.js b/src/Partition/ReadablePartition.js index eb3792a..c827ce4 100644 --- a/src/Partition/ReadablePartition.js +++ b/src/Partition/ReadablePartition.js @@ -209,7 +209,7 @@ class ReadablePartition extends events.EventEmitter { * @param {number} offset The position inside the buffer to start reading from. * @param {number} position The file position to start reading from. * @param {number} [size] The expected byte size of the document at the given position. - * @returns {{ dataSize: number, sequenceNumber, number, time64: number }} The metadata fields of the document + * @returns {{ dataSize: number, sequenceNumber: number, time64: number }} The metadata fields of the document * @throws {Error} if the storage entry at the given position is corrupted. * @throws {InvalidDataSizeError} if the document size at the given position does not match the provided size. * @throws {CorruptFileError} if the document at the given position can not be read completely. @@ -286,7 +286,7 @@ class ReadablePartition extends events.EventEmitter { return false; } - assert((position % DOCUMENT_ALIGNMENT) === 0, `Invalid read position. Needs to be a multiple of ${DOCUMENT_ALIGNMENT}.`); + assert((position % DOCUMENT_ALIGNMENT) === 0, `Invalid read position ${position}. Needs to be a multiple of ${DOCUMENT_ALIGNMENT}.`); const reader = this.prepareReadBuffer(position); if (reader.length < size + DOCUMENT_HEADER_SIZE) { diff --git a/src/Partition/WritablePartition.js b/src/Partition/WritablePartition.js index 8644eb6..3f4bb0d 100644 --- a/src/Partition/WritablePartition.js +++ b/src/Partition/WritablePartition.js @@ -8,13 +8,13 @@ const DEFAULT_WRITE_BUFFER_SIZE = 16 * 1024; const DOCUMENT_HEADER_SIZE = 16; const DOCUMENT_ALIGNMENT = 4; const DOCUMENT_SEPARATOR = "\x00\x00\x1E\n"; -const DOCUMENT_PAD = ' '.repeat(16 - 4 - DOCUMENT_SEPARATOR.length)/* + DOCUMENT_SEPARATOR*/; +const DOCUMENT_PAD = ' '.repeat(DOCUMENT_ALIGNMENT)/* + DOCUMENT_SEPARATOR*/; const NES_EPOCH = new Date('2020-01-01T00:00:00'); /** * @param {number} dataSize - * @returns {string} The data padded to 16 bytes alignment and ended with \0x1E (record separator) and a line break. + * @returns {string} The data needed to pad to DOCUMENT_ALIGNMENT bytes alignment (including the DOCUMENT_SEPARATOR). */ function padData(dataSize) { const padSize = (DOCUMENT_ALIGNMENT - ((dataSize + 4 + DOCUMENT_SEPARATOR.length) % DOCUMENT_ALIGNMENT)) % DOCUMENT_ALIGNMENT; @@ -84,11 +84,10 @@ class WritablePartition extends ReadablePartition { this.flushCallbacks = []; if (super.open() === false) { - const stat = fs.statSync(this.fileName); - if (stat.size !== 0) { + if (this.size !== -this.headerSize) { + // If file is not empty, we can not open and initialize it return false; } - this.metadata.epoch = Date.now(); this.writeMetadata(); this.size = 0; } @@ -123,6 +122,7 @@ class WritablePartition extends ReadablePartition { * @returns void */ writeMetadata() { + this.metadata.epoch = Date.now(); const metadataBuffer = buildMetadataHeader(ReadablePartition.HEADER_MAGIC, this.metadata); fs.writeSync(this.fd, metadataBuffer, 0, metadataBuffer.byteLength, 0); this.headerSize = metadataBuffer.byteLength; @@ -328,9 +328,7 @@ class WritablePartition extends ReadablePartition { if (after > this.size) { return; } - if (after < 0) { - after = 0; - } + after = Math.max(0, after); this.flush(); let position = after, data; From b360946197ca7faff5ca4eca19675e55d9ccfd0c Mon Sep 17 00:00:00 2001 From: Alexander Berl Date: Sun, 21 Feb 2021 23:54:35 +0100 Subject: [PATCH 6/8] Refactor document padding calculation --- src/Partition/ReadablePartition.js | 13 +++++++++---- src/Partition/WritablePartition.js | 25 ++++++++----------------- src/util.js | 14 +++++++++++++- 3 files changed, 30 insertions(+), 22 deletions(-) diff --git a/src/Partition/ReadablePartition.js b/src/Partition/ReadablePartition.js index c827ce4..784c180 100644 --- a/src/Partition/ReadablePartition.js +++ b/src/Partition/ReadablePartition.js @@ -1,12 +1,13 @@ const fs = require('fs'); const path = require('path'); const events = require('events'); -const { assert } = require('../util'); +const { assert, alignTo } = require('../util'); const DEFAULT_READ_BUFFER_SIZE = 64 * 1024; const DOCUMENT_HEADER_SIZE = 16; const DOCUMENT_ALIGNMENT = 4; const DOCUMENT_SEPARATOR = "\x00\x00\x1E\n"; +const DOCUMENT_FOOTER_SIZE = 4 /* additional data size footer */ + DOCUMENT_SEPARATOR.length; // node-event-store partition V03 const HEADER_MAGIC = "nesprt03"; @@ -161,8 +162,8 @@ class ReadablePartition extends events.EventEmitter { * @returns {number} The size of the data including header, padded to 16 bytes alignment and ended with a line break. */ documentWriteSize(dataSize) { - const padSize = (DOCUMENT_ALIGNMENT - ((dataSize + 4 + DOCUMENT_SEPARATOR.length) % DOCUMENT_ALIGNMENT)) % DOCUMENT_ALIGNMENT; - return dataSize + DOCUMENT_SEPARATOR.length + 4 + padSize + DOCUMENT_HEADER_SIZE; + const padSize = alignTo(dataSize + DOCUMENT_FOOTER_SIZE, DOCUMENT_ALIGNMENT); + return DOCUMENT_HEADER_SIZE + dataSize + padSize + DOCUMENT_FOOTER_SIZE; } /** @@ -382,4 +383,8 @@ class ReadablePartition extends events.EventEmitter { module.exports = ReadablePartition; module.exports.CorruptFileError = CorruptFileError; module.exports.InvalidDataSizeError = InvalidDataSizeError; -module.exports.HEADER_MAGIC = HEADER_MAGIC; \ No newline at end of file +module.exports.HEADER_MAGIC = HEADER_MAGIC; +module.exports.DOCUMENT_SEPARATOR = DOCUMENT_SEPARATOR; +module.exports.DOCUMENT_ALIGNMENT = DOCUMENT_ALIGNMENT; +module.exports.DOCUMENT_HEADER_SIZE = DOCUMENT_HEADER_SIZE; +module.exports.DOCUMENT_FOOTER_SIZE = DOCUMENT_FOOTER_SIZE; \ No newline at end of file diff --git a/src/Partition/WritablePartition.js b/src/Partition/WritablePartition.js index 3f4bb0d..b7835cb 100644 --- a/src/Partition/WritablePartition.js +++ b/src/Partition/WritablePartition.js @@ -1,26 +1,15 @@ const fs = require('fs'); const mkdirpSync = require('mkdirp').sync; const ReadablePartition = require('./ReadablePartition'); -const { assert, buildMetadataHeader } = require('../util'); +const { assert, buildMetadataHeader, alignTo } = require('../util'); const Clock = require('../Clock'); const DEFAULT_WRITE_BUFFER_SIZE = 16 * 1024; -const DOCUMENT_HEADER_SIZE = 16; -const DOCUMENT_ALIGNMENT = 4; -const DOCUMENT_SEPARATOR = "\x00\x00\x1E\n"; -const DOCUMENT_PAD = ' '.repeat(DOCUMENT_ALIGNMENT)/* + DOCUMENT_SEPARATOR*/; +const { DOCUMENT_ALIGNMENT, DOCUMENT_SEPARATOR, DOCUMENT_HEADER_SIZE, DOCUMENT_FOOTER_SIZE } = ReadablePartition; +const DOCUMENT_PAD = ' '.repeat(DOCUMENT_ALIGNMENT); const NES_EPOCH = new Date('2020-01-01T00:00:00'); -/** - * @param {number} dataSize - * @returns {string} The data needed to pad to DOCUMENT_ALIGNMENT bytes alignment (including the DOCUMENT_SEPARATOR). - */ -function padData(dataSize) { - const padSize = (DOCUMENT_ALIGNMENT - ((dataSize + 4 + DOCUMENT_SEPARATOR.length) % DOCUMENT_ALIGNMENT)) % DOCUMENT_ALIGNMENT; - return DOCUMENT_PAD.substr(0, padSize); -} - /** * A partition is a single file where the storage will write documents to depending on some partitioning rules. * In the case of an event store, this is most likely the (write) streams. @@ -219,7 +208,8 @@ class WritablePartition extends ReadablePartition { let bytesWritten = 0; bytesWritten += fs.writeSync(this.fd, dataHeader); bytesWritten += fs.writeSync(this.fd, data); - bytesWritten += fs.writeSync(this.fd, padData(dataSize)); + const padSize = alignTo(dataSize + DOCUMENT_FOOTER_SIZE, DOCUMENT_ALIGNMENT); + bytesWritten += fs.writeSync(this.fd, DOCUMENT_PAD.substr(0, padSize)); const dataSizeBuffer = Buffer.alloc(4); dataSizeBuffer.writeUInt32BE(dataSize, 0); bytesWritten += fs.writeSync(this.fd, dataSizeBuffer); @@ -240,13 +230,14 @@ class WritablePartition extends ReadablePartition { * @returns {number} Number of bytes written. */ writeBuffered(data, dataSize, sequenceNumber, callback) { - const bytesToWrite = Buffer.byteLength(data, 'utf8') + DOCUMENT_HEADER_SIZE; + const bytesToWrite = this.documentWriteSize(Buffer.byteLength(data, 'utf8')); this.flushIfWriteBufferTooSmall(bytesToWrite); let bytesWritten = 0; bytesWritten += this.writeDocumentHeader(this.writeBuffer, this.writeBufferCursor, dataSize, sequenceNumber); bytesWritten += this.writeBuffer.write(data, this.writeBufferCursor + bytesWritten, 'utf8'); - bytesWritten += this.writeBuffer.write(padData(dataSize), this.writeBufferCursor + bytesWritten, 'utf8'); + const padSize = alignTo(dataSize + DOCUMENT_FOOTER_SIZE, DOCUMENT_ALIGNMENT); + bytesWritten += this.writeBuffer.write(DOCUMENT_PAD.substr(0, padSize), this.writeBufferCursor + bytesWritten, 'utf8'); this.writeBuffer.writeUInt32BE(dataSize, this.writeBufferCursor + bytesWritten); bytesWritten += 4; bytesWritten += this.writeBuffer.write(DOCUMENT_SEPARATOR, this.writeBufferCursor + bytesWritten, 'utf8'); diff --git a/src/util.js b/src/util.js index aad331c..703d805 100644 --- a/src/util.js +++ b/src/util.js @@ -26,6 +26,17 @@ function assert(condition, message, ErrorType = Error) { } } +/** + * Return the amount required to align value to the given alignment. + * It calculates the difference of the alignment and the modulo of value by alignment. + * @param {number} value + * @param {number} alignment + * @returns {number} + */ +function alignTo(value, alignment) { + return (alignment - (value % alignment)) % alignment; +} + /** * @param {string} secret The secret to use for calculating further HMACs * @returns {function(string)} A function that calculates the HMAC for a given string @@ -177,5 +188,6 @@ module.exports = { matches, buildMetadataForMatcher, buildMatcherFromMetadata, - buildMetadataHeader + buildMetadataHeader, + alignTo }; \ No newline at end of file From b05c3096ef9f499ff6e80e2572f0f894ba06f5c5 Mon Sep 17 00:00:00 2001 From: Alexander Berl Date: Mon, 22 Feb 2021 01:19:40 +0100 Subject: [PATCH 7/8] Increase partition test coverage --- src/Partition/ReadablePartition.js | 21 ++++++++++---------- test/Partition.spec.js | 13 ++++++++++++ test/Storage.spec.js | 32 ++++++++++++++++++++++++++++++ 3 files changed, 55 insertions(+), 11 deletions(-) diff --git a/src/Partition/ReadablePartition.js b/src/Partition/ReadablePartition.js index 784c180..d2b3172 100644 --- a/src/Partition/ReadablePartition.js +++ b/src/Partition/ReadablePartition.js @@ -22,6 +22,7 @@ class InvalidDataSizeError extends Error {} * @returns {number} */ function hash(str) { + /* istanbul ignore if */ if (str.length === 0) { return 0; } @@ -264,9 +265,9 @@ class ReadablePartition extends events.EventEmitter { return ({ buffer: null, cursor: 0, length: 0 }); } let bufferCursor = position - this.readBufferPos; - if (this.readBufferPos < 0 || (this.readBufferPos > 0 && bufferCursor < DOCUMENT_SEPARATOR.length + 4)) { + if (this.readBufferPos < 0 || (this.readBufferPos > 0 && bufferCursor < DOCUMENT_FOOTER_SIZE)) { this.fillBuffer(Math.max(position - this.readBuffer.byteLength, 0)); - bufferCursor = this.readBufferLength; + bufferCursor = position - this.readBufferPos; } return ({ buffer: this.readBuffer, cursor: bufferCursor, length: this.readBufferLength }); } @@ -321,12 +322,11 @@ class ReadablePartition extends events.EventEmitter { */ findDocumentPositionBefore(position) { assert(this.fd, 'Partition is not opened.'); + position -= (position % DOCUMENT_ALIGNMENT); if (position <= 0) { return false; } - assert((position % DOCUMENT_ALIGNMENT) === 0, `Invalid read position. Needs to be a multiple of ${DOCUMENT_ALIGNMENT}.`); - const separatorSize = DOCUMENT_SEPARATOR.length; // Optimization if we are at an exact document boundary, where we can just read the document size let reader = this.prepareReadBufferBackwards(position); @@ -353,10 +353,11 @@ class ReadablePartition extends events.EventEmitter { /** * @api + * @param {number} [after] The document position to start reading from. * @returns {Generator} A generator that returns all documents in this partition. */ - *readAll() { - let position = 0; + *readAll(after = 0) { + let position = after < 0 ? this.size + after + 1 : after; let data; while ((data = this.readFrom(position)) !== false) { yield data; @@ -366,15 +367,13 @@ class ReadablePartition extends events.EventEmitter { /** * @api + * @param {number} [before] The document position to start reading backward from. * @returns {Generator} A generator that returns all documents in this partition in reverse order. */ - *readAllBackwards() { - let position = this.size; + *readAllBackwards(before = -1) { + let position = before < 0 ? this.size + before + 1 : before; while ((position = this.findDocumentPositionBefore(position)) !== false) { const data = this.readFrom(position); - if (data === false) { - break; - } yield data; } } diff --git a/test/Partition.spec.js b/test/Partition.spec.js index 72d040a..106bcd9 100644 --- a/test/Partition.spec.js +++ b/test/Partition.spec.js @@ -184,6 +184,19 @@ describe('Partition', function() { expect(i).to.be(0); }); + it('reads all documents in backwards write order from arbitary position', function() { + partition.open(); + fillPartition(50, i => 'foo-' + i.toString()); + partition.close(); + partition.open(); + let i = 50; + for (let data of partition.readAllBackwards(-9)) { + expect(data).to.be('foo-' + i.toString()); + i--; + } + expect(i).to.be(0); + }); + }); describe('readFrom', function() { diff --git a/test/Storage.spec.js b/test/Storage.spec.js index 3691ec2..f72e91e 100644 --- a/test/Storage.spec.js +++ b/test/Storage.spec.js @@ -239,6 +239,23 @@ describe('Storage', function() { expect(index.isOpen()).to.be(true); }); + it('works with arbitrarily sized documents', function() { + storage = createStorage({ writeBufferSize: 1024 }); + storage.open(); + + for (let i = 1; i <= 10; i++) { + storage.write({ foo: i, pad: ' '.repeat(storage.partitionConfig.writeBufferSize * i / 12) }); + } + + storage.close(); + storage = createStorage(); + storage.open(); + + for (let i = 1; i <= 8; i++) { + expect(storage.read(i).foo).to.eql(i); + } + }); + }); describe('readRange', function() { @@ -981,5 +998,20 @@ describe('Storage', function() { reader.close(); }); + it('partitions are opened only once', function(done){ + storage = createStorage({ syncOnFlush: true, partitioner: (document, number) => document.type }); + storage.open(); + + let reader = createReader(); + reader.open(); + reader.on('partition-created', (id) => { + const partitionInstance = reader.getPartition(id); + expect(reader.getPartition(id)).to.be(partitionInstance); + reader.close(); + done(); + }); + + storage.getPartition(''); + }); }); }); From c5fe2fdca946a6aa667928b3709e29a6f70b95e1 Mon Sep 17 00:00:00 2001 From: Alexander Berl Date: Mon, 22 Feb 2021 02:06:39 +0100 Subject: [PATCH 8/8] Increase partition test coverage --- src/Partition/ReadOnlyPartition.js | 1 + src/Partition/ReadablePartition.js | 2 -- test/Partition.spec.js | 35 ++++++++++++++++++++++++++++-- 3 files changed, 34 insertions(+), 4 deletions(-) diff --git a/src/Partition/ReadOnlyPartition.js b/src/Partition/ReadOnlyPartition.js index 9b8f492..948168e 100644 --- a/src/Partition/ReadOnlyPartition.js +++ b/src/Partition/ReadOnlyPartition.js @@ -18,6 +18,7 @@ class ReadOnlyPartition extends WatchesFile(ReadablePartition) { * @param {string} filename */ onChange(filename) { + /* istanbul ignore if */ if (!this.fd) { return; } diff --git a/src/Partition/ReadablePartition.js b/src/Partition/ReadablePartition.js index d2b3172..b462ad9 100644 --- a/src/Partition/ReadablePartition.js +++ b/src/Partition/ReadablePartition.js @@ -347,8 +347,6 @@ class ReadablePartition extends events.EventEmitter { position -= this.readBufferLength; } while (position > 0); return position; - /*const header = this.readDocumentHeader(reader.buffer, reader.cursor, position); - return ({ position, ...header });*/ } /** diff --git a/test/Partition.spec.js b/test/Partition.spec.js index 106bcd9..a42b668 100644 --- a/test/Partition.spec.js +++ b/test/Partition.spec.js @@ -26,8 +26,8 @@ describe('Partition', function() { /** * @returns {ReadOnlyPartition} */ - function createReader() { - const reader = new Partition.ReadOnly(partition.name, { dataDirectory }); + function createReader(options = {}) { + const reader = new Partition.ReadOnly(partition.name, { ...options, dataDirectory }); readers[readers.length] = reader; return reader; } @@ -171,6 +171,19 @@ describe('Partition', function() { expect(i).to.be(51); }); + it('reads all documents in write order from arbitrary position', function() { + partition.open(); + fillPartition(50, i => 'foo-' + i.toString()); + partition.close(); + partition.open(); + let i = 50; + for (let data of partition.readAll(-partition.documentWriteSize('foo-50'.length)-1)) { + expect(data).to.be('foo-' + i.toString()); + i++; + } + expect(i).to.be(51); + }); + it('reads all documents in backwards write order', function() { partition.open(); fillPartition(50, i => 'foo-' + i.toString()); @@ -195,6 +208,24 @@ describe('Partition', function() { i--; } expect(i).to.be(0); + i = 50; + for (let data of partition.readAllBackwards(partition.size - 12)) { + expect(data).to.be('foo-' + i.toString()); + i--; + } + expect(i).to.be(0); + }); + + it('can find document boundaries by scanning across readbuffers', function() { + partition.open(); + fillPartition(2, i => '0xFF'.repeat(64)); + const lastPosition = partition.write('0xFF'.repeat(64)); + partition.close(); + + const reader = createReader({ readBufferSize: 64 }); + reader.open(); + expect(reader.findDocumentPositionBefore(reader.size - 8)).to.be(lastPosition); + reader.close(); }); });