Switch to Akka Artery
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / common / actor / QuarantinedMonitorActor.java
index 9cb592a4c7713f5d533870c92af38d0e0dfdf4e6..dfafb82b6172f1da6814e9e3ee6001e758562651 100644 (file)
@@ -8,13 +8,15 @@
 
 package org.opendaylight.controller.cluster.common.actor;
 
+import akka.actor.Address;
 import akka.actor.Props;
-import akka.actor.UntypedActor;
-import akka.japi.Creator;
+import akka.actor.UntypedAbstractActor;
 import akka.japi.Effect;
 import akka.remote.AssociationErrorEvent;
-import akka.remote.InvalidAssociation;
 import akka.remote.RemotingLifecycleEvent;
+import akka.remote.artery.ThisActorSystemQuarantinedEvent;
+import java.util.HashSet;
+import java.util.Set;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -26,16 +28,19 @@ import org.slf4j.LoggerFactory;
  * @author Gary Wu gary.wu1@huawei.com
  *
  */
-public class QuarantinedMonitorActor extends UntypedActor {
+public class QuarantinedMonitorActor extends UntypedAbstractActor {
+    public static final String ADDRESS = "quarantined-monitor";
 
     private static final Logger LOG = LoggerFactory.getLogger(QuarantinedMonitorActor.class);
-
-    public static final String ADDRESS = "quarantined-monitor";
+    private static final Integer MESSAGE_THRESHOLD = 10;
 
     private final Effect callback;
     private boolean quarantined;
 
-    protected QuarantinedMonitorActor(Effect callback) {
+    private Set<Address> addressSet = new HashSet<>();
+    private int count = 0;
+
+    protected QuarantinedMonitorActor(final Effect callback) {
         this.callback = callback;
 
         LOG.debug("Created QuarantinedMonitorActor");
@@ -49,48 +54,48 @@ public class QuarantinedMonitorActor extends UntypedActor {
     }
 
     @Override
-    public void onReceive(Object message) throws Exception {
+    public void onReceive(final Object message) throws Exception {
         final String messageType = message.getClass().getSimpleName();
         LOG.trace("onReceive {} {}", messageType, message);
 
         // check to see if we got quarantined by another node
-
         if (quarantined) {
             return;
         }
 
-        // TODO: follow https://github.com/akka/akka/issues/18758 to see if Akka adds a named
-        // exception for quarantine detection
-        if (message instanceof AssociationErrorEvent) {
-            AssociationErrorEvent event = (AssociationErrorEvent) message;
-            Throwable cause = event.getCause();
-            if (cause instanceof InvalidAssociation) {
-                Throwable cause2 = ((InvalidAssociation) cause).getCause();
-                if (cause2.getMessage().contains("quarantined this system")) {
+        if (message instanceof ThisActorSystemQuarantinedEvent) {
+            final ThisActorSystemQuarantinedEvent event = (ThisActorSystemQuarantinedEvent) message;
+            LOG.warn("Got quarantined by {}", event.remoteAddress());
+            quarantined = true;
+
+            // execute the callback
+            callback.apply();
+        } else  if (message instanceof AssociationErrorEvent) {
+            String errorMessage = message.toString();
+            LOG.trace("errorMessage:{}", errorMessage);
+            if (errorMessage.contains("The remote system has a UID that has been quarantined")) {
+                Address address = ((AssociationErrorEvent) message).getRemoteAddress();
+                addressSet.add(address);
+                count++;
+                LOG.trace("address:{} addressSet: {} count:{}", address, addressSet, count);
+                if (count >= MESSAGE_THRESHOLD && addressSet.size() > 1) {
+                    count = 0;
+                    addressSet.clear();
+                    final AssociationErrorEvent event = (AssociationErrorEvent) message;
+                    LOG.warn("Got quarantined via AssociationEvent by {}", event.remoteAddress());
                     quarantined = true;
 
-                    LOG.warn("Got quarantined by {}", event.getRemoteAddress());
-
                     // execute the callback
                     callback.apply();
-                } else {
-                    LOG.debug("received AssociationErrorEvent, cause: InvalidAssociation", cause2);
                 }
-            } else {
-                LOG.debug("received AssociationErrorEvent", cause);
+            } else if (errorMessage.contains("The remote system explicitly disassociated")) {
+                count = 0;
+                addressSet.clear();
             }
         }
     }
 
     public static Props props(final Effect callback) {
-        return Props.create(new Creator<QuarantinedMonitorActor>() {
-            private static final long serialVersionUID = 1L;
-
-            @Override
-            public QuarantinedMonitorActor create() throws Exception {
-                return new QuarantinedMonitorActor(callback);
-            }
-        });
+        return Props.create(QuarantinedMonitorActor.class, callback);
     }
-
 }