Switch to Akka Artery
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / common / actor / QuarantinedMonitorActor.java
index 8e2693e900dea8f2d0fa7bc1750eef126667d213..dfafb82b6172f1da6814e9e3ee6001e758562651 100644 (file)
@@ -8,13 +8,15 @@
 
 package org.opendaylight.controller.cluster.common.actor;
 
+import akka.actor.Address;
 import akka.actor.Props;
-import akka.actor.UntypedActor;
-import akka.japi.Creator;
+import akka.actor.UntypedAbstractActor;
 import akka.japi.Effect;
 import akka.remote.AssociationErrorEvent;
-import akka.remote.InvalidAssociation;
 import akka.remote.RemotingLifecycleEvent;
+import akka.remote.artery.ThisActorSystemQuarantinedEvent;
+import java.util.HashSet;
+import java.util.Set;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -23,18 +25,22 @@ import org.slf4j.LoggerFactory;
  * quarantined by another. Once this node gets quarantined, restart the ActorSystem to allow this
  * node to rejoin the cluster.
  *
- * @author Gary Wu <gary.wu1@huawei.com>
+ * @author Gary Wu gary.wu1@huawei.com
  *
  */
-public class QuarantinedMonitorActor extends UntypedActor {
-
-    private final Logger LOG = LoggerFactory.getLogger(QuarantinedMonitorActor.class);
-
+public class QuarantinedMonitorActor extends UntypedAbstractActor {
     public static final String ADDRESS = "quarantined-monitor";
 
+    private static final Logger LOG = LoggerFactory.getLogger(QuarantinedMonitorActor.class);
+    private static final Integer MESSAGE_THRESHOLD = 10;
+
     private final Effect callback;
+    private boolean quarantined;
+
+    private Set<Address> addressSet = new HashSet<>();
+    private int count = 0;
 
-    protected QuarantinedMonitorActor(Effect callback) {
+    protected QuarantinedMonitorActor(final Effect callback) {
         this.callback = callback;
 
         LOG.debug("Created QuarantinedMonitorActor");
@@ -48,42 +54,48 @@ public class QuarantinedMonitorActor extends UntypedActor {
     }
 
     @Override
-    public void onReceive(Object message) throws Exception {
+    public void onReceive(final Object message) throws Exception {
         final String messageType = message.getClass().getSimpleName();
         LOG.trace("onReceive {} {}", messageType, message);
 
         // check to see if we got quarantined by another node
+        if (quarantined) {
+            return;
+        }
 
-        // TODO: follow https://github.com/akka/akka/issues/18758 to see if Akka adds a named
-        // exception for quarantine detection
-        if (message instanceof AssociationErrorEvent) {
-            AssociationErrorEvent event = (AssociationErrorEvent) message;
-            Throwable cause = event.getCause();
-            if (cause instanceof InvalidAssociation) {
-                Throwable cause2 = ((InvalidAssociation) cause).getCause();
-                if (cause2.getMessage().contains("quarantined this system")) {
-                    LOG.warn("Got quarantined by {}", event.getRemoteAddress());
+        if (message instanceof ThisActorSystemQuarantinedEvent) {
+            final ThisActorSystemQuarantinedEvent event = (ThisActorSystemQuarantinedEvent) message;
+            LOG.warn("Got quarantined by {}", event.remoteAddress());
+            quarantined = true;
+
+            // execute the callback
+            callback.apply();
+        } else  if (message instanceof AssociationErrorEvent) {
+            String errorMessage = message.toString();
+            LOG.trace("errorMessage:{}", errorMessage);
+            if (errorMessage.contains("The remote system has a UID that has been quarantined")) {
+                Address address = ((AssociationErrorEvent) message).getRemoteAddress();
+                addressSet.add(address);
+                count++;
+                LOG.trace("address:{} addressSet: {} count:{}", address, addressSet, count);
+                if (count >= MESSAGE_THRESHOLD && addressSet.size() > 1) {
+                    count = 0;
+                    addressSet.clear();
+                    final AssociationErrorEvent event = (AssociationErrorEvent) message;
+                    LOG.warn("Got quarantined via AssociationEvent by {}", event.remoteAddress());
+                    quarantined = true;
 
                     // execute the callback
                     callback.apply();
-                } else {
-                    LOG.debug("received AssociationErrorEvent, cause: InvalidAssociation", cause2);
                 }
-            } else {
-                LOG.debug("received AssociationErrorEvent", cause);
+            } else if (errorMessage.contains("The remote system explicitly disassociated")) {
+                count = 0;
+                addressSet.clear();
             }
         }
     }
 
     public static Props props(final Effect callback) {
-        return Props.create(new Creator<QuarantinedMonitorActor>() {
-            private static final long serialVersionUID = 1L;
-
-            @Override
-            public QuarantinedMonitorActor create() throws Exception {
-                return new QuarantinedMonitorActor(callback);
-            }
-        });
+        return Props.create(QuarantinedMonitorActor.class, callback);
     }
-
 }