diff --git a/gobblefile.js b/gobblefile.js index bd87c2b..3acfd4a 100644 --- a/gobblefile.js +++ b/gobblefile.js @@ -9,6 +9,9 @@ module.exports = gobble('src').transform('babel', { ], plugins: [ 'syntax-decorators', + ['babel-plugin-transform-builtin-extend', { + globals: ['Error'] + }], 'transform-decorators-legacy', 'transform-runtime', 'add-module-exports' diff --git a/package.json b/package.json index c31ae25..e3e2774 100644 --- a/package.json +++ b/package.json @@ -54,6 +54,7 @@ "babel": "6.5.2", "babel-plugin-add-module-exports": "0.1.2", "babel-plugin-syntax-decorators": "6.5.0", + "babel-plugin-transform-builtin-extend": "1.1.2", "babel-plugin-transform-decorators-legacy": "1.3.4", "babel-plugin-transform-runtime": "6.6.0", "babel-preset-es2015": "6.6.0", diff --git a/src/defaults.js b/src/defaults.js index e3e2943..d454bd5 100644 --- a/src/defaults.js +++ b/src/defaults.js @@ -10,6 +10,8 @@ export const portSecure = 443; export const reconnectInitialDelay = 1000; +export const reconnectFailAfter = Infinity; + export const reconnectMaxDelay = 15 * 1000; export const reconnectBackoffStrategy = 'fibonacci'; diff --git a/src/error.js b/src/error.js index ae0d704..8efa22c 100644 --- a/src/error.js +++ b/src/error.js @@ -16,3 +16,9 @@ export class BadOptionsError extends LogentriesError { this.options = opts; } } + +export class ReconnectFailedError extends LogentriesError { + constructor(msg = 'Reconnect failed') { + super(msg); + } +} diff --git a/src/logger.js b/src/logger.js index f5b5bcf..39f6d80 100644 --- a/src/logger.js +++ b/src/logger.js @@ -13,7 +13,8 @@ import text from './text'; import build from './serialize'; import { BadOptionsError, - LogentriesError + LogentriesError, + ReconnectFailedError, } from './error'; import RingBuffer from './ringbuffer'; import BunyanStream from './bunyanstream'; @@ -140,6 +141,8 @@ class Logger extends Writable { this.inactivityTimeout = opts.inactivityTimeout || defaults.inactivityTimeout; this.disableTimeout = opts.disableTimeout; this.token = opts.token; + this.reconnectFailed = false; + this.reconnectFailAfter = opts.reconnectFailAfter || defaults.reconnectFailAfter; this.reconnectInitialDelay = opts.reconnectInitialDelay || defaults.reconnectInitialDelay; this.reconnectMaxDelay = opts.reconnectMaxDelay || defaults.reconnectMaxDelay; this.reconnectBackoffStrategy = @@ -187,6 +190,7 @@ class Logger extends Writable { this.on(bufferDrainEvent, () => { this.debugLogger.log('RingBuffer drained.'); this.drained = true; + this.reconnectFailed = false; }); this.on(timeoutEvent, () => { @@ -211,6 +215,21 @@ class Logger extends Writable { */ _write(ch, enc, cb) { this.drained = false; + + if (this.reconnectFailed) { + this.ringBuffer.read(); + + if (this.ringBuffer.isEmpty()) { + this.emit(bufferDrainEvent); + // this event is DEPRECATED - will be removed in next major release. + // new users should use 'buffer drain' event instead. + this.emit('connection drain'); + } + + cb(); + return; + } + this.connection.then(conn => { const record = this.ringBuffer.read(); if (record) { @@ -233,8 +252,15 @@ class Logger extends Writable { } cb(); }).catch(err => { - this.emit(errorEvent, err); - this.debugLogger.log(`Error: ${err}`); + if (err instanceof ReconnectFailedError) { + this.ringBuffer.read(); // this connection failed, shift the buffer + this.reconnectFailed = true; + this.debugLogger.log('Error: Reconnect failed'); + } else { + this.emit(errorEvent, err); + this.debugLogger.log(`Error: ${err}`); + } + cb(); }); } @@ -413,12 +439,12 @@ class Logger extends Writable { initialDelay: this.reconnectInitialDelay, maxDelay: this.reconnectMaxDelay, strategy: this.reconnectBackoffStrategy, - failAfter: Infinity, + failAfter: this.reconnectFailAfter, randomisationFactor: 0, immediate: false }); - this.connection = new Promise((resolve) => { + this.connection = new Promise((resolve, reject) => { const connOpts = { host: this.host, port: this.port @@ -442,6 +468,10 @@ class Logger extends Writable { } }); + this.reconnection.once('fail', () => { + reject(new ReconnectFailedError()); + }); + this.reconnection.once('disconnect', () => { this.debugLogger.log('Socket was disconnected'); this.connection = null; @@ -579,6 +609,14 @@ class Logger extends Writable { this._json = val; } + get reconnectFailed() { + return this._reconnectFailed; + } + + set reconnectFailed(val) { + this._reconnectFailed = val; + } + get reconnectMaxDelay() { return this._reconnectMaxDelay; } @@ -595,6 +633,14 @@ class Logger extends Writable { this._reconnectInitialDelay = val; } + get reconnectFailAfter() { + return this._reconnectFailAfter; + } + + set reconnectFailAfter(val) { + this._reconnectFailAfter = val; + } + get reconnectBackoffStrategy() { return this._reconnectBackoffStrategy; } diff --git a/src/ringbuffer.js b/src/ringbuffer.js index e2c467b..628a725 100644 --- a/src/ringbuffer.js +++ b/src/ringbuffer.js @@ -27,6 +27,11 @@ class RingBuffer extends EventEmitter { return this.records.shift(); } + empty() { + this.bufferWasFull = false; + this.records = []; + } + isEmpty() { return this.records.length === 0; } diff --git a/test/test.js b/test/test.js index 939b2ea..bcb2e36 100644 --- a/test/test.js +++ b/test/test.js @@ -3,6 +3,7 @@ 'use strict'; const _ = require('lodash'); +const EventEmitter = require('events'); const bunyan = require('bunyan'); const defaults = require('../lib/defaults.js'); const levels = require('../lib/levels.js'); @@ -660,6 +661,58 @@ tape('Socket is not closed after inactivity timeout when buffer is not empty.', logger.log(lvl, 'first log'); }); +tape('Socket will not reconnect indefinitely when fail after is configured', function (t) { + t.plan(5); + t.timeoutAfter(1000); + const lvl = defaults.levels[3]; + const tkn = x; + const logger = new Logger({ token: x, reconnectFailAfter: 3, reconnectInitialDelay: 100, reconnectMaxDelay: 101 }); + + const mock = new mitm(); + let retryCounter = 0; + + const origConnect = mock.connect; + + const sendMoreLogs = () => { + mock.connect = origConnect; + + mock.on('connection', (socket) => { + socket.once('data', (buffer) => { + const log = buffer.toString(); + const expectedLog = [tkn, lvl, 'other log' + '\n'].join(' '); + t.equals(log, expectedLog, 'log received.'); + mock.disable(); + }); + }); + + logger.log(lvl, 'other log'); + }; + + logger.once('buffer drain', sendMoreLogs); + + mock.connect = function() { + const emitter = new EventEmitter(); + + // mock properties + emitter.end = _.noop; + emitter.setTimeout = _.noop; + emitter.server = new EventEmitter(); + + t.true(retryCounter <= 3, `retry ${retryCounter}`); + retryCounter += 1; + + setTimeout(() => { + emitter.emit('error', new Error('connection failed')) + }, 0); + + return emitter; + }; + + mock.enable(); + + logger.log(lvl, 'test log 1'); + logger.log(lvl, 'test log 2'); +}); tape('RingBuffer buffers and shifts when it is full', function (t) { t.plan(5);