From 65d701765758007bd8dd7e8851931f41398f945a Mon Sep 17 00:00:00 2001 From: Lena Kaplan Date: Wed, 19 Jul 2017 21:07:41 +0300 Subject: [PATCH 1/2] change async apply each to asyn paralllel --- lib/actor-pool.js | 243 ++++++++++++++++++++++++---------------------- 1 file changed, 129 insertions(+), 114 deletions(-) diff --git a/lib/actor-pool.js b/lib/actor-pool.js index 396c7c0..7cf8203 100644 --- a/lib/actor-pool.js +++ b/lib/actor-pool.js @@ -13,162 +13,177 @@ var MEMORY_POOL_SIZE = config.memoryPoolSize; var log = require('./logger')('memory-pool'); var actorEnvelopes = new LRU({ - max: MEMORY_POOL_SIZE, - dispose: function dispose(key, value) { - log.info(log.defaultContext(), 'removing actor: ', key, ' from LRU'); - } + max: MEMORY_POOL_SIZE, + dispose: function dispose(key, value) { + log.info(log.defaultContext(), 'removing actor: ', key, ' from LRU'); + } }); var firstInit = true; var PROCESS_QUEUE_INTERVAL; var SET_MIN_STATE_INTERVAL; var singleState = (function singleStateFn() { - var currentMinimum = new Date(); + var currentMinimum = new Date(); - return { - getMinimum: function getMinimumFn() { - return currentMinimum; - }, + return { + getMinimum: function getMinimumFn() { + return currentMinimum; + }, - setMinimum: function setMinimumFn(newMinimum) { - currentMinimum = newMinimum; - return; - } - }; + setMinimum: function setMinimumFn(newMinimum) { + currentMinimum = newMinimum; + return; + } + }; })(); function getOrCreate(context, options, initCb) { - var modelName; - var id; - var idx; - - modelName = context.activity.modelName; - id = '' + context.activity.entityId; - idx = modelName + id; - if (!actorEnvelopes.get(idx)) { - var key = context.actorEntity._version; - context.actorEntity.constructor.instanceLocker().acquire(key, function instanceLockerAcquireFn(cb) { - if (!actorEnvelopes.get(idx)) { - var envelope = {}; - envelope.actorId = id; - envelope.modelName = modelName; - actorEnvelopes.set(idx, envelope); - return context.actorEntity.performStartOperation(context.journalEntity, options, envelope, function performStartOperationCbFn(err) { - if (err) { - actorEnvelopes.del(idx); - return cb(err); - } - context.envelope = envelope; - return cb(); + var modelName; + var id; + var idx; + + modelName = context.activity.modelName; + id = '' + context.activity.entityId; + idx = modelName + id; + if (!actorEnvelopes.get(idx)) { + var key = context.actorEntity._version; + context.actorEntity.constructor.instanceLocker().acquire(key, function instanceLockerAcquireFn(cb) { + if (!actorEnvelopes.get(idx)) { + var envelope = {}; + envelope.actorId = id; + envelope.modelName = modelName; + actorEnvelopes.set(idx, envelope); + return context.actorEntity.performStartOperation(context.journalEntity, options, envelope, function performStartOperationCbFn(err) { + if (err) { + actorEnvelopes.del(idx); + return cb(err); + } + context.envelope = envelope; + return cb(); + }); + } + }, function instanceLockerAcquireCbFn(err) { + if (err) { + return initCb(err, context); + } + return initCb(null, context); }); - } - }, function instanceLockerAcquireCbFn(err) { - if (err) { - return initCb(err, context); - } - return initCb(null, context); - }); - } else { - var envelope = actorEnvelopes.get(idx); - context.envelope = envelope; - return initCb(null, context); - } + } else { + var envelope = actorEnvelopes.get(idx); + context.envelope = envelope; + return initCb(null, context); + } } function destroy(modelName, id) { - log.debug(log.defaultContext(), 'removing actor from memory\n' + 'model: ' + modelName + '\n' + 'id: ' + id); - var idx = modelName + id; - if (typeof actorEnvelopes.get(idx) !== 'undefined') { - actorEnvelopes.del(idx); - } + log.debug(log.defaultContext(), 'removing actor from memory\n' + 'model: ' + modelName + '\n' + 'id: ' + id); + var idx = modelName + id; + if (typeof actorEnvelopes.get(idx) !== 'undefined') { + actorEnvelopes.del(idx); + } } function processMemoryMessages() { - actorEnvelopes.values().forEach(function actorEnvelopesForEachFn(actorEnvelope) { - if (typeof actorEnvelope.isCurrentlyProcessing !== 'undefined' && actorEnvelope.isCurrentlyProcessing === false) { - actorEnvelope.isCurrentlyProcessing = true; - var actorModel = loopback.getModel(actorEnvelope.modelName); - var query = { where: { id: actorEnvelope.actorId }, limit: 1 }; - var options = { 'fetchAllScopes': true }; - actorModel.find(query, options, function actorModelFindFn(err, result) { + var tasks = []; + async.each(actorEnvelopes.values(), function (actorEnvelope, cb) { + tasks.push(function () { + if (typeof actorEnvelope.isCurrentlyProcessing !== 'undefined' && actorEnvelope.isCurrentlyProcessing === false) { + actorEnvelope.isCurrentlyProcessing = true; + var actorModel = loopback.getModel(actorEnvelope.modelName); + var query = {where: {id: actorEnvelope.actorId}, limit: 1}; + var options = {'fetchAllScopes': true}; + actorModel.find(query, options, function actorModelFindFn(err, result) { + if (err) { + logError(err); + actorEnvelope.isCurrentlyProcessing = false; + } else if (!result[0]) { + actorEnvelopes.del(actorEnvelope.modelName + actorEnvelope.actorId); + } else { + var actor = result[0]; + actor.processMessagesBackground(actorEnvelope, options, function processMessagesBackgroundFn() { + actorEnvelope.isCurrentlyProcessing = false; + }); + } + }); + } + }); + cb(); + }, function (err) { + log.error(`error in processMemoryMessages ${err}`); + }); + + async.parallel(tasks, function (err) { if (err) { - logError(err); - actorEnvelope.isCurrentlyProcessing = false; - } else if (!result[0]) { - actorEnvelopes.del(actorEnvelope.modelName + actorEnvelope.actorId); - } else { - var actor = result[0]; - actor.processMessagesBackground(actorEnvelope, options, function processMessagesBackgroundFn() { - actorEnvelope.isCurrentlyProcessing = false; - }); + log.error(`error in processMemoryMessages ${err}`); } - }); - } - }); + }); } function logError(err) { - if (err) { - log.error(log.defaultContext(), err); - } + if (err) { + log.error(log.defaultContext(), err); + } } function checkIfMinimum(actorEnvelope, callback) { - var currentMinimum = singleState.getMinimum(); - if (currentMinimum > actorEnvelope.actorLastPersistedOn) { - singleState.setMinimum(actorEnvelope.actorLastPersistedOn); - } - return callback(); + var currentMinimum = singleState.getMinimum(); + if (currentMinimum > actorEnvelope.actorLastPersistedOn) { + singleState.setMinimum(actorEnvelope.actorLastPersistedOn); + } + return callback(); } function setMinState() { - async.each(actorEnvelopes.values(), checkIfMinimum, function setMinStateAsyncFn() { - var minStateModel = loopback.getModel('MinimumState'); - minStateModel.findOne({}, { fetchAllScopes: true, ctx: { tenantId: 'default' } }, function minStateModelFindOneCbFn(err, instance) { - var instanceId; - if (err) { - // TO DO: better error handling - log.error(log.defaultContext(), err); - } else if (instance !== null) { - instanceId = instance.id; - } else { - instanceId = uuid.v4(); - } - - var currentMinimum = singleState.getMinimum(); - var data = { time: currentMinimum, id: instanceId }; - - minStateModel.upsert(data, { ctx: { tenantId: 'default' } }, function minStateModelUpsertCbFn(err, instance) { - if (err) { - log.error(log.defaultContext(), err); - // TO DO: better error handling - } - }); + async.each(actorEnvelopes.values(), checkIfMinimum, function setMinStateAsyncFn() { + var minStateModel = loopback.getModel('MinimumState'); + minStateModel.findOne({}, { + fetchAllScopes: true, + ctx: {tenantId: 'default'} + }, function minStateModelFindOneCbFn(err, instance) { + var instanceId; + if (err) { + // TO DO: better error handling + log.error(log.defaultContext(), err); + } else if (instance !== null) { + instanceId = instance.id; + } else { + instanceId = uuid.v4(); + } + + var currentMinimum = singleState.getMinimum(); + var data = {time: currentMinimum, id: instanceId}; + + minStateModel.upsert(data, {ctx: {tenantId: 'default'}}, function minStateModelUpsertCbFn(err, instance) { + if (err) { + log.error(log.defaultContext(), err); + // TO DO: better error handling + } + }); + }); }); - }); } function initWithCustomInterval(app) { - if (!firstInit) { - clearInterval(PROCESS_QUEUE_INTERVAL); - clearInterval(SET_MIN_STATE_INTERVAL); - } - PROCESS_QUEUE_INTERVAL = setInterval(processMemoryMessages, app.get('memoryInterval') || config.memoryInterval); - SET_MIN_STATE_INTERVAL = setInterval(setMinState, app.get('minStateInterval') || config.minStateInterval); - - firstInit = false; + if (!firstInit) { + clearInterval(PROCESS_QUEUE_INTERVAL); + clearInterval(SET_MIN_STATE_INTERVAL); + } + PROCESS_QUEUE_INTERVAL = setInterval(processMemoryMessages, app.get('memoryInterval') || config.memoryInterval); + SET_MIN_STATE_INTERVAL = setInterval(setMinState, app.get('minStateInterval') || config.minStateInterval); + + firstInit = false; } module.exports.getOrCreateInstance = getOrCreate; module.exports.destroy = destroy; module.exports.initPool = function initPool(app) { - initWithCustomInterval(app); + initWithCustomInterval(app); }; module.exports.getEnvelope = function getEnvelope(modelName, id) { - var envelope = actorEnvelopes.get(modelName + id); - return envelope; + var envelope = actorEnvelopes.get(modelName + id); + return envelope; }; From f86ed0d9dcc128ed48c17a698bdc83cf5212b455 Mon Sep 17 00:00:00 2001 From: Lena Kaplan Date: Wed, 19 Jul 2017 21:26:49 +0300 Subject: [PATCH 2/2] add limit to async parallel in proccess message background --- lib/actor-pool.js | 2 +- server/config.json | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/actor-pool.js b/lib/actor-pool.js index 7cf8203..3cb1b06 100644 --- a/lib/actor-pool.js +++ b/lib/actor-pool.js @@ -112,7 +112,7 @@ function processMemoryMessages() { log.error(`error in processMemoryMessages ${err}`); }); - async.parallel(tasks, function (err) { + async.parallelLimit(tasks, config.actorChunkSize, function (err) { if (err) { log.error(`error in processMemoryMessages ${err}`); } diff --git a/server/config.json b/server/config.json index c93bee9..5029ee7 100644 --- a/server/config.json +++ b/server/config.json @@ -92,5 +92,6 @@ "userName":"admin", "password":"admin", "email":"admin@mycompany.com" - } + }, + "actorChunkSize" : 10 }