X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManagerTest.java;h=74746ebb0a86666fe341c2997dc4e14fe7af8397;hb=refs%2Fchanges%2F37%2F30037%2F4;hp=b6ea14ff95e75653d6c6d2ae4ce559261a75674f;hpb=60dbe8adeda3af724255231af9400341b17953b9;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index b6ea14ff95..74746ebb0a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -20,38 +20,43 @@ import static org.mockito.Mockito.verify; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.AddressFromURIString; -import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.Status; import akka.actor.Status.Failure; +import akka.actor.Status.Success; +import akka.actor.Terminated; import akka.cluster.Cluster; import akka.cluster.ClusterEvent; import akka.dispatch.Dispatchers; import akka.japi.Creator; import akka.pattern.Patterns; import akka.persistence.RecoveryCompleted; +import akka.serialization.Serialization; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import akka.util.Timeout; +import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.ConfigFactory; import java.net.URI; import java.util.AbstractMap; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -62,6 +67,7 @@ import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl; import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider; import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration; +import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; @@ -70,7 +76,6 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIde import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; import org.opendaylight.controller.cluster.datastore.messages.CreateShard; -import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; @@ -93,12 +98,15 @@ import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.TestActorFactory; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; import org.opendaylight.controller.cluster.raft.messages.AddServer; import org.opendaylight.controller.cluster.raft.messages.AddServerReply; import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; +import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; +import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; @@ -113,7 +121,7 @@ public class ShardManagerTest extends AbstractActorTest { private static int ID_COUNTER = 1; private final String shardMrgIDSuffix = "config" + ID_COUNTER++; - private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix; + private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); @Mock private static CountDownLatch ready; @@ -126,12 +134,9 @@ public class ShardManagerTest extends AbstractActorTest { dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS) .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6); - private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) { - String name = new ShardIdentifier(shardName, memberName,"config").toString(); - return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name); - } + private final Collection actorSystems = new ArrayList<>(); - private final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache(); + private final TestActorFactory actorFactory = new TestActorFactory(getSystem()); @Before public void setUp() { @@ -152,6 +157,27 @@ public class ShardManagerTest extends AbstractActorTest { public void tearDown() { InMemoryJournal.clear(); InMemorySnapshotStore.clear(); + + for(ActorSystem system: actorSystems) { + JavaTestKit.shutdownActorSystem(system, null, Boolean.TRUE); + } + + actorFactory.close(); + } + + private ActorSystem newActorSystem(String config) { + ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(config)); + actorSystems.add(system); + return system; + } + + private ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) { + String name = new ShardIdentifier(shardName, memberName,"config").toString(); + if(system == getSystem()) { + return actorFactory.createTestActor(Props.create(MessageCollectorActor.class), name); + } + + return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name); } private Props newShardMgrProps() { @@ -165,28 +191,24 @@ public class ShardManagerTest extends AbstractActorTest { return mockFactory; } - private Props newShardMgrProps(Configuration config) { - return TestShardManager.builder(datastoreContextBuilder).configuration(config).props(); + private TestShardManager.Builder newTestShardMgrBuilder() { + return TestShardManager.builder(datastoreContextBuilder); } - private Props newPropsShardMgrWithMockShardActor() { - return newPropsShardMgrWithMockShardActor("shardManager", mockShardActor, new MockClusterWrapper(), - new MockConfiguration()); + private TestShardManager.Builder newTestShardMgrBuilder(Configuration config) { + return TestShardManager.builder(datastoreContextBuilder).configuration(config); } - private Props newPropsShardMgrWithMockShardActor(final String name, final ActorRef shardActor, - final ClusterWrapper clusterWrapper, final Configuration config) { - Creator creator = new Creator() { - private static final long serialVersionUID = 1L; - @Override - public ShardManager create() throws Exception { - return new ForwardingShardManager(ShardManager.builder().cluster(clusterWrapper).configuration(config). - datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build())). - waitTillReadyCountdownLatch(ready).primaryShardInfoCache(primaryShardInfoCache), name, shardActor); - } - }; + private Props newShardMgrProps(Configuration config) { + return newTestShardMgrBuilder(config).props(); + } + + private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor() { + return TestShardManager.builder(datastoreContextBuilder).shardActor(mockShardActor); + } - return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()); + private Props newPropsShardMgrWithMockShardActor() { + return newTestShardMgrBuilderWithMockShardActor().props(); } private TestShardManager newTestShardManager() { @@ -194,7 +216,7 @@ public class ShardManagerTest extends AbstractActorTest { } private TestShardManager newTestShardManager(Props props) { - TestActorRef shardManagerActor = TestActorRef.create(getSystem(), props); + TestActorRef shardManagerActor = actorFactory.createTestActor(props); TestShardManager shardManager = shardManagerActor.underlyingActor(); shardManager.waitForRecoveryComplete(); return shardManager; @@ -223,44 +245,51 @@ public class ShardManagerTest extends AbstractActorTest { } }; - final TestActorRef defaultShardActor = TestActorRef.create(getSystem(), - Props.create(MessageCollectorActor.class), "default"); - final TestActorRef topologyShardActor = TestActorRef.create(getSystem(), - Props.create(MessageCollectorActor.class), "topology"); + final TestActorRef defaultShardActor = actorFactory.createTestActor( + Props.create(MessageCollectorActor.class), actorFactory.generateActorId("default")); + final TestActorRef topologyShardActor = actorFactory.createTestActor( + Props.create(MessageCollectorActor.class), actorFactory.generateActorId("topology")); final Map> shardInfoMap = Collections.synchronizedMap( new HashMap>()); shardInfoMap.put("default", new AbstractMap.SimpleEntry(defaultShardActor, null)); shardInfoMap.put("topology", new AbstractMap.SimpleEntry(topologyShardActor, null)); + final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache(); final CountDownLatch newShardActorLatch = new CountDownLatch(2); + class LocalShardManager extends ShardManager { + public LocalShardManager(AbstractBuilder builder) { + super(builder); + } + + @Override + protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) { + Entry entry = shardInfoMap.get(info.getShardName()); + ActorRef ref = null; + if(entry != null) { + ref = entry.getKey(); + entry.setValue(info.getDatastoreContext()); + } + + newShardActorLatch.countDown(); + return ref; + } + } + final Creator creator = new Creator() { private static final long serialVersionUID = 1L; @Override public ShardManager create() throws Exception { - return new ShardManager(ShardManager.builder().cluster(new MockClusterWrapper()).configuration(mockConfig). - datastoreContextFactory(mockFactory).waitTillReadyCountdownLatch(ready). - primaryShardInfoCache(primaryShardInfoCache)) { - @Override - protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) { - Entry entry = shardInfoMap.get(info.getShardName()); - ActorRef ref = null; - if(entry != null) { - ref = entry.getKey(); - entry.setValue(info.getDatastoreContext()); - } - - newShardActorLatch.countDown(); - return ref; - } - }; + return new LocalShardManager(new GenericBuilder(LocalShardManager.class). + datastoreContextFactory(mockFactory).primaryShardInfoCache(primaryShardInfoCache). + configuration(mockConfig)); } }; JavaTestKit kit = new JavaTestKit(getSystem()); - final ActorRef shardManager = getSystem().actorOf(Props.create(new DelegatingShardManagerCreator(creator)). - withDispatcher(Dispatchers.DefaultDispatcherId())); + final ActorRef shardManager = actorFactory.createActor(Props.create( + new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId())); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), kit.getRef()); @@ -285,15 +314,12 @@ public class ShardManagerTest extends AbstractActorTest { newContext = MessageCollectorActor.expectFirstMatching(topologyShardActor, DatastoreContext.class); assertEquals("getShardElectionTimeoutFactor", 77, newContext.getShardElectionTimeoutFactor()); - - defaultShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); - topologyShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); } @Test public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); @@ -308,7 +334,7 @@ public class ShardManagerTest extends AbstractActorTest { new JavaTestKit(getSystem()) {{ String memberId = "member-1-shard-default-" + shardMrgIDSuffix; - final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); @@ -326,14 +352,14 @@ public class ShardManagerTest extends AbstractActorTest { LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), primaryFound.getPrimaryPath().contains("member-1-shard-default")); - assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() ); + assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree()); }}; } @Test public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); @@ -353,7 +379,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); @@ -380,7 +406,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); @@ -391,7 +417,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); @@ -405,7 +431,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); @@ -427,14 +453,14 @@ public class ShardManagerTest extends AbstractActorTest { LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), primaryFound.getPrimaryPath().contains("member-1-shard-default")); - assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() ); + assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree()); }}; } @Test public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); @@ -470,7 +496,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); @@ -487,7 +513,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); @@ -503,7 +529,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); @@ -519,7 +545,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); @@ -536,18 +562,16 @@ public class ShardManagerTest extends AbstractActorTest { // Create an ActorSystem ShardManager actor for member-1. - final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); + final ActorSystem system1 = newActorSystem("Member1"); Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); - ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1"); - - final TestActorRef shardManager1 = TestActorRef.create(system1, - newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1), - new MockConfiguration()), shardManagerID); + final TestActorRef shardManager1 = TestActorRef.create(system1, + newTestShardMgrBuilderWithMockShardActor().cluster( + new ClusterWrapperImpl(system1)).props(), shardManagerID); // Create an ActorSystem ShardManager actor for member-2. - final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2")); + final ActorSystem system2 = newActorSystem("Member2"); Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); @@ -557,9 +581,9 @@ public class ShardManagerTest extends AbstractActorTest { put("default", Arrays.asList("member-1", "member-2")). put("astronauts", Arrays.asList("member-2")).build()); - final TestActorRef shardManager2 = TestActorRef.create(system2, - newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2), - mockConfig2), shardManagerID); + final TestActorRef shardManager2 = TestActorRef.create(system2, + newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster( + new ClusterWrapperImpl(system2)).props(), shardManagerID); new JavaTestKit(system1) {{ @@ -594,9 +618,6 @@ public class ShardManagerTest extends AbstractActorTest { expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class); }}; - - JavaTestKit.shutdownActorSystem(system1); - JavaTestKit.shutdownActorSystem(system2); } @Test @@ -605,18 +626,18 @@ public class ShardManagerTest extends AbstractActorTest { // Create an ActorSystem ShardManager actor for member-1. - final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); + final ActorSystem system1 = newActorSystem("Member1"); Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1"); - final TestActorRef shardManager1 = TestActorRef.create(system1, - newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1), - new MockConfiguration()), shardManagerID); + final TestActorRef shardManager1 = TestActorRef.create(system1, + newTestShardMgrBuilder().shardActor(mockShardActor1).cluster( + new ClusterWrapperImpl(system1)).props(), shardManagerID); // Create an ActorSystem ShardManager actor for member-2. - final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2")); + final ActorSystem system2 = newActorSystem("Member2"); Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); @@ -625,9 +646,9 @@ public class ShardManagerTest extends AbstractActorTest { MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.>builder(). put("default", Arrays.asList("member-1", "member-2")).build()); - final TestActorRef shardManager2 = TestActorRef.create(system2, - newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2), - mockConfig2), shardManagerID); + final TestActorRef shardManager2 = TestActorRef.create(system2, + newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster( + new ClusterWrapperImpl(system2)).props(), shardManagerID); new JavaTestKit(system1) {{ @@ -694,9 +715,6 @@ public class ShardManagerTest extends AbstractActorTest { MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class); }}; - - JavaTestKit.shutdownActorSystem(system1); - JavaTestKit.shutdownActorSystem(system2); } @Test @@ -705,18 +723,20 @@ public class ShardManagerTest extends AbstractActorTest { // Create an ActorSystem ShardManager actor for member-1. - final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); + final ActorSystem system1 = newActorSystem("Member1"); Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1"); - final TestActorRef shardManager1 = TestActorRef.create(system1, - newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1), - new MockConfiguration()), shardManagerID); + final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache(); + final TestActorRef shardManager1 = TestActorRef.create(system1, + newTestShardMgrBuilder().shardActor(mockShardActor1).cluster( + new ClusterWrapperImpl(system1)).primaryShardInfoCache(primaryShardInfoCache).props(), + shardManagerID); // Create an ActorSystem ShardManager actor for member-2. - final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2")); + final ActorSystem system2 = newActorSystem("Member2"); Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); @@ -725,12 +745,11 @@ public class ShardManagerTest extends AbstractActorTest { MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.>builder(). put("default", Arrays.asList("member-1", "member-2")).build()); - final TestActorRef shardManager2 = TestActorRef.create(system2, - newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2), - mockConfig2), shardManagerID); + final TestActorRef shardManager2 = TestActorRef.create(system2, + newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster( + new ClusterWrapperImpl(system2)).props(), shardManagerID); new JavaTestKit(system1) {{ - shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager1.tell(new ActorInitialized(), mockShardActor1); @@ -781,16 +800,13 @@ public class ShardManagerTest extends AbstractActorTest { assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config")); }}; - - JavaTestKit.shutdownActorSystem(system1); - JavaTestKit.shutdownActorSystem(system2); } @Test public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); @@ -805,7 +821,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testOnReceiveFindLocalShardForExistentShard() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); @@ -822,7 +838,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef()); @@ -833,7 +849,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); @@ -851,24 +867,22 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testOnRecoveryJournalIsCleaned() { - InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules( + String persistenceID = "shard-manager-" + shardMrgIDSuffix; + InMemoryJournal.addEntry(persistenceID, 1L, new ShardManager.SchemaContextModules( ImmutableSet.of("foo"))); - InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules( + InMemoryJournal.addEntry(persistenceID, 2L, new ShardManager.SchemaContextModules( ImmutableSet.of("bar"))); - InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID); + InMemoryJournal.addDeleteMessagesCompleteLatch(persistenceID); - new JavaTestKit(getSystem()) {{ - TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); + TestShardManager shardManager = newTestShardManager(); - shardManager.underlyingActor().waitForRecoveryComplete(); - InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID); + InMemoryJournal.waitForDeleteMessagesComplete(persistenceID); - // Journal entries up to the last one should've been deleted - Map journal = InMemoryJournal.get(shardMgrID); - synchronized (journal) { - assertEquals("Journal size", 0, journal.size()); - } - }}; + // Journal entries up to the last one should've been deleted + Map journal = InMemoryJournal.get(persistenceID); + synchronized (journal) { + assertEquals("Journal size", 0, journal.size()); + } } @Test @@ -1041,7 +1055,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testOnReceiveSwitchShardBehavior() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); @@ -1056,11 +1070,11 @@ public class ShardManagerTest extends AbstractActorTest { } @Test - public void testOnReceiveCreateShard() { + public void testOnCreateShard() { new JavaTestKit(getSystem()) {{ datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true); - ActorRef shardManager = getSystem().actorOf(newShardMgrProps( + ActorRef shardManager = actorFactory.createActor(newShardMgrProps( new ConfigurationImpl(new EmptyModuleShardConfigProvider()))); SchemaContext schemaContext = TestModel.createTestContext(); @@ -1074,7 +1088,7 @@ public class ShardManagerTest extends AbstractActorTest { "foo", null, Arrays.asList("member-1", "member-5", "member-6")); shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), getRef()); - expectMsgClass(duration("5 seconds"), CreateShardReply.class); + expectMsgClass(duration("5 seconds"), Success.class); shardManager.tell(new FindLocalShard("foo", true), getRef()); @@ -1090,18 +1104,45 @@ public class ShardManagerTest extends AbstractActorTest { shardBuilder.getId()); assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext()); - // Send CreateShard with same name - should fail. + // Send CreateShard with same name - should return Success with a message. + + shardManager.tell(new CreateShard(config, shardBuilder, null), getRef()); + + Success success = expectMsgClass(duration("5 seconds"), Success.class); + assertNotNull("Success status is null", success.status()); + }}; + } + + @Test + public void testOnCreateShardWithLocalMemberNotInShardConfig() { + new JavaTestKit(getSystem()) {{ + datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true); + + ActorRef shardManager = actorFactory.createActor(newShardMgrProps( + new ConfigurationImpl(new EmptyModuleShardConfigProvider()))); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender()); + + Shard.Builder shardBuilder = Shard.builder(); + ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module", + "foo", null, Arrays.asList("member-5", "member-6")); shardManager.tell(new CreateShard(config, shardBuilder, null), getRef()); + expectMsgClass(duration("5 seconds"), Success.class); + + shardManager.tell(new FindLocalShard("foo", true), getRef()); + expectMsgClass(duration("5 seconds"), LocalShardFound.class); - expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); + assertEquals("peerMembers size", 0, shardBuilder.getPeerAddresses().size()); + assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(), + shardBuilder.getDatastoreContext().getShardRaftConfig().getCustomRaftPolicyImplementationClass()); }}; } @Test - public void testOnReceiveCreateShardWithNoInitialSchemaContext() { + public void testOnCreateShardWithNoInitialSchemaContext() { new JavaTestKit(getSystem()) {{ - ActorRef shardManager = getSystem().actorOf(newShardMgrProps( + ActorRef shardManager = actorFactory.createActor(newShardMgrProps( new ConfigurationImpl(new EmptyModuleShardConfigProvider()))); Shard.Builder shardBuilder = Shard.builder(); @@ -1110,7 +1151,7 @@ public class ShardManagerTest extends AbstractActorTest { "foo", null, Arrays.asList("member-1")); shardManager.tell(new CreateShard(config, shardBuilder, null), getRef()); - expectMsgClass(duration("5 seconds"), CreateShardReply.class); + expectMsgClass(duration("5 seconds"), Success.class); SchemaContext schemaContext = TestModel.createTestContext(); shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender()); @@ -1132,7 +1173,7 @@ public class ShardManagerTest extends AbstractActorTest { put("shard1", Arrays.asList("member-1")). put("shard2", Arrays.asList("member-1")).build()); - ActorRef shardManager = getSystem().actorOf(newShardMgrProps(mockConfig).withDispatcher( + ActorRef shardManager = actorFactory.createActor(newShardMgrProps(mockConfig).withDispatcher( Dispatchers.DefaultDispatcherId())); shardManager.tell(GetSnapshot.INSTANCE, kit.getRef()); @@ -1153,21 +1194,23 @@ public class ShardManagerTest extends AbstractActorTest { DatastoreSnapshot datastoreSnapshot = kit.expectMsgClass(DatastoreSnapshot.class); assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType()); - List shardSnapshots = datastoreSnapshot.getShardSnapshots(); - Set actualShardNames = new HashSet<>(); - for(ShardSnapshot s: shardSnapshots) { - actualShardNames.add(s.getName()); - } + assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot()); - assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), actualShardNames); + Function shardNameTransformer = new Function() { + @Override + public String apply(ShardSnapshot s) { + return s.getName(); + } + }; - shardManager.tell(PoisonPill.getInstance(), ActorRef.noSender()); + assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet( + Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer))); } @Test - public void testAddShardReplicaForNonExistentShard() throws Exception { + public void testAddShardReplicaForNonExistentShardConfig() throws Exception { new JavaTestKit(getSystem()) {{ - ActorRef shardManager = getSystem().actorOf(newShardMgrProps( + ActorRef shardManager = actorFactory.createActor(newShardMgrProps( new ConfigurationImpl(new EmptyModuleShardConfigProvider()))); shardManager.tell(new AddShardReplica("model-inventory"), getRef()); @@ -1178,17 +1221,6 @@ public class ShardManagerTest extends AbstractActorTest { }}; } - @Test - public void testAddShardReplicaForAlreadyCreatedShard() throws Exception { - new JavaTestKit(getSystem()) {{ - ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); - shardManager.tell(new AddShardReplica("default"), getRef()); - Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class); - assertEquals("Failure obtained", true, - (resp.cause() instanceof IllegalArgumentException)); - }}; - } - @Test public void testAddShardReplica() throws Exception { MockConfiguration mockConfig = @@ -1199,24 +1231,23 @@ public class ShardManagerTest extends AbstractActorTest { String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); // Create an ActorSystem ShardManager actor for member-1. - final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); + final ActorSystem system1 = newActorSystem("Member1"); Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1"); - final TestActorRef newReplicaShardManager = TestActorRef.create(system1, - newPropsShardMgrWithMockShardActor("shardManager1", mockDefaultShardActor, - new ClusterWrapperImpl(system1), mockConfig), shardManagerID); + final TestActorRef newReplicaShardManager = TestActorRef.create(system1, + newTestShardMgrBuilder(mockConfig).shardActor(mockDefaultShardActor).cluster( + new ClusterWrapperImpl(system1)).props(), shardManagerID); // Create an ActorSystem ShardManager actor for member-2. - final ActorSystem system2 = ActorSystem.create("cluster-test", - ConfigFactory.load().getConfig("Member2")); + final ActorSystem system2 = newActorSystem("Member2"); Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); String name = new ShardIdentifier("astronauts", "member-2", "config").toString(); final TestActorRef mockShardLeaderActor = - TestActorRef.create(system2, Props.create(MockRespondActor.class), name); - final TestActorRef leaderShardManager = TestActorRef.create(system2, - newPropsShardMgrWithMockShardActor("shardManager2", mockShardLeaderActor, - new ClusterWrapperImpl(system2), mockConfig), shardManagerID); + TestActorRef.create(system2, Props.create(MockRespondActor.class), name); + final TestActorRef leaderShardManager = TestActorRef.create(system2, + newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor).cluster( + new ClusterWrapperImpl(system2)).props(), shardManagerID); new JavaTestKit(system1) {{ @@ -1247,47 +1278,172 @@ public class ShardManagerTest extends AbstractActorTest { .verifySnapshotPersisted(Sets.newHashSet("default", "astronauts")); expectMsgClass(duration("5 seconds"), Status.Success.class); }}; + } - JavaTestKit.shutdownActorSystem(system1); - JavaTestKit.shutdownActorSystem(system2); + @Test + public void testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader() throws Exception { + new JavaTestKit(getSystem()) {{ + TestActorRef shardManager = actorFactory.createTestActor( + newPropsShardMgrWithMockShardActor(), shardMgrID); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); + + String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix; + AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null); + ActorRef leaderShardActor = shardManager.underlyingActor().getContext().actorOf( + Props.create(MockRespondActor.class, addServerReply), leaderId); + + MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString()); + + String newReplicaId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.tell(new RoleChangeNotification(newReplicaId, + RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor); + shardManager.tell(new ShardLeaderStateChanged(newReplicaId, leaderId, Optional.absent(), + DataStoreVersions.CURRENT_VERSION), mockShardActor); + + shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef()); + + MessageCollectorActor.expectFirstMatching(leaderShardActor, AddServer.class); + + Failure resp = expectMsgClass(duration("5 seconds"), Failure.class); + assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass()); + + shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef()); + expectMsgClass(duration("5 seconds"), LocalShardFound.class); + + // Send message again to verify previous in progress state is cleared + + shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef()); + resp = expectMsgClass(duration("5 seconds"), Failure.class); + assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass()); + + // Send message again with an AddServer timeout to verify the pre-existing shard actor isn't terminated. + + shardManager.tell(newDatastoreContextFactory(datastoreContextBuilder. + shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build()), getRef()); + leaderShardActor.tell(MockRespondActor.CLEAR_RESPONSE, ActorRef.noSender()); + shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef()); + expectMsgClass(duration("5 seconds"), Failure.class); + + shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef()); + expectMsgClass(duration("5 seconds"), LocalShardFound.class); + }}; } @Test - public void testAddShardReplicaWithFindPrimaryTimeout() throws Exception { - MockConfiguration mockConfig = - new MockConfiguration(ImmutableMap.>builder(). - put("default", Arrays.asList("member-1", "member-2")). - put("astronauts", Arrays.asList("member-2")).build()); + public void testAddShardReplicaWithPreExistingLocalReplicaLeader() throws Exception { + new JavaTestKit(getSystem()) {{ + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); - String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mock(DataTree.class)), + DataStoreVersions.CURRENT_VERSION), getRef()); + shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(), + RaftState.Leader.name())), mockShardActor); - // Create an ActorSystem ShardManager actor for member-1. - final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); - Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); - ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1"); - final TestActorRef newReplicaShardManager = TestActorRef.create(system1, - newPropsShardMgrWithMockShardActor("shardManager1", mockDefaultShardActor, - new ClusterWrapperImpl(system1), mockConfig), shardManagerID); + shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef()); + Failure resp = expectMsgClass(duration("5 seconds"), Failure.class); + assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass()); - new JavaTestKit(system1) {{ + shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef()); + expectMsgClass(duration("5 seconds"), LocalShardFound.class); + }}; + } + + @Test + public void testAddShardReplicaWithAddServerReplyFailure() throws Exception { + new JavaTestKit(getSystem()) {{ + JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem()); + + MockConfiguration mockConfig = + new MockConfiguration(ImmutableMap.>builder(). + put("astronauts", Arrays.asList("member-2")).build()); + + ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1"); + final TestActorRef shardManager = actorFactory.createTestActor( + newTestShardMgrBuilder(mockConfig).shardActor(mockNewReplicaShardActor).props(), shardMgrID); + shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef())); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + + JavaTestKit terminateWatcher = new JavaTestKit(getSystem()); + terminateWatcher.watch(mockNewReplicaShardActor); + + shardManager.tell(new AddShardReplica("astronauts"), getRef()); + + AddServer addServerMsg = mockShardLeaderKit.expectMsgClass(AddServer.class); + assertEquals("AddServer serverId", "member-1-shard-astronauts-" + shardMrgIDSuffix, + addServerMsg.getNewServerId()); + mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.TIMEOUT, null)); + + Failure failure = expectMsgClass(duration("5 seconds"), Failure.class); + assertEquals("Failure cause", TimeoutException.class, failure.cause().getClass()); + + shardManager.tell(new FindLocalShard("astronauts", false), getRef()); + expectMsgClass(duration("5 seconds"), LocalShardNotFound.class); + + terminateWatcher.expectTerminated(mockNewReplicaShardActor); + + shardManager.tell(new AddShardReplica("astronauts"), getRef()); + mockShardLeaderKit.expectMsgClass(AddServer.class); + mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.NO_LEADER, null)); + failure = expectMsgClass(duration("5 seconds"), Failure.class); + assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass()); + }}; + } + + @Test + public void testAddShardReplicaWithAlreadyInProgress() throws Exception { + new JavaTestKit(getSystem()) {{ + JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem()); + JavaTestKit secondRequestKit = new JavaTestKit(getSystem()); + + MockConfiguration mockConfig = + new MockConfiguration(ImmutableMap.>builder(). + put("astronauts", Arrays.asList("member-2")).build()); + + final TestActorRef shardManager = actorFactory.createTestActor( + newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor).props(), shardMgrID); + shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef())); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + + shardManager.tell(new AddShardReplica("astronauts"), getRef()); + + mockShardLeaderKit.expectMsgClass(AddServer.class); + + shardManager.tell(new AddShardReplica("astronauts"), secondRequestKit.getRef()); + + secondRequestKit.expectMsgClass(duration("5 seconds"), Failure.class); + }}; + } + + @Test + public void testAddShardReplicaWithFindPrimaryTimeout() throws Exception { + new JavaTestKit(getSystem()) {{ + MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.>builder(). + put("astronauts", Arrays.asList("member-2")).build()); + + final ActorRef newReplicaShardManager = actorFactory.createActor(newTestShardMgrBuilder(mockConfig). + shardActor(mockShardActor).props(), shardMgrID); newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2", getRef().path().toString()); - newReplicaShardManager.underlyingActor().waitForMemberUp(); newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef()); Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class); assertEquals("Failure obtained", true, (resp.cause() instanceof RuntimeException)); }}; - - JavaTestKit.shutdownActorSystem(system1); } @Test public void testRemoveShardReplicaForNonExistentShard() throws Exception { new JavaTestKit(getSystem()) {{ - ActorRef shardManager = getSystem().actorOf(newShardMgrProps( + ActorRef shardManager = actorFactory.createActor(newShardMgrProps( new ConfigurationImpl(new EmptyModuleShardConfigProvider()))); shardManager.tell(new RemoveShardReplica("model-inventory"), getRef()); @@ -1298,6 +1454,72 @@ public class ShardManagerTest extends AbstractActorTest { } + @Test + public void testServerRemovedShardActorNotRunning() throws Exception { + new JavaTestKit(getSystem()) {{ + MockConfiguration mockConfig = + new MockConfiguration(ImmutableMap.>builder(). + put("default", Arrays.asList("member-1", "member-2")). + put("astronauts", Arrays.asList("member-2")). + put("people", Arrays.asList("member-1", "member-2")).build()); + + TestActorRef shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig)); + + shardManager.underlyingActor().waitForRecoveryComplete(); + + shardManager.tell(new FindLocalShard("people", false), getRef()); + expectMsgClass(duration("5 seconds"), NotInitializedException.class); + + shardManager.tell(new FindLocalShard("default", false), getRef()); + expectMsgClass(duration("5 seconds"), NotInitializedException.class); + + // Removed the default shard replica from member-1 + ShardIdentifier.Builder builder = new ShardIdentifier.Builder(); + final ShardIdentifier shardId = builder.shardName("default").memberName("member-1").type("config1").build(); + shardManager.tell(new ServerRemoved(shardId.toString()), getRef()); + + shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people")); + }}; + } + + @Test + public void testServerRemovedShardActorRunning() throws Exception { + new JavaTestKit(getSystem()) {{ + MockConfiguration mockConfig = + new MockConfiguration(ImmutableMap.>builder(). + put("default", Arrays.asList("member-1", "member-2")). + put("astronauts", Arrays.asList("member-2")). + put("people", Arrays.asList("member-1", "member-2")).build()); + + TestActorRef shard = actorFactory.createTestActor(MessageCollectorActor.props()); + + TestActorRef shardManager = actorFactory.createTestActor( + newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props()); + + watch(shard); + + shardManager.underlyingActor().waitForRecoveryComplete(); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + + shardManager.tell(new FindLocalShard("people", false), getRef()); + expectMsgClass(duration("5 seconds"), NotInitializedException.class); + + shardManager.tell(new FindLocalShard("default", false), getRef()); + expectMsgClass(duration("5 seconds"), NotInitializedException.class); + + // Removed the default shard replica from member-1 + ShardIdentifier.Builder builder = new ShardIdentifier.Builder(); + final ShardIdentifier shardId = builder.shardName("default").memberName("member-1").type("config1").build(); + shardManager.tell(new ServerRemoved(shardId.toString()), getRef()); + + shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people")); + + expectMsgClass(duration("5 seconds"), Terminated.class); + }}; + } + + @Test public void testShardPersistenceWithRestoredData() throws Exception { new JavaTestKit(getSystem()) {{ @@ -1308,10 +1530,10 @@ public class ShardManagerTest extends AbstractActorTest { put("people", Arrays.asList("member-1", "member-2")).build()); String[] restoredShards = {"default", "astronauts"}; ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards)); - InMemorySnapshotStore.addSnapshot(shardMgrID, snapshot); + InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot); //create shardManager to come up with restored data - TestActorRef newRestoredShardManager = TestActorRef.create(getSystem(), + TestActorRef newRestoredShardManager = actorFactory.createTestActor( newShardMgrProps(mockConfig)); newRestoredShardManager.underlyingActor().waitForRecoveryComplete(); @@ -1334,9 +1556,21 @@ public class ShardManagerTest extends AbstractActorTest { private static class TestShardManager extends ShardManager { private final CountDownLatch recoveryComplete = new CountDownLatch(1); + private final CountDownLatch snapshotPersist = new CountDownLatch(1); + private ShardManagerSnapshot snapshot; + private final Map shardActors; + private final ActorRef shardActor; + private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1); + private CountDownLatch memberUpReceived = new CountDownLatch(1); + private CountDownLatch memberRemovedReceived = new CountDownLatch(1); + private CountDownLatch memberUnreachableReceived = new CountDownLatch(1); + private CountDownLatch memberReachableReceived = new CountDownLatch(1); + private volatile MessageInterceptor messageInterceptor; private TestShardManager(Builder builder) { super(builder); + shardActor = builder.shardActor; + shardActors = builder.shardActors; } @Override @@ -1350,65 +1584,14 @@ public class ShardManagerTest extends AbstractActorTest { } } - void waitForRecoveryComplete() { - assertEquals("Recovery complete", true, - Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS)); - } - - public static Builder builder(DatastoreContext.Builder datastoreContextBuilder) { - return new Builder(datastoreContextBuilder); - } - - private static class Builder extends ShardManager.Builder { - Builder(DatastoreContext.Builder datastoreContextBuilder) { - cluster(new MockClusterWrapper()).configuration(new MockConfiguration()); - datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build())); - waitTillReadyCountdownLatch(ready).primaryShardInfoCache(new PrimaryShardInfoFutureCache()); - } - - @Override - public Props props() { - verify(); - return Props.create(TestShardManager.class, this); - } - } - } - - private static class DelegatingShardManagerCreator implements Creator { - private static final long serialVersionUID = 1L; - private final Creator delegate; - - public DelegatingShardManagerCreator(Creator delegate) { - this.delegate = delegate; - } - - @Override - public ShardManager create() throws Exception { - return delegate.create(); - } - } - - private static class ForwardingShardManager extends ShardManager { - private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1); - private CountDownLatch memberUpReceived = new CountDownLatch(1); - private CountDownLatch memberRemovedReceived = new CountDownLatch(1); - private CountDownLatch memberUnreachableReceived = new CountDownLatch(1); - private CountDownLatch memberReachableReceived = new CountDownLatch(1); - private final ActorRef shardActor; - private final String name; - private final CountDownLatch snapshotPersist = new CountDownLatch(1); - private ShardManagerSnapshot snapshot; - - public ForwardingShardManager(Builder builder, String name, ActorRef shardActor) { - super(builder); - this.shardActor = shardActor; - this.name = name; - } - @Override public void handleCommand(Object message) throws Exception { try{ - super.handleCommand(message); + if(messageInterceptor != null && messageInterceptor.canIntercept(message)) { + getSender().tell(messageInterceptor.apply(message), getSelf()); + } else { + super.handleCommand(message); + } } finally { if(message instanceof FindPrimary) { findPrimaryMessageReceived.countDown(); @@ -1436,14 +1619,13 @@ public class ShardManagerTest extends AbstractActorTest { } } - @Override - public String persistenceId() { - return name; + void setMessageInterceptor(MessageInterceptor messageInterceptor) { + this.messageInterceptor = messageInterceptor; } - @Override - protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) { - return shardActor; + void waitForRecoveryComplete() { + assertEquals("Recovery complete", true, + Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS)); } void waitForMemberUp() { @@ -1477,6 +1659,30 @@ public class ShardManagerTest extends AbstractActorTest { findPrimaryMessageReceived = new CountDownLatch(1); } + public static Builder builder(DatastoreContext.Builder datastoreContextBuilder) { + return new Builder(datastoreContextBuilder); + } + + private static class Builder extends AbstractGenericBuilder { + private ActorRef shardActor; + private final Map shardActors = new HashMap<>(); + + Builder(DatastoreContext.Builder datastoreContextBuilder) { + super(TestShardManager.class); + datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build())); + } + + Builder shardActor(ActorRef shardActor) { + this.shardActor = shardActor; + return this; + } + + Builder addShardActor(String shardName, ActorRef actorRef){ + shardActors.put(shardName, actorRef); + return this; + } + } + @Override public void saveSnapshot(Object obj) { snapshot = (ShardManagerSnapshot) obj; @@ -1485,15 +1691,93 @@ public class ShardManagerTest extends AbstractActorTest { void verifySnapshotPersisted(Set shardList) { assertEquals("saveSnapshot invoked", true, - Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS)); + Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS)); assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList())); } + + @Override + protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) { + if(shardActors.get(info.getShardName()) != null){ + return shardActors.get(info.getShardName()); + } + + if(shardActor != null) { + return shardActor; + } + + return super.newShardActor(schemaContext, info); + } + } + + private static abstract class AbstractGenericBuilder, C extends ShardManager> + extends ShardManager.AbstractBuilder { + private final Class shardManagerClass; + + AbstractGenericBuilder(Class shardManagerClass) { + this.shardManagerClass = shardManagerClass; + cluster(new MockClusterWrapper()).configuration(new MockConfiguration()). + waitTillReadyCountdownLatch(ready).primaryShardInfoCache(new PrimaryShardInfoFutureCache()); + } + + @Override + public Props props() { + verify(); + return Props.create(shardManagerClass, this); + } + } + + private static class GenericBuilder extends AbstractGenericBuilder, C> { + GenericBuilder(Class shardManagerClass) { + super(shardManagerClass); + } + } + + private static class DelegatingShardManagerCreator implements Creator { + private static final long serialVersionUID = 1L; + private final Creator delegate; + + public DelegatingShardManagerCreator(Creator delegate) { + this.delegate = delegate; + } + + @Override + public ShardManager create() throws Exception { + return delegate.create(); + } + } + + interface MessageInterceptor extends Function { + boolean canIntercept(Object message); + } + + private MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) { + return new MessageInterceptor(){ + @Override + public Object apply(Object message) { + return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1); + } + + @Override + public boolean canIntercept(Object message) { + return message instanceof FindPrimary; + } + }; } private static class MockRespondActor extends MessageCollectorActor { + static final String CLEAR_RESPONSE = "clear-response"; private volatile Object responseMsg; + @SuppressWarnings("unused") + public MockRespondActor() { + } + + @SuppressWarnings("unused") + public MockRespondActor(Object responseMsg) { + this.responseMsg = responseMsg; + } + public void updateResponse(Object response) { responseMsg = response; } @@ -1504,8 +1788,9 @@ public class ShardManagerTest extends AbstractActorTest { if (message instanceof AddServer) { if (responseMsg != null) { getSender().tell(responseMsg, getSelf()); - responseMsg = null; } + } if(message.equals(CLEAR_RESPONSE)) { + responseMsg = null; } } }