BUG-7965 Switch distributed-data backend to a separate shard
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / IntegrationTestKit.java
index 1d501e1cdb387ccfbe13859d4f3fb6c84baf607d..86c875993ae1e7697c77ff226b5cf6b719dd5e1d 100644 (file)
@@ -25,14 +25,19 @@ import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
+import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
-import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState;
+import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
@@ -40,12 +45,16 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 
 public class IntegrationTestKit extends ShardTestKit {
 
+    private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestKit.class);
+
     protected DatastoreContext.Builder datastoreContextBuilder;
     protected DatastoreSnapshot restoreFromSnapshot;
 
@@ -76,8 +85,15 @@ public class IntegrationTestKit extends ShardTestKit {
 
     public AbstractDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig,
             final boolean waitUntilLeader, final SchemaContext schemaContext, final String... shardNames) {
+        return setupDistributedDataStore(typeName, moduleShardsConfig, "modules.conf", waitUntilLeader,
+                schemaContext, shardNames);
+    }
+
+    public AbstractDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig,
+                                                       final String modulesConfig, final boolean waitUntilLeader,
+                                                       final SchemaContext schemaContext, final String... shardNames) {
         final ClusterWrapper cluster = new ClusterWrapperImpl(getSystem());
-        final Configuration config = new ConfigurationImpl(moduleShardsConfig, "modules.conf");
+        final Configuration config = new ConfigurationImpl(moduleShardsConfig, modulesConfig);
 
         datastoreContextBuilder.dataStoreName(typeName);
 
@@ -99,6 +115,52 @@ public class IntegrationTestKit extends ShardTestKit {
         return dataStore;
     }
 
+    public DistributedDataStore setupDistributedDataStoreWithoutConfig(final String typeName,
+                                                                       final SchemaContext schemaContext) {
+        final ClusterWrapper cluster = new ClusterWrapperImpl(getSystem());
+        final ConfigurationImpl configuration = new ConfigurationImpl(new EmptyModuleShardConfigProvider());
+
+        getDatastoreContextBuilder().dataStoreName(typeName);
+
+        final DatastoreContext datastoreContext = getDatastoreContextBuilder().build();
+
+        final DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
+        Mockito.doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
+        Mockito.doReturn(datastoreContext).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
+
+        final DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster,
+                configuration, mockContextFactory, restoreFromSnapshot);
+
+        dataStore.onGlobalContextUpdated(schemaContext);
+
+        datastoreContextBuilder = DatastoreContext.newBuilderFrom(datastoreContext);
+        return dataStore;
+    }
+
+    public DistributedDataStore setupDistributedDataStoreWithoutConfig(final String typeName,
+                                                                       final SchemaContext schemaContext,
+                                                                       final LogicalDatastoreType storeType) {
+        final ClusterWrapper cluster = new ClusterWrapperImpl(getSystem());
+        final ConfigurationImpl configuration = new ConfigurationImpl(new EmptyModuleShardConfigProvider());
+
+        getDatastoreContextBuilder().dataStoreName(typeName);
+
+        final DatastoreContext datastoreContext =
+                getDatastoreContextBuilder().logicalStoreType(storeType).build();
+
+        final DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
+        Mockito.doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
+        Mockito.doReturn(datastoreContext).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
+
+        final DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster,
+                configuration, mockContextFactory, restoreFromSnapshot);
+
+        dataStore.onGlobalContextUpdated(schemaContext);
+
+        datastoreContextBuilder = DatastoreContext.newBuilderFrom(datastoreContext);
+        return dataStore;
+    }
+
     public void waitUntilLeader(final ActorContext actorContext, final String... shardNames) {
         for (String shardName: shardNames) {
             ActorRef shard = findLocalShard(actorContext, shardName);
@@ -148,6 +210,19 @@ public class IntegrationTestKit extends ShardTestKit {
         return shard;
     }
 
+    public static void waitUntilShardIsDown(ActorContext actorContext, String shardName) {
+        for (int i = 0; i < 20 * 5 ; i++) {
+            LOG.debug("Waiting for shard down {}", shardName);
+            Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+            Optional<ActorRef> shardReply = actorContext.findLocalShard(shardName);
+            if (!shardReply.isPresent()) {
+                return;
+            }
+        }
+
+        throw new IllegalStateException("Shard[" + shardName + " did not shutdown in time");
+    }
+
     public static void verifyShardStats(final AbstractDataStore datastore, final String shardName,
             final ShardStatsVerifier verifier) throws Exception {
         ActorContext actorContext = datastore.getActorContext();
@@ -173,6 +248,31 @@ public class IntegrationTestKit extends ShardTestKit {
         throw lastError;
     }
 
+    public static void verifyShardState(final AbstractDataStore datastore, final String shardName,
+            final Consumer<OnDemandShardState> verifier) throws Exception {
+        ActorContext actorContext = datastore.getActorContext();
+
+        Future<ActorRef> future = actorContext.findLocalShardAsync(shardName);
+        ActorRef shardActor = Await.result(future, Duration.create(10, TimeUnit.SECONDS));
+
+        AssertionError lastError = null;
+        Stopwatch sw = Stopwatch.createStarted();
+        while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
+            OnDemandShardState shardState = (OnDemandShardState)actorContext
+                    .executeOperation(shardActor, GetOnDemandRaftState.INSTANCE);
+
+            try {
+                verifier.accept(shardState);
+                return;
+            } catch (AssertionError e) {
+                lastError = e;
+                Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+            }
+        }
+
+        throw lastError;
+    }
+
     void testWriteTransaction(final AbstractDataStore dataStore, final YangInstanceIdentifier nodePath,
             final NormalizedNode<?, ?> nodeToWrite) throws Exception {