X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManagerTest.java;h=e95993de24bbe80b4c3f7bc7a6bc3fdf74816b8f;hp=578bf1d1725bc34053939f5be4a541c5df0156d2;hb=13ba9adfa24716a7b27bc4cfef198b3fa5c577b0;hpb=1fbf74e08377dd5e003274be1a43df42d1678845 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index 578bf1d172..e95993de24 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -1,31 +1,34 @@ package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.AddressFromURIString; import akka.actor.Props; +import akka.cluster.Cluster; +import akka.cluster.ClusterEvent; +import akka.dispatch.Dispatchers; import akka.japi.Creator; import akka.pattern.Patterns; import akka.persistence.RecoveryCompleted; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import akka.util.Timeout; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; -import java.net.URI; +import com.typesafe.config.ConfigFactory; import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.junit.After; @@ -33,21 +36,25 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; +import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; +import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; -import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; +import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound; +import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; +import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; @@ -55,7 +62,7 @@ import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; -import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; import scala.concurrent.Future; @@ -75,6 +82,13 @@ public class ShardManagerTest extends AbstractActorTest { private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder(). dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS); + private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) { + String name = new ShardIdentifier(shardName, memberName,"config").toString(); + return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name); + } + + private final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache(); + @Before public void setUp() { MockitoAnnotations.initMocks(this); @@ -94,27 +108,28 @@ public class ShardManagerTest extends AbstractActorTest { InMemoryJournal.clear(); } - private Props newShardMgrProps() { + private Props newShardMgrProps(boolean persistent) { return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(), - datastoreContextBuilder.build(), ready); + datastoreContextBuilder.persistent(persistent).build(), ready, primaryShardInfoCache); } private Props newPropsShardMgrWithMockShardActor() { + return newPropsShardMgrWithMockShardActor("shardManager", mockShardActor, new MockClusterWrapper(), + new MockConfiguration()); + } + + private Props newPropsShardMgrWithMockShardActor(final String name, final ActorRef shardActor, + final ClusterWrapper clusterWrapper, final Configuration config) { Creator creator = new Creator() { private static final long serialVersionUID = 1L; @Override public ShardManager create() throws Exception { - return new ShardManager(new MockClusterWrapper(), new MockConfiguration(), - datastoreContextBuilder.build(), ready) { - @Override - protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) { - return mockShardActor; - } - }; + return new ForwardingShardManager(clusterWrapper, config, datastoreContextBuilder.build(), + ready, name, shardActor, primaryShardInfoCache); } }; - return Props.create(new DelegatingShardManagerCreator(creator)); + return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()); } @Test @@ -124,9 +139,9 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); - shardManager.tell(new FindPrimary("non-existent", false).toSerializable(), getRef()); + shardManager.tell(new FindPrimary("non-existent", false), getRef()); - expectMsgEquals(duration("5 seconds"), new PrimaryNotFound("non-existent").toSerializable()); + expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class); }}; } @@ -140,17 +155,40 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); - shardManager.tell(new LeaderStateChanged(memberId, memberId), getRef()); + DataTree mockDataTree = mock(DataTree.class); + shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree), + DataStoreVersions.CURRENT_VERSION), getRef()); MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class); shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name())), mockShardActor); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); - PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); + LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), primaryFound.getPrimaryPath().contains("member-1-shard-default")); + assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() ); + }}; + } + + @Test + public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); + + String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; + String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.tell(new RoleChangeNotification(memberId1, + RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor); + shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION), mockShardActor); + + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); + + expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); }}; } @@ -168,11 +206,12 @@ public class ShardManagerTest extends AbstractActorTest { String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; shardManager.tell(new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor); - shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor); + shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, Optional.absent(), + DataStoreVersions.CURRENT_VERSION), mockShardActor); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); - PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); + RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), primaryFound.getPrimaryPath().contains("member-2-shard-default")); }}; @@ -183,9 +222,9 @@ public class ShardManagerTest extends AbstractActorTest { new JavaTestKit(getSystem()) {{ final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); - expectMsgClass(duration("5 seconds"), ActorNotInitialized.class); + expectMsgClass(duration("5 seconds"), NotInitializedException.class); }}; } @@ -197,7 +236,7 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); }}; @@ -215,17 +254,20 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); - shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor); + DataTree mockDataTree = mock(DataTree.class); + shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree), + DataStoreVersions.CURRENT_VERSION), mockShardActor); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); - PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); + LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), primaryFound.getPrimaryPath().contains("member-1-shard-default")); + assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() ); }}; } @@ -238,7 +280,7 @@ public class ShardManagerTest extends AbstractActorTest { // We're passing waitUntilInitialized = true to FindPrimary so the response should be // delayed until we send ActorInitialized and RoleChangeNotification. - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef()); expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS)); @@ -252,11 +294,14 @@ public class ShardManagerTest extends AbstractActorTest { expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS)); - shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor); + DataTree mockDataTree = mock(DataTree.class); + shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree), + DataStoreVersions.CURRENT_VERSION), mockShardActor); - PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); + LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), primaryFound.getPrimaryPath().contains("member-1-shard-default")); + assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() ); expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); }}; @@ -269,9 +314,9 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef()); - expectMsgClass(duration("2 seconds"), ActorNotInitialized.class); + expectMsgClass(duration("2 seconds"), NotInitializedException.class); shardManager.tell(new ActorInitialized(), mockShardActor); @@ -289,7 +334,7 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, null, RaftState.Candidate.name()), mockShardActor); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef()); expectMsgClass(duration("2 seconds"), NoShardLeaderException.class); }}; @@ -303,302 +348,339 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); - shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef()); + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef()); expectMsgClass(duration("2 seconds"), NoShardLeaderException.class); }}; } @Test - public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception { - new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + public void testOnReceiveFindPrimaryForRemoteShard() throws Exception { + String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); - shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + // Create an ActorSystem ShardManager actor for member-1. - shardManager.tell(new FindLocalShard("non-existent", false), getRef()); + final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); + Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); - LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class); + ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1"); - assertEquals("getShardName", "non-existent", notFound.getShardName()); - }}; - } + final TestActorRef shardManager1 = TestActorRef.create(system1, + newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1), + new MockConfiguration()), shardManagerID); - @Test - public void testOnReceiveFindLocalShardForExistentShard() throws Exception { - new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + // Create an ActorSystem ShardManager actor for member-2. - shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); - shardManager.tell(new ActorInitialized(), mockShardActor); + final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2")); - shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef()); + Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); - LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class); + final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2"); - assertTrue("Found path contains " + found.getPath().path().toString(), - found.getPath().path().toString().contains("member-1-shard-default-config")); - }}; - } + MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.>builder(). + put("default", Arrays.asList("member-1", "member-2")). + put("astronauts", Arrays.asList("member-2")).build()); - @Test - public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception { - new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + final TestActorRef shardManager2 = TestActorRef.create(system2, + newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2), + mockConfig2), shardManagerID); - shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef()); + new JavaTestKit(system1) {{ - expectMsgClass(duration("5 seconds"), ActorNotInitialized.class); - }}; - } + shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); - @Test - public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception { - new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + shardManager2.tell(new ActorInitialized(), mockShardActor2); - shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix; + shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, + Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor2); + shardManager2.tell(new RoleChangeNotification(memberId2, + RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2); - // We're passing waitUntilInitialized = true to FindLocalShard so the response should be - // delayed until we send ActorInitialized. - Future future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true), - new Timeout(5, TimeUnit.SECONDS)); + shardManager1.underlyingActor().waitForMemberUp(); - shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager1.tell(new FindPrimary("astronauts", false), getRef()); - Object resp = Await.result(future, duration("5 seconds")); - assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound); - }}; - } + RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class); + String path = found.getPrimaryPath(); + assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config")); - @Test - public void testOnReceiveMemberUp() throws Exception { - new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + shardManager2.underlyingActor().verifyFindPrimary(); - MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString()); + Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); - shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef()); + shardManager1.underlyingActor().waitForMemberRemoved(); - PrimaryFound found = PrimaryFound.fromSerializable(expectMsgClass(duration("5 seconds"), - PrimaryFound.SERIALIZABLE_CLASS)); - String path = found.getPrimaryPath(); - assertTrue("Found path contains " + path, path.contains("member-2-shard-astronauts-config")); + shardManager1.tell(new FindPrimary("astronauts", false), getRef()); + + expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class); }}; + + JavaTestKit.shutdownActorSystem(system1); + JavaTestKit.shutdownActorSystem(system2); } @Test - public void testOnReceiveMemberDown() throws Exception { + public void testShardAvailabilityOnChangeOfMemberReachability() throws Exception { + String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); - new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + // Create an ActorSystem ShardManager actor for member-1. - MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString()); + final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); + Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); - shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef()); + final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1"); - expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS); + final TestActorRef shardManager1 = TestActorRef.create(system1, + newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1), + new MockConfiguration()), shardManagerID); - MockClusterWrapper.sendMemberRemoved(shardManager, "member-2", getRef().path().toString()); + // Create an ActorSystem ShardManager actor for member-2. - shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef()); + final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2")); - expectMsgClass(duration("5 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS); - }}; - } + Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); - @Test - public void testOnRecoveryJournalIsCleaned() { - InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules( - ImmutableSet.of("foo"))); - InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules( - ImmutableSet.of("bar"))); - InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID); + final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2"); - new JavaTestKit(getSystem()) {{ - TestActorRef shardManager = TestActorRef.create(getSystem(), - Props.create(new TestShardManagerCreator(shardMrgIDSuffix))); + MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.>builder(). + put("default", Arrays.asList("member-1", "member-2")).build()); - shardManager.underlyingActor().waitForRecoveryComplete(); - InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID); + final TestActorRef shardManager2 = TestActorRef.create(system2, + newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2), + mockConfig2), shardManagerID); - // Journal entries up to the last one should've been deleted - Map journal = InMemoryJournal.get(shardMgrID); - synchronized (journal) { - assertEquals("Journal size", 1, journal.size()); - assertEquals("Journal entry seq #", Long.valueOf(2), journal.keySet().iterator().next()); - } - }}; - } + new JavaTestKit(system1) {{ - @Test - public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception { - final ImmutableSet persistedModules = ImmutableSet.of("foo", "bar"); - InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules( - persistedModules)); - new JavaTestKit(getSystem()) {{ - TestActorRef shardManager = TestActorRef.create(getSystem(), - Props.create(new TestShardManagerCreator(shardMrgIDSuffix))); + shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager1.tell(new ActorInitialized(), mockShardActor1); + shardManager2.tell(new ActorInitialized(), mockShardActor2); - shardManager.underlyingActor().waitForRecoveryComplete(); + String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; + String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, + Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1); + shardManager1.tell(new RoleChangeNotification(memberId1, + RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1); + shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)), + DataStoreVersions.CURRENT_VERSION), + mockShardActor2); + shardManager2.tell(new RoleChangeNotification(memberId2, + RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2); + shardManager1.underlyingActor().waitForMemberUp(); + + shardManager1.tell(new FindPrimary("default", true), getRef()); + + RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class); + String path = found.getPrimaryPath(); + assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config")); + + shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper. + createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558")); + + shardManager1.underlyingActor().waitForUnreachableMember(); + + shardManager1.tell(new FindPrimary("default", true), getRef()); + + expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); + + shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper. + createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558")); + + shardManager1.underlyingActor().waitForReachableMember(); + + shardManager1.tell(new FindPrimary("default", true), getRef()); - Collection knownModules = shardManager.underlyingActor().getKnownModules(); + RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class); + String path1 = found1.getPrimaryPath(); + assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config")); - assertEquals("getKnownModules", persistedModules, Sets.newHashSet(knownModules)); }}; + + JavaTestKit.shutdownActorSystem(system1); + JavaTestKit.shutdownActorSystem(system2); } @Test - public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules() - throws Exception { - new JavaTestKit(getSystem()) {{ - final TestActorRef shardManager = - TestActorRef.create(getSystem(), newShardMgrProps()); + public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() throws Exception { + String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); - assertEquals("getKnownModules size", 0, shardManager.underlyingActor().getKnownModules().size()); + // Create an ActorSystem ShardManager actor for member-1. - ModuleIdentifier foo = mock(ModuleIdentifier.class); - when(foo.getNamespace()).thenReturn(new URI("foo")); + final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); + Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); - Set moduleIdentifierSet = new HashSet<>(); - moduleIdentifierSet.add(foo); + final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1"); - SchemaContext schemaContext = mock(SchemaContext.class); - when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet); + final TestActorRef shardManager1 = TestActorRef.create(system1, + newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1), + new MockConfiguration()), shardManagerID); - shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext)); + // Create an ActorSystem ShardManager actor for member-2. - assertEquals("getKnownModules", Sets.newHashSet("foo"), - Sets.newHashSet(shardManager.underlyingActor().getKnownModules())); + final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2")); - ModuleIdentifier bar = mock(ModuleIdentifier.class); - when(bar.getNamespace()).thenReturn(new URI("bar")); + Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); - moduleIdentifierSet.add(bar); + final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2"); - shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext)); + MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.>builder(). + put("default", Arrays.asList("member-1", "member-2")).build()); - assertEquals("getKnownModules", Sets.newHashSet("foo", "bar"), - Sets.newHashSet(shardManager.underlyingActor().getKnownModules())); - }}; - } + final TestActorRef shardManager2 = TestActorRef.create(system2, + newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2), + mockConfig2), shardManagerID); - @Test - public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules() - throws Exception { - new JavaTestKit(getSystem()) {{ - final TestActorRef shardManager = - TestActorRef.create(getSystem(), newShardMgrProps()); + new JavaTestKit(system1) {{ - SchemaContext schemaContext = mock(SchemaContext.class); - Set moduleIdentifierSet = new HashSet<>(); + shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager1.tell(new ActorInitialized(), mockShardActor1); + shardManager2.tell(new ActorInitialized(), mockShardActor2); - ModuleIdentifier foo = mock(ModuleIdentifier.class); - when(foo.getNamespace()).thenReturn(new URI("foo")); + String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; + String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, + Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1); + shardManager1.tell(new RoleChangeNotification(memberId1, + RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1); + shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)), + DataStoreVersions.CURRENT_VERSION), + mockShardActor2); + shardManager2.tell(new RoleChangeNotification(memberId2, + RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2); + shardManager1.underlyingActor().waitForMemberUp(); + + shardManager1.tell(new FindPrimary("default", true), getRef()); + + RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class); + String path = found.getPrimaryPath(); + assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config")); + + primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(system1.actorSelection( + mockShardActor1.path()), Optional.absent())); - moduleIdentifierSet.add(foo); + shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper. + createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558")); - when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet); + shardManager1.underlyingActor().waitForUnreachableMember(); - shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext)); + shardManager1.tell(new FindPrimary("default", true), getRef()); - assertEquals("getKnownModules", Sets.newHashSet("foo"), - Sets.newHashSet(shardManager.underlyingActor().getKnownModules())); + expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); - //Create a completely different SchemaContext with only the bar module in it - //schemaContext = mock(SchemaContext.class); - moduleIdentifierSet.clear(); - ModuleIdentifier bar = mock(ModuleIdentifier.class); - when(bar.getNamespace()).thenReturn(new URI("bar")); + assertNull("Expected primaryShardInfoCache entry removed", primaryShardInfoCache.getIfPresent("default")); - moduleIdentifierSet.add(bar); + shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, Optional.of(mock(DataTree.class)), + DataStoreVersions.CURRENT_VERSION), mockShardActor1); + shardManager1.tell(new RoleChangeNotification(memberId1, + RaftState.Follower.name(), RaftState.Leader.name()), mockShardActor1); - shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext)); + shardManager1.tell(new FindPrimary("default", true), getRef()); - assertEquals("getKnownModules", Sets.newHashSet("foo"), - Sets.newHashSet(shardManager.underlyingActor().getKnownModules())); + LocalPrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class); + String path1 = found1.getPrimaryPath(); + assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config")); }}; + + JavaTestKit.shutdownActorSystem(system1); + JavaTestKit.shutdownActorSystem(system2); } + @Test - public void testRecoveryApplicable(){ - new JavaTestKit(getSystem()) { - { - final Props persistentProps = ShardManager.props( - new MockClusterWrapper(), - new MockConfiguration(), - DatastoreContext.newBuilder().persistent(true).build(), ready); - final TestActorRef persistentShardManager = - TestActorRef.create(getSystem(), persistentProps); + public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); - DataPersistenceProvider dataPersistenceProvider1 = persistentShardManager.underlyingActor().getDataPersistenceProvider(); + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); - assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable()); + shardManager.tell(new FindLocalShard("non-existent", false), getRef()); - final Props nonPersistentProps = ShardManager.props( - new MockClusterWrapper(), - new MockConfiguration(), - DatastoreContext.newBuilder().persistent(false).build(), ready); - final TestActorRef nonPersistentShardManager = - TestActorRef.create(getSystem(), nonPersistentProps); + LocalShardNotFound notFound = expectMsgClass(duration("5 seconds"), LocalShardNotFound.class); - DataPersistenceProvider dataPersistenceProvider2 = nonPersistentShardManager.underlyingActor().getDataPersistenceProvider(); + assertEquals("getShardName", "non-existent", notFound.getShardName()); + }}; + } - assertFalse("Recovery Not Applicable", dataPersistenceProvider2.isRecoveryApplicable()); + @Test + public void testOnReceiveFindLocalShardForExistentShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); - }}; + shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef()); + + LocalShardFound found = expectMsgClass(duration("5 seconds"), LocalShardFound.class); + assertTrue("Found path contains " + found.getPath().path().toString(), + found.getPath().path().toString().contains("member-1-shard-default-config")); + }}; } @Test - public void testOnUpdateSchemaContextUpdateKnownModulesCallsDataPersistenceProvider() - throws Exception { - final CountDownLatch persistLatch = new CountDownLatch(1); - final Creator creator = new Creator() { - private static final long serialVersionUID = 1L; - @Override - public ShardManager create() throws Exception { - return new ShardManager(new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(), ready) { - @Override - protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) { - DataPersistenceProviderMonitor dataPersistenceProviderMonitor - = new DataPersistenceProviderMonitor(); - dataPersistenceProviderMonitor.setPersistLatch(persistLatch); - return dataPersistenceProviderMonitor; - } - }; - } - }; + public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + + shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef()); + + expectMsgClass(duration("5 seconds"), NotInitializedException.class); + }}; + } + @Test + public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception { new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); - final TestActorRef shardManager = - TestActorRef.create(getSystem(), Props.create(new DelegatingShardManagerCreator(creator))); + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); - ModuleIdentifier foo = mock(ModuleIdentifier.class); - when(foo.getNamespace()).thenReturn(new URI("foo")); + // We're passing waitUntilInitialized = true to FindLocalShard so the response should be + // delayed until we send ActorInitialized. + Future future = Patterns.ask(shardManager, new FindLocalShard(Shard.DEFAULT_NAME, true), + new Timeout(5, TimeUnit.SECONDS)); - Set moduleIdentifierSet = new HashSet<>(); - moduleIdentifierSet.add(foo); + shardManager.tell(new ActorInitialized(), mockShardActor); - SchemaContext schemaContext = mock(SchemaContext.class); - when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet); + Object resp = Await.result(future, duration("5 seconds")); + assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound); + }}; + } - shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext)); + @Test + public void testOnRecoveryJournalIsCleaned() { + InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules( + ImmutableSet.of("foo"))); + InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules( + ImmutableSet.of("bar"))); + InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID); - assertEquals("Persisted", true, - Uninterruptibles.awaitUninterruptibly(persistLatch, 5, TimeUnit.SECONDS)); + new JavaTestKit(getSystem()) {{ + TestActorRef shardManager = TestActorRef.create(getSystem(), + Props.create(new TestShardManagerCreator(shardMrgIDSuffix))); + + shardManager.underlyingActor().waitForRecoveryComplete(); + InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID); + // Journal entries up to the last one should've been deleted + Map journal = InMemoryJournal.get(shardMgrID); + synchronized (journal) { + assertEquals("Journal size", 0, journal.size()); + } }}; } @Test - public void testRoleChangeNotificationAndLeaderStateChangedReleaseReady() throws Exception { + public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception { new JavaTestKit(getSystem()) { { - TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); + TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true)); String memberId = "member-1-shard-default-" + shardMrgIDSuffix; shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( @@ -606,7 +688,8 @@ public class ShardManagerTest extends AbstractActorTest { verify(ready, never()).countDown(); - shardManager.underlyingActor().onReceiveCommand(new LeaderStateChanged(memberId, memberId)); + shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId, + Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION)); verify(ready, times(1)).countDown(); @@ -614,10 +697,10 @@ public class ShardManagerTest extends AbstractActorTest { } @Test - public void testRoleChangeNotificationToFollowerWithLeaderStateChangedReleaseReady() throws Exception { + public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception { new JavaTestKit(getSystem()) { { - TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); + TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true)); String memberId = "member-1-shard-default-" + shardMrgIDSuffix; shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( @@ -625,19 +708,45 @@ public class ShardManagerTest extends AbstractActorTest { verify(ready, never()).countDown(); - shardManager.underlyingActor().onReceiveCommand(new LeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix)); + shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString())); + + shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, + "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)), + DataStoreVersions.CURRENT_VERSION)); verify(ready, times(1)).countDown(); }}; } + @Test + public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception { + new JavaTestKit(getSystem()) { + { + TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true)); + + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( + memberId, null, RaftState.Follower.name())); + + verify(ready, never()).countDown(); + + shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, + "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)), + DataStoreVersions.CURRENT_VERSION)); + + shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString())); + + verify(ready, times(1)).countDown(); + + }}; + } @Test public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception { new JavaTestKit(getSystem()) { { - TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); + TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true)); shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( "unknown", RaftState.Candidate.name(), RaftState.Leader.name())); @@ -650,10 +759,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testByDefaultSyncStatusIsFalse() throws Exception{ - final Props persistentProps = ShardManager.props( - new MockClusterWrapper(), - new MockConfiguration(), - DatastoreContext.newBuilder().persistent(true).build(), ready); + final Props persistentProps = newShardMgrProps(true); final TestActorRef shardManager = TestActorRef.create(getSystem(), persistentProps); @@ -667,7 +773,7 @@ public class ShardManagerTest extends AbstractActorTest { final Props persistentProps = ShardManager.props( new MockClusterWrapper(), new MockConfiguration(), - DatastoreContext.newBuilder().persistent(true).build(), ready); + DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache); final TestActorRef shardManager = TestActorRef.create(getSystem(), persistentProps); @@ -680,10 +786,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{ - final Props persistentProps = ShardManager.props( - new MockClusterWrapper(), - new MockConfiguration(), - DatastoreContext.newBuilder().persistent(true).build(), ready); + final Props persistentProps = newShardMgrProps(true); final TestActorRef shardManager = TestActorRef.create(getSystem(), persistentProps); @@ -704,7 +807,7 @@ public class ShardManagerTest extends AbstractActorTest { final Props persistentProps = ShardManager.props( new MockClusterWrapper(), new MockConfiguration(), - DatastoreContext.newBuilder().persistent(true).build(), ready); + DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache); final TestActorRef shardManager = TestActorRef.create(getSystem(), persistentProps); @@ -737,7 +840,7 @@ public class ShardManagerTest extends AbstractActorTest { return Arrays.asList("default", "astronauts"); } }, - DatastoreContext.newBuilder().persistent(true).build(), ready); + DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache); final TestActorRef shardManager = TestActorRef.create(getSystem(), persistentProps); @@ -780,7 +883,8 @@ public class ShardManagerTest extends AbstractActorTest { TestShardManager(String shardMrgIDSuffix) { super(new MockClusterWrapper(), new MockConfiguration(), - DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready); + DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready, + new PrimaryShardInfoFutureCache()); } @Override @@ -828,4 +932,94 @@ public class ShardManagerTest extends AbstractActorTest { return delegate.create(); } } + + private static class ForwardingShardManager extends ShardManager { + private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1); + private CountDownLatch memberUpReceived = new CountDownLatch(1); + private CountDownLatch memberRemovedReceived = new CountDownLatch(1); + private CountDownLatch memberUnreachableReceived = new CountDownLatch(1); + private CountDownLatch memberReachableReceived = new CountDownLatch(1); + private final ActorRef shardActor; + private final String name; + + protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration, + DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name, + ActorRef shardActor, PrimaryShardInfoFutureCache primaryShardInfoCache) { + super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch, primaryShardInfoCache); + this.shardActor = shardActor; + this.name = name; + } + + @Override + public void handleCommand(Object message) throws Exception { + try{ + super.handleCommand(message); + } finally { + if(message instanceof FindPrimary) { + findPrimaryMessageReceived.countDown(); + } else if(message instanceof ClusterEvent.MemberUp) { + String role = ((ClusterEvent.MemberUp)message).member().roles().head(); + if(!getCluster().getCurrentMemberName().equals(role)) { + memberUpReceived.countDown(); + } + } else if(message instanceof ClusterEvent.MemberRemoved) { + String role = ((ClusterEvent.MemberRemoved)message).member().roles().head(); + if(!getCluster().getCurrentMemberName().equals(role)) { + memberRemovedReceived.countDown(); + } + } else if(message instanceof ClusterEvent.UnreachableMember) { + String role = ((ClusterEvent.UnreachableMember)message).member().roles().head(); + if(!getCluster().getCurrentMemberName().equals(role)) { + memberUnreachableReceived.countDown(); + } + } else if(message instanceof ClusterEvent.ReachableMember) { + String role = ((ClusterEvent.ReachableMember)message).member().roles().head(); + if(!getCluster().getCurrentMemberName().equals(role)) { + memberReachableReceived.countDown(); + } + } + } + } + + @Override + public String persistenceId() { + return name; + } + + @Override + protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) { + return shardActor; + } + + void waitForMemberUp() { + assertEquals("MemberUp received", true, + Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS)); + memberUpReceived = new CountDownLatch(1); + } + + void waitForMemberRemoved() { + assertEquals("MemberRemoved received", true, + Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS)); + memberRemovedReceived = new CountDownLatch(1); + } + + void waitForUnreachableMember() { + assertEquals("UnreachableMember received", true, + Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS + )); + memberUnreachableReceived = new CountDownLatch(1); + } + + void waitForReachableMember() { + assertEquals("ReachableMember received", true, + Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS)); + memberReachableReceived = new CountDownLatch(1); + } + + void verifyFindPrimary() { + assertEquals("FindPrimary received", true, + Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS)); + findPrimaryMessageReceived = new CountDownLatch(1); + } + } }