import static org.junit.Assert.fail;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
+import akka.cluster.Cluster;
+import akka.cluster.ClusterEvent.CurrentClusterState;
+import akka.cluster.Member;
+import akka.cluster.MemberStatus;
import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.mockito.Mockito;
}
}
+ public void waitForMembersUp(String... otherMembers) {
+ Set<String> otherMembersSet = Sets.newHashSet(otherMembers);
+ Stopwatch sw = Stopwatch.createStarted();
+ while(sw.elapsed(TimeUnit.SECONDS) <= 10) {
+ CurrentClusterState state = Cluster.get(getSystem()).state();
+ for(Member m: state.getMembers()) {
+ if(m.status() == MemberStatus.up() && otherMembersSet.remove(m.getRoles().iterator().next()) &&
+ otherMembersSet.isEmpty()) {
+ return;
+ }
+ }
+
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ }
+
+ fail("Member(s) " + otherMembersSet + " are not Up");
+ }
+
public static ActorRef findLocalShard(ActorContext actorContext, String shardName) {
ActorRef shard = null;
for(int i = 0; i < 20 * 5 && shard == null; i++) {
cohort.commit().get(5, TimeUnit.SECONDS);
}
- public void cleanup(DistributedDataStore dataStore) {
- if(dataStore != null) {
- dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null);
- }
- }
-
void assertExceptionOnCall(Callable<Void> callable, Class<? extends Exception> expType)
throws Exception {
try {