As per the comments, upstream has provided a dedicated event, hence
use that instead of digging inside akka internals.
Change-Id: I4731dfbbdd228d562ddd32ec5fd3d0e9af0855d0
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.japi.Effect;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.japi.Effect;
-import akka.remote.AssociationErrorEvent;
-import akka.remote.InvalidAssociation;
-import akka.remote.RemotingLifecycleEvent;
+import akka.remote.ThisActorSystemQuarantinedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final Effect callback;
private boolean quarantined;
private final Effect callback;
private boolean quarantined;
- protected QuarantinedMonitorActor(Effect callback) {
+ protected QuarantinedMonitorActor(final Effect callback) {
this.callback = callback;
LOG.debug("Created QuarantinedMonitorActor");
this.callback = callback;
LOG.debug("Created QuarantinedMonitorActor");
- getContext().system().eventStream().subscribe(getSelf(), RemotingLifecycleEvent.class);
+ getContext().system().eventStream().subscribe(getSelf(), ThisActorSystemQuarantinedEvent.class);
- public void onReceive(Object message) throws Exception {
+ public void onReceive(final Object message) throws Exception {
final String messageType = message.getClass().getSimpleName();
LOG.trace("onReceive {} {}", messageType, message);
// check to see if we got quarantined by another node
final String messageType = message.getClass().getSimpleName();
LOG.trace("onReceive {} {}", messageType, message);
// check to see if we got quarantined by another node
if (quarantined) {
return;
}
if (quarantined) {
return;
}
- // TODO: follow https://github.com/akka/akka/issues/18758 to see if Akka adds a named
- // exception for quarantine detection
- if (message instanceof AssociationErrorEvent) {
- AssociationErrorEvent event = (AssociationErrorEvent) message;
- Throwable cause = event.getCause();
- if (cause instanceof InvalidAssociation) {
- Throwable cause2 = ((InvalidAssociation) cause).getCause();
- if (cause2.getMessage().contains("quarantined this system")) {
- quarantined = true;
-
- LOG.warn("Got quarantined by {}", event.getRemoteAddress());
+ if (message instanceof ThisActorSystemQuarantinedEvent) {
+ final ThisActorSystemQuarantinedEvent event = (ThisActorSystemQuarantinedEvent) message;
+ LOG.warn("Got quarantined by {}", event.remoteAddress());
+ quarantined = true;
- // execute the callback
- callback.apply();
- } else {
- LOG.debug("received AssociationErrorEvent, cause: InvalidAssociation", cause2);
- }
- } else {
- LOG.debug("received AssociationErrorEvent", cause);
- }
+ // execute the callback
+ callback.apply();
import akka.japi.Effect;
import akka.remote.AssociationErrorEvent;
import akka.remote.InvalidAssociation;
import akka.japi.Effect;
import akka.remote.AssociationErrorEvent;
import akka.remote.InvalidAssociation;
+import akka.remote.ThisActorSystemQuarantinedEvent;
import akka.testkit.JavaTestKit;
import org.junit.After;
import org.junit.Before;
import akka.testkit.JavaTestKit;
import org.junit.After;
import org.junit.Before;
public void testOnReceiveQuarantined() throws Exception {
final Throwable t = new RuntimeException("Remote has quarantined this system");
final InvalidAssociation cause = InvalidAssociation.apply(LOCAL, REMOTE, t, Option.apply(null));
public void testOnReceiveQuarantined() throws Exception {
final Throwable t = new RuntimeException("Remote has quarantined this system");
final InvalidAssociation cause = InvalidAssociation.apply(LOCAL, REMOTE, t, Option.apply(null));
- final AssociationErrorEvent event = new AssociationErrorEvent(cause, LOCAL, REMOTE, true, Logging.ErrorLevel());
+ final ThisActorSystemQuarantinedEvent event = new ThisActorSystemQuarantinedEvent(LOCAL, REMOTE);
actor.tell(event, ActorRef.noSender());
verify(callback, timeout(1000)).apply();
}
actor.tell(event, ActorRef.noSender());
verify(callback, timeout(1000)).apply();
}