X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManagerTest.java;h=b676cf225c801039d1e679298e6d7cb23d2808ac;hp=ed7b6866bf5fffff21cae1f61379a3126257ea07;hb=cbf0c2ef5f3fb7ea6367ac98f580337840952a05;hpb=95d1074f7f446784f76dc41525cecdb65688df6c diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index ed7b6866bf..b676cf225c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -1,347 +1,538 @@ package org.opendaylight.controller.cluster.datastore; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import akka.actor.ActorRef; import akka.actor.ActorSystem; +import akka.actor.AddressFromURIString; import akka.actor.Props; -import akka.dispatch.Futures; -import akka.japi.Procedure; -import akka.persistence.PersistentConfirmation; -import akka.persistence.PersistentId; -import akka.persistence.PersistentImpl; -import akka.persistence.PersistentRepr; -import akka.persistence.journal.japi.AsyncWriteJournal; +import akka.cluster.Cluster; +import akka.cluster.ClusterEvent; +import akka.dispatch.Dispatchers; +import akka.japi.Creator; +import akka.pattern.Patterns; +import akka.persistence.RecoveryCompleted; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; +import akka.util.Timeout; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; -import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigValueFactory; -import org.junit.AfterClass; +import java.net.URI; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.junit.After; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; +import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; +import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; -import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; +import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; +import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; +import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; +import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; +import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; +import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import scala.concurrent.Await; import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; -import java.net.URI; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; +public class ShardManagerTest extends AbstractActorTest { + private static int ID_COUNTER = 1; -import static junit.framework.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; + private final String shardMrgIDSuffix = "config" + ID_COUNTER++; + private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix; -public class ShardManagerTest { - private static ActorSystem system; - Configuration mockConfig = new MockConfiguration(); - private static ActorRef defaultShardMockActor; + @Mock + private static CountDownLatch ready; - @BeforeClass - public static void setUpClass() { - Map myJournal = new HashMap<>(); - myJournal.put("class", "org.opendaylight.controller.cluster.datastore.ShardManagerTest$MyJournal"); - myJournal.put("plugin-dispatcher", "akka.actor.default-dispatcher"); - Config config = ConfigFactory.load() - .withValue("akka.persistence.journal.plugin", - ConfigValueFactory.fromAnyRef("my-journal")) - .withValue("my-journal", ConfigValueFactory.fromMap(myJournal)); + private static TestActorRef mockShardActor; - MyJournal.clear(); + private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder(). + dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS); - system = ActorSystem.create("test", config); + private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) { + String name = new ShardIdentifier(shardName, memberName,"config").toString(); + return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name); + } - String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1","config").toString(); - defaultShardMockActor = system.actorOf(Props.create(DoNothingActor.class), name); + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + InMemoryJournal.clear(); + if(mockShardActor == null) { + String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString(); + mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), name); + } + + mockShardActor.underlyingActor().clear(); } - @AfterClass - public static void tearDown() { - JavaTestKit.shutdownActorSystem(system); - system = null; + @After + public void tearDown() { + InMemoryJournal.clear(); } - @Before - public void setUpTest(){ - MyJournal.clear(); + private Props newShardMgrProps() { + return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(), + datastoreContextBuilder.build(), ready); + } + + private Props newPropsShardMgrWithMockShardActor() { + return newPropsShardMgrWithMockShardActor("shardManager", mockShardActor, new MockClusterWrapper(), + new MockConfiguration()); + } + + private Props newPropsShardMgrWithMockShardActor(final String name, final ActorRef shardActor, + final ClusterWrapper clusterWrapper, final Configuration config) { + Creator creator = new Creator() { + private static final long serialVersionUID = 1L; + @Override + public ShardManager create() throws Exception { + return new ForwardingShardManager(clusterWrapper, config, datastoreContextBuilder.build(), + ready, name, shardActor); + } + }; + + return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()); } @Test public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); - new JavaTestKit(system) { - { - final Props props = ShardManager - .props("config", new MockClusterWrapper(), - new MockConfiguration(), new DatastoreContext()); + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); - final ActorRef subject = getSystem().actorOf(props); + shardManager.tell(new FindPrimary("non-existent", false), getRef()); - subject.tell(new FindPrimary("inventory").toSerializable(), getRef()); + expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class); + }}; + } - expectMsgEquals(duration("2 seconds"), - new PrimaryNotFound("inventory").toSerializable()); - }}; + @Test + public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception { + new JavaTestKit(getSystem()) {{ + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); + + shardManager.tell(new LeaderStateChanged(memberId, memberId), getRef()); + + MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class); + shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(), + RaftState.Leader.name())), mockShardActor); + + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); + + PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class); + assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), + primaryFound.getPrimaryPath().contains("member-1-shard-default")); + }}; } @Test - public void testOnReceiveFindPrimaryForExistentShard() throws Exception { + public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); - new JavaTestKit(system) {{ - final Props props = ShardManager - .props("config", new MockClusterWrapper(), - new MockConfiguration(), new DatastoreContext()); + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); - final ActorRef subject = getSystem().actorOf(props); + String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; + MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString()); - subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); - subject.tell(new ActorInitialized(), defaultShardMockActor); + String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.tell(new RoleChangeNotification(memberId1, + RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor); + shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor); - subject.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); - expectMsgClass(duration("1 seconds"), PrimaryFound.SERIALIZABLE_CLASS); - } - }; + PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class); + assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), + primaryFound.getPrimaryPath().contains("member-2-shard-default")); + }}; } @Test - public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception { + public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); - new JavaTestKit(system) {{ - final Props props = ShardManager - .props("config", new MockClusterWrapper(), - new MockConfiguration(), new DatastoreContext()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); - final ActorRef subject = getSystem().actorOf(props); + expectMsgClass(duration("5 seconds"), NotInitializedException.class); + }}; + } - subject.tell(new FindLocalShard("inventory"), getRef()); + @Test + public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); - final String out = new ExpectMsg(duration("3 seconds"), "find local") { - @Override - protected String match(Object in) { - if (in instanceof LocalShardNotFound) { - return ((LocalShardNotFound) in).getShardName(); - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); - assertEquals("inventory", out); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); + + expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); }}; } @Test - public void testOnReceiveFindLocalShardForExistentShard() throws Exception { + public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); - final MockClusterWrapper mockClusterWrapper = new MockClusterWrapper(); + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); - new JavaTestKit(system) {{ - final Props props = ShardManager - .props("config", mockClusterWrapper, - new MockConfiguration(), new DatastoreContext()); + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.tell(new RoleChangeNotification(memberId, + RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor); - final ActorRef subject = getSystem().actorOf(props); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); - subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); - subject.tell(new ActorInitialized(), defaultShardMockActor); + expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); - subject.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef()); + shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor); - final ActorRef out = new ExpectMsg(duration("3 seconds"), "find local") { - @Override - protected ActorRef match(Object in) { - if (in instanceof LocalShardFound) { - return ((LocalShardFound) in).getPath(); - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); - assertTrue(out.path().toString(), - out.path().toString().contains("member-1-shard-default-config")); + PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class); + assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), + primaryFound.getPrimaryPath().contains("member-1-shard-default")); }}; } @Test - public void testOnReceiveMemberUp() throws Exception { + public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); - new JavaTestKit(system) {{ - final Props props = ShardManager - .props("config", new MockClusterWrapper(), - new MockConfiguration(), new DatastoreContext()); + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); - final ActorRef subject = getSystem().actorOf(props); + // We're passing waitUntilInitialized = true to FindPrimary so the response should be + // delayed until we send ActorInitialized and RoleChangeNotification. + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef()); - MockClusterWrapper.sendMemberUp(subject, "member-2", getRef().path().toString()); + expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS)); - subject.tell(new FindPrimary("astronauts").toSerializable(), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); - final String out = new ExpectMsg(duration("3 seconds"), "primary found") { - // do not put code outside this method, will run afterwards - @Override - protected String match(Object in) { - if (in.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) { - PrimaryFound f = PrimaryFound.fromSerializable(in); - return f.getPrimaryPath(); - } else { - throw noMatch(); - } - } - }.get(); // this extracts the received message + expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS)); - assertTrue(out, out.contains("member-2-shard-astronauts-config")); + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.tell(new RoleChangeNotification(memberId, + RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor); + + expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS)); + + shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor); + + PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class); + assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), + primaryFound.getPrimaryPath().contains("member-1-shard-default")); + + expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); }}; } @Test - public void testOnReceiveMemberDown() throws Exception { + public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); - new JavaTestKit(system) {{ - final Props props = ShardManager - .props("config", new MockClusterWrapper(), - new MockConfiguration(), new DatastoreContext()); + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); - final ActorRef subject = getSystem().actorOf(props); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef()); - MockClusterWrapper.sendMemberUp(subject, "member-2", getRef().path().toString()); + expectMsgClass(duration("2 seconds"), NotInitializedException.class); - subject.tell(new FindPrimary("astronauts").toSerializable(), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); - expectMsgClass(duration("3 seconds"), PrimaryFound.SERIALIZABLE_CLASS); + expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); + }}; + } + + @Test + public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); - MockClusterWrapper.sendMemberRemoved(subject, "member-2", getRef().path().toString()); + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, + null, RaftState.Candidate.name()), mockShardActor); - subject.tell(new FindPrimary("astronauts").toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef()); - expectMsgClass(duration("1 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS); + expectMsgClass(duration("2 seconds"), NoShardLeaderException.class); }}; } @Test - public void testOnRecoveryJournalIsEmptied(){ - MyJournal.addToJournal(1L, new ShardManager.SchemaContextModules( - ImmutableSet.of("foo"))); + public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); + + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef()); + + expectMsgClass(duration("2 seconds"), NoShardLeaderException.class); + }}; + } + + @Test + public void testOnReceiveFindPrimaryForRemoteShard() throws Exception { + String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); + + // Create an ActorSystem ShardManager actor for member-1. + + final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); + Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + + ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1"); + + final TestActorRef shardManager1 = TestActorRef.create(system1, + newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1), + new MockConfiguration()), shardManagerID); + + // Create an ActorSystem ShardManager actor for member-2. + + final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2")); + + Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + + final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2"); + + MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.>builder(). + put("default", Arrays.asList("member-1", "member-2")). + put("astronauts", Arrays.asList("member-2")).build()); + + final TestActorRef shardManager2 = TestActorRef.create(system2, + newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2), + mockConfig2), shardManagerID); + + new JavaTestKit(system1) {{ + + shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + + shardManager2.tell(new ActorInitialized(), mockShardActor2); + + String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix; + shardManager2.tell(new LeaderStateChanged(memberId2, memberId2), mockShardActor2); + shardManager2.tell(new RoleChangeNotification(memberId2, + RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2); + + shardManager1.underlyingActor().waitForMemberUp(); + + shardManager1.tell(new FindPrimary("astronauts", false), getRef()); + + PrimaryFound found = expectMsgClass(duration("5 seconds"), PrimaryFound.class); + String path = found.getPrimaryPath(); + assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config")); + + shardManager2.underlyingActor().verifyFindPrimary(); + + Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + + shardManager1.underlyingActor().waitForMemberRemoved(); + + shardManager1.tell(new FindPrimary("astronauts", false), getRef()); + + expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class); + }}; + + JavaTestKit.shutdownActorSystem(system1); + JavaTestKit.shutdownActorSystem(system2); + } - assertEquals(1, MyJournal.get().size()); + @Test + public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); - new JavaTestKit(system) {{ - final Props props = ShardManager - .props("config", new MockClusterWrapper(), - new MockConfiguration(), new DatastoreContext()); + shardManager.tell(new FindLocalShard("non-existent", false), getRef()); - final ActorRef subject = getSystem().actorOf(props); + LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class); + + assertEquals("getShardName", "non-existent", notFound.getShardName()); + }}; + } + + @Test + public void testOnReceiveFindLocalShardForExistentShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); - // Send message to check that ShardManager is ready - subject.tell(new FindPrimary("unknown").toSerializable(), getRef()); + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); - expectMsgClass(duration("3 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS); + shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef()); - assertEquals(0, MyJournal.get().size()); + LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class); + + assertTrue("Found path contains " + found.getPath().path().toString(), + found.getPath().path().toString().contains("member-1-shard-default-config")); + }}; + } + + @Test + public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + + shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef()); + + expectMsgClass(duration("5 seconds"), NotInitializedException.class); + }}; + } + + @Test + public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + + // We're passing waitUntilInitialized = true to FindLocalShard so the response should be + // delayed until we send ActorInitialized. + Future future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true), + new Timeout(5, TimeUnit.SECONDS)); + + shardManager.tell(new ActorInitialized(), mockShardActor); + + Object resp = Await.result(future, duration("5 seconds")); + assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound); + }}; + } + + @Test + public void testOnRecoveryJournalIsCleaned() { + InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules( + ImmutableSet.of("foo"))); + InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules( + ImmutableSet.of("bar"))); + InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID); + + new JavaTestKit(getSystem()) {{ + TestActorRef shardManager = TestActorRef.create(getSystem(), + Props.create(new TestShardManagerCreator(shardMrgIDSuffix))); + + shardManager.underlyingActor().waitForRecoveryComplete(); + InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID); + + // Journal entries up to the last one should've been deleted + Map journal = InMemoryJournal.get(shardMgrID); + synchronized (journal) { + assertEquals("Journal size", 1, journal.size()); + assertEquals("Journal entry seq #", Long.valueOf(2), journal.keySet().iterator().next()); + } }}; } @Test public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception { - new JavaTestKit(system) {{ - final Props props = ShardManager - .props("config", new MockClusterWrapper(), - new MockConfiguration(), new DatastoreContext()); - final TestActorRef subject = - TestActorRef.create(system, props); + final ImmutableSet persistedModules = ImmutableSet.of("foo", "bar"); + InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules( + persistedModules)); + new JavaTestKit(getSystem()) {{ + TestActorRef shardManager = TestActorRef.create(getSystem(), + Props.create(new TestShardManagerCreator(shardMrgIDSuffix))); - subject.underlyingActor().onReceiveRecover(new ShardManager.SchemaContextModules(ImmutableSet.of("foo"))); + shardManager.underlyingActor().waitForRecoveryComplete(); - Collection knownModules = subject.underlyingActor().getKnownModules(); + Collection knownModules = shardManager.underlyingActor().getKnownModules(); - assertTrue(knownModules.contains("foo")); + assertEquals("getKnownModules", persistedModules, Sets.newHashSet(knownModules)); }}; } @Test public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules() throws Exception { - new JavaTestKit(system) {{ - final Props props = ShardManager - .props("config", new MockClusterWrapper(), - new MockConfiguration(), new DatastoreContext()); - final TestActorRef subject = - TestActorRef.create(system, props); - - Collection knownModules = subject.underlyingActor().getKnownModules(); + new JavaTestKit(getSystem()) {{ + final TestActorRef shardManager = + TestActorRef.create(getSystem(), newShardMgrProps()); - assertEquals(0, knownModules.size()); - - SchemaContext schemaContext = mock(SchemaContext.class); - Set moduleIdentifierSet = new HashSet<>(); + assertEquals("getKnownModules size", 0, shardManager.underlyingActor().getKnownModules().size()); ModuleIdentifier foo = mock(ModuleIdentifier.class); when(foo.getNamespace()).thenReturn(new URI("foo")); + Set moduleIdentifierSet = new HashSet<>(); moduleIdentifierSet.add(foo); + SchemaContext schemaContext = mock(SchemaContext.class); when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet); - subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext)); + shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext)); - assertTrue(knownModules.contains("foo")); - - assertEquals(1, knownModules.size()); + assertEquals("getKnownModules", Sets.newHashSet("foo"), + Sets.newHashSet(shardManager.underlyingActor().getKnownModules())); ModuleIdentifier bar = mock(ModuleIdentifier.class); when(bar.getNamespace()).thenReturn(new URI("bar")); moduleIdentifierSet.add(bar); - subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext)); - - assertTrue(knownModules.contains("bar")); - - assertEquals(2, knownModules.size()); + shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext)); + assertEquals("getKnownModules", Sets.newHashSet("foo", "bar"), + Sets.newHashSet(shardManager.underlyingActor().getKnownModules())); }}; - } - @Test public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules() throws Exception { - new JavaTestKit(system) {{ - final Props props = ShardManager - .props("config", new MockClusterWrapper(), - new MockConfiguration(), new DatastoreContext()); - final TestActorRef subject = - TestActorRef.create(system, props); - - Collection knownModules = subject.underlyingActor().getKnownModules(); - - assertEquals(0, knownModules.size()); + new JavaTestKit(getSystem()) {{ + final TestActorRef shardManager = + TestActorRef.create(getSystem(), newShardMgrProps()); SchemaContext schemaContext = mock(SchemaContext.class); Set moduleIdentifierSet = new HashSet<>(); @@ -353,103 +544,397 @@ public class ShardManagerTest { when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet); - subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext)); - - assertTrue(knownModules.contains("foo")); + shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext)); - assertEquals(1, knownModules.size()); + assertEquals("getKnownModules", Sets.newHashSet("foo"), + Sets.newHashSet(shardManager.underlyingActor().getKnownModules())); //Create a completely different SchemaContext with only the bar module in it - schemaContext = mock(SchemaContext.class); - moduleIdentifierSet = new HashSet<>(); + //schemaContext = mock(SchemaContext.class); + moduleIdentifierSet.clear(); ModuleIdentifier bar = mock(ModuleIdentifier.class); when(bar.getNamespace()).thenReturn(new URI("bar")); moduleIdentifierSet.add(bar); - subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext)); + shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext)); + + assertEquals("getKnownModules", Sets.newHashSet("foo"), + Sets.newHashSet(shardManager.underlyingActor().getKnownModules())); + + }}; + } + + @Test + public void testRecoveryApplicable(){ + new JavaTestKit(getSystem()) { + { + final Props persistentProps = ShardManager.props( + new MockClusterWrapper(), + new MockConfiguration(), + DatastoreContext.newBuilder().persistent(true).build(), ready); + final TestActorRef persistentShardManager = + TestActorRef.create(getSystem(), persistentProps); + + DataPersistenceProvider dataPersistenceProvider1 = persistentShardManager.underlyingActor().getDataPersistenceProvider(); + + assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable()); + + final Props nonPersistentProps = ShardManager.props( + new MockClusterWrapper(), + new MockConfiguration(), + DatastoreContext.newBuilder().persistent(false).build(), ready); + final TestActorRef nonPersistentShardManager = + TestActorRef.create(getSystem(), nonPersistentProps); + + DataPersistenceProvider dataPersistenceProvider2 = nonPersistentShardManager.underlyingActor().getDataPersistenceProvider(); + + assertFalse("Recovery Not Applicable", dataPersistenceProvider2.isRecoveryApplicable()); + + + }}; + + } + + @Test + public void testOnUpdateSchemaContextUpdateKnownModulesCallsDataPersistenceProvider() + throws Exception { + final CountDownLatch persistLatch = new CountDownLatch(1); + final Creator creator = new Creator() { + private static final long serialVersionUID = 1L; + @Override + public ShardManager create() throws Exception { + return new ShardManager(new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(), ready) { + @Override + protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) { + DataPersistenceProviderMonitor dataPersistenceProviderMonitor + = new DataPersistenceProviderMonitor(); + dataPersistenceProviderMonitor.setPersistLatch(persistLatch); + return dataPersistenceProviderMonitor; + } + }; + } + }; - assertFalse(knownModules.contains("bar")); + new JavaTestKit(getSystem()) {{ - assertEquals(1, knownModules.size()); + final TestActorRef shardManager = + TestActorRef.create(getSystem(), Props.create(new DelegatingShardManagerCreator(creator))); + + ModuleIdentifier foo = mock(ModuleIdentifier.class); + when(foo.getNamespace()).thenReturn(new URI("foo")); + + Set moduleIdentifierSet = new HashSet<>(); + moduleIdentifierSet.add(foo); + + SchemaContext schemaContext = mock(SchemaContext.class); + when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet); + + shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext)); + + assertEquals("Persisted", true, + Uninterruptibles.awaitUninterruptibly(persistLatch, 5, TimeUnit.SECONDS)); }}; + } + + @Test + public void testRoleChangeNotificationAndLeaderStateChangedReleaseReady() throws Exception { + new JavaTestKit(getSystem()) { + { + TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); + + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( + memberId, RaftState.Candidate.name(), RaftState.Leader.name())); + + verify(ready, never()).countDown(); + shardManager.underlyingActor().onReceiveCommand(new LeaderStateChanged(memberId, memberId)); + + verify(ready, times(1)).countDown(); + + }}; } + @Test + public void testRoleChangeNotificationToFollowerWithLeaderStateChangedReleaseReady() throws Exception { + new JavaTestKit(getSystem()) { + { + TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); + + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( + memberId, null, RaftState.Follower.name())); + + verify(ready, never()).countDown(); + + shardManager.underlyingActor().onReceiveCommand(new LeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix)); - private void sleep(long period){ - Uninterruptibles.sleepUninterruptibly(period, TimeUnit.MILLISECONDS); + verify(ready, times(1)).countDown(); + + }}; } - public static class MyJournal extends AsyncWriteJournal { - private static Map journal = Maps.newTreeMap(); + @Test + public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception { + new JavaTestKit(getSystem()) { + { + TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); + + shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( + "unknown", RaftState.Candidate.name(), RaftState.Leader.name())); + + verify(ready, never()).countDown(); - public static void addToJournal(Long sequenceNr, Object value){ - journal.put(sequenceNr, value); + }}; + } + + + @Test + public void testByDefaultSyncStatusIsFalse() throws Exception{ + final Props persistentProps = ShardManager.props( + new MockClusterWrapper(), + new MockConfiguration(), + DatastoreContext.newBuilder().persistent(true).build(), ready); + final TestActorRef shardManager = + TestActorRef.create(getSystem(), persistentProps); + + ShardManager shardManagerActor = shardManager.underlyingActor(); + + assertEquals(false, shardManagerActor.getMBean().getSyncStatus()); + } + + @Test + public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{ + final Props persistentProps = ShardManager.props( + new MockClusterWrapper(), + new MockConfiguration(), + DatastoreContext.newBuilder().persistent(true).build(), ready); + final TestActorRef shardManager = + TestActorRef.create(getSystem(), persistentProps); + + ShardManager shardManagerActor = shardManager.underlyingActor(); + shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown", + RaftState.Follower.name(), RaftState.Leader.name())); + + assertEquals(true, shardManagerActor.getMBean().getSyncStatus()); + } + + @Test + public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{ + final Props persistentProps = ShardManager.props( + new MockClusterWrapper(), + new MockConfiguration(), + DatastoreContext.newBuilder().persistent(true).build(), ready); + final TestActorRef shardManager = + TestActorRef.create(getSystem(), persistentProps); + + ShardManager shardManagerActor = shardManager.underlyingActor(); + shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown", + RaftState.Follower.name(), RaftState.Candidate.name())); + + assertEquals(false, shardManagerActor.getMBean().getSyncStatus()); + + // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate + shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown")); + + assertEquals(false, shardManagerActor.getMBean().getSyncStatus()); + } + + @Test + public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{ + final Props persistentProps = ShardManager.props( + new MockClusterWrapper(), + new MockConfiguration(), + DatastoreContext.newBuilder().persistent(true).build(), ready); + final TestActorRef shardManager = + TestActorRef.create(getSystem(), persistentProps); + + ShardManager shardManagerActor = shardManager.underlyingActor(); + shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown", + RaftState.Candidate.name(), RaftState.Follower.name())); + + // Initially will be false + assertEquals(false, shardManagerActor.getMBean().getSyncStatus()); + + // Send status true will make sync status true + shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown")); + + assertEquals(true, shardManagerActor.getMBean().getSyncStatus()); + + // Send status false will make sync status false + shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-default-unknown")); + + assertEquals(false, shardManagerActor.getMBean().getSyncStatus()); + + } + + @Test + public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{ + final Props persistentProps = ShardManager.props( + new MockClusterWrapper(), + new MockConfiguration() { + @Override + public List getMemberShardNames(String memberName) { + return Arrays.asList("default", "astronauts"); + } + }, + DatastoreContext.newBuilder().persistent(true).build(), ready); + final TestActorRef shardManager = + TestActorRef.create(getSystem(), persistentProps); + + ShardManager shardManagerActor = shardManager.underlyingActor(); + + // Initially will be false + assertEquals(false, shardManagerActor.getMBean().getSyncStatus()); + + // Make default shard leader + shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown", + RaftState.Follower.name(), RaftState.Leader.name())); + + // default = Leader, astronauts is unknown so sync status remains false + assertEquals(false, shardManagerActor.getMBean().getSyncStatus()); + + // Make astronauts shard leader as well + shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown", + RaftState.Follower.name(), RaftState.Leader.name())); + + // Now sync status should be true + assertEquals(true, shardManagerActor.getMBean().getSyncStatus()); + + // Make astronauts a Follower + shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown", + RaftState.Leader.name(), RaftState.Follower.name())); + + // Sync status is not true + assertEquals(false, shardManagerActor.getMBean().getSyncStatus()); + + // Make the astronauts follower sync status true + shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-astronauts-unknown")); + + // Sync status is now true + assertEquals(true, shardManagerActor.getMBean().getSyncStatus()); + + } + + private static class TestShardManager extends ShardManager { + private final CountDownLatch recoveryComplete = new CountDownLatch(1); + + TestShardManager(String shardMrgIDSuffix) { + super(new MockClusterWrapper(), new MockConfiguration(), + DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready); } - public static Map get(){ - return journal; + @Override + public void handleRecover(Object message) throws Exception { + try { + super.handleRecover(message); + } finally { + if(message instanceof RecoveryCompleted) { + recoveryComplete.countDown(); + } + } } - public static void clear(){ - journal.clear(); + void waitForRecoveryComplete() { + assertEquals("Recovery complete", true, + Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS)); } + } - @Override public Future doAsyncReplayMessages(final String persistenceId, long fromSequenceNr, long toSequenceNr, long max, - final Procedure replayCallback) { - if(journal.size() == 0){ - return Futures.successful(null); - } - return Futures.future(new Callable() { - @Override - public Void call() throws Exception { - for (Map.Entry entry : journal.entrySet()) { - PersistentRepr persistentMessage = - new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId, - false, null, null); - replayCallback.apply(persistentMessage); - } - return null; - } - }, context().dispatcher()); + @SuppressWarnings("serial") + static class TestShardManagerCreator implements Creator { + String shardMrgIDSuffix; + + TestShardManagerCreator(String shardMrgIDSuffix) { + this.shardMrgIDSuffix = shardMrgIDSuffix; } - @Override public Future doAsyncReadHighestSequenceNr(String s, long l) { - return Futures.successful(-1L); + @Override + public TestShardManager create() throws Exception { + return new TestShardManager(shardMrgIDSuffix); } - @Override public Future doAsyncWriteMessages( - final Iterable persistentReprs) { - return Futures.future(new Callable() { - @Override - public Void call() throws Exception { - for (PersistentRepr repr : persistentReprs){ - if(repr.payload() instanceof ShardManager.SchemaContextModules) { - journal.put(repr.sequenceNr(), repr.payload()); - } + } + + private static class DelegatingShardManagerCreator implements Creator { + private static final long serialVersionUID = 1L; + private final Creator delegate; + + public DelegatingShardManagerCreator(Creator delegate) { + this.delegate = delegate; + } + + @Override + public ShardManager create() throws Exception { + return delegate.create(); + } + } + + private static class ForwardingShardManager extends ShardManager { + private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1); + private CountDownLatch memberUpReceived = new CountDownLatch(1); + private CountDownLatch memberRemovedReceived = new CountDownLatch(1); + private final ActorRef shardActor; + private final String name; + + protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration, + DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name, + ActorRef shardActor) { + super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch); + this.shardActor = shardActor; + this.name = name; + } + + @Override + public void handleCommand(Object message) throws Exception { + try{ + super.handleCommand(message); + } finally { + if(message instanceof FindPrimary) { + findPrimaryMessageReceived.countDown(); + } else if(message instanceof ClusterEvent.MemberUp) { + String role = ((ClusterEvent.MemberUp)message).member().roles().head(); + if(!getCluster().getCurrentMemberName().equals(role)) { + memberUpReceived.countDown(); + } + } else if(message instanceof ClusterEvent.MemberRemoved) { + String role = ((ClusterEvent.MemberRemoved)message).member().roles().head(); + if(!getCluster().getCurrentMemberName().equals(role)) { + memberRemovedReceived.countDown(); } - return null; } - }, context().dispatcher()); + } + } + + @Override + public String persistenceId() { + return name; + } + + @Override + protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) { + return shardActor; } - @Override public Future doAsyncWriteConfirmations( - Iterable persistentConfirmations) { - return Futures.successful(null); + void waitForMemberUp() { + assertEquals("MemberUp received", true, + Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS)); + memberUpReceived = new CountDownLatch(1); } - @Override public Future doAsyncDeleteMessages(Iterable persistentIds, - boolean b) { - clear(); - return Futures.successful(null); + void waitForMemberRemoved() { + assertEquals("MemberRemoved received", true, + Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS)); + memberRemovedReceived = new CountDownLatch(1); } - @Override public Future doAsyncDeleteMessagesTo(String s, long l, boolean b) { - clear(); - return Futures.successful(null); + void verifyFindPrimary() { + assertEquals("FindPrimary received", true, + Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS)); + findPrimaryMessageReceived = new CountDownLatch(1); } } }