Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
language: node_js
node_js:
- "8"
- "9"
- "10"
- "11"
script:
- npm run test:jasmine
54 changes: 54 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,61 @@ Here is a list of components build on Node.js:
[daviddm-image]: https://david-dm.org/elasticio/sailor-nodejs.svg?theme=shields.io
[daviddm-url]: https://david-dm.org/elasticio/sailor-nodejs

## Flow control

When working in the multi-tenant integration environment it's important to obey the API and
consumption limits imposed by the platform. This is not only a condition for you integrations to
run on the platform (and not begin suspended), but also a good integration practice to sustainably
and efficiently use resources.

Imagine a system where one party (published) publishes to the queue and one or more consumers
consume from the queue. If publishers are writing to the queue faster than consumers read data
from the queue, queue will earlier or later be overfilled. Once one queue of your integration flow
will grow to a particular limit, the complete integration flow will be suspended and author will be
informed about it. Flow control is a build-in mechanism in the SDK that will help you to prevent
the overflow to happen.

There are two types of flow control:
* Static flow control - the hard limit of the events (e.g. messages published to the queue) that can
be generated by component. This limit is dictated by your pricing plan and will limit protect the
platform from extensive load.
* Dynamic flow control - the limit that is imposed based on the state of individual queue, more
messages are in the queue, slower publisher could write to it.

Let's take a look at the simple example:

```JavaScript
'use strict';

exports.process = process;

async function process(msg, cfg, snapshot) {
for (let i = 0; i < 100000; i++) {
console.log('Sending message %s', i);
await this.emit('data', {
body: {
counter: i,
hi: 'there'
}
});
console.log('Message %s was sent', i);
}
}
```

This simple component, once started on the platform will generate 100k messages. Without flow control this
example will quickly bring the integration queue to the limits and integration flow will be suspended.
With flow control the publishing rate of the messages will be slowed down so both publisher and consumers
will operate in balance.

### How to configure it

There is a set of environment variables that are responsible for the configuration of the static flow control
(dynamic flow control is implemented in the message-oriented middleware of the platform hence can't be configured
on the component level)

* ELASTICIO_DATA_RATE_LIMIT - a number of maximum `data` messages per second that could be emitted
by the component

## Sailor hooks

Expand Down
51 changes: 25 additions & 26 deletions lib/amqp.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,35 +100,38 @@ class Amqp {
return this.subscribeChannel.reject(message, false);
}

sendToExchange(exchangeName, routingKey, payload, options) {
//eslint-disable-next-line max-len
log.trace('Pushing to exchange=%s, routingKey=%s, data=%j, options=%j', exchangeName, routingKey, payload, options);
try {
this.publishChannel.publish(exchangeName, routingKey, new Buffer(payload), options);
} catch (err) {
log.error('Failed to publish message to exchange %s, %s', exchangeName, err.message);
async sendToExchange(exchangeName, routingKey, payload, options, throttle) {
log.trace('Pushing to exchange=%s, routingKey=%s, data=%j, '
+ 'options=%j', exchangeName, routingKey, payload, options);
const result = this.publishChannel.publish(exchangeName, routingKey, Buffer.from(payload), options);
if (!result) {
log.error('WARNING - buffer full when publishing a message to '
+ 'exchange=%s with routingKey=%s', exchangeName, routingKey);
}
if (throttle) {
await throttle();
}
}

prepareMessageAndSendToExchange(data, properties, routingKey) {
async prepareMessageAndSendToExchange(data, properties, routingKey, throttle) {
const settings = this.settings;
data.headers = filterMessageHeaders(data.headers);
const encryptedData = encryptor.encryptMessageContent(data);

this.sendToExchange(settings.PUBLISH_MESSAGES_TO, routingKey, encryptedData, properties);
await this.sendToExchange(settings.PUBLISH_MESSAGES_TO, routingKey, encryptedData, properties, throttle);
}

sendData(data, properties) {
async sendData(data, properties, throttle) {
const settings = this.settings;

const msgHeaders = data.headers || {};

const routingKey = getRoutingKeyFromHeaders(msgHeaders) || settings.DATA_ROUTING_KEY;

this.prepareMessageAndSendToExchange(data, properties, routingKey);
await this.prepareMessageAndSendToExchange(data, properties, routingKey, throttle);
}

sendHttpReply(data, properties) {
async sendHttpReply(data, properties) {

const routingKey = properties.headers.reply_to;

Expand All @@ -137,10 +140,10 @@ class Amqp {
//eslint-disable-next-line no-useless-escape
`Component emitted \'httpReply\' event but \'reply_to\' was not found in AMQP headers`);
}
this.prepareMessageAndSendToExchange(data, properties, routingKey);
await this.prepareMessageAndSendToExchange(data, properties, routingKey);
}

sendError(err, properties, originalMessageContent) {
async sendError(err, properties, originalMessageContent) {
const settings = this.settings;
const headers = properties.headers;

Expand All @@ -159,24 +162,24 @@ class Amqp {
}
const errorPayload = JSON.stringify(payload);

this.sendToExchange(settings.PUBLISH_MESSAGES_TO, settings.ERROR_ROUTING_KEY, errorPayload, properties);
await this.sendToExchange(settings.PUBLISH_MESSAGES_TO, settings.ERROR_ROUTING_KEY, errorPayload, properties);

if (headers.reply_to) {
console.log('Sending error to', headers.reply_to);
const replyToOptions = _.cloneDeep(properties);
replyToOptions.headers[HEADER_ERROR_RESPONSE] = true;
this.sendToExchange(settings.PUBLISH_MESSAGES_TO, headers.reply_to, encryptedError, replyToOptions);
await this.sendToExchange(settings.PUBLISH_MESSAGES_TO, headers.reply_to, encryptedError, replyToOptions);
}
}

sendRebound(reboundError, originalMessage, properties) {
async sendRebound(reboundError, originalMessage, properties) {
const settings = this.settings;

log.trace('Rebound message: %j', originalMessage);
const reboundIteration = getReboundIteration(originalMessage.properties.headers.reboundIteration);

if (reboundIteration > settings.REBOUND_LIMIT) {
return this.sendError(
await this.sendError(
new Error('Rebound limit exceeded'),
properties,
originalMessage.content
Expand All @@ -185,7 +188,7 @@ class Amqp {
properties.expiration = getExpiration(reboundIteration);
properties.headers.reboundIteration = reboundIteration;

this.sendToExchange(
await this.sendToExchange(
settings.PUBLISH_MESSAGES_TO,
settings.REBOUND_ROUTING_KEY,
originalMessage.content,
Expand All @@ -206,17 +209,13 @@ class Amqp {
}
}

sendSnapshot(data, properties) {
async sendSnapshot(data, properties) {
const settings = this.settings;
const exchange = settings.PUBLISH_MESSAGES_TO;
const routingKey = settings.SNAPSHOT_ROUTING_KEY;
let payload;
try {
payload = JSON.stringify(data);
} catch (e) {
return console.error('A snapshot should be a valid JSON');
}
this.sendToExchange(exchange, routingKey, payload, properties);
payload = JSON.stringify(data);
await this.sendToExchange(exchange, routingKey, payload, properties);
}
}

Expand Down
198 changes: 198 additions & 0 deletions lib/emitter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
'use strict';


/**
* Object#toString reference.
*/
const objToString = Object.prototype.toString;

/**
* Check if a value is an array.
*
* @api private
* @param {*} val The value to test.
* @return {boolean} true if the value is an array, otherwise false.
*/
function isArray(val) {
return objToString.call(val) === '[object Array]';
}

/**
* Event emitter constructor.
*
* @api public
*/
function EventEmitter() {
// Empty constructor
}

/**
* Add a listener.
*
* @api public
* @param {string} name Event name.
* @param {Function} fn Event handler.
* @return {EventEmitter} Emitter instance.
*/
EventEmitter.prototype.on = function on(name, fn) {
if (!this.$events) {
this.$events = {};
}

if (!this.$events[name]) {
this.$events[name] = fn;
} else if (isArray(this.$events[name])) {
this.$events[name].push(fn);
} else {
this.$events[name] = [this.$events[name], fn];
}

return this;
};

EventEmitter.prototype.addListener = EventEmitter.prototype.on;

/**
* Adds a volatile listener.
*
* @api public
* @param {string} name Event name.
* @param {Function} fn Event handler.
* @return {EventEmitter} Emitter instance.
*/
EventEmitter.prototype.once = function once(name, fn) {
const that = this;

function on() {
that.removeListener(name, on);
//eslint-disable-next-line no-invalid-this
fn.apply(this, arguments);
}

on.listener = fn;
this.on(name, on);

return this;
};

/**
* Remove a listener.
*
* @api public
* @param {string} name Event name.
* @param {Function} fn Event handler.
* @return {EventEmitter} Emitter instance.
*/
EventEmitter.prototype.removeListener = function removeListener(name, fn) {
if (this.$events && this.$events[name]) {
const list = this.$events[name];

if (isArray(list)) {
let pos = -1;

for (let i = 0, l = list.length; i < l; i++) {
if (list[i] === fn || (list[i].listener && list[i].listener === fn)) {
pos = i;
break;
}
}

if (pos < 0) {
return this;
}

list.splice(pos, 1);

if (!list.length) {
delete this.$events[name];
}
} else if (list === fn || (list.listener && list.listener === fn)) {
delete this.$events[name];
}
}

return this;
};

/**
* Remove all listeners for an event.
*
* @api public
* @param {string} name Event name.
* @return {EventEmitter} Emitter instance.
*/
EventEmitter.prototype.removeAllListeners = function removeAllListeners(name) {
if (name === undefined) {
this.$events = {};
return this;
}

if (this.$events && this.$events[name]) {
this.$events[name] = null;
}

return this;
};

/**
* Get all listeners for a given event.
*
* @api public
* @param {string} name Event name.
* @return {EventEmitter} Emitter instance.
*/
EventEmitter.prototype.listeners = function listeners(name) {
if (!this.$events) {
this.$events = {};
}

if (!this.$events[name]) {
this.$events[name] = [];
}

if (!isArray(this.$events[name])) {
this.$events[name] = [this.$events[name]];
}

return this.$events[name];
};

/**
* Emit an event.
*
* @api public
* @param {string} name Event name.
* @return {boolean} true if at least one handler was invoked, else false.
*/
EventEmitter.prototype.emit = async function emit(name) {
if (!this.$events) {
return false;
}

const handler = this.$events[name];

if (!handler) {
return false;
}

const args = Array.prototype.slice.call(arguments, 1);

if (typeof handler === 'function') {
await handler.apply(this, args);
} else if (isArray(handler)) {
const listeners = handler.slice();

for (let i = 0, l = listeners.length; i < l; i++) {
await listeners[i].apply(this, args);
}
} else {
return false;
}

return true;
};

/**
* Module exports.
*/
exports.EventEmitter = EventEmitter;
2 changes: 1 addition & 1 deletion lib/executor.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
var EventEmitter = require('events').EventEmitter;
var EventEmitter = require('./emitter').EventEmitter;
var _ = require('lodash');
var util = require('util');

Expand Down
Loading