BUG 3156 : Recreating CDS should not fail 57/19957/6
authorMoiz Raja <moraja@cisco.com>
Thu, 7 May 2015 01:43:31 +0000 (18:43 -0700)
committerGerrit Code Review <gerrit@opendaylight.org>
Tue, 12 May 2015 21:14:50 +0000 (21:14 +0000)
When CDS is recreated it can fail because we try to
create actors on a datastore that is already terminated.

To address this issue I have enhanced DistributedDataStoreFactory
as follows,

- Simplified the actor system creation code by synchronizing
  all static methods. Since these methods are anyway not used in
  any fast paths - it is a fair enough solution.
- Added another static field in DistributedDataStoreFactory to
  track the createdInstances. This is so that we can properly
  cleanup the actor system as instances of the data store are destroyed.
- The actor system is now shutdown when there are no datastrore instances.
- Removed actor system shutdown from ActorContext to ensure that we
  use the symmetric method destroyInstance

Now that we do not shutdown actor system till both the data stores
are destroyed it may so happen that we may have a situation where we are
trying to create an actor on the old actor system with an already used name
like shardmnanager-config or shardmanager-operational. To avoid this
I have added a loop when creating shardmanager which catches the thrown
exception and retries 100 times after waiting for 100 milliseconds. This
is to give some time for the actor to properly die.

Change-Id: I3c8b2b3b21a1519610bf2ed5e1af8be93f3120ce
Signed-off-by: Moiz Raja <moraja@cisco.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/concurrent_data_broker/DomConcurrentDataBrokerModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/config/yang/config/remote_rpc_connector/RemoteRPCBrokerModule.java

index a136cc6f75392e9d95426fe786d10e9381a1a6cd..8051f7d49bab43e48ad1a82215d35b49a55281e5 100644 (file)
@@ -8,9 +8,11 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
@@ -77,10 +79,8 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
         String shardDispatcher =
                 new Dispatchers(actorSystem.dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
 
-        actorContext = new ActorContext(actorSystem, actorSystem.actorOf(
-                ShardManager.props(cluster, configuration, datastoreContext, waitTillReadyCountDownLatch)
-                        .withDispatcher(shardDispatcher).withMailbox(ActorContext.MAILBOX), shardManagerId ),
-                cluster, configuration, datastoreContext);
+        actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, cluster, configuration,
+                datastoreContext, shardDispatcher, shardManagerId ), cluster, configuration, datastoreContext);
 
         this.waitTillReadyTimeInMillis =
                 actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR;
@@ -186,11 +186,12 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
             try {
                 closeable.close();
             } catch (Exception e) {
-                LOG.debug("Error closing insance", e);
+                LOG.debug("Error closing instance", e);
             }
         }
 
         actorContext.shutdown();
+        DistributedDataStoreFactory.destroyInstance(this);
     }
 
     @VisibleForTesting
@@ -212,6 +213,25 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
         }
     }
 
+    private ActorRef createShardManager(ActorSystem actorSystem, ClusterWrapper cluster, Configuration configuration,
+                                        DatastoreContext datastoreContext, String shardDispatcher, String shardManagerId){
+        Exception lastException = null;
+
+        for(int i=0;i<100;i++) {
+            try {
+                return actorSystem.actorOf(
+                        ShardManager.props(cluster, configuration, datastoreContext, waitTillReadyCountDownLatch)
+                                .withDispatcher(shardDispatcher).withMailbox(ActorContext.MAILBOX), shardManagerId);
+            } catch (Exception e){
+                lastException = e;
+                Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+                LOG.debug(String.format("Could not create actor %s because of %s - waiting for sometime before retrying (retry count = %d)", shardManagerId, e.getMessage(), i));
+            }
+        }
+
+        throw new IllegalStateException("Failed to create Shard Manager", lastException);
+    }
+
     @VisibleForTesting
     public CountDownLatch getWaitTillReadyCountDownLatch() {
         return waitTillReadyCountDownLatch;
index 8199e33294874f729dcc886fa482c9937eb73f1e..ee8ac61ecf6da73a410a1ebc87315389dd715d42 100644 (file)
@@ -10,26 +10,36 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.osgi.BundleDelegatingClassLoader;
+import com.google.common.base.Preconditions;
 import com.typesafe.config.ConfigFactory;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.datastore.config.ConfigurationReader;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.sal.core.api.model.SchemaService;
 import org.osgi.framework.BundleContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
 
 public class DistributedDataStoreFactory {
     private static final String ACTOR_SYSTEM_NAME = "opendaylight-cluster-data";
     private static final String CONFIGURATION_NAME = "odl-cluster-data";
+    private static ActorSystem actorSystem = null;
+    private static final Set<DistributedDataStore> createdInstances = new HashSet<>(2);
+    private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStoreFactory.class);
 
-    private static volatile ActorSystem persistentActorSystem = null;
-
-    public static DistributedDataStore createInstance(SchemaService schemaService,
+    public static synchronized DistributedDataStore createInstance(SchemaService schemaService,
             DatastoreContext datastoreContext, BundleContext bundleContext) {
 
+        LOG.info("Create data store instance of type : {}", datastoreContext.getDataStoreType());
+
         DatastoreContextIntrospector introspector = new DatastoreContextIntrospector(datastoreContext);
         DatastoreContextConfigAdminOverlay overlay = new DatastoreContextConfigAdminOverlay(
                 introspector, bundleContext);
 
-        ActorSystem actorSystem = getOrCreateInstance(bundleContext, datastoreContext.getConfigurationReader());
+        ActorSystem actorSystem = getActorSystem(bundleContext, datastoreContext.getConfigurationReader());
         Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
         final DistributedDataStore dataStore = new DistributedDataStore(actorSystem,
                 new ClusterWrapperImpl(actorSystem), config, introspector.getContext());
@@ -41,28 +51,44 @@ public class DistributedDataStoreFactory {
 
         dataStore.setCloseable(overlay);
         dataStore.waitTillReady();
+
+        createdInstances.add(dataStore);
         return dataStore;
     }
 
-    private static final ActorSystem getOrCreateInstance(final BundleContext bundleContext, ConfigurationReader configurationReader) {
-        ActorSystem ret = persistentActorSystem;
-        if (ret == null) {
-            synchronized (DistributedDataStoreFactory.class) {
-                ret = persistentActorSystem;
-                if (ret == null) {
-                    // Create an OSGi bundle classloader for actor system
-                    BundleDelegatingClassLoader classLoader = new BundleDelegatingClassLoader(bundleContext.getBundle(),
-                        Thread.currentThread().getContextClassLoader());
-
-                    ret = ActorSystem.create(ACTOR_SYSTEM_NAME,
-                        ConfigFactory.load(configurationReader.read()).getConfig(CONFIGURATION_NAME), classLoader);
-                    ret.actorOf(Props.create(TerminationMonitor.class), "termination-monitor");
-
-                    persistentActorSystem = ret;
+    private static synchronized final ActorSystem getActorSystem(final BundleContext bundleContext,
+                                                                 ConfigurationReader configurationReader) {
+        if (actorSystem == null) {
+            // Create an OSGi bundle classloader for actor system
+            BundleDelegatingClassLoader classLoader = new BundleDelegatingClassLoader(bundleContext.getBundle(),
+                Thread.currentThread().getContextClassLoader());
+
+            actorSystem = ActorSystem.create(ACTOR_SYSTEM_NAME,
+                ConfigFactory.load(configurationReader.read()).getConfig(CONFIGURATION_NAME), classLoader);
+            actorSystem.actorOf(Props.create(TerminationMonitor.class), "termination-monitor");
+        }
+
+        return actorSystem;
+    }
+
+    public static synchronized void destroyInstance(DistributedDataStore dataStore){
+        Preconditions.checkNotNull(dataStore, "dataStore should not be null");
+
+        LOG.info("Destroy data store instance of type : {}", dataStore.getActorContext().getDataStoreType());
+
+        if(createdInstances.remove(dataStore)){
+            if(createdInstances.size() == 0){
+                if(actorSystem != null) {
+                    actorSystem.shutdown();
+                    try {
+                        actorSystem.awaitTermination(Duration.create(10, TimeUnit.SECONDS));
+                    } catch (Exception e) {
+                        LOG.warn("Error awaiting actor termination", e);
+                    }
+                    actorSystem = null;
                 }
             }
         }
-
-        return ret;
     }
+
 }
index 73f1a8f328d2a671c5699d770b782a860afcc7f7..18a8798c4755d80a982550ffc3fee42c9d1f9d0f 100644 (file)
@@ -378,8 +378,7 @@ public class ActorContext {
     }
 
     public void shutdown() {
-        shardManager.tell(PoisonPill.getInstance(), null);
-        actorSystem.shutdown();
+        shardManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
     }
 
     public ClusterWrapper getClusterWrapper() {
index 62da307979a1a6372601e9a857e8a0cd4bd4bd3a..6412231171db7bdc68cb980541f1a81138dad9be 100644 (file)
@@ -34,6 +34,11 @@ public class DomConcurrentDataBrokerModule extends AbstractDomConcurrentDataBrok
         super(identifier, dependencyResolver, oldModule, oldInstance);
     }
 
+    @Override
+    public boolean canReuseInstance(AbstractDomConcurrentDataBrokerModule oldModule) {
+        return true;
+    }
+
     @Override
     public AutoCloseable createInstance() {
         //Initializing Operational DOM DataStore defaulting to InMemoryDOMDataStore if one is not configured
index f78b134d42a5b6cdda965bf090ea18c64bfdea69..252788203f5972746d1259a03901b30e1278feed 100644 (file)
@@ -28,8 +28,12 @@ public class DistributedConfigDataStoreProviderModule extends
     }
 
     @Override
-    public java.lang.AutoCloseable createInstance() {
+    public boolean canReuseInstance(AbstractDistributedConfigDataStoreProviderModule oldModule) {
+        return true;
+    }
 
+    @Override
+    public java.lang.AutoCloseable createInstance() {
         ConfigProperties props = getConfigProperties();
         if(props == null) {
             props = new ConfigProperties();
index 6711a1007475d8a0cbb18ecb204a35e39491513a..08845654a5fb34ff77227eb05f8eb08ef4e51caf 100644 (file)
@@ -27,6 +27,11 @@ public class DistributedOperationalDataStoreProviderModule extends
         // add custom validation form module attributes here.
     }
 
+    @Override
+    public boolean canReuseInstance(AbstractDistributedOperationalDataStoreProviderModule oldModule) {
+        return true;
+    }
+
     @Override
     public java.lang.AutoCloseable createInstance() {
 
index c2e8125df2b5be843f2a6ed560ea3c211817c1b1..5d231417fb7058c0b3f7c0be10d927c8d48ce46b 100644 (file)
@@ -21,6 +21,11 @@ public class RemoteRPCBrokerModule extends org.opendaylight.controller.config.ya
      // add custom validation form module attributes here.
   }
 
+  @Override
+  public boolean canReuseInstance(AbstractRemoteRPCBrokerModule oldModule) {
+      return true;
+  }
+
   @Override
   public java.lang.AutoCloseable createInstance() {
     Broker broker = getDomBrokerDependency();