IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, follower2DatastoreContextBuilder);
follower2TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, false, CARS);
+ followerTestKit.waitForMembersUp("member-1", "member-3");
+ follower2TestKit.waitForMembersUp("member-1", "member-2");
+
// Do an initial read to get the primary shard info cached.
DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
+import akka.cluster.Cluster;
+import akka.cluster.ClusterEvent.CurrentClusterState;
+import akka.cluster.Member;
+import akka.cluster.MemberStatus;
import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.mockito.Mockito;
}
}
+ public void waitForMembersUp(String... otherMembers) {
+ Set<String> otherMembersSet = Sets.newHashSet(otherMembers);
+ Stopwatch sw = Stopwatch.createStarted();
+ while(sw.elapsed(TimeUnit.SECONDS) <= 10) {
+ CurrentClusterState state = Cluster.get(getSystem()).state();
+ for(Member m: state.getMembers()) {
+ if(m.status() == MemberStatus.up() && otherMembersSet.remove(m.getRoles().iterator().next()) &&
+ otherMembersSet.isEmpty()) {
+ return;
+ }
+ }
+
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ }
+
+ fail("Member(s) " + otherMembersSet + " are not Up");
+ }
+
public static ActorRef findLocalShard(ActorContext actorContext, String shardName) {
ActorRef shard = null;
for(int i = 0; i < 20 * 5 && shard == null; i++) {
}
public void waitForMembersUp(String... otherMembers) {
- Set<String> otherMembersSet = Sets.newHashSet(otherMembers);
- Stopwatch sw = Stopwatch.createStarted();
- while(sw.elapsed(TimeUnit.SECONDS) <= 10) {
- CurrentClusterState state = Cluster.get(kit.getSystem()).state();
- for(Member m: state.getMembers()) {
- if(m.status() == MemberStatus.up() && otherMembersSet.remove(m.getRoles().iterator().next()) &&
- otherMembersSet.isEmpty()) {
- return;
- }
- }
-
- Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
- }
-
- fail("Member(s) " + otherMembersSet + " are not Up");
+ kit.waitForMembersUp(otherMembers);
}
public void waitForMemberDown(String member) {