<feature version='${project.version}'>odl-mdsal-broker-local</feature>
<feature version='${akka.version}'>odl-akka-system</feature>
<feature version='${akka.version}'>odl-akka-persistence</feature>
+ <feature version='${akka.version}'>odl-akka-clustering</feature>
<bundle>mvn:org.opendaylight.controller/sal-clustering-commons/{{VERSION}}</bundle>
<bundle>mvn:org.opendaylight.controller/sal-akka-raft/{{VERSION}}</bundle>
<bundle>mvn:com.codahale.metrics/metrics-core/3.0.1</bundle>
<feature name ='odl-mdsal-distributed-datastore' version='${project.version}'>
<feature version='${project.version}'>odl-mdsal-broker-local</feature>
<feature version='${project.version}'>odl-mdsal-clustering-commons</feature>
- <feature version='${akka.version}'>odl-akka-clustering</feature>
<bundle>mvn:org.opendaylight.controller/sal-distributed-datastore/{{VERSION}}</bundle>
</feature>
<feature name ='odl-mdsal-remoterpc-connector' version='${project.version}'>
<feature version='${project.version}'>odl-mdsal-broker-local</feature>
<feature version='${project.version}'>odl-mdsal-clustering-commons</feature>
- <feature version='${akka.version}'>odl-akka-clustering</feature>
<feature version='0.7'>odl-akka-leveldb</feature>
<bundle>mvn:org.opendaylight.controller/sal-remoterpc-connector/{{VERSION}}</bundle>
</feature>
--- /dev/null
+/*
+ * Copyright (c) 2015 Huawei Technologies Co., Ltd. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.common.actor;
+
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.japi.Creator;
+import akka.japi.Effect;
+import akka.remote.AssociationErrorEvent;
+import akka.remote.InvalidAssociation;
+import akka.remote.RemotingLifecycleEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class listens to Akka RemotingLifecycleEvent events to detect when this node has been
+ * quarantined by another. Once this node gets quarantined, restart the ActorSystem to allow this
+ * node to rejoin the cluster.
+ *
+ * @author Gary Wu <gary.wu1@huawei.com>
+ *
+ */
+public class QuarantinedMonitorActor extends UntypedActor {
+
+ private final Logger LOG = LoggerFactory.getLogger(QuarantinedMonitorActor.class);
+
+ public static final String ADDRESS = "quarantined-monitor";
+
+ private final Effect callback;
+
+ protected QuarantinedMonitorActor(Effect callback) {
+ this.callback = callback;
+
+ LOG.debug("Created QuarantinedMonitorActor");
+
+ getContext().system().eventStream().subscribe(getSelf(), RemotingLifecycleEvent.class);
+ }
+
+ @Override
+ public void postStop() {
+ LOG.debug("Stopping QuarantinedMonitorActor");
+ }
+
+ @Override
+ public void onReceive(Object message) throws Exception {
+ final String messageType = message.getClass().getSimpleName();
+ LOG.trace("onReceive {} {}", messageType, message);
+
+ // check to see if we got quarantined by another node
+
+ // TODO: follow https://github.com/akka/akka/issues/18758 to see if Akka adds a named
+ // exception for quarantine detection
+ if (message instanceof AssociationErrorEvent) {
+ AssociationErrorEvent event = (AssociationErrorEvent) message;
+ Throwable cause = event.getCause();
+ if (cause instanceof InvalidAssociation) {
+ Throwable cause2 = ((InvalidAssociation) cause).getCause();
+ if (cause2.getMessage().contains("quarantined this system")) {
+ LOG.warn("Got quarantined by {}", event.getRemoteAddress());
+
+ // execute the callback
+ callback.apply();
+ } else {
+ LOG.debug("received AssociationErrorEvent, cause: InvalidAssociation", cause2);
+ }
+ } else {
+ LOG.warn("received AssociationErrorEvent", cause);
+ }
+ }
+ }
+
+ public static Props props(final Effect callback) {
+ return Props.create(new Creator<QuarantinedMonitorActor>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public QuarantinedMonitorActor create() throws Exception {
+ return new QuarantinedMonitorActor(callback);
+ }
+ });
+ }
+
+}
cluster {
seed-nodes = ["akka.tcp://opendaylight-cluster-data@127.0.0.1:2550"]
- auto-down-unreachable-after = 300s
+ auto-down-unreachable-after = 30s
roles = [
"member-1"
public class TerminationMonitor extends UntypedActor{
private static final Logger LOG = LoggerFactory.getLogger(TerminationMonitor.class);
+ public static final String ADDRESS = "termination-monitor";
public TerminationMonitor(){
LOG.debug("Created TerminationMonitor");
package org.opendaylight.controller.config.yang.config.actor_system_provider.impl;
import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.japi.Effect;
import akka.osgi.BundleDelegatingClassLoader;
+import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.ActorSystemProvider;
import org.opendaylight.controller.cluster.ActorSystemProviderListener;
import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
import org.opendaylight.controller.cluster.common.actor.FileAkkaConfigurationReader;
+import org.opendaylight.controller.cluster.common.actor.QuarantinedMonitorActor;
+import org.opendaylight.controller.cluster.datastore.TerminationMonitor;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.util.ListenerRegistry;
+import org.osgi.framework.Bundle;
import org.osgi.framework.BundleContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final String ACTOR_SYSTEM_NAME = "opendaylight-cluster-data";
private static final String CONFIGURATION_NAME = "odl-cluster-data";
static final Logger LOG = LoggerFactory.getLogger(ActorSystemProviderImpl.class);
-
- private ActorSystem actorSystem;
- private final BundleDelegatingClassLoader classLoader;
+ private final ActorSystem actorSystem;
private final ListenerRegistry<ActorSystemProviderListener> listeners = new ListenerRegistry<>();
- public ActorSystemProviderImpl(BundleContext bundleContext) {
+ public ActorSystemProviderImpl(final BundleContext bundleContext) {
LOG.info("Creating new ActorSystem");
- classLoader = new BundleDelegatingClassLoader(bundleContext.getBundle(),
- Thread.currentThread().getContextClassLoader());
+ final Bundle bundle = bundleContext.getBundle();
- createActorSystem();
- }
+ final BundleDelegatingClassLoader classLoader = new BundleDelegatingClassLoader(bundle, Thread.currentThread()
+ .getContextClassLoader());
+
+ final AkkaConfigurationReader configurationReader = new FileAkkaConfigurationReader();
+ final Config akkaConfig = ConfigFactory.load(configurationReader.read()).getConfig(CONFIGURATION_NAME);
+
+ actorSystem = ActorSystem.create(ACTOR_SYSTEM_NAME, akkaConfig, classLoader);
+
+ actorSystem.actorOf(Props.create(TerminationMonitor.class), TerminationMonitor.ADDRESS);
+
+ actorSystem.actorOf(QuarantinedMonitorActor.props(new Effect() {
+
+ @Override
+ public void apply() throws Exception {
+ // restart the entire karaf container
+ LOG.warn("Restarting karaf container");
+ System.setProperty("karaf.restart", "true");
+ bundleContext.getBundle(0).stop();
+ }
+ }), QuarantinedMonitorActor.ADDRESS);
- private void createActorSystem() {
- AkkaConfigurationReader configurationReader = new FileAkkaConfigurationReader();
- actorSystem = ActorSystem.create(ACTOR_SYSTEM_NAME,
- ConfigFactory.load(configurationReader.read()).getConfig(CONFIGURATION_NAME), classLoader);
}
@Override
LOG.warn("Error awaiting actor termination", e);
}
}
-}
\ No newline at end of file
+}