33 */
44
55require ( './bootstrap' )
6+
7+ const AWSXRay = require ( 'aws-xray-sdk' )
8+ const ns = AWSXRay . getNamespace ( ) ;
9+
610const _ = require ( 'lodash' )
711const config = require ( 'config' )
812const Kafka = require ( 'no-kafka' )
@@ -11,8 +15,6 @@ const logger = require('./common/logger')
1115const helper = require ( './common/helper' )
1216const ProcessorService = require ( './services/ProcessorService' )
1317
14- const AWSXRay = require ( 'aws-xray-sdk' )
15- const segment = new AWSXRay . Segment ( 'legacy-challenge-processor' ) ;
1618
1719// Start kafka consumer
1820logger . info ( 'Starting kafka consumer' )
@@ -26,6 +28,9 @@ const consumer = new Kafka.GroupConsumer(helper.getKafkaOptions())
2628 * this function will be invoked
2729 */
2830const dataHandler = ( messageSet , topic , partition ) => Promise . each ( messageSet , async ( m ) => {
31+ const segment = new AWSXRay . Segment ( 'legacy-challenge-processor' ) ;
32+ AWSXRay . setSegment ( segment ) ;
33+
2934 const message = m . message . value . toString ( 'utf8' )
3035 logger . info ( `Handle Kafka event message; Topic: ${ topic } ; Partition: ${ partition } ; Offset: ${ m . offset } ; Message: ${ message } .` )
3136
@@ -57,67 +62,57 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, a
5762 return
5863 }
5964
60- const ns = AWSXRay . getNamespace ( ) ;
61- console . log ( 'Running within context' , ns ) ;
62- ns . run ( async ( ) => {
63- console . log ( 'creating segment' ) ;
64- console . log ( 'created segment' ) ;
65- AWSXRay . setSegment ( segment ) ;
66- console . log ( 'set segment' ) ;
67-
68- const { traceInformation : {
69- traceId,
70- parentSegmentId,
71- } = {
72- traceId : null ,
73- parentSegmentId : null
74- } } = messageJSON . payload ;
75-
76- console . log ( 'tracing information' , traceId , parentSegmentId ) ;
77-
78- if ( traceId ) {
79- segment . trace_id = traceId ;
80- segment . id = parentSegmentId ;
81- }
65+ const { traceInformation : {
66+ traceId,
67+ parentSegmentId,
68+ } = {
69+ traceId : null ,
70+ parentSegmentId : null
71+ } } = messageJSON . payload ;
8272
73+ console . log ( 'tracing information' , traceId , parentSegmentId ) ;
8374
84- // do not trust the message payload
85- // the message.payload will be replaced with the data from the API
86- try {
87- console . log ( 'Fetch challenge details' ) ;
88- const challengeUuid = _ . get ( messageJSON , 'payload.id' )
89- if ( _ . isEmpty ( challengeUuid ) ) {
90- segment . close ( ) ;
91- segment . addError ( new Error ( err ) ) ;
92- throw new Error ( 'Invalid payload' )
93- }
94- const m2mToken = await helper . getM2MToken ( )
95- const v5Challenge = await helper . getRequest ( `${ config . V5_CHALLENGE_API_URL } /${ challengeUuid } ` , m2mToken )
96- // TODO : Cleanup. Pulling the billingAccountId from the payload, it's not part of the challenge object
97- messageJSON . payload = { billingAccountId : messageJSON . payload . billingAccountId , ...v5Challenge . body }
98- } catch ( err ) {
99- segment . addError ( new Error ( err ) ) ;
100- logger . debug ( 'Failed to fetch challenge information' )
101- logger . logFullError ( err )
102- }
75+ if ( traceId ) {
76+ segment . trace_id = traceId ;
77+ segment . id = parentSegmentId ;
78+ }
10379
104- try {
105- console . log ( 'Process challenge' )
106- await ProcessorService . processMessage ( messageJSON )
10780
108- // logger.debug('Successfully processed message')
109- } catch ( err ) {
81+ // do not trust the message payload
82+ // the message.payload will be replaced with the data from the API
83+ try {
84+ console . log ( 'Fetch challenge details' ) ;
85+ const challengeUuid = _ . get ( messageJSON , 'payload.id' )
86+ if ( _ . isEmpty ( challengeUuid ) ) {
87+ segment . close ( ) ;
11088 segment . addError ( new Error ( err ) ) ;
111- logger . error ( `Error processing message ${ JSON . stringify ( messageJSON ) } ` )
112- logger . logFullError ( err )
113- } finally {
114- // Commit offset regardless of error
115- await consumer . commitOffset ( { topic, partition, offset : m . offset } )
89+ throw new Error ( 'Invalid payload' )
11690 }
91+ const m2mToken = await helper . getM2MToken ( )
92+ const v5Challenge = await helper . getRequest ( `${ config . V5_CHALLENGE_API_URL } /${ challengeUuid } ` , m2mToken )
93+ // TODO : Cleanup. Pulling the billingAccountId from the payload, it's not part of the challenge object
94+ messageJSON . payload = { billingAccountId : messageJSON . payload . billingAccountId , ...v5Challenge . body }
95+ } catch ( err ) {
96+ segment . addError ( new Error ( err ) ) ;
97+ logger . debug ( 'Failed to fetch challenge information' )
98+ logger . logFullError ( err )
99+ }
117100
118- segment . close ( ) ;
119- } )
101+ try {
102+ console . log ( 'Process challenge' )
103+ await ProcessorService . processMessage ( messageJSON )
104+
105+ // logger.debug('Successfully processed message')
106+ } catch ( err ) {
107+ segment . addError ( new Error ( err ) ) ;
108+ logger . error ( `Error processing message ${ JSON . stringify ( messageJSON ) } ` )
109+ logger . logFullError ( err )
110+ } finally {
111+ // Commit offset regardless of error
112+ await consumer . commitOffset ( { topic, partition, offset : m . offset } )
113+ }
120114
115+ segment . close ( ) ;
121116} )
122117
123118// check if there is kafka connection alive
@@ -135,20 +130,24 @@ const check = () => {
135130
136131const topics = [ config . CREATE_CHALLENGE_TOPIC , config . UPDATE_CHALLENGE_TOPIC ]
137132
138- consumer
139- . init ( [ {
140- subscriptions : topics ,
141- handler : dataHandler
142- } ] )
143- // consume configured topics
144- . then ( ( ) => {
145- logger . info ( 'Initialized.......' )
146- healthcheck . init ( [ check ] )
147- logger . info ( 'Adding topics successfully.......' )
148- logger . info ( topics )
149- logger . info ( 'Kick Start.......' )
133+ ( ( ) => {
134+ ns . run ( ( ) => {
135+ consumer
136+ . init ( [ {
137+ subscriptions : topics ,
138+ handler : dataHandler
139+ } ] )
140+ // consume configured topics
141+ . then ( ( ) => {
142+ logger . info ( 'Initialized.......' )
143+ healthcheck . init ( [ check ] )
144+ logger . info ( 'Adding topics successfully.......' )
145+ logger . info ( topics )
146+ logger . info ( 'Kick Start.......' )
147+ } )
148+ . catch ( ( err ) => logger . error ( err ) )
150149 } )
151- . catch ( ( err ) => logger . error ( err ) )
150+ } ) ( ) ;
152151
153152if ( process . env . NODE_ENV === 'test' ) {
154153 module . exports = consumer
0 commit comments