From ef9cb212e27d58f16a5b8358837aa4f98a458f16 Mon Sep 17 00:00:00 2001 From: Renat Zubairov Date: Fri, 8 Feb 2019 15:04:43 +0100 Subject: [PATCH 1/9] Custom EventEmitter --- lib/emitter.js | 198 ++++++++++++++++++++++++++++++++++++++++++++++ lib/executor.js | 2 +- package-lock.json | 5 ++ package.json | 1 + 4 files changed, 205 insertions(+), 1 deletion(-) create mode 100644 lib/emitter.js diff --git a/lib/emitter.js b/lib/emitter.js new file mode 100644 index 00000000..90fe722e --- /dev/null +++ b/lib/emitter.js @@ -0,0 +1,198 @@ +'use strict'; + + +/** + * Object#toString reference. + */ +const objToString = Object.prototype.toString; + +/** + * Check if a value is an array. + * + * @api private + * @param {*} val The value to test. + * @return {boolean} true if the value is an array, otherwise false. + */ +function isArray(val) { + return objToString.call(val) === '[object Array]'; +} + +/** + * Event emitter constructor. + * + * @api public + */ +function EventEmitter() { + // Empty constructor +} + +/** + * Add a listener. + * + * @api public + * @param {string} name Event name. + * @param {Function} fn Event handler. + * @return {EventEmitter} Emitter instance. + */ +EventEmitter.prototype.on = function on(name, fn) { + if (!this.$events) { + this.$events = {}; + } + + if (!this.$events[name]) { + this.$events[name] = fn; + } else if (isArray(this.$events[name])) { + this.$events[name].push(fn); + } else { + this.$events[name] = [this.$events[name], fn]; + } + + return this; +}; + +EventEmitter.prototype.addListener = EventEmitter.prototype.on; + +/** + * Adds a volatile listener. + * + * @api public + * @param {string} name Event name. + * @param {Function} fn Event handler. + * @return {EventEmitter} Emitter instance. + */ +EventEmitter.prototype.once = function once(name, fn) { + const that = this; + + function on() { + that.removeListener(name, on); + //eslint-disable-next-line no-invalid-this + fn.apply(this, arguments); + } + + on.listener = fn; + this.on(name, on); + + return this; +}; + +/** + * Remove a listener. + * + * @api public + * @param {string} name Event name. + * @param {Function} fn Event handler. + * @return {EventEmitter} Emitter instance. + */ +EventEmitter.prototype.removeListener = function removeListener(name, fn) { + if (this.$events && this.$events[name]) { + var list = this.$events[name]; + + if (isArray(list)) { + var pos = -1; + + for (var i = 0, l = list.length; i < l; i++) { + if (list[i] === fn || (list[i].listener && list[i].listener === fn)) { + pos = i; + break; + } + } + + if (pos < 0) { + return this; + } + + list.splice(pos, 1); + + if (!list.length) { + delete this.$events[name]; + } + } else if (list === fn || (list.listener && list.listener === fn)) { + delete this.$events[name]; + } + } + + return this; +}; + +/** + * Remove all listeners for an event. + * + * @api public + * @param {string} name Event name. + * @return {EventEmitter} Emitter instance. + */ +EventEmitter.prototype.removeAllListeners = function removeAllListeners(name) { + if (name === undefined) { + this.$events = {}; + return this; + } + + if (this.$events && this.$events[name]) { + this.$events[name] = null; + } + + return this; +}; + +/** + * Get all listeners for a given event. + * + * @api public + * @param {string} name Event name. + * @return {EventEmitter} Emitter instance. + */ +EventEmitter.prototype.listeners = function listeners(name) { + if (!this.$events) { + this.$events = {}; + } + + if (!this.$events[name]) { + this.$events[name] = []; + } + + if (!isArray(this.$events[name])) { + this.$events[name] = [this.$events[name]]; + } + + return this.$events[name]; +}; + +/** + * Emit an event. + * + * @api public + * @param {string} name Event name. + * @return {boolean} true if at least one handler was invoked, else false. + */ +EventEmitter.prototype.emit = function emit(name) { + if (!this.$events) { + return false; + } + + var handler = this.$events[name]; + + if (!handler) { + return false; + } + + var args = Array.prototype.slice.call(arguments, 1); + + if (typeof handler === 'function') { + handler.apply(this, args); + } else if (isArray(handler)) { + var listeners = handler.slice(); + + for (var i = 0, l = listeners.length; i < l; i++) { + listeners[i].apply(this, args); + } + } else { + return false; + } + + return true; +}; + +/** + * Module exports. + */ +exports.EventEmitter = EventEmitter; diff --git a/lib/executor.js b/lib/executor.js index cba591bd..5cc4f580 100644 --- a/lib/executor.js +++ b/lib/executor.js @@ -1,4 +1,4 @@ -var EventEmitter = require('events').EventEmitter; +var EventEmitter = require('./emitter').EventEmitter; var _ = require('lodash'); var util = require('util'); diff --git a/package-lock.json b/package-lock.json index a2e8211e..217fc680 100644 --- a/package-lock.json +++ b/package-lock.json @@ -4552,6 +4552,11 @@ "integrity": "sha1-u+Z0BseaqFxc/sdm/lc0VV36EnQ=", "dev": true }, + "p-throttle": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/p-throttle/-/p-throttle-2.1.0.tgz", + "integrity": "sha512-DvChtxq2k1PfiK4uZXKA4IvRyuq/gP55tb6MQyMLGfYJifCjJY5lDMb94IQHZss/K/tmZx3fAsSC1IqP0e1OnA==" + }, "parse-filepath": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/parse-filepath/-/parse-filepath-1.0.2.tgz", diff --git a/package.json b/package.json index 22ec85bc..ef31da19 100644 --- a/package.json +++ b/package.json @@ -21,6 +21,7 @@ "debug": "3.1.0", "elasticio-rest-node": "1.2.2", "lodash": "4.17.4", + "p-throttle": "^2.1.0", "q": "1.4.1", "request-promise-native": "^1.0.5", "requestretry": "^3.1.0", From 39a15b563d6d4250fc4e3591b497cd40fb6a7323 Mon Sep 17 00:00:00 2001 From: Renat Zubairov Date: Fri, 8 Feb 2019 15:53:02 +0100 Subject: [PATCH 2/9] Now event emitter works on async listeners. This changes the semantic of the exception handling in the async functions -> exception thrown in the listeners are not automatically propogated to the emitter if emitter is not an `async` function --- lib/emitter.js | 20 ++++++++++---------- spec/component/actions/httpReply.js | 8 ++++---- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/lib/emitter.js b/lib/emitter.js index 90fe722e..d5aa238f 100644 --- a/lib/emitter.js +++ b/lib/emitter.js @@ -85,12 +85,12 @@ EventEmitter.prototype.once = function once(name, fn) { */ EventEmitter.prototype.removeListener = function removeListener(name, fn) { if (this.$events && this.$events[name]) { - var list = this.$events[name]; + const list = this.$events[name]; if (isArray(list)) { - var pos = -1; + let pos = -1; - for (var i = 0, l = list.length; i < l; i++) { + for (let i = 0, l = list.length; i < l; i++) { if (list[i] === fn || (list[i].listener && list[i].listener === fn)) { pos = i; break; @@ -164,26 +164,26 @@ EventEmitter.prototype.listeners = function listeners(name) { * @param {string} name Event name. * @return {boolean} true if at least one handler was invoked, else false. */ -EventEmitter.prototype.emit = function emit(name) { +EventEmitter.prototype.emit = async function emit(name) { if (!this.$events) { return false; } - var handler = this.$events[name]; + const handler = this.$events[name]; if (!handler) { return false; } - var args = Array.prototype.slice.call(arguments, 1); + const args = Array.prototype.slice.call(arguments, 1); if (typeof handler === 'function') { - handler.apply(this, args); + await handler.apply(this, args); } else if (isArray(handler)) { - var listeners = handler.slice(); + const listeners = handler.slice(); - for (var i = 0, l = listeners.length; i < l; i++) { - listeners[i].apply(this, args); + for (let i = 0, l = listeners.length; i < l; i++) { + await listeners[i].apply(this, args); } } else { return false; diff --git a/spec/component/actions/httpReply.js b/spec/component/actions/httpReply.js index c91b7d1f..a2f0ceab 100644 --- a/spec/component/actions/httpReply.js +++ b/spec/component/actions/httpReply.js @@ -1,7 +1,7 @@ exports.process = processAction; -function processAction(msg, cfg) { - this.emit('httpReply', { +async function processAction(msg, cfg) { + await this.emit('httpReply', { statusCode: 200, body: 'Ok', headers: { @@ -9,8 +9,8 @@ function processAction(msg, cfg) { } }); - this.emit('data', { + await this.emit('data', { body: {} }); - this.emit('end'); + await this.emit('end'); } From 210bddc03897f20cd0dc19dacb3a5c1cfe98e619 Mon Sep 17 00:00:00 2001 From: Renat Zubairov Date: Fri, 8 Feb 2019 22:16:38 +0100 Subject: [PATCH 3/9] Async functions + test fixes --- lib/amqp.js | 50 ++++++++++++++++++++++----------------------- lib/sailor.js | 40 ++++++++++++++++-------------------- spec/amqp.spec.js | 20 +++++++++--------- spec/sailor.spec.js | 38 +++++++++++++++++----------------- 4 files changed, 71 insertions(+), 77 deletions(-) diff --git a/lib/amqp.js b/lib/amqp.js index 62c21af3..f7c5d968 100644 --- a/lib/amqp.js +++ b/lib/amqp.js @@ -3,6 +3,7 @@ const amqplib = require('amqplib'); const encryptor = require('./encryptor.js'); const co = require('co'); const _ = require('lodash'); +const pThrottle = require('p-throttle'); const HEADER_ROUTING_KEY = 'x-eio-routing-key'; const HEADER_ERROR_RESPONSE = 'x-eio-error-response'; @@ -100,35 +101,36 @@ class Amqp { return this.subscribeChannel.reject(message, false); } - sendToExchange(exchangeName, routingKey, payload, options) { - //eslint-disable-next-line max-len - log.trace('Pushing to exchange=%s, routingKey=%s, data=%j, options=%j', exchangeName, routingKey, payload, options); - try { - this.publishChannel.publish(exchangeName, routingKey, new Buffer(payload), options); - } catch (err) { - log.error('Failed to publish message to exchange %s, %s', exchangeName, err.message); + async sendToExchange(exchangeName, routingKey, payload, options) { + log.trace('Pushing to exchange=%s, routingKey=%s, data=%j, ' + + 'options=%j', exchangeName, routingKey, payload, options); + const result = this.publishChannel.publish(exchangeName, routingKey, Buffer.from(payload), options); + if (!result) { + log.error('WARNING - buffer full when publishing a message to ' + + 'exchange=%s with routingKey=%s', exchangeName, routingKey); } + await new Promise(resolve => setTimeout(resolve, 1000)); } - prepareMessageAndSendToExchange(data, properties, routingKey) { + async prepareMessageAndSendToExchange(data, properties, routingKey) { const settings = this.settings; data.headers = filterMessageHeaders(data.headers); const encryptedData = encryptor.encryptMessageContent(data); - this.sendToExchange(settings.PUBLISH_MESSAGES_TO, routingKey, encryptedData, properties); + await this.sendToExchange(settings.PUBLISH_MESSAGES_TO, routingKey, encryptedData, properties); } - sendData(data, properties) { + async sendData(data, properties) { const settings = this.settings; const msgHeaders = data.headers || {}; const routingKey = getRoutingKeyFromHeaders(msgHeaders) || settings.DATA_ROUTING_KEY; - this.prepareMessageAndSendToExchange(data, properties, routingKey); + await this.prepareMessageAndSendToExchange(data, properties, routingKey); } - sendHttpReply(data, properties) { + async sendHttpReply(data, properties) { const routingKey = properties.headers.reply_to; @@ -137,10 +139,10 @@ class Amqp { //eslint-disable-next-line no-useless-escape `Component emitted \'httpReply\' event but \'reply_to\' was not found in AMQP headers`); } - this.prepareMessageAndSendToExchange(data, properties, routingKey); + await this.prepareMessageAndSendToExchange(data, properties, routingKey); } - sendError(err, properties, originalMessageContent) { + async sendError(err, properties, originalMessageContent) { const settings = this.settings; const headers = properties.headers; @@ -159,24 +161,24 @@ class Amqp { } const errorPayload = JSON.stringify(payload); - this.sendToExchange(settings.PUBLISH_MESSAGES_TO, settings.ERROR_ROUTING_KEY, errorPayload, properties); + await this.sendToExchange(settings.PUBLISH_MESSAGES_TO, settings.ERROR_ROUTING_KEY, errorPayload, properties); if (headers.reply_to) { console.log('Sending error to', headers.reply_to); const replyToOptions = _.cloneDeep(properties); replyToOptions.headers[HEADER_ERROR_RESPONSE] = true; - this.sendToExchange(settings.PUBLISH_MESSAGES_TO, headers.reply_to, encryptedError, replyToOptions); + await this.sendToExchange(settings.PUBLISH_MESSAGES_TO, headers.reply_to, encryptedError, replyToOptions); } } - sendRebound(reboundError, originalMessage, properties) { + async sendRebound(reboundError, originalMessage, properties) { const settings = this.settings; log.trace('Rebound message: %j', originalMessage); const reboundIteration = getReboundIteration(originalMessage.properties.headers.reboundIteration); if (reboundIteration > settings.REBOUND_LIMIT) { - return this.sendError( + await this.sendError( new Error('Rebound limit exceeded'), properties, originalMessage.content @@ -185,7 +187,7 @@ class Amqp { properties.expiration = getExpiration(reboundIteration); properties.headers.reboundIteration = reboundIteration; - this.sendToExchange( + await this.sendToExchange( settings.PUBLISH_MESSAGES_TO, settings.REBOUND_ROUTING_KEY, originalMessage.content, @@ -206,17 +208,13 @@ class Amqp { } } - sendSnapshot(data, properties) { + async sendSnapshot(data, properties) { const settings = this.settings; const exchange = settings.PUBLISH_MESSAGES_TO; const routingKey = settings.SNAPSHOT_ROUTING_KEY; let payload; - try { - payload = JSON.stringify(data); - } catch (e) { - return console.error('A snapshot should be a valid JSON'); - } - this.sendToExchange(exchange, routingKey, payload, properties); + payload = JSON.stringify(data); + await this.sendToExchange(exchange, routingKey, payload, properties); } } diff --git a/lib/sailor.js b/lib/sailor.js index 5c1dfe2b..4749956f 100644 --- a/lib/sailor.js +++ b/lib/sailor.js @@ -271,7 +271,7 @@ class Sailor { taskExec.process(module, payload, cfg, snapshot); - function onData(data) { + async function onData(data) { const headers = _.clone(outgoingMessageHeaders); log.info({ deliveryTag, @@ -293,10 +293,10 @@ class Sailor { }); } - return promise(self.amqpConnection.sendData(data, props)); + await self.amqpConnection.sendData(data, props); } - function onHttpReply(reply) { + async function onHttpReply(reply) { const headers = _.clone(outgoingMessageHeaders); const props = createAmqpProperties(headers); log.info({ @@ -307,10 +307,10 @@ class Sailor { messageProcessingTime: Date.now() - timeStart }, 'processMessage emit HttpReply'); - return promise(self.amqpConnection.sendHttpReply(reply, props)); + await self.amqpConnection.sendHttpReply(reply, props); } - function onError(err) { + async function onError(err) { const headers = _.clone(outgoingMessageHeaders); err = formatError(err); taskExec.errorCount++; @@ -325,10 +325,10 @@ class Sailor { }, 'processMessage emit error'); headers.end = new Date().getTime(); const props = createAmqpProperties(headers); - return promise(self.amqpConnection.sendError(err, props, message.content)); + await self.amqpConnection.sendError(err, props, message.content); } - function onRebound(err) { + async function onRebound(err) { const headers = _.clone(outgoingMessageHeaders); err = formatError(err); log.info({ @@ -343,18 +343,18 @@ class Sailor { headers.end = new Date().getTime(); headers.reboundReason = err.message; const props = createAmqpProperties(headers); - return promise(self.amqpConnection.sendRebound(err, message, props)); + await self.amqpConnection.sendRebound(err, message, props); } - function onSnapshot(data) { + async function onSnapshot(data) { const headers = _.clone(outgoingMessageHeaders); headers.snapshotEvent = 'snapshot'; self.snapshot = data; //replacing `local` snapshot const props = createAmqpProperties(headers); - return promise(self.amqpConnection.sendSnapshot(data, props)); + await self.amqpConnection.sendSnapshot(data, props); } - function onUpdateSnapshot(data) { + async function onUpdateSnapshot(data) { const headers = _.clone(outgoingMessageHeaders); headers.snapshotEvent = 'updateSnapshot'; @@ -364,13 +364,13 @@ class Sailor { } _.extend(self.snapshot, data); //updating `local` snapshot const props = createAmqpProperties(headers); - self.amqpConnection.sendSnapshot(data, props); + await self.amqpConnection.sendSnapshot(data, props); } else { console.error('You should pass an object to the `updateSnapshot` event'); } } - function onUpdateKeys(keys) { + async function onUpdateKeys(keys) { log.info({ deliveryTag, messageId, @@ -379,18 +379,14 @@ class Sailor { messageProcessingTime: Date.now() - timeStart }, 'processMessage emit updateKeys'); - return promise(self.apiClient.accounts.update(cfg._account, { keys: keys }) - .then(onKeysUpdateSuccess) - .fail(onKeysUpdateError)); - - function onKeysUpdateSuccess() { + try { + await self.apiClient.accounts.update(cfg._account, { keys: keys }); log.debug('Successfully updated keys #%s', message.fields.deliveryTag); - } - - function onKeysUpdateError(err) { + } catch (error) { log.error('Failed to updated keys #%s', message.fields.deliveryTag); - return onError(err); + await onError(error); } + } function onEnd() { diff --git a/spec/amqp.spec.js b/spec/amqp.spec.js index fb2de55d..c0584170 100644 --- a/spec/amqp.spec.js +++ b/spec/amqp.spec.js @@ -143,7 +143,7 @@ describe('AMQP', () => { expect(payload).toEqual(msg); }); - it('Should throw error in sendHttpReply if reply_to header not found', () => { + it('Should throw error in sendHttpReply if reply_to header not found', done => { const amqp = new Amqp(settings); amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish']); @@ -155,9 +155,8 @@ describe('AMQP', () => { }, body: 'OK' }; - - expect(() => { - amqp.sendHttpReply(msg, { + async function test() { + await amqp.sendHttpReply(msg, { contentType: 'application/json', contentEncoding: 'utf8', mandatory: true, @@ -166,11 +165,12 @@ describe('AMQP', () => { stepId: 'step_456' } }); + } + test().then(() => done(new Error('should throw')),(err) => { + expect(amqp.publishChannel.publish).not.toHaveBeenCalled(); + done(); + }); - }).toThrow('Component emitted \'httpReply\' event but \'reply_to\' was not found in AMQP headers'); - - - expect(amqp.publishChannel.publish).not.toHaveBeenCalled(); }); it('Should send message to outgoing channel using routing key from headers when process data', () => { @@ -262,7 +262,7 @@ describe('AMQP', () => { }); }); - it('Should send message to errors using routing key from headers when process error', () => { + it('Should send message to errors using routing key from headers when process error', async () => { const expectedErrorPayload = { error: { @@ -289,7 +289,7 @@ describe('AMQP', () => { } }; - amqp.sendError(new Error('Test error'), props, message.content); + await amqp.sendError(new Error('Test error'), props, message.content); expect(amqp.publishChannel.publish).toHaveBeenCalled(); expect(amqp.publishChannel.publish.callCount).toEqual(2); diff --git a/spec/sailor.spec.js b/spec/sailor.spec.js index b654b8c8..76f89833 100644 --- a/spec/sailor.spec.js +++ b/spec/sailor.spec.js @@ -400,7 +400,7 @@ describe('Sailor', () => { spyOn(sailor.apiClient.tasks, 'retrieveStep').andCallFake((taskId, stepId) => { expect(taskId).toEqual('5559edd38968ec0736000003'); expect(stepId).toEqual('step_1'); - return Q({ + return Promise.resolve({ config: { _account: '1234567890' } @@ -410,25 +410,25 @@ describe('Sailor', () => { spyOn(sailor.apiClient.accounts, 'update').andCallFake((accountId, keys) => { expect(accountId).toEqual('1234567890'); expect(keys).toEqual({ keys: { oauth: { access_token: 'newAccessToken' } } }); - return Q.reject(new Error('Update keys error')); + return Promise.reject(new Error('Update keys error')); }); - - sailor.prepare() - .then(() => sailor.connect()) - .then(() => sailor.processMessage(payload, message)) - .then(() => { - expect(sailor.apiClient.tasks.retrieveStep).toHaveBeenCalled(); - expect(sailor.apiClient.accounts.update).toHaveBeenCalled(); - - expect(fakeAMQPConnection.connect).toHaveBeenCalled(); - expect(fakeAMQPConnection.sendError).toHaveBeenCalled(); - expect(fakeAMQPConnection.sendError.calls[0].args[0].message).toEqual('Update keys error'); - expect(fakeAMQPConnection.ack).toHaveBeenCalled(); - expect(fakeAMQPConnection.ack.callCount).toEqual(1); - expect(fakeAMQPConnection.ack.calls[0].args[0]).toEqual(message); - done(); - }) - .catch(done); + async function test() { + await sailor.prepare(); + await sailor.connect(); + await sailor.processMessage(payload, message); + // It will not throw an error because component + // process method is not `async` + expect(sailor.apiClient.tasks.retrieveStep).toHaveBeenCalled(); + expect(sailor.apiClient.accounts.update).toHaveBeenCalled(); + + expect(fakeAMQPConnection.connect).toHaveBeenCalled(); + expect(fakeAMQPConnection.sendError).toHaveBeenCalled(); + expect(fakeAMQPConnection.sendError.calls[0].args[0].message).toEqual('Update keys error'); + expect(fakeAMQPConnection.ack).toHaveBeenCalled(); + expect(fakeAMQPConnection.ack.callCount).toEqual(1); + expect(fakeAMQPConnection.ack.calls[0].args[0]).toEqual(message); + } + test().then(done,done); }); it('should call sendRebound() and ack()', done => { From 52c3565bcbe5ce154766b58521017e606d6de9ab Mon Sep 17 00:00:00 2001 From: Renat Zubairov Date: Fri, 8 Feb 2019 23:44:38 +0100 Subject: [PATCH 4/9] Added first throttle + first test for it --- lib/amqp.js | 15 +++++++------ lib/sailor.js | 6 ++++- spec/amqp.spec.js | 56 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 69 insertions(+), 8 deletions(-) diff --git a/lib/amqp.js b/lib/amqp.js index f7c5d968..6abd8eb8 100644 --- a/lib/amqp.js +++ b/lib/amqp.js @@ -3,7 +3,6 @@ const amqplib = require('amqplib'); const encryptor = require('./encryptor.js'); const co = require('co'); const _ = require('lodash'); -const pThrottle = require('p-throttle'); const HEADER_ROUTING_KEY = 'x-eio-routing-key'; const HEADER_ERROR_RESPONSE = 'x-eio-error-response'; @@ -101,7 +100,7 @@ class Amqp { return this.subscribeChannel.reject(message, false); } - async sendToExchange(exchangeName, routingKey, payload, options) { + async sendToExchange(exchangeName, routingKey, payload, options, throttle) { log.trace('Pushing to exchange=%s, routingKey=%s, data=%j, ' + 'options=%j', exchangeName, routingKey, payload, options); const result = this.publishChannel.publish(exchangeName, routingKey, Buffer.from(payload), options); @@ -109,25 +108,27 @@ class Amqp { log.error('WARNING - buffer full when publishing a message to ' + 'exchange=%s with routingKey=%s', exchangeName, routingKey); } - await new Promise(resolve => setTimeout(resolve, 1000)); + if (throttle) { + await throttle(); + } } - async prepareMessageAndSendToExchange(data, properties, routingKey) { + async prepareMessageAndSendToExchange(data, properties, routingKey, throttle) { const settings = this.settings; data.headers = filterMessageHeaders(data.headers); const encryptedData = encryptor.encryptMessageContent(data); - await this.sendToExchange(settings.PUBLISH_MESSAGES_TO, routingKey, encryptedData, properties); + await this.sendToExchange(settings.PUBLISH_MESSAGES_TO, routingKey, encryptedData, properties, throttle); } - async sendData(data, properties) { + async sendData(data, properties, throttle) { const settings = this.settings; const msgHeaders = data.headers || {}; const routingKey = getRoutingKeyFromHeaders(msgHeaders) || settings.DATA_ROUTING_KEY; - await this.prepareMessageAndSendToExchange(data, properties, routingKey); + await this.prepareMessageAndSendToExchange(data, properties, routingKey, throttle); } async sendHttpReply(data, properties) { diff --git a/lib/sailor.js b/lib/sailor.js index 4749956f..0179322e 100644 --- a/lib/sailor.js +++ b/lib/sailor.js @@ -10,6 +10,7 @@ const RestApiClient = require('elasticio-rest-node'); const assert = require('assert'); const co = require('co'); const uuid = require('uuid'); +const pThorottle = require('p-throttle'); const TIMEOUT = process.env.ELASTICIO_TIMEOUT || 20 * 60 * 1000; // 20 minutes const AMQP_HEADER_META_PREFIX = 'x-eio-meta-'; @@ -32,6 +33,9 @@ class Sailor { maxAttempts: settings.API_REQUEST_RETRY_ATTEMPTS, retryDelay: settings.API_REQUEST_RETRY_DELAY }); + this.throttles = { + data: new Promise(() => pThorottle(Promise.resolve(true), 100, 1000)) + }; } connect() { @@ -293,7 +297,7 @@ class Sailor { }); } - await self.amqpConnection.sendData(data, props); + await self.amqpConnection.sendData(data, props, self.throttles.data); } async function onHttpReply(reply) { diff --git a/spec/amqp.spec.js b/spec/amqp.spec.js index c0584170..e34e2f86 100644 --- a/spec/amqp.spec.js +++ b/spec/amqp.spec.js @@ -28,6 +28,7 @@ describe('AMQP', () => { const settings = require('../lib/settings.js').readFrom(envVars); const encryptor = require('../lib/encryptor.js'); const _ = require('lodash'); + const pThrottle = require('p-throttle'); const message = { fields: { @@ -105,6 +106,61 @@ describe('AMQP', () => { }); }); + it('Should send message async to outgoing channel when process data', done => { + const amqp = new Amqp(settings); + amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish']); + + const props = { + contentType: 'application/json', + contentEncoding: 'utf8', + mandatory: true, + headers: { + taskId: 'task1234567890', + stepId: 'step_456' + } + }; + // One request every 500 ms + const throttle = pThrottle(() => Promise.resolve(), 1, 500); + const start = Date.now(); + async function test() { + for (let i = 0; i < 3; i++) { + await amqp.sendData({ + headers: { + 'some-other-header': 'headerValue' + }, + body: 'Message content' + }, props, throttle); + } + } + test().then(() => { + const duration = Math.round((Date.now() - start) / 1000); + // Total duration should be around 1 seconds, because + // first goes through + // second throttled for 500ms + // third throttled for another 500 ms + expect(duration).toEqual(1); + expect(amqp.publishChannel.publish).toHaveBeenCalled(); + expect(amqp.publishChannel.publish.callCount).toEqual(3); + + const publishParameters = amqp.publishChannel.publish.calls[0].args; + expect(publishParameters).toEqual([ + settings.PUBLISH_MESSAGES_TO, + settings.DATA_ROUTING_KEY, + jasmine.any(Object), + props + ]); + + const payload = encryptor.decryptMessageContent(publishParameters[2].toString()); + expect(payload).toEqual({ + headers: { + 'some-other-header': 'headerValue' + }, + body: 'Message content' + }); + done(); + }, done); + }); + it('Should sendHttpReply to outgoing channel using routing key from headers when process data', () => { const amqp = new Amqp(settings); From 69a1e9ca2ffc149da613fdad542445219f6eb9b0 Mon Sep 17 00:00:00 2001 From: Renat Zubairov Date: Fri, 8 Feb 2019 23:46:14 +0100 Subject: [PATCH 5/9] Added new node.js versions to travis build --- .travis.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.travis.yml b/.travis.yml index b607749c..cba8d501 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,5 +1,8 @@ language: node_js node_js: + - "8" + - "9" - "10" + - "11" script: - npm run test:jasmine From 7d29a3eb7a9187931899e5859eccea8fe81a5043 Mon Sep 17 00:00:00 2001 From: Renat Zubairov Date: Sat, 9 Feb 2019 00:00:22 +0100 Subject: [PATCH 6/9] Fixed bug in the Throttle definition --- lib/sailor.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/sailor.js b/lib/sailor.js index 0179322e..76c4a2da 100644 --- a/lib/sailor.js +++ b/lib/sailor.js @@ -34,7 +34,7 @@ class Sailor { retryDelay: settings.API_REQUEST_RETRY_DELAY }); this.throttles = { - data: new Promise(() => pThorottle(Promise.resolve(true), 100, 1000)) + data: pThorottle(() => Promise.resolve(true), 100, 1000) }; } From 1546ddb26245c59c8b3e8c50cdb4afb750f7207a Mon Sep 17 00:00:00 2001 From: Renat Zubairov Date: Sat, 9 Feb 2019 00:31:44 +0100 Subject: [PATCH 7/9] Added integration test for throttling --- lib/sailor.js | 5 +- lib/settings.js | 4 +- .../integration_component/component.json | 4 + .../integration_component/triggers/async.js | 21 ++++ mocha_spec/run.spec.js | 98 +++++++++++++++++++ 5 files changed, 130 insertions(+), 2 deletions(-) create mode 100644 mocha_spec/integration_component/triggers/async.js diff --git a/lib/sailor.js b/lib/sailor.js index 76c4a2da..7ab686d2 100644 --- a/lib/sailor.js +++ b/lib/sailor.js @@ -34,7 +34,10 @@ class Sailor { retryDelay: settings.API_REQUEST_RETRY_DELAY }); this.throttles = { - data: pThorottle(() => Promise.resolve(true), 100, 1000) + // 100 Messages per Second + data: pThorottle(() => Promise.resolve(true), + settings.DATA_RATE_LIMIT, + settings.RATE_INTERVAL) }; } diff --git a/lib/settings.js b/lib/settings.js index 90546357..7ff9d2d1 100644 --- a/lib/settings.js +++ b/lib/settings.js @@ -41,7 +41,9 @@ function readFrom(envVars) { STARTUP_REQUIRED: false, HOOK_SHUTDOWN: false, API_REQUEST_RETRY_ATTEMPTS: 3, - API_REQUEST_RETRY_DELAY: 100 + API_REQUEST_RETRY_DELAY: 100, + RATE_INTERVAL: 1000, // One second + DATA_RATE_LIMIT: 100 }; _.forEach(requiredAlways, function readRequired(key) { diff --git a/mocha_spec/integration_component/component.json b/mocha_spec/integration_component/component.json index af525130..c36d216d 100644 --- a/mocha_spec/integration_component/component.json +++ b/mocha_spec/integration_component/component.json @@ -23,6 +23,10 @@ "emit_data": { "main": "./triggers/emit_data.js", "title": "emits data" + }, + "async_trigger": { + "main": "./triggers/async.js", + "title": "async trigger" } }, "actions": { diff --git a/mocha_spec/integration_component/triggers/async.js b/mocha_spec/integration_component/triggers/async.js new file mode 100644 index 00000000..60c494e4 --- /dev/null +++ b/mocha_spec/integration_component/triggers/async.js @@ -0,0 +1,21 @@ +'use strict'; + +exports.process = process; + +async function process(msg, cfg, snapshot) { + for (let i = 0; i < 11; i++) { + console.log('Sending message %s', i); + //eslint-disable-next-line no-invalid-this + await this.emit('data', { + id: 'f45be600-f770-11e6-b42d-b187bfbf19fd', + headers: { + 'x-custom-component-header': '123_abc' + }, + body: { + id: 'someId', + hai: 'there' + } + }); + console.log('Message %s was sent', i); + } +} diff --git a/mocha_spec/run.spec.js b/mocha_spec/run.spec.js index 9862b3f1..2df3165a 100644 --- a/mocha_spec/run.spec.js +++ b/mocha_spec/run.spec.js @@ -230,6 +230,104 @@ describe('Integration Test', () => { run = requireRun(); }); + it('should work well with async process function emitting data', done => { + process.env.ELASTICIO_STEP_ID = 'step_2'; + process.env.ELASTICIO_FLOW_ID = '5559edd38968ec0736000003'; + process.env.ELASTICIO_FUNCTION = 'async_trigger'; + process.env.ELASTICIO_DATA_RATE_LIMIT = '1'; + process.env.ELASTICIO_RATE_INTERVAL = '110'; + + helpers.mockApiTaskStepResponse({ + is_passthrough: true + }); + + const psMsg = Object.assign(inputMessage, { + passthrough: { + step_1: { + id: '34', + body: {}, + attachments: {} + } + } + }); + + amqpHelper.publishMessage(psMsg, { + parentMessageId, + traceId + }); + + let counter = 0; + const start = Date.now(); + amqpHelper.on('data', ({ properties, emittedMessage }, queueName) => { + + expect(queueName).to.eql(amqpHelper.nextStepQueue); + + expect(emittedMessage.passthrough).to.deep.eql({ + step_1: { + id: '34', + body: {}, + attachments: {} + }, + step_2: { + id: messageId, + headers: { + 'x-custom-component-header': '123_abc' + }, + body: { + id: 'someId', + hai: 'there' + } + } + }); + + + delete properties.headers.start; + delete properties.headers.end; + delete properties.headers.cid; + + expect(properties.headers).to.deep.equal({ + 'taskId': process.env.ELASTICIO_FLOW_ID, + 'execId': process.env.ELASTICIO_EXEC_ID, + 'userId': process.env.ELASTICIO_USER_ID, + 'x-eio-meta-trace-id': traceId, + 'stepId': process.env.ELASTICIO_STEP_ID, + 'compId': process.env.ELASTICIO_COMP_ID, + 'function': process.env.ELASTICIO_FUNCTION, + messageId, + parentMessageId + }); + + delete properties.headers; + + expect(properties).to.deep.eql({ + contentType: 'application/json', + contentEncoding: 'utf8', + deliveryMode: undefined, + priority: undefined, + correlationId: undefined, + replyTo: undefined, + expiration: undefined, + messageId: undefined, + timestamp: undefined, + type: undefined, + userId: undefined, + appId: undefined, + clusterId: undefined + }); + + counter++; + // We need 10 messages + if (counter > 10) { + const duration = Date.now() - start; + console.log(`Test duration was ${duration} milliseconds, it should be more than 1000`); + expect(duration > 1000).to.be.ok; + done(); + } + }); + + run = requireRun(); + }); + describe('when env ELASTICIO_STARTUP_REQUIRED is set', () => { beforeEach(() => { From db06fc6376fd487621066fd54a060d5e17825703 Mon Sep 17 00:00:00 2001 From: Renat Zubairov Date: Sat, 9 Feb 2019 00:48:06 +0100 Subject: [PATCH 8/9] Bumped version, added information to README --- README.md | 54 ++++++++++++++++++++++++++++++++++++++++++++++++++++ package.json | 2 +- 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index e7660fdc..db499600 100644 --- a/README.md +++ b/README.md @@ -49,7 +49,61 @@ Here is a list of components build on Node.js: [daviddm-image]: https://david-dm.org/elasticio/sailor-nodejs.svg?theme=shields.io [daviddm-url]: https://david-dm.org/elasticio/sailor-nodejs +## Flow control + +When working in the multi-tenant integration environment it's important to obey the API and +consumption limits imposed by the platform. This is not only a condition for you integrations to +run on the platform (and not begin suspended), but also a good integration practice to sustainably +and efficiently use resources. + +Imagine a system where one party (published) publishes to the queue and one or more consumers +consume from the queue. If publishers are writing to the queue faster than consumers read data +from the queue, queue will earlier or later be overfilled. Once one queue of your integration flow +will grow to a particular limit, the complete integration flow will be suspended and author will be +informed about it. Flow control is a build-in mechanism in the SDK that will help you to prevent +the overflow to happen. + +There are two types of flow control: + * Static flow control - the hard limit of the events (e.g. messages published to the queue) that can + be generated by component. This limit is dictated by your pricing plan and will limit protect the + platform from extensive load. + * Dynamic flow control - the limit that is imposed based on the state of individual queue, more + messages are in the queue, slower publisher could write to it. + +Let's take a look at the simple example: + +```JavaScript +'use strict'; + +exports.process = process; + +async function process(msg, cfg, snapshot) { + for (let i = 0; i < 100000; i++) { + console.log('Sending message %s', i); + await this.emit('data', { + body: { + counter: i, + hi: 'there' + } + }); + console.log('Message %s was sent', i); + } +} +``` + +This simple component, once started on the platform will generate 100k messages. Without flow control this +example will quickly bring the integration queue to the limits and integration flow will be suspended. +With flow control the publishing rate of the messages will be slowed down so both publisher and consumers +will operate in balance. + +### How to configure it + +There is a set of environment variables that are responsible for the configuration of the static flow control +(dynamic flow control is implemented in the message-oriented middleware of the platform hence can't be configured +on the component level) + * ELASTICIO_DATA_RATE_LIMIT - a number of maximum `data` messages per second that could be emitted + by the component ## Sailor hooks diff --git a/package.json b/package.json index ef31da19..938a15ab 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "elasticio-sailor-nodejs", "description": "The official elastic.io library for bootstrapping and executing for Node.js connectors", - "version": "2.2.2", + "version": "2.2.3.dev.1", "main": "run.js", "scripts": { "lint": "./node_modules/.bin/eslint lib spec mocha_spec lib run.js runService.js", From 2d2f8902e24d7cb576deaa1e3853b8695f7662f7 Mon Sep 17 00:00:00 2001 From: Renat Zubairov Date: Sat, 9 Feb 2019 00:51:56 +0100 Subject: [PATCH 9/9] Fixed version --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 938a15ab..a4adbbb2 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "elasticio-sailor-nodejs", "description": "The official elastic.io library for bootstrapping and executing for Node.js connectors", - "version": "2.2.3.dev.1", + "version": "2.2.3-dev.1", "main": "run.js", "scripts": { "lint": "./node_modules/.bin/eslint lib spec mocha_spec lib run.js runService.js",