Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions obp-api/src/main/resources/props/sample.props.template
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,8 @@ featured_apis=elasticSearchWarehouseV300
# rabbitmq_connector.username=obp
# rabbitmq_connector.password=obp
# rabbitmq_connector.virtual_host=/
# rabbitmq_connector.request_queue=obp_rpc_queue
# rabbitmq_connector.response_queue_prefix=obp_reply_queue
# -- RabbitMQ Adapter --------------------------------------------
#rabbitmq.adapter.enabled=false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ object RabbitMQUtils extends MdcLoggable{

private implicit val formats = code.api.util.CustomJsonFormats.nullTolerateFormats

val RPC_QUEUE_NAME: String = "obp_rpc_queue"
val RPC_REPLY_TO_QUEUE_NAME_PREFIX: String = "obp_reply_queue"
val RPC_QUEUE_NAME: String = APIUtil.getPropsValue("rabbitmq_connector.request_queue", "obp_rpc_queue")
val RPC_REPLY_TO_QUEUE_NAME_PREFIX: String = APIUtil.getPropsValue("rabbitmq_connector.response_queue_prefix", "obp_reply_queue")

class ResponseCallback(val rabbitCorrelationId: String, channel: Channel) extends DeliverCallback {

Expand Down Expand Up @@ -92,14 +92,30 @@ object RabbitMQUtils extends MdcLoggable{
val rabbitRequestJsonString: String = write(outBound) // convert OutBound to json string

val connection = RabbitMQConnectionPool.borrowConnection()
// Check if queue already exists using a temporary channel (passive declare closes channel on failure)
val queueExists = try {
val tempChannel = connection.createChannel()
try {
tempChannel.queueDeclarePassive(RPC_QUEUE_NAME)
true
} finally {
if (tempChannel.isOpen) tempChannel.close()
}
} catch {
case _: java.io.IOException => false
}

val channel = connection.createChannel() // channel is not thread safe, so we always create new channel for each message.
channel.queueDeclare(
RPC_QUEUE_NAME, // Queue name
true, // durable: non-persis, here set durable = true
false, // exclusive: non-excl4, here set exclusive = false
false, // autoDelete: delete, here set autoDelete = false
rpcQueueArgs // extra arguments,
)
// Only declare queue if it doesn't already exist (avoids argument conflicts with external adapters)
if (!queueExists) {
channel.queueDeclare(
RPC_QUEUE_NAME, // Queue name
true, // durable: non-persis, here set durable = true
false, // exclusive: non-excl4, here set exclusive = false
false, // autoDelete: delete, here set autoDelete = false
rpcQueueArgs // extra arguments,
)
}

val replyQueueName:String = channel.queueDeclare(
s"${RPC_REPLY_TO_QUEUE_NAME_PREFIX}_${messageId.replace("obp_","")}_${UUID.randomUUID.toString}", // Queue name, it will be a unique name for each queue
Expand All @@ -112,6 +128,7 @@ object RabbitMQUtils extends MdcLoggable{
val rabbitResponseJsonFuture = {
try {
logger.debug(s"${RabbitMQConnector_vOct2024.toString} outBoundJson: $messageId = $rabbitRequestJsonString")
logger.info(s"[RabbitMQ] Sending message to queue: $RPC_QUEUE_NAME, messageId: $messageId, replyTo: $replyQueueName")

val rabbitMQCorrelationId = UUID.randomUUID().toString
val rabbitMQProps = new BasicProperties.Builder()
Expand All @@ -121,6 +138,7 @@ object RabbitMQUtils extends MdcLoggable{
.replyTo(replyQueueName)
.build()
channel.basicPublish("", RPC_QUEUE_NAME, rabbitMQProps, rabbitRequestJsonString.getBytes("UTF-8"))
logger.info(s"[RabbitMQ] Message published, correlationId: $rabbitMQCorrelationId, waiting for response on: $replyQueueName")

val responseCallback = new ResponseCallback(rabbitMQCorrelationId, channel)
channel.basicConsume(replyQueueName, true, responseCallback, cancelCallback)
Expand Down