BUG-2138: Make DistributedShardFactory return Futures.
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / MemberNode.java
index d5d4ad1afc26b8e7595b30e94ca707ba144bece7..e6ea97124707cbd2cf85c605dba139bcf81628a4 100644 (file)
@@ -9,9 +9,9 @@ package org.opendaylight.controller.cluster.datastore;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
+
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
-import akka.actor.Address;
 import akka.actor.AddressFromURIString;
 import akka.cluster.Cluster;
 import akka.cluster.ClusterEvent.CurrentClusterState;
@@ -22,16 +22,19 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
@@ -45,11 +48,11 @@ import scala.concurrent.duration.Duration;
  * @author Thomas Pantelis
  */
 public class MemberNode {
-    static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
+    private static final String MEMBER_1_ADDRESS = "akka://cluster-test@127.0.0.1:2558";
 
     private IntegrationTestKit kit;
-    private DistributedDataStore configDataStore;
-    private DistributedDataStore operDataStore;
+    private AbstractDataStore configDataStore;
+    private AbstractDataStore operDataStore;
     private DatastoreContext.Builder datastoreContextBuilder;
     private boolean cleanedUp;
 
@@ -60,7 +63,7 @@ public class MemberNode {
      *                callers to cleanup instances on test completion.
      * @return a Builder instance
      */
-    public static Builder builder(List<MemberNode> members) {
+    public static Builder builder(final List<MemberNode> members) {
         return new Builder(members);
     }
 
@@ -69,12 +72,12 @@ public class MemberNode {
     }
 
 
-    public DistributedDataStore configDataStore() {
+    public AbstractDataStore configDataStore() {
         return configDataStore;
     }
 
 
-    public DistributedDataStore operDataStore() {
+    public AbstractDataStore operDataStore() {
         return operDataStore;
     }
 
@@ -82,14 +85,22 @@ public class MemberNode {
         return datastoreContextBuilder;
     }
 
-    public void waitForMembersUp(String... otherMembers) {
-        Set<String> otherMembersSet = Sets.newHashSet(otherMembers);
+    public void waitForMembersUp(final String... otherMembers) {
+        kit.waitForMembersUp(otherMembers);
+    }
+
+    public void waitForMemberDown(final String member) {
         Stopwatch sw = Stopwatch.createStarted();
-        while(sw.elapsed(TimeUnit.SECONDS) <= 10) {
+        while (sw.elapsed(TimeUnit.SECONDS) <= 10) {
             CurrentClusterState state = Cluster.get(kit.getSystem()).state();
-            for(Member m: state.getMembers()) {
-                if(m.status() == MemberStatus.up() && otherMembersSet.remove(m.getRoles().iterator().next()) &&
-                        otherMembersSet.isEmpty()) {
+            for (Member m : state.getUnreachable()) {
+                if (member.equals(m.getRoles().iterator().next())) {
+                    return;
+                }
+            }
+
+            for (Member m : state.getMembers()) {
+                if (m.status() != MemberStatus.up() && member.equals(m.getRoles().iterator().next())) {
                     return;
                 }
             }
@@ -97,20 +108,30 @@ public class MemberNode {
             Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
         }
 
-        fail("Member(s) " + otherMembersSet + " are not Up");
+        fail("Member " + member + " is now down");
     }
 
+    @SuppressWarnings("checkstyle:IllegalCatch")
     public void cleanup() {
-        if(!cleanedUp) {
+        if (!cleanedUp) {
             cleanedUp = true;
-            kit.cleanup(configDataStore);
-            kit.cleanup(operDataStore);
-            kit.shutdownActorSystem(kit.getSystem(), Boolean.TRUE);
+            if (configDataStore != null) {
+                configDataStore.close();
+            }
+            if (operDataStore != null) {
+                operDataStore.close();
+            }
+
+            try {
+                IntegrationTestKit.shutdownActorSystem(kit.getSystem(), Boolean.TRUE);
+            } catch (RuntimeException e) {
+                LoggerFactory.getLogger(MemberNode.class).warn("Failed to shutdown actor system", e);
+            }
         }
     }
 
-    public static void verifyRaftState(DistributedDataStore datastore, String shardName, RaftStateVerifier verifier)
-            throws Exception {
+    public static void verifyRaftState(final AbstractDataStore datastore, final String shardName,
+            final RaftStateVerifier verifier) throws Exception {
         ActorContext actorContext = datastore.getActorContext();
 
         Future<ActorRef> future = actorContext.findLocalShardAsync(shardName);
@@ -118,9 +139,9 @@ public class MemberNode {
 
         AssertionError lastError = null;
         Stopwatch sw = Stopwatch.createStarted();
-        while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
-            OnDemandRaftState raftState = (OnDemandRaftState)actorContext.
-                    executeOperation(shardActor, GetOnDemandRaftState.INSTANCE);
+        while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
+            OnDemandRaftState raftState = (OnDemandRaftState)actorContext
+                    .executeOperation(shardActor, GetOnDemandRaftState.INSTANCE);
 
             try {
                 verifier.verify(raftState);
@@ -134,27 +155,23 @@ public class MemberNode {
         throw lastError;
     }
 
-    public static void verifyRaftPeersPresent(DistributedDataStore datastore, final String shardName,
-            String... peerMemberNames) throws Exception {
+    public static void verifyRaftPeersPresent(final AbstractDataStore datastore, final String shardName,
+            final String... peerMemberNames) throws Exception {
         final Set<String> peerIds = Sets.newHashSet();
-        for(String p: peerMemberNames) {
-            peerIds.add(ShardIdentifier.builder().memberName(p).shardName(shardName).
-                type(datastore.getActorContext().getDataStoreType()).build().toString());
+        for (String p: peerMemberNames) {
+            peerIds.add(ShardIdentifier.create(shardName, MemberName.forName(p),
+                datastore.getActorContext().getDataStoreName()).toString());
         }
 
-        verifyRaftState(datastore, shardName, new RaftStateVerifier() {
-            @Override
-            public void verify(OnDemandRaftState raftState) {
-                assertEquals("Peers for shard " + shardName, peerIds, raftState.getPeerAddresses().keySet());
-            }
-        });
+        verifyRaftState(datastore, shardName, raftState -> assertEquals("Peers for shard " + shardName, peerIds,
+            raftState.getPeerAddresses().keySet()));
     }
 
-    public static void verifyNoShardPresent(DistributedDataStore datastore, String shardName) {
+    public static void verifyNoShardPresent(final AbstractDataStore datastore, final String shardName) {
         Stopwatch sw = Stopwatch.createStarted();
-        while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
+        while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
             Optional<ActorRef> shardReply = datastore.getActorContext().findLocalShard(shardName);
-            if(!shardReply.isPresent()) {
+            if (!shardReply.isPresent()) {
                 return;
             }
 
@@ -168,14 +185,15 @@ public class MemberNode {
         private final List<MemberNode> members;
         private String moduleShardsConfig;
         private String akkaConfig;
+        private boolean useAkkaArtery = true;
         private String[] waitForshardLeader = new String[0];
         private String testName;
         private SchemaContext schemaContext;
         private boolean createOperDatastore = true;
-        private DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
-                shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(30);
+        private DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
+                .shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(30);
 
-        Builder(List<MemberNode> members) {
+        Builder(final List<MemberNode> members) {
             this.members = members;
         }
 
@@ -184,8 +202,8 @@ public class MemberNode {
          *
          * @return this Builder
          */
-        public Builder moduleShardsConfig(String moduleShardsConfig) {
-            this.moduleShardsConfig = moduleShardsConfig;
+        public Builder moduleShardsConfig(final String newModuleShardsConfig) {
+            this.moduleShardsConfig = newModuleShardsConfig;
             return this;
         }
 
@@ -194,8 +212,18 @@ public class MemberNode {
          *
          * @return this Builder
          */
-        public Builder akkaConfig(String akkaConfig) {
-            this.akkaConfig = akkaConfig;
+        public Builder akkaConfig(final String newAkkaConfig) {
+            this.akkaConfig = newAkkaConfig;
+            return this;
+        }
+
+        /**
+         * Specifies whether or not to use akka artery for remoting. Default is true.
+         *
+         * @return this Builder
+         */
+        public Builder useAkkaArtery(final boolean newUseAkkaArtery) {
+            this.useAkkaArtery = newUseAkkaArtery;
             return this;
         }
 
@@ -204,8 +232,8 @@ public class MemberNode {
          *
          * @return this Builder
          */
-        public Builder testName(String testName) {
-            this.testName = testName;
+        public Builder testName(final String newTestName) {
+            this.testName = newTestName;
             return this;
         }
 
@@ -214,7 +242,7 @@ public class MemberNode {
          *
          * @return this Builder
          */
-        public Builder waitForShardLeader(String... shardNames) {
+        public Builder waitForShardLeader(final String... shardNames) {
             this.waitForshardLeader = shardNames;
             return this;
         }
@@ -224,7 +252,7 @@ public class MemberNode {
          *
          * @return this Builder
          */
-        public Builder createOperDatastore(boolean value) {
+        public Builder createOperDatastore(final boolean value) {
             this.createOperDatastore = value;
             return this;
         }
@@ -234,8 +262,8 @@ public class MemberNode {
          *
          * @return this Builder
          */
-        public Builder schemaContext(SchemaContext schemaContext) {
-            this.schemaContext = schemaContext;
+        public Builder schemaContext(final SchemaContext newSchemaContext) {
+            this.schemaContext = newSchemaContext;
             return this;
         }
 
@@ -244,7 +272,7 @@ public class MemberNode {
          *
          * @return this Builder
          */
-        public Builder datastoreContextBuilder(DatastoreContext.Builder builder) {
+        public Builder datastoreContextBuilder(final DatastoreContext.Builder builder) {
             datastoreContextBuilder = builder;
             return this;
         }
@@ -254,24 +282,34 @@ public class MemberNode {
             Preconditions.checkNotNull(akkaConfig, "akkaConfig must be specified");
             Preconditions.checkNotNull(testName, "testName must be specified");
 
-            if(schemaContext == null) {
+            if (schemaContext == null) {
                 schemaContext = SchemaContextHelper.full();
             }
 
             MemberNode node = new MemberNode();
             node.datastoreContextBuilder = datastoreContextBuilder;
 
-            ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(akkaConfig));
-            Cluster.get(system).join(MEMBER_1_ADDRESS);
+            Config baseConfig = ConfigFactory.load();
+            Config config;
+            if (useAkkaArtery) {
+                config = baseConfig.getConfig(akkaConfig);
+            } else {
+                config = baseConfig.getConfig(akkaConfig + "-without-artery")
+                        .withFallback(baseConfig.getConfig(akkaConfig));
+            }
+
+            ActorSystem system = ActorSystem.create("cluster-test", config);
+            String member1Address = useAkkaArtery ? MEMBER_1_ADDRESS : MEMBER_1_ADDRESS.replace("akka", "akka.tcp");
+            Cluster.get(system).join(AddressFromURIString.parse(member1Address));
 
             node.kit = new IntegrationTestKit(system, datastoreContextBuilder);
 
-            String memberName = new ClusterWrapperImpl(system).getCurrentMemberName();
+            String memberName = new ClusterWrapperImpl(system).getCurrentMemberName().getName();
             node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-config-" + memberName);
             node.configDataStore = node.kit.setupDistributedDataStore("config_" + testName, moduleShardsConfig,
                     true, schemaContext, waitForshardLeader);
 
-            if(createOperDatastore) {
+            if (createOperDatastore) {
                 node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-oper-" + memberName);
                 node.operDataStore = node.kit.setupDistributedDataStore("oper_" + testName, moduleShardsConfig,
                         true, schemaContext, waitForshardLeader);
@@ -282,7 +320,7 @@ public class MemberNode {
         }
     }
 
-    public static interface RaftStateVerifier {
+    public interface RaftStateVerifier {
         void verify(OnDemandRaftState raftState);
     }
-}
\ No newline at end of file
+}