Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions common/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
akka {
actor {
default-dispatcher {
# Configurable thread pool
thread-pool-executor {
fixed-pool-size = 32
}
}

# Deployment configuration
deployment {
/parent-actor {
router = round-robin-pool
nr-of-instances = 5
}
}
}

# Cluster configuration
cluster {
min-nr-of-members = 2
role {
frontend.min-nr-of-members = 1
backend.min-nr-of-members = 2
}
}
}
25 changes: 15 additions & 10 deletions parent-child/src/main/java/org/royrusso/actor/ParentActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,21 @@ public ParentActor() {

@Override
public void onReceive(Object msg) throws Exception {

log.info("Received Command: " + msg);

if (msg instanceof Command) {
final String data = ((Command) msg).getData();
final Event event = new Event(data, UUID.randomUUID().toString());

childActor.tell(event, getSelf());
} else if (msg.equals("echo")) {
log.info("ECHO!");
try {
log.info("Received Command: " + msg);

if (msg instanceof Command) {
final String data = ((Command) msg).getData();
final Event event = new Event(data, UUID.randomUUID().toString());
childActor.tell(event, getSelf());
} else if (msg.equals("echo")) {
log.info("ECHO!");
} else {
unhandled(msg); // Handle unknown messages
}
} catch (Exception e) {
log.error("Error processing message: " + msg, e);
throw e;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import akka.persistence.PersistentChannelSettings;
import org.royrusso.command.ChannelReply;
import scala.concurrent.duration.Duration;
import akka.pattern.CircuitBreaker;

import java.util.concurrent.TimeUnit;

Expand All @@ -42,9 +43,20 @@ public class BaseProcessor extends UntypedActor {

private ActorRef receiver;
private ActorRef channel;
private final CircuitBreaker circuitBreaker;

public BaseProcessor(ActorRef receiver) {
this.receiver = receiver;

// Configure circuit breaker
this.circuitBreaker = new CircuitBreaker(
getContext().dispatcher(),
getContext().system().scheduler(),
5, // max failures
Duration.create(10, TimeUnit.SECONDS), // call timeout
Duration.create(1, TimeUnit.MINUTES) // reset timeout
);

this.channel = getContext().actorOf(PersistentChannel.props(
PersistentChannelSettings.create()
.withRedeliverInterval(Duration.create(30, TimeUnit.SECONDS))
Expand Down