package org.opendaylight.controller.cluster.common.actor;
+import akka.actor.Address;
import akka.actor.Props;
-import akka.actor.UntypedActor;
+import akka.actor.UntypedAbstractActor;
+import akka.cluster.Cluster;
+import akka.cluster.ClusterEvent;
import akka.japi.Effect;
import akka.remote.AssociationErrorEvent;
-import akka.remote.InvalidAssociation;
import akka.remote.RemotingLifecycleEvent;
+import akka.remote.artery.ThisActorSystemQuarantinedEvent;
+import java.util.HashSet;
+import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* @author Gary Wu gary.wu1@huawei.com
*
*/
-public class QuarantinedMonitorActor extends UntypedActor {
+public class QuarantinedMonitorActor extends UntypedAbstractActor {
+ public static final String ADDRESS = "quarantined-monitor";
private static final Logger LOG = LoggerFactory.getLogger(QuarantinedMonitorActor.class);
-
- public static final String ADDRESS = "quarantined-monitor";
+ private static final Integer MESSAGE_THRESHOLD = 10;
private final Effect callback;
private boolean quarantined;
- protected QuarantinedMonitorActor(Effect callback) {
+ private final Set<Address> addressSet = new HashSet<>();
+ private int count = 0;
+
+ protected QuarantinedMonitorActor(final Effect callback) {
this.callback = callback;
LOG.debug("Created QuarantinedMonitorActor");
getContext().system().eventStream().subscribe(getSelf(), RemotingLifecycleEvent.class);
+ getContext().system().eventStream().subscribe(getSelf(), ClusterEvent.MemberDowned.class);
}
@Override
}
@Override
- public void onReceive(Object message) throws Exception {
+ public void onReceive(final Object message) throws Exception {
final String messageType = message.getClass().getSimpleName();
LOG.trace("onReceive {} {}", messageType, message);
// check to see if we got quarantined by another node
-
if (quarantined) {
return;
}
- // TODO: follow https://github.com/akka/akka/issues/18758 to see if Akka adds a named
- // exception for quarantine detection
- if (message instanceof AssociationErrorEvent) {
- AssociationErrorEvent event = (AssociationErrorEvent) message;
- Throwable cause = event.getCause();
- if (cause instanceof InvalidAssociation) {
- Throwable cause2 = ((InvalidAssociation) cause).getCause();
- if (cause2.getMessage().contains("quarantined this system")) {
+ if (message instanceof ThisActorSystemQuarantinedEvent) {
+ final ThisActorSystemQuarantinedEvent event = (ThisActorSystemQuarantinedEvent) message;
+ LOG.warn("Got quarantined by {}", event.remoteAddress());
+ quarantined = true;
+
+ // execute the callback
+ callback.apply();
+ } else if (message instanceof AssociationErrorEvent) {
+ final String errorMessage = message.toString();
+ LOG.trace("errorMessage:{}", errorMessage);
+ if (errorMessage.contains("The remote system has a UID that has been quarantined")) {
+ final Address address = ((AssociationErrorEvent) message).getRemoteAddress();
+ addressSet.add(address);
+ count++;
+ LOG.trace("address:{} addressSet: {} count:{}", address, addressSet, count);
+ if (count >= MESSAGE_THRESHOLD && addressSet.size() > 1) {
+ count = 0;
+ addressSet.clear();
+ final AssociationErrorEvent event = (AssociationErrorEvent) message;
+ LOG.warn("Got quarantined via AssociationEvent by {}", event.remoteAddress());
quarantined = true;
- LOG.warn("Got quarantined by {}", event.getRemoteAddress());
-
// execute the callback
callback.apply();
- } else {
- LOG.debug("received AssociationErrorEvent, cause: InvalidAssociation", cause2);
}
- } else {
- LOG.debug("received AssociationErrorEvent", cause);
+ } else if (errorMessage.contains("The remote system explicitly disassociated")) {
+ count = 0;
+ addressSet.clear();
+ }
+ } else if (message instanceof ClusterEvent.MemberDowned) {
+ final ClusterEvent.MemberDowned event = (ClusterEvent.MemberDowned) message;
+ if (Cluster.get(getContext().system()).selfMember().equals(event.member())) {
+ LOG.warn("This member has been downed, restarting");
+
+ callback.apply();
}
}
}