diff --git a/common/src/main/resources/reference.conf b/common/src/main/resources/reference.conf new file mode 100644 index 0000000..4e928ff --- /dev/null +++ b/common/src/main/resources/reference.conf @@ -0,0 +1,27 @@ +akka { + actor { + default-dispatcher { + # Configurable thread pool + thread-pool-executor { + fixed-pool-size = 32 + } + } + + # Deployment configuration + deployment { + /parent-actor { + router = round-robin-pool + nr-of-instances = 5 + } + } + } + + # Cluster configuration + cluster { + min-nr-of-members = 2 + role { + frontend.min-nr-of-members = 1 + backend.min-nr-of-members = 2 + } + } +} \ No newline at end of file diff --git a/parent-child/src/main/java/org/royrusso/actor/ParentActor.java b/parent-child/src/main/java/org/royrusso/actor/ParentActor.java index 82c2f98..65c306c 100644 --- a/parent-child/src/main/java/org/royrusso/actor/ParentActor.java +++ b/parent-child/src/main/java/org/royrusso/actor/ParentActor.java @@ -47,16 +47,21 @@ public ParentActor() { @Override public void onReceive(Object msg) throws Exception { - - log.info("Received Command: " + msg); - - if (msg instanceof Command) { - final String data = ((Command) msg).getData(); - final Event event = new Event(data, UUID.randomUUID().toString()); - - childActor.tell(event, getSelf()); - } else if (msg.equals("echo")) { - log.info("ECHO!"); + try { + log.info("Received Command: " + msg); + + if (msg instanceof Command) { + final String data = ((Command) msg).getData(); + final Event event = new Event(data, UUID.randomUUID().toString()); + childActor.tell(event, getSelf()); + } else if (msg.equals("echo")) { + log.info("ECHO!"); + } else { + unhandled(msg); // Handle unknown messages + } + } catch (Exception e) { + log.error("Error processing message: " + msg, e); + throw e; } } } diff --git a/persistent-channel/src/main/java/org/royrusso/actor/BaseProcessor.java b/persistent-channel/src/main/java/org/royrusso/actor/BaseProcessor.java index 83f7eb4..7e05664 100644 --- a/persistent-channel/src/main/java/org/royrusso/actor/BaseProcessor.java +++ b/persistent-channel/src/main/java/org/royrusso/actor/BaseProcessor.java @@ -28,6 +28,7 @@ import akka.persistence.PersistentChannelSettings; import org.royrusso.command.ChannelReply; import scala.concurrent.duration.Duration; +import akka.pattern.CircuitBreaker; import java.util.concurrent.TimeUnit; @@ -42,9 +43,20 @@ public class BaseProcessor extends UntypedActor { private ActorRef receiver; private ActorRef channel; + private final CircuitBreaker circuitBreaker; public BaseProcessor(ActorRef receiver) { this.receiver = receiver; + + // Configure circuit breaker + this.circuitBreaker = new CircuitBreaker( + getContext().dispatcher(), + getContext().system().scheduler(), + 5, // max failures + Duration.create(10, TimeUnit.SECONDS), // call timeout + Duration.create(1, TimeUnit.MINUTES) // reset timeout + ); + this.channel = getContext().actorOf(PersistentChannel.props( PersistentChannelSettings.create() .withRedeliverInterval(Duration.create(30, TimeUnit.SECONDS))