Skip to content

Commit ed9721b

Browse files
authored
New fields in rmq messages (#101)
* - add `workspaceId` and `containerId` in rabbit messages - minor refactoring * logger fixes * fixes
1 parent 4333f9e commit ed9721b

File tree

14 files changed

+172
-147
lines changed

14 files changed

+172
-147
lines changed

.npmignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,5 @@
1+
.env
2+
.idea
3+
.circleci
4+
mocha_spec
15
spec

README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ Sailor logger adds the following extra context to log messages:
7777
- `ELASTICIO_FLOW_ID`
7878
- `ELASTICIO_FLOW_VERSION`
7979
- `ELASTICIO_FUNCTION`
80-
- `ELASTICIO_ORGANIZATION_ID`
8180
- `ELASTICIO_STEP_ID`
8281
- `ELASTICIO_TASK_USER_EMAIL`
8382
- `ELASTICIO_TENANT_ID`

lib/amqp.js

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -164,13 +164,10 @@ class Amqp {
164164
}
165165

166166
async sendHttpReply(data, properties) {
167-
168167
const routingKey = properties.headers.reply_to;
169168

170169
if (!routingKey) {
171-
throw new Error(
172-
//eslint-disable-next-line no-useless-escape
173-
`Component emitted \'httpReply\' event but \'reply_to\' was not found in AMQP headers`);
170+
throw new Error(`Component emitted 'httpReply' event but 'reply_to' was not found in AMQP headers`);
174171
}
175172
return this.prepareMessageAndSendToExchange(data, properties, routingKey);
176173
}

lib/logging.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ const data = Object.assign(
2424
'ELASTICIO_FLOW_ID',
2525
'ELASTICIO_FLOW_VERSION',
2626
'ELASTICIO_FUNCTION',
27-
'ELASTICIO_ORGANIZATION_ID',
2827
'ELASTICIO_STEP_ID',
2928
'ELASTICIO_TASK_USER_EMAIL',
3029
'ELASTICIO_TENANT_ID',

lib/sailor.js

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,14 @@ class Sailor {
8080

8181
reportError(err) {
8282
const headers = {
83-
execId: process.env.ELASTICIO_EXEC_ID,
84-
taskId: process.env.ELASTICIO_FLOW_ID,
85-
userId: process.env.ELASTICIO_USER_ID
83+
execId: this.settings.EXEC_ID,
84+
taskId: this.settings.FLOW_ID,
85+
workspaceId: this.settings.WORKSPACE_ID,
86+
containerId: this.settings.CONTAINER_ID,
87+
userId: this.settings.USER_ID,
88+
stepId: this.settings.STEP_ID,
89+
compId: this.settings.COMP_ID,
90+
function: this.settings.FUNCTION
8691
};
8792
const props = createDefaultAmqpProperties(headers);
8893
this.amqpConnection.sendError(err, props);
@@ -174,7 +179,7 @@ class Sailor {
174179
const metaHeaders = _.pick(headers, metaHeaderNames);
175180
const metaHeadersLowerCased = _.mapKeys(metaHeaders, (value, key) => key.toLowerCase());
176181

177-
let result = _.pick(headers, ['taskId', 'execId', 'userId']);
182+
let result = _.pick(headers, ['taskId', 'execId', 'workspaceId', 'containerId', 'userId', 'stepId', 'compId']);
178183
result = _.extend(result, metaHeadersLowerCased);
179184

180185
result.threadId = headers.threadId || headers['x-eio-meta-trace-id'];
@@ -240,6 +245,11 @@ class Sailor {
240245
let outgoingMessageHeaders = _.clone(incomingMessageHeaders);
241246

242247
outgoingMessageHeaders = _.extend(outgoingMessageHeaders, {
248+
execId: settings.EXEC_ID,
249+
taskId: settings.FLOW_ID,
250+
workspaceId: settings.WORKSPACE_ID,
251+
containerId: settings.CONTAINER_ID,
252+
userId: settings.USER_ID,
243253
stepId: settings.STEP_ID,
244254
compId: settings.COMP_ID,
245255
function: settings.FUNCTION,
@@ -386,11 +396,9 @@ class Sailor {
386396
logger.error('Failed to updated keys #%s', deliveryTag);
387397
await onError(error);
388398
}
389-
390399
}
391400

392401
function onEnd() {
393-
394402
if (endWasEmitted) {
395403
logger.warn({
396404
messagesCount: self.messagesCount,
@@ -471,4 +479,3 @@ function createDefaultAmqpProperties(headers) {
471479
}
472480

473481
exports.Sailor = Sailor;
474-

lib/settings.js

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
1-
var _ = require('lodash');
1+
const _ = require('lodash');
22

33
exports.readFrom = readFrom;
44

55
function readFrom(envVars) {
6-
7-
var result = {};
6+
const result = {};
87

98
// required settings
10-
11-
var requiredAlways = [
9+
const requiredAlways = [
1210
'FLOW_ID',
1311
'EXEC_ID',
1412
'STEP_ID',
13+
'CONTAINER_ID',
14+
'WORKSPACE_ID',
1515

1616
'USER_ID',
1717
'COMP_ID',
@@ -22,7 +22,7 @@ function readFrom(envVars) {
2222
'API_KEY'
2323
];
2424

25-
var requiredForMessageProcessing = [
25+
const requiredForMessageProcessing = [
2626
'AMQP_URI',
2727
'LISTEN_MESSAGES_ON',
2828
'PUBLISH_MESSAGES_TO',
@@ -33,7 +33,7 @@ function readFrom(envVars) {
3333
'SNAPSHOT_ROUTING_KEY'
3434
];
3535

36-
var optional = {
36+
const optional = {
3737
REBOUND_INITIAL_EXPIRATION: 15000,
3838
REBOUND_LIMIT: 20,
3939
COMPONENT_PATH: '',
@@ -52,7 +52,7 @@ function readFrom(envVars) {
5252
};
5353

5454
_.forEach(requiredAlways, function readRequired(key) {
55-
var envVarName = 'ELASTICIO_' + key;
55+
const envVarName = 'ELASTICIO_' + key;
5656
result[key] = envVars[envVarName] || throwError(envVarName + ' is missing');
5757
});
5858

@@ -64,7 +64,7 @@ function readFrom(envVars) {
6464
}
6565

6666
_.forEach(optional, function readOptional(defaultValue, key) {
67-
var envVarName = 'ELASTICIO_' + key;
67+
const envVarName = 'ELASTICIO_' + key;
6868
if (typeof defaultValue === 'number' && envVars[envVarName]) {
6969
result[key] = parseInt(envVars[envVarName]) || defaultValue;
7070
} else {

mocha_spec/integration_helpers.js

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,10 @@ const nock = require('nock');
88

99
const env = process.env;
1010

11-
1211
class AmqpHelper extends EventEmitter {
13-
1412
constructor() {
1513
super();
14+
1615
this.httpReplyQueueName = PREFIX + 'request_reply_queue';
1716
this.httpReplyQueueRoutingKey = PREFIX + 'request_reply_routing_key';
1817
this.nextStepQueue = PREFIX + '_next_step_queue';
@@ -37,11 +36,11 @@ class AmqpHelper extends EventEmitter {
3736
return this.subscriptionChannel.publish(
3837
env.ELASTICIO_LISTEN_MESSAGES_ON,
3938
env.ELASTICIO_DATA_ROUTING_KEY,
40-
new Buffer(JSON.stringify(message)),
41-
{
39+
new Buffer(JSON.stringify(message)), {
4240
headers: Object.assign({
4341
execId: env.ELASTICIO_EXEC_ID,
4442
taskId: env.ELASTICIO_FLOW_ID,
43+
workspaceId: env.ELASTICIO_WORKSPACE_ID,
4544
userId: env.ELASTICIO_USER_ID,
4645
threadId,
4746
messageId: parentMessageId
@@ -62,6 +61,7 @@ class AmqpHelper extends EventEmitter {
6261
durable: true,
6362
autoDelete: false
6463
};
64+
6565
yield subscriptionChannel.assertExchange(env.ELASTICIO_LISTEN_MESSAGES_ON, 'direct', exchangeOptions);
6666
yield publishChannel.assertExchange(env.ELASTICIO_PUBLISH_MESSAGES_TO, 'direct', exchangeOptions);
6767

@@ -114,11 +114,13 @@ class AmqpHelper extends EventEmitter {
114114
that.consumer.bind(that, that.nextStepQueue),
115115
{ consumerTag: 'sailor_nodejs_1' }
116116
);
117+
117118
yield that.publishChannel.consume(
118119
that.nextStepErrorQueue,
119120
that.consumer.bind(that, that.nextStepErrorQueue),
120121
{ consumerTag: 'sailor_nodejs_2' }
121122
);
123+
122124
yield that.publishChannel.consume(
123125
that.httpReplyQueueName,
124126
that.consumer.bind(that, that.httpReplyQueueName),
@@ -128,7 +130,6 @@ class AmqpHelper extends EventEmitter {
128130
}
129131

130132
consumer(queue, message) {
131-
132133
this.publishChannel.ack(message);
133134

134135
const emittedMessage = JSON.parse(message.content.toString());
@@ -138,22 +139,19 @@ class AmqpHelper extends EventEmitter {
138139
body: emittedMessage.body,
139140
emittedMessage
140141
};
142+
141143
this.dataMessages.push(data);
142144
this.emit('data', data, queue);
143145

144146
// publishChannel.cancel('sailor_nodejs');
145-
146147
// done();
147-
148148
}
149149
}
150150

151151
function amqp() {
152-
153152
const handle = {
154153
//eslint-disable-next-line no-empty-function
155154
getMessages() {
156-
157155
}
158156
};
159157
return handle;
@@ -166,6 +164,9 @@ function prepareEnv() {
166164
env.ELASTICIO_STEP_ID = 'step_1';
167165
env.ELASTICIO_EXEC_ID = 'some-exec-id';
168166

167+
env.ELASTICIO_WORKSPACE_ID = '5559edd38968ec073600683';
168+
env.ELASTICIO_CONTAINER_ID = 'dc1c8c3f-f9cb-49e1-a6b8-716af9e15948';
169+
169170
env.ELASTICIO_USER_ID = '5559edd38968ec0736000002';
170171
env.ELASTICIO_COMP_ID = '5559edd38968ec0736000456';
171172

@@ -177,8 +178,6 @@ function prepareEnv() {
177178
env.ELASTICIO_FLOW_WEBHOOK_URI = 'https://in.elastic.io/hooks/' + env.ELASTICIO_FLOW_ID;
178179

179180
env.DEBUG = 'sailor:debug';
180-
181-
182181
}
183182

184183
function mockApiTaskStepResponse(response) {
@@ -205,4 +204,3 @@ exports.amqp = function amqp() {
205204

206205
exports.prepareEnv = prepareEnv;
207206
exports.mockApiTaskStepResponse = mockApiTaskStepResponse;
208-

0 commit comments

Comments
 (0)