Notify listeners on applySnapshot
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / IntegrationTestKit.java
index c3d216be9ca9d144c51741194c06bb0550919549..afffa99e0cdfb61c8f96f55d4dd6db0de046ea96 100644 (file)
@@ -12,11 +12,16 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
+import akka.cluster.Cluster;
+import akka.cluster.ClusterEvent.CurrentClusterState;
+import akka.cluster.Member;
+import akka.cluster.MemberStatus;
 import com.google.common.base.Optional;
 import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 import org.mockito.Mockito;
@@ -112,6 +117,24 @@ public class IntegrationTestKit extends ShardTestKit {
         }
     }
 
+    public void waitForMembersUp(String... otherMembers) {
+        Set<String> otherMembersSet = Sets.newHashSet(otherMembers);
+        Stopwatch sw = Stopwatch.createStarted();
+        while(sw.elapsed(TimeUnit.SECONDS) <= 10) {
+            CurrentClusterState state = Cluster.get(getSystem()).state();
+            for(Member m: state.getMembers()) {
+                if(m.status() == MemberStatus.up() && otherMembersSet.remove(m.getRoles().iterator().next()) &&
+                        otherMembersSet.isEmpty()) {
+                    return;
+                }
+            }
+
+            Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+        }
+
+        fail("Member(s) " + otherMembersSet + " are not Up");
+    }
+
     public static ActorRef findLocalShard(ActorContext actorContext, String shardName) {
         ActorRef shard = null;
         for(int i = 0; i < 20 * 5 && shard == null; i++) {
@@ -192,12 +215,6 @@ public class IntegrationTestKit extends ShardTestKit {
         cohort.commit().get(5, TimeUnit.SECONDS);
     }
 
-    public void cleanup(DistributedDataStore dataStore) {
-        if(dataStore != null) {
-            dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null);
-        }
-    }
-
     void assertExceptionOnCall(Callable<Void> callable, Class<? extends Exception> expType)
             throws Exception {
         try {