From: Gary Wu Date: Fri, 2 Oct 2015 20:12:43 +0000 (-0700) Subject: Bug 4037: Allow auto-downed node to rejoin cluster X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=fcc138511afe71502ac092f297c618167e01d80e Bug 4037: Allow auto-downed node to rejoin cluster This patch will detect when a node has been auto-downed/quarantined by another node. When this happens, the ActorSystem of the datastore will be restarted to allow the node to rejoin the cluster. Change-Id: I0913bf455d426b6a0fccb17eac61b74f0911fa5d Signed-off-by: Gary Wu --- diff --git a/features/mdsal/src/main/features/features.xml b/features/mdsal/src/main/features/features.xml index 269eddd8aa..ebfdad94e4 100644 --- a/features/mdsal/src/main/features/features.xml +++ b/features/mdsal/src/main/features/features.xml @@ -70,6 +70,7 @@ odl-mdsal-broker-local odl-akka-system odl-akka-persistence + odl-akka-clustering mvn:org.opendaylight.controller/sal-clustering-commons/{{VERSION}} mvn:org.opendaylight.controller/sal-akka-raft/{{VERSION}} mvn:com.codahale.metrics/metrics-core/3.0.1 @@ -77,13 +78,11 @@ odl-mdsal-broker-local odl-mdsal-clustering-commons - odl-akka-clustering mvn:org.opendaylight.controller/sal-distributed-datastore/{{VERSION}} odl-mdsal-broker-local odl-mdsal-clustering-commons - odl-akka-clustering odl-akka-leveldb mvn:org.opendaylight.controller/sal-remoterpc-connector/{{VERSION}} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/QuarantinedMonitorActor.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/QuarantinedMonitorActor.java new file mode 100644 index 0000000000..9d79e0f5e1 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/QuarantinedMonitorActor.java @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2015 Huawei Technologies Co., Ltd. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.common.actor; + +import akka.actor.Props; +import akka.actor.UntypedActor; +import akka.japi.Creator; +import akka.japi.Effect; +import akka.remote.AssociationErrorEvent; +import akka.remote.InvalidAssociation; +import akka.remote.RemotingLifecycleEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class listens to Akka RemotingLifecycleEvent events to detect when this node has been + * quarantined by another. Once this node gets quarantined, restart the ActorSystem to allow this + * node to rejoin the cluster. + * + * @author Gary Wu + * + */ +public class QuarantinedMonitorActor extends UntypedActor { + + private final Logger LOG = LoggerFactory.getLogger(QuarantinedMonitorActor.class); + + public static final String ADDRESS = "quarantined-monitor"; + + private final Effect callback; + + protected QuarantinedMonitorActor(Effect callback) { + this.callback = callback; + + LOG.debug("Created QuarantinedMonitorActor"); + + getContext().system().eventStream().subscribe(getSelf(), RemotingLifecycleEvent.class); + } + + @Override + public void postStop() { + LOG.debug("Stopping QuarantinedMonitorActor"); + } + + @Override + public void onReceive(Object message) throws Exception { + final String messageType = message.getClass().getSimpleName(); + LOG.trace("onReceive {} {}", messageType, message); + + // check to see if we got quarantined by another node + + // TODO: follow https://github.com/akka/akka/issues/18758 to see if Akka adds a named + // exception for quarantine detection + if (message instanceof AssociationErrorEvent) { + AssociationErrorEvent event = (AssociationErrorEvent) message; + Throwable cause = event.getCause(); + if (cause instanceof InvalidAssociation) { + Throwable cause2 = ((InvalidAssociation) cause).getCause(); + if (cause2.getMessage().contains("quarantined this system")) { + LOG.warn("Got quarantined by {}", event.getRemoteAddress()); + + // execute the callback + callback.apply(); + } else { + LOG.debug("received AssociationErrorEvent, cause: InvalidAssociation", cause2); + } + } else { + LOG.warn("received AssociationErrorEvent", cause); + } + } + } + + public static Props props(final Effect callback) { + return Props.create(new Creator() { + private static final long serialVersionUID = 1L; + + @Override + public QuarantinedMonitorActor create() throws Exception { + return new QuarantinedMonitorActor(callback); + } + }); + } + +} diff --git a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf index 5a6dbe5879..96f97cd0c8 100644 --- a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf +++ b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf @@ -52,7 +52,7 @@ odl-cluster-data { cluster { seed-nodes = ["akka.tcp://opendaylight-cluster-data@127.0.0.1:2550"] - auto-down-unreachable-after = 300s + auto-down-unreachable-after = 30s roles = [ "member-1" diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TerminationMonitor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TerminationMonitor.java index 6dd0ab1230..ab06ff10a1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TerminationMonitor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TerminationMonitor.java @@ -16,6 +16,7 @@ import org.slf4j.LoggerFactory; public class TerminationMonitor extends UntypedActor{ private static final Logger LOG = LoggerFactory.getLogger(TerminationMonitor.class); + public static final String ADDRESS = "termination-monitor"; public TerminationMonitor(){ LOG.debug("Created TerminationMonitor"); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/actor_system_provider/impl/ActorSystemProviderImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/actor_system_provider/impl/ActorSystemProviderImpl.java index 22d728dd75..5970ea47e5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/actor_system_provider/impl/ActorSystemProviderImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/actor_system_provider/impl/ActorSystemProviderImpl.java @@ -8,15 +8,21 @@ package org.opendaylight.controller.config.yang.config.actor_system_provider.impl; import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.japi.Effect; import akka.osgi.BundleDelegatingClassLoader; +import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.ActorSystemProvider; import org.opendaylight.controller.cluster.ActorSystemProviderListener; import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader; import org.opendaylight.controller.cluster.common.actor.FileAkkaConfigurationReader; +import org.opendaylight.controller.cluster.common.actor.QuarantinedMonitorActor; +import org.opendaylight.controller.cluster.datastore.TerminationMonitor; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.util.ListenerRegistry; +import org.osgi.framework.Bundle; import org.osgi.framework.BundleContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,24 +32,35 @@ public class ActorSystemProviderImpl implements ActorSystemProvider, AutoCloseab private static final String ACTOR_SYSTEM_NAME = "opendaylight-cluster-data"; private static final String CONFIGURATION_NAME = "odl-cluster-data"; static final Logger LOG = LoggerFactory.getLogger(ActorSystemProviderImpl.class); - - private ActorSystem actorSystem; - private final BundleDelegatingClassLoader classLoader; + private final ActorSystem actorSystem; private final ListenerRegistry listeners = new ListenerRegistry<>(); - public ActorSystemProviderImpl(BundleContext bundleContext) { + public ActorSystemProviderImpl(final BundleContext bundleContext) { LOG.info("Creating new ActorSystem"); - classLoader = new BundleDelegatingClassLoader(bundleContext.getBundle(), - Thread.currentThread().getContextClassLoader()); + final Bundle bundle = bundleContext.getBundle(); - createActorSystem(); - } + final BundleDelegatingClassLoader classLoader = new BundleDelegatingClassLoader(bundle, Thread.currentThread() + .getContextClassLoader()); + + final AkkaConfigurationReader configurationReader = new FileAkkaConfigurationReader(); + final Config akkaConfig = ConfigFactory.load(configurationReader.read()).getConfig(CONFIGURATION_NAME); + + actorSystem = ActorSystem.create(ACTOR_SYSTEM_NAME, akkaConfig, classLoader); + + actorSystem.actorOf(Props.create(TerminationMonitor.class), TerminationMonitor.ADDRESS); + + actorSystem.actorOf(QuarantinedMonitorActor.props(new Effect() { + + @Override + public void apply() throws Exception { + // restart the entire karaf container + LOG.warn("Restarting karaf container"); + System.setProperty("karaf.restart", "true"); + bundleContext.getBundle(0).stop(); + } + }), QuarantinedMonitorActor.ADDRESS); - private void createActorSystem() { - AkkaConfigurationReader configurationReader = new FileAkkaConfigurationReader(); - actorSystem = ActorSystem.create(ACTOR_SYSTEM_NAME, - ConfigFactory.load(configurationReader.read()).getConfig(CONFIGURATION_NAME), classLoader); } @Override @@ -68,4 +85,4 @@ public class ActorSystemProviderImpl implements ActorSystemProvider, AutoCloseab LOG.warn("Error awaiting actor termination", e); } } -} \ No newline at end of file +}