import akka.actor.Address;
import akka.actor.Props;
import akka.actor.UntypedAbstractActor;
+import akka.cluster.Cluster;
+import akka.cluster.ClusterEvent;
import akka.japi.Effect;
import akka.remote.AssociationErrorEvent;
import akka.remote.RemotingLifecycleEvent;
-import akka.remote.ThisActorSystemQuarantinedEvent;
+import akka.remote.artery.ThisActorSystemQuarantinedEvent;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.HashSet;
import java.util.Set;
import org.slf4j.Logger;
private final Effect callback;
private boolean quarantined;
- private Set<Address> addressSet = new HashSet<>();
+ private final Set<Address> addressSet = new HashSet<>();
private int count = 0;
+ @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "Akka class design")
protected QuarantinedMonitorActor(final Effect callback) {
this.callback = callback;
LOG.debug("Created QuarantinedMonitorActor");
getContext().system().eventStream().subscribe(getSelf(), RemotingLifecycleEvent.class);
+ getContext().system().eventStream().subscribe(getSelf(), ClusterEvent.MemberDowned.class);
}
@Override
return;
}
- if (message instanceof ThisActorSystemQuarantinedEvent) {
- final ThisActorSystemQuarantinedEvent event = (ThisActorSystemQuarantinedEvent) message;
+ if (message instanceof ThisActorSystemQuarantinedEvent event) {
LOG.warn("Got quarantined by {}", event.remoteAddress());
quarantined = true;
// execute the callback
callback.apply();
- } else if (message instanceof AssociationErrorEvent) {
- String errorMessage = message.toString();
+ } else if (message instanceof AssociationErrorEvent event) {
+ final String errorMessage = message.toString();
LOG.trace("errorMessage:{}", errorMessage);
if (errorMessage.contains("The remote system has a UID that has been quarantined")) {
- Address address = ((AssociationErrorEvent) message).getRemoteAddress();
+ final Address address = event.getRemoteAddress();
addressSet.add(address);
count++;
LOG.trace("address:{} addressSet: {} count:{}", address, addressSet, count);
if (count >= MESSAGE_THRESHOLD && addressSet.size() > 1) {
count = 0;
addressSet.clear();
- final AssociationErrorEvent event = (AssociationErrorEvent) message;
LOG.warn("Got quarantined via AssociationEvent by {}", event.remoteAddress());
quarantined = true;
count = 0;
addressSet.clear();
}
+ } else if (message instanceof ClusterEvent.MemberDowned event) {
+ if (Cluster.get(getContext().system()).selfMember().equals(event.member())) {
+ LOG.warn("This member has been downed, restarting");
+
+ callback.apply();
+ }
}
}