Bug 4037: Allow auto-downed node to rejoin cluster 52/27852/8
authorGary Wu <Gary.Wu1@huawei.com>
Fri, 2 Oct 2015 20:12:43 +0000 (13:12 -0700)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 23 Oct 2015 18:13:32 +0000 (18:13 +0000)
This patch will detect when a node has been
auto-downed/quarantined by another node. When this
happens, the ActorSystem of the datastore will be
restarted to allow the node to rejoin the cluster.

Change-Id: I0913bf455d426b6a0fccb17eac61b74f0911fa5d
Signed-off-by: Gary Wu <Gary.Wu1@huawei.com>
features/mdsal/src/main/features/features.xml
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/QuarantinedMonitorActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TerminationMonitor.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/actor_system_provider/impl/ActorSystemProviderImpl.java

index 269eddd8aa0d078332ba072fe04454190f1248b9..ebfdad94e48580ba0300fe523275e9708efd1e1e 100644 (file)
@@ -70,6 +70,7 @@
         <feature version='${project.version}'>odl-mdsal-broker-local</feature>
         <feature version='${akka.version}'>odl-akka-system</feature>
         <feature version='${akka.version}'>odl-akka-persistence</feature>
         <feature version='${project.version}'>odl-mdsal-broker-local</feature>
         <feature version='${akka.version}'>odl-akka-system</feature>
         <feature version='${akka.version}'>odl-akka-persistence</feature>
+        <feature version='${akka.version}'>odl-akka-clustering</feature>
         <bundle>mvn:org.opendaylight.controller/sal-clustering-commons/{{VERSION}}</bundle>
         <bundle>mvn:org.opendaylight.controller/sal-akka-raft/{{VERSION}}</bundle>
         <bundle>mvn:com.codahale.metrics/metrics-core/3.0.1</bundle>
         <bundle>mvn:org.opendaylight.controller/sal-clustering-commons/{{VERSION}}</bundle>
         <bundle>mvn:org.opendaylight.controller/sal-akka-raft/{{VERSION}}</bundle>
         <bundle>mvn:com.codahale.metrics/metrics-core/3.0.1</bundle>
     <feature name ='odl-mdsal-distributed-datastore' version='${project.version}'>
         <feature version='${project.version}'>odl-mdsal-broker-local</feature>
         <feature version='${project.version}'>odl-mdsal-clustering-commons</feature>
     <feature name ='odl-mdsal-distributed-datastore' version='${project.version}'>
         <feature version='${project.version}'>odl-mdsal-broker-local</feature>
         <feature version='${project.version}'>odl-mdsal-clustering-commons</feature>
-        <feature version='${akka.version}'>odl-akka-clustering</feature>
         <bundle>mvn:org.opendaylight.controller/sal-distributed-datastore/{{VERSION}}</bundle>
     </feature>
     <feature name ='odl-mdsal-remoterpc-connector' version='${project.version}'>
         <feature version='${project.version}'>odl-mdsal-broker-local</feature>
         <feature version='${project.version}'>odl-mdsal-clustering-commons</feature>
         <bundle>mvn:org.opendaylight.controller/sal-distributed-datastore/{{VERSION}}</bundle>
     </feature>
     <feature name ='odl-mdsal-remoterpc-connector' version='${project.version}'>
         <feature version='${project.version}'>odl-mdsal-broker-local</feature>
         <feature version='${project.version}'>odl-mdsal-clustering-commons</feature>
-        <feature version='${akka.version}'>odl-akka-clustering</feature>
         <feature version='0.7'>odl-akka-leveldb</feature>
         <bundle>mvn:org.opendaylight.controller/sal-remoterpc-connector/{{VERSION}}</bundle>
     </feature>
         <feature version='0.7'>odl-akka-leveldb</feature>
         <bundle>mvn:org.opendaylight.controller/sal-remoterpc-connector/{{VERSION}}</bundle>
     </feature>
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/QuarantinedMonitorActor.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/QuarantinedMonitorActor.java
new file mode 100644 (file)
index 0000000..9d79e0f
--- /dev/null
@@ -0,0 +1,89 @@
+/*
+ * Copyright (c) 2015 Huawei Technologies Co., Ltd. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.common.actor;
+
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.japi.Creator;
+import akka.japi.Effect;
+import akka.remote.AssociationErrorEvent;
+import akka.remote.InvalidAssociation;
+import akka.remote.RemotingLifecycleEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class listens to Akka RemotingLifecycleEvent events to detect when this node has been
+ * quarantined by another. Once this node gets quarantined, restart the ActorSystem to allow this
+ * node to rejoin the cluster.
+ *
+ * @author Gary Wu <gary.wu1@huawei.com>
+ *
+ */
+public class QuarantinedMonitorActor extends UntypedActor {
+
+    private final Logger LOG = LoggerFactory.getLogger(QuarantinedMonitorActor.class);
+
+    public static final String ADDRESS = "quarantined-monitor";
+
+    private final Effect callback;
+
+    protected QuarantinedMonitorActor(Effect callback) {
+        this.callback = callback;
+
+        LOG.debug("Created QuarantinedMonitorActor");
+
+        getContext().system().eventStream().subscribe(getSelf(), RemotingLifecycleEvent.class);
+    }
+
+    @Override
+    public void postStop() {
+        LOG.debug("Stopping QuarantinedMonitorActor");
+    }
+
+    @Override
+    public void onReceive(Object message) throws Exception {
+        final String messageType = message.getClass().getSimpleName();
+        LOG.trace("onReceive {} {}", messageType, message);
+
+        // check to see if we got quarantined by another node
+
+        // TODO: follow https://github.com/akka/akka/issues/18758 to see if Akka adds a named
+        // exception for quarantine detection
+        if (message instanceof AssociationErrorEvent) {
+            AssociationErrorEvent event = (AssociationErrorEvent) message;
+            Throwable cause = event.getCause();
+            if (cause instanceof InvalidAssociation) {
+                Throwable cause2 = ((InvalidAssociation) cause).getCause();
+                if (cause2.getMessage().contains("quarantined this system")) {
+                    LOG.warn("Got quarantined by {}", event.getRemoteAddress());
+
+                    // execute the callback
+                    callback.apply();
+                } else {
+                    LOG.debug("received AssociationErrorEvent, cause: InvalidAssociation", cause2);
+                }
+            } else {
+                LOG.warn("received AssociationErrorEvent", cause);
+            }
+        }
+    }
+
+    public static Props props(final Effect callback) {
+        return Props.create(new Creator<QuarantinedMonitorActor>() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public QuarantinedMonitorActor create() throws Exception {
+                return new QuarantinedMonitorActor(callback);
+            }
+        });
+    }
+
+}
index 5a6dbe587984c5310e141c79bea8d1e08f457b87..96f97cd0c8772dfd8889fd6ca7ec40393d424af3 100644 (file)
@@ -52,7 +52,7 @@ odl-cluster-data {
     cluster {
       seed-nodes = ["akka.tcp://opendaylight-cluster-data@127.0.0.1:2550"]
 
     cluster {
       seed-nodes = ["akka.tcp://opendaylight-cluster-data@127.0.0.1:2550"]
 
-      auto-down-unreachable-after = 300s
+      auto-down-unreachable-after = 30s
 
       roles = [
         "member-1"
 
       roles = [
         "member-1"
index 6dd0ab123006a7180722ed456e3202319cf19a15..ab06ff10a182c606c34d71f26893a5fbc1a2de5c 100644 (file)
@@ -16,6 +16,7 @@ import org.slf4j.LoggerFactory;
 
 public class TerminationMonitor extends UntypedActor{
     private static final Logger LOG = LoggerFactory.getLogger(TerminationMonitor.class);
 
 public class TerminationMonitor extends UntypedActor{
     private static final Logger LOG = LoggerFactory.getLogger(TerminationMonitor.class);
+    public static final String ADDRESS = "termination-monitor";
 
     public TerminationMonitor(){
         LOG.debug("Created TerminationMonitor");
 
     public TerminationMonitor(){
         LOG.debug("Created TerminationMonitor");
index 22d728dd75df7130b6c7f63ea4791004e2e21cb2..5970ea47e5413d4c76c19e212632589f60341351 100644 (file)
@@ -8,15 +8,21 @@
 package org.opendaylight.controller.config.yang.config.actor_system_provider.impl;
 
 import akka.actor.ActorSystem;
 package org.opendaylight.controller.config.yang.config.actor_system_provider.impl;
 
 import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.japi.Effect;
 import akka.osgi.BundleDelegatingClassLoader;
 import akka.osgi.BundleDelegatingClassLoader;
+import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.ActorSystemProvider;
 import org.opendaylight.controller.cluster.ActorSystemProviderListener;
 import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
 import org.opendaylight.controller.cluster.common.actor.FileAkkaConfigurationReader;
 import com.typesafe.config.ConfigFactory;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.ActorSystemProvider;
 import org.opendaylight.controller.cluster.ActorSystemProviderListener;
 import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
 import org.opendaylight.controller.cluster.common.actor.FileAkkaConfigurationReader;
+import org.opendaylight.controller.cluster.common.actor.QuarantinedMonitorActor;
+import org.opendaylight.controller.cluster.datastore.TerminationMonitor;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.util.ListenerRegistry;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.util.ListenerRegistry;
+import org.osgi.framework.Bundle;
 import org.osgi.framework.BundleContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.osgi.framework.BundleContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -26,24 +32,35 @@ public class ActorSystemProviderImpl implements ActorSystemProvider, AutoCloseab
     private static final String ACTOR_SYSTEM_NAME = "opendaylight-cluster-data";
     private static final String CONFIGURATION_NAME = "odl-cluster-data";
     static final Logger LOG = LoggerFactory.getLogger(ActorSystemProviderImpl.class);
     private static final String ACTOR_SYSTEM_NAME = "opendaylight-cluster-data";
     private static final String CONFIGURATION_NAME = "odl-cluster-data";
     static final Logger LOG = LoggerFactory.getLogger(ActorSystemProviderImpl.class);
-
-    private ActorSystem actorSystem;
-    private final BundleDelegatingClassLoader classLoader;
+    private final ActorSystem actorSystem;
     private final ListenerRegistry<ActorSystemProviderListener> listeners = new ListenerRegistry<>();
 
     private final ListenerRegistry<ActorSystemProviderListener> listeners = new ListenerRegistry<>();
 
-    public ActorSystemProviderImpl(BundleContext bundleContext) {
+    public ActorSystemProviderImpl(final BundleContext bundleContext) {
         LOG.info("Creating new ActorSystem");
 
         LOG.info("Creating new ActorSystem");
 
-        classLoader = new BundleDelegatingClassLoader(bundleContext.getBundle(),
-                Thread.currentThread().getContextClassLoader());
+        final Bundle bundle = bundleContext.getBundle();
 
 
-        createActorSystem();
-    }
+        final BundleDelegatingClassLoader classLoader = new BundleDelegatingClassLoader(bundle, Thread.currentThread()
+                .getContextClassLoader());
+
+        final AkkaConfigurationReader configurationReader = new FileAkkaConfigurationReader();
+        final Config akkaConfig = ConfigFactory.load(configurationReader.read()).getConfig(CONFIGURATION_NAME);
+
+        actorSystem = ActorSystem.create(ACTOR_SYSTEM_NAME, akkaConfig, classLoader);
+
+        actorSystem.actorOf(Props.create(TerminationMonitor.class), TerminationMonitor.ADDRESS);
+
+        actorSystem.actorOf(QuarantinedMonitorActor.props(new Effect() {
+
+            @Override
+            public void apply() throws Exception {
+                // restart the entire karaf container
+                LOG.warn("Restarting karaf container");
+                System.setProperty("karaf.restart", "true");
+                bundleContext.getBundle(0).stop();
+            }
+        }), QuarantinedMonitorActor.ADDRESS);
 
 
-    private void createActorSystem() {
-        AkkaConfigurationReader configurationReader = new FileAkkaConfigurationReader();
-        actorSystem = ActorSystem.create(ACTOR_SYSTEM_NAME,
-                ConfigFactory.load(configurationReader.read()).getConfig(CONFIGURATION_NAME), classLoader);
     }
 
     @Override
     }
 
     @Override
@@ -68,4 +85,4 @@ public class ActorSystemProviderImpl implements ActorSystemProvider, AutoCloseab
             LOG.warn("Error awaiting actor termination", e);
         }
     }
             LOG.warn("Error awaiting actor termination", e);
         }
     }
-}
\ No newline at end of file
+}