X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-clustering-commons%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fcommon%2Factor%2FQuarantinedMonitorActor.java;h=77dcba564decdfc1a031a688f19001fb7c59d9f3;hp=519fcd071c64b20baa2adcb5033c8fc38fa1f08a;hb=46fedf6c2d91ea96c453c059c3b2b7a6c16c370f;hpb=0f4c2236a45097dcac1d66415cb76aabcc4a873f diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/QuarantinedMonitorActor.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/QuarantinedMonitorActor.java index 519fcd071c..77dcba564d 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/QuarantinedMonitorActor.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/QuarantinedMonitorActor.java @@ -8,12 +8,17 @@ package org.opendaylight.controller.cluster.common.actor; +import akka.actor.Address; import akka.actor.Props; -import akka.actor.UntypedActor; +import akka.actor.UntypedAbstractActor; +import akka.cluster.Cluster; +import akka.cluster.ClusterEvent; import akka.japi.Effect; import akka.remote.AssociationErrorEvent; -import akka.remote.InvalidAssociation; import akka.remote.RemotingLifecycleEvent; +import akka.remote.artery.ThisActorSystemQuarantinedEvent; +import java.util.HashSet; +import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,21 +30,25 @@ import org.slf4j.LoggerFactory; * @author Gary Wu gary.wu1@huawei.com * */ -public class QuarantinedMonitorActor extends UntypedActor { +public class QuarantinedMonitorActor extends UntypedAbstractActor { + public static final String ADDRESS = "quarantined-monitor"; private static final Logger LOG = LoggerFactory.getLogger(QuarantinedMonitorActor.class); - - public static final String ADDRESS = "quarantined-monitor"; + private static final Integer MESSAGE_THRESHOLD = 10; private final Effect callback; private boolean quarantined; - protected QuarantinedMonitorActor(Effect callback) { + private final Set
addressSet = new HashSet<>(); + private int count = 0; + + protected QuarantinedMonitorActor(final Effect callback) { this.callback = callback; LOG.debug("Created QuarantinedMonitorActor"); getContext().system().eventStream().subscribe(getSelf(), RemotingLifecycleEvent.class); + getContext().system().eventStream().subscribe(getSelf(), ClusterEvent.MemberDowned.class); } @Override @@ -48,35 +57,50 @@ public class QuarantinedMonitorActor extends UntypedActor { } @Override - public void onReceive(Object message) throws Exception { + public void onReceive(final Object message) throws Exception { final String messageType = message.getClass().getSimpleName(); LOG.trace("onReceive {} {}", messageType, message); // check to see if we got quarantined by another node - if (quarantined) { return; } - // TODO: follow https://github.com/akka/akka/issues/18758 to see if Akka adds a named - // exception for quarantine detection - if (message instanceof AssociationErrorEvent) { - AssociationErrorEvent event = (AssociationErrorEvent) message; - Throwable cause = event.getCause(); - if (cause instanceof InvalidAssociation) { - Throwable cause2 = ((InvalidAssociation) cause).getCause(); - if (cause2.getMessage().contains("quarantined this system")) { + if (message instanceof ThisActorSystemQuarantinedEvent) { + final ThisActorSystemQuarantinedEvent event = (ThisActorSystemQuarantinedEvent) message; + LOG.warn("Got quarantined by {}", event.remoteAddress()); + quarantined = true; + + // execute the callback + callback.apply(); + } else if (message instanceof AssociationErrorEvent) { + final String errorMessage = message.toString(); + LOG.trace("errorMessage:{}", errorMessage); + if (errorMessage.contains("The remote system has a UID that has been quarantined")) { + final Address address = ((AssociationErrorEvent) message).getRemoteAddress(); + addressSet.add(address); + count++; + LOG.trace("address:{} addressSet: {} count:{}", address, addressSet, count); + if (count >= MESSAGE_THRESHOLD && addressSet.size() > 1) { + count = 0; + addressSet.clear(); + final AssociationErrorEvent event = (AssociationErrorEvent) message; + LOG.warn("Got quarantined via AssociationEvent by {}", event.remoteAddress()); quarantined = true; - LOG.warn("Got quarantined by {}", event.getRemoteAddress()); - // execute the callback callback.apply(); - } else { - LOG.debug("received AssociationErrorEvent, cause: InvalidAssociation", cause2); } - } else { - LOG.debug("received AssociationErrorEvent", cause); + } else if (errorMessage.contains("The remote system explicitly disassociated")) { + count = 0; + addressSet.clear(); + } + } else if (message instanceof ClusterEvent.MemberDowned) { + final ClusterEvent.MemberDowned event = (ClusterEvent.MemberDowned) message; + if (Cluster.get(getContext().system()).selfMember().equals(event.member())) { + LOG.warn("This member has been downed, restarting"); + + callback.apply(); } } }