Convert DatastoreSnapshotRestore to OSGi DS
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / MemberNode.java
index bb2ce19deec0314b5fb409fe4d9b7f85e776ae41..b3fff4c6d77307f084ffb53d003e3eb6dff44f74 100644 (file)
@@ -7,36 +7,37 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import static java.util.Objects.requireNonNull;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
-import akka.actor.Address;
 import akka.actor.AddressFromURIString;
 import akka.cluster.Cluster;
 import akka.cluster.ClusterEvent.CurrentClusterState;
 import akka.cluster.Member;
 import akka.cluster.MemberStatus;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
-import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
+import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * Class that represents a cluster member node for unit tests. It encapsulates an actor system with
@@ -47,7 +48,7 @@ import scala.concurrent.duration.Duration;
  * @author Thomas Pantelis
  */
 public class MemberNode {
-    static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
+    private static final String MEMBER_1_ADDRESS = "akka://cluster-test@127.0.0.1:2558";
 
     private IntegrationTestKit kit;
     private AbstractDataStore configDataStore;
@@ -110,6 +111,7 @@ public class MemberNode {
         fail("Member " + member + " is now down");
     }
 
+    @SuppressWarnings("checkstyle:IllegalCatch")
     public void cleanup() {
         if (!cleanedUp) {
             cleanedUp = true;
@@ -120,21 +122,25 @@ public class MemberNode {
                 operDataStore.close();
             }
 
-            IntegrationTestKit.shutdownActorSystem(kit.getSystem(), Boolean.TRUE);
+            try {
+                IntegrationTestKit.shutdownActorSystem(kit.getSystem(), Boolean.TRUE);
+            } catch (RuntimeException e) {
+                LoggerFactory.getLogger(MemberNode.class).warn("Failed to shutdown actor system", e);
+            }
         }
     }
 
     public static void verifyRaftState(final AbstractDataStore datastore, final String shardName,
             final RaftStateVerifier verifier) throws Exception {
-        ActorContext actorContext = datastore.getActorContext();
+        ActorUtils actorUtils = datastore.getActorUtils();
 
-        Future<ActorRef> future = actorContext.findLocalShardAsync(shardName);
-        ActorRef shardActor = Await.result(future, Duration.create(10, TimeUnit.SECONDS));
+        Future<ActorRef> future = actorUtils.findLocalShardAsync(shardName);
+        ActorRef shardActor = Await.result(future, FiniteDuration.create(10, TimeUnit.SECONDS));
 
         AssertionError lastError = null;
         Stopwatch sw = Stopwatch.createStarted();
         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
-            OnDemandRaftState raftState = (OnDemandRaftState)actorContext
+            OnDemandRaftState raftState = (OnDemandRaftState)actorUtils
                     .executeOperation(shardActor, GetOnDemandRaftState.INSTANCE);
 
             try {
@@ -151,10 +157,10 @@ public class MemberNode {
 
     public static void verifyRaftPeersPresent(final AbstractDataStore datastore, final String shardName,
             final String... peerMemberNames) throws Exception {
-        final Set<String> peerIds = Sets.newHashSet();
+        final Set<String> peerIds = new HashSet<>();
         for (String p: peerMemberNames) {
             peerIds.add(ShardIdentifier.create(shardName, MemberName.forName(p),
-                datastore.getActorContext().getDataStoreName()).toString());
+                datastore.getActorUtils().getDataStoreName()).toString());
         }
 
         verifyRaftState(datastore, shardName, raftState -> assertEquals("Peers for shard " + shardName, peerIds,
@@ -164,7 +170,7 @@ public class MemberNode {
     public static void verifyNoShardPresent(final AbstractDataStore datastore, final String shardName) {
         Stopwatch sw = Stopwatch.createStarted();
         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
-            Optional<ActorRef> shardReply = datastore.getActorContext().findLocalShard(shardName);
+            Optional<ActorRef> shardReply = datastore.getActorUtils().findLocalShard(shardName);
             if (!shardReply.isPresent()) {
                 return;
             }
@@ -179,9 +185,10 @@ public class MemberNode {
         private final List<MemberNode> members;
         private String moduleShardsConfig;
         private String akkaConfig;
+        private boolean useAkkaArtery = true;
         private String[] waitForshardLeader = new String[0];
         private String testName;
-        private SchemaContext schemaContext;
+        private EffectiveModelContext schemaContext;
         private boolean createOperDatastore = true;
         private DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
                 .shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(30);
@@ -210,6 +217,16 @@ public class MemberNode {
             return this;
         }
 
+        /**
+         * Specifies whether or not to use akka artery for remoting. Default is true.
+         *
+         * @return this Builder
+         */
+        public Builder useAkkaArtery(final boolean newUseAkkaArtery) {
+            this.useAkkaArtery = newUseAkkaArtery;
+            return this;
+        }
+
         /**
          * Specifies the name of the test that is appended to the data store names. This is required.
          *
@@ -245,7 +262,7 @@ public class MemberNode {
          *
          * @return this Builder
          */
-        public Builder schemaContext(final SchemaContext newSchemaContext) {
+        public Builder schemaContext(final EffectiveModelContext newSchemaContext) {
             this.schemaContext = newSchemaContext;
             return this;
         }
@@ -260,10 +277,10 @@ public class MemberNode {
             return this;
         }
 
-        public MemberNode build() {
-            Preconditions.checkNotNull(moduleShardsConfig, "moduleShardsConfig must be specified");
-            Preconditions.checkNotNull(akkaConfig, "akkaConfig must be specified");
-            Preconditions.checkNotNull(testName, "testName must be specified");
+        public MemberNode build() throws Exception {
+            requireNonNull(moduleShardsConfig, "moduleShardsConfig must be specified");
+            requireNonNull(akkaConfig, "akkaConfig must be specified");
+            requireNonNull(testName, "testName must be specified");
 
             if (schemaContext == null) {
                 schemaContext = SchemaContextHelper.full();
@@ -272,20 +289,30 @@ public class MemberNode {
             MemberNode node = new MemberNode();
             node.datastoreContextBuilder = datastoreContextBuilder;
 
-            ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(akkaConfig));
-            Cluster.get(system).join(MEMBER_1_ADDRESS);
+            Config baseConfig = ConfigFactory.load();
+            Config config;
+            if (useAkkaArtery) {
+                config = baseConfig.getConfig(akkaConfig);
+            } else {
+                config = baseConfig.getConfig(akkaConfig + "-without-artery")
+                        .withFallback(baseConfig.getConfig(akkaConfig));
+            }
+
+            ActorSystem system = ActorSystem.create("cluster-test", config);
+            String member1Address = useAkkaArtery ? MEMBER_1_ADDRESS : MEMBER_1_ADDRESS.replace("akka", "akka.tcp");
+            Cluster.get(system).join(AddressFromURIString.parse(member1Address));
 
             node.kit = new IntegrationTestKit(system, datastoreContextBuilder);
 
             String memberName = new ClusterWrapperImpl(system).getCurrentMemberName().getName();
             node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-config-" + memberName);
-            node.configDataStore = node.kit.setupDistributedDataStore("config_" + testName, moduleShardsConfig,
-                    true, schemaContext, waitForshardLeader);
+            node.configDataStore = node.kit.setupAbstractDataStore(DistributedDataStore.class,
+                    "config_" + testName, moduleShardsConfig, true, schemaContext, waitForshardLeader);
 
             if (createOperDatastore) {
                 node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-oper-" + memberName);
-                node.operDataStore = node.kit.setupDistributedDataStore("oper_" + testName, moduleShardsConfig,
-                        true, schemaContext, waitForshardLeader);
+                node.operDataStore = node.kit.setupAbstractDataStore(DistributedDataStore.class,
+                        "oper_" + testName, moduleShardsConfig, true, schemaContext, waitForshardLeader);
             }
 
             members.add(node);