X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManagerTest.java;h=f9b67567d1ae4214b06981176e5e443ab1a01f00;hp=3257e8f910e0538ec99bbd0f608daf2b5056e1bd;hb=e65e1755bd770cf03d3ea15edda9b9cc7a79f3c0;hpb=4680d02510a884b3a893345f423cedcc8c5af0f4 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index 3257e8f910..f9b67567d1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -20,7 +20,6 @@ import static org.mockito.Mockito.verify; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.AddressFromURIString; -import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.Status; import akka.actor.Status.Failure; @@ -38,18 +37,20 @@ import akka.testkit.TestActorRef; import akka.util.Timeout; import com.google.common.base.Function; import com.google.common.base.Optional; +import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.ConfigFactory; import java.net.URI; import java.util.AbstractMap; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -57,6 +58,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.commons.lang3.SerializationUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -91,6 +93,7 @@ import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; +import org.opendaylight.controller.cluster.datastore.utils.ForwardingActor; import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; @@ -98,25 +101,32 @@ import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.TestActorFactory; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; import org.opendaylight.controller.cluster.raft.messages.AddServer; import org.opendaylight.controller.cluster.raft.messages.AddServerReply; +import org.opendaylight.controller.cluster.raft.messages.RemoveServer; +import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply; import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; -import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; +import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; public class ShardManagerTest extends AbstractActorTest { + private static final Logger LOG = LoggerFactory.getLogger(ShardManagerTest.class); + private static int ID_COUNTER = 1; private final String shardMrgIDSuffix = "config" + ID_COUNTER++; @@ -130,15 +140,12 @@ public class ShardManagerTest extends AbstractActorTest { private static String mockShardName; private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder(). - dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS) + dataStoreName(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS) .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6); - private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) { - String name = new ShardIdentifier(shardName, memberName,"config").toString(); - return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name); - } + private final Collection actorSystems = new ArrayList<>(); - private final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache(); + private final TestActorFactory actorFactory = new TestActorFactory(getSystem()); @Before public void setUp() { @@ -159,6 +166,27 @@ public class ShardManagerTest extends AbstractActorTest { public void tearDown() { InMemoryJournal.clear(); InMemorySnapshotStore.clear(); + + for(ActorSystem system: actorSystems) { + JavaTestKit.shutdownActorSystem(system, null, Boolean.TRUE); + } + + actorFactory.close(); + } + + private ActorSystem newActorSystem(String config) { + ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(config)); + actorSystems.add(system); + return system; + } + + private ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) { + String name = new ShardIdentifier(shardName, memberName,"config").toString(); + if(system == getSystem()) { + return actorFactory.createTestActor(Props.create(MessageCollectorActor.class), name); + } + + return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name); } private Props newShardMgrProps() { @@ -172,43 +200,77 @@ public class ShardManagerTest extends AbstractActorTest { return mockFactory; } + private TestShardManager.Builder newTestShardMgrBuilder() { + return TestShardManager.builder(datastoreContextBuilder); + } + + private TestShardManager.Builder newTestShardMgrBuilder(Configuration config) { + return TestShardManager.builder(datastoreContextBuilder).configuration(config); + } + private Props newShardMgrProps(Configuration config) { - return TestShardManager.builder(datastoreContextBuilder).configuration(config).props(); + return newTestShardMgrBuilder(config).props(); } - private Props newPropsShardMgrWithMockShardActor() { - return newPropsShardMgrWithMockShardActor("shardManager", mockShardActor, new MockClusterWrapper(), - new MockConfiguration()); + private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor() { + return newTestShardMgrBuilderWithMockShardActor(mockShardActor); } - private Props newPropsShardMgrWithMockShardActor(final String name, final ActorRef shardActor, - final ClusterWrapper clusterWrapper, final Configuration config) { - Creator creator = new Creator() { - private static final long serialVersionUID = 1L; - @Override - public ShardManager create() throws Exception { - return new ForwardingShardManager(ShardManager.builder().cluster(clusterWrapper).configuration(config). - datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build())). - waitTillReadyCountdownLatch(ready).primaryShardInfoCache(primaryShardInfoCache), name, shardActor); - } - }; + private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(ActorRef shardActor) { + return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor); + } - return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()); + + private Props newPropsShardMgrWithMockShardActor() { + return newTestShardMgrBuilderWithMockShardActor().props(); } + private Props newPropsShardMgrWithMockShardActor(ActorRef shardActor) { + return newTestShardMgrBuilderWithMockShardActor(shardActor).props(); + } + + private TestShardManager newTestShardManager() { return newTestShardManager(newShardMgrProps()); } private TestShardManager newTestShardManager(Props props) { - TestActorRef shardManagerActor = TestActorRef.create(getSystem(), props); + TestActorRef shardManagerActor = actorFactory.createTestActor(props); TestShardManager shardManager = shardManagerActor.underlyingActor(); shardManager.waitForRecoveryComplete(); return shardManager; } + private void waitForShardInitialized(ActorRef shardManager, String shardName, JavaTestKit kit) { + AssertionError last = null; + Stopwatch sw = Stopwatch.createStarted(); + while(sw.elapsed(TimeUnit.SECONDS) <= 5) { + try { + shardManager.tell(new FindLocalShard(shardName, true), kit.getRef()); + kit.expectMsgClass(LocalShardFound.class); + return; + } catch(AssertionError e) { + last = e; + } + + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + } + + throw last; + } + + private T expectMsgClassOrFailure(Class msgClass, JavaTestKit kit, String msg) { + Object reply = kit.expectMsgAnyClassOf(JavaTestKit.duration("5 sec"), msgClass, Failure.class); + if(reply instanceof Failure) { + throw new AssertionError(msg + " failed", ((Failure)reply).cause()); + } + + return (T)reply; + } + @Test public void testPerShardDatastoreContext() throws Exception { + LOG.info("testPerShardDatastoreContext starting"); final DatastoreContextFactory mockFactory = newDatastoreContextFactory( datastoreContextBuilder.shardElectionTimeoutFactor(5).build()); @@ -230,44 +292,51 @@ public class ShardManagerTest extends AbstractActorTest { } }; - final TestActorRef defaultShardActor = TestActorRef.create(getSystem(), - Props.create(MessageCollectorActor.class), "default"); - final TestActorRef topologyShardActor = TestActorRef.create(getSystem(), - Props.create(MessageCollectorActor.class), "topology"); + final TestActorRef defaultShardActor = actorFactory.createTestActor( + Props.create(MessageCollectorActor.class), actorFactory.generateActorId("default")); + final TestActorRef topologyShardActor = actorFactory.createTestActor( + Props.create(MessageCollectorActor.class), actorFactory.generateActorId("topology")); final Map> shardInfoMap = Collections.synchronizedMap( new HashMap>()); shardInfoMap.put("default", new AbstractMap.SimpleEntry(defaultShardActor, null)); shardInfoMap.put("topology", new AbstractMap.SimpleEntry(topologyShardActor, null)); + final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache(); final CountDownLatch newShardActorLatch = new CountDownLatch(2); + class LocalShardManager extends ShardManager { + public LocalShardManager(AbstractBuilder builder) { + super(builder); + } + + @Override + protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) { + Entry entry = shardInfoMap.get(info.getShardName()); + ActorRef ref = null; + if(entry != null) { + ref = entry.getKey(); + entry.setValue(info.getDatastoreContext()); + } + + newShardActorLatch.countDown(); + return ref; + } + } + final Creator creator = new Creator() { private static final long serialVersionUID = 1L; @Override public ShardManager create() throws Exception { - return new ShardManager(ShardManager.builder().cluster(new MockClusterWrapper()).configuration(mockConfig). - datastoreContextFactory(mockFactory).waitTillReadyCountdownLatch(ready). - primaryShardInfoCache(primaryShardInfoCache)) { - @Override - protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) { - Entry entry = shardInfoMap.get(info.getShardName()); - ActorRef ref = null; - if(entry != null) { - ref = entry.getKey(); - entry.setValue(info.getDatastoreContext()); - } - - newShardActorLatch.countDown(); - return ref; - } - }; + return new LocalShardManager(new GenericBuilder(LocalShardManager.class). + datastoreContextFactory(mockFactory).primaryShardInfoCache(primaryShardInfoCache). + configuration(mockConfig)); } }; JavaTestKit kit = new JavaTestKit(getSystem()); - final ActorRef shardManager = getSystem().actorOf(Props.create(new DelegatingShardManagerCreator(creator)). - withDispatcher(Dispatchers.DefaultDispatcherId())); + final ActorRef shardManager = actorFactory.createActor(Props.create( + new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId())); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), kit.getRef()); @@ -293,14 +362,13 @@ public class ShardManagerTest extends AbstractActorTest { newContext = MessageCollectorActor.expectFirstMatching(topologyShardActor, DatastoreContext.class); assertEquals("getShardElectionTimeoutFactor", 77, newContext.getShardElectionTimeoutFactor()); - defaultShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); - topologyShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); + LOG.info("testPerShardDatastoreContext ending"); } @Test public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); @@ -312,10 +380,11 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testOnReceiveFindPrimaryForLocalLeaderShard() throws Exception { + LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard starting"); new JavaTestKit(getSystem()) {{ String memberId = "member-1-shard-default-" + shardMrgIDSuffix; - final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); @@ -335,12 +404,15 @@ public class ShardManagerTest extends AbstractActorTest { primaryFound.getPrimaryPath().contains("member-1-shard-default")); assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree()); }}; + + LOG.info("testOnReceiveFindPrimaryForLocalLeaderShard ending"); } @Test public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception { + LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp starting"); new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); @@ -355,12 +427,15 @@ public class ShardManagerTest extends AbstractActorTest { expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); }}; + + LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp ending"); } @Test public void testOnReceiveFindPrimaryForNonLocalLeaderShard() throws Exception { + LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard starting"); new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); @@ -382,12 +457,14 @@ public class ShardManagerTest extends AbstractActorTest { primaryFound.getPrimaryPath().contains("member-2-shard-default")); assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion()); }}; + + LOG.info("testOnReceiveFindPrimaryForNonLocalLeaderShard ending"); } @Test public void testOnReceiveFindPrimaryForUninitializedShard() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); @@ -398,7 +475,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testOnReceiveFindPrimaryForInitializedShardWithNoRole() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); @@ -411,8 +488,9 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId() throws Exception { + LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting"); new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); @@ -436,12 +514,15 @@ public class ShardManagerTest extends AbstractActorTest { primaryFound.getPrimaryPath().contains("member-1-shard-default")); assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree()); }}; + + LOG.info("testOnReceiveFindPrimaryForFollowerShardWithNoInitialLeaderId starting"); } @Test public void testOnReceiveFindPrimaryWaitForShardLeader() throws Exception { + LOG.info("testOnReceiveFindPrimaryWaitForShardLeader starting"); new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); @@ -472,12 +553,15 @@ public class ShardManagerTest extends AbstractActorTest { expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); }}; + + LOG.info("testOnReceiveFindPrimaryWaitForShardLeader ending"); } @Test public void testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard() throws Exception { + LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard starting"); new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); @@ -489,12 +573,15 @@ public class ShardManagerTest extends AbstractActorTest { expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); }}; + + LOG.info("testOnReceiveFindPrimaryWaitForReadyWithUninitializedShard ending"); } @Test public void testOnReceiveFindPrimaryWaitForReadyWithCandidateShard() throws Exception { + LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard starting"); new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); @@ -505,12 +592,15 @@ public class ShardManagerTest extends AbstractActorTest { expectMsgClass(duration("2 seconds"), NoShardLeaderException.class); }}; + + LOG.info("testOnReceiveFindPrimaryWaitForReadyWithCandidateShard ending"); } @Test public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() throws Exception { + LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard starting"); new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); @@ -521,12 +611,15 @@ public class ShardManagerTest extends AbstractActorTest { expectMsgClass(duration("2 seconds"), NoShardLeaderException.class); }}; + + LOG.info("testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard ending"); } @Test public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception { + LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard starting"); new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); @@ -535,26 +628,27 @@ public class ShardManagerTest extends AbstractActorTest { expectMsgClass(duration("2 seconds"), NoShardLeaderException.class); }}; + + LOG.info("testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard ending"); } @Test public void testOnReceiveFindPrimaryForRemoteShard() throws Exception { + LOG.info("testOnReceiveFindPrimaryForRemoteShard starting"); String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); // Create an ActorSystem ShardManager actor for member-1. - final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); + final ActorSystem system1 = newActorSystem("Member1"); Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); - ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1"); - - final TestActorRef shardManager1 = TestActorRef.create(system1, - newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1), - new MockConfiguration()), shardManagerID); + final TestActorRef shardManager1 = TestActorRef.create(system1, + newTestShardMgrBuilderWithMockShardActor().cluster( + new ClusterWrapperImpl(system1)).props(), shardManagerID); // Create an ActorSystem ShardManager actor for member-2. - final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2")); + final ActorSystem system2 = newActorSystem("Member2"); Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); @@ -564,9 +658,9 @@ public class ShardManagerTest extends AbstractActorTest { put("default", Arrays.asList("member-1", "member-2")). put("astronauts", Arrays.asList("member-2")).build()); - final TestActorRef shardManager2 = TestActorRef.create(system2, - newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2), - mockConfig2), shardManagerID); + final TestActorRef shardManager2 = TestActorRef.create(system2, + newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster( + new ClusterWrapperImpl(system2)).props(), shardManagerID); new JavaTestKit(system1) {{ @@ -602,28 +696,28 @@ public class ShardManagerTest extends AbstractActorTest { expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class); }}; - JavaTestKit.shutdownActorSystem(system1); - JavaTestKit.shutdownActorSystem(system2); + LOG.info("testOnReceiveFindPrimaryForRemoteShard ending"); } @Test public void testShardAvailabilityOnChangeOfMemberReachability() throws Exception { + LOG.info("testShardAvailabilityOnChangeOfMemberReachability starting"); String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); // Create an ActorSystem ShardManager actor for member-1. - final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); + final ActorSystem system1 = newActorSystem("Member1"); Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1"); - final TestActorRef shardManager1 = TestActorRef.create(system1, - newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1), - new MockConfiguration()), shardManagerID); + final TestActorRef shardManager1 = TestActorRef.create(system1, + newTestShardMgrBuilder().shardActor(mockShardActor1).cluster( + new ClusterWrapperImpl(system1)).props(), shardManagerID); // Create an ActorSystem ShardManager actor for member-2. - final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2")); + final ActorSystem system2 = newActorSystem("Member2"); Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); @@ -632,9 +726,9 @@ public class ShardManagerTest extends AbstractActorTest { MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.>builder(). put("default", Arrays.asList("member-1", "member-2")).build()); - final TestActorRef shardManager2 = TestActorRef.create(system2, - newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2), - mockConfig2), shardManagerID); + final TestActorRef shardManager2 = TestActorRef.create(system2, + newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster( + new ClusterWrapperImpl(system2)).props(), shardManagerID); new JavaTestKit(system1) {{ @@ -702,28 +796,30 @@ public class ShardManagerTest extends AbstractActorTest { }}; - JavaTestKit.shutdownActorSystem(system1); - JavaTestKit.shutdownActorSystem(system2); + LOG.info("testShardAvailabilityOnChangeOfMemberReachability ending"); } @Test public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() throws Exception { + LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange starting"); String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); // Create an ActorSystem ShardManager actor for member-1. - final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); + final ActorSystem system1 = newActorSystem("Member1"); Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1"); - final TestActorRef shardManager1 = TestActorRef.create(system1, - newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1), - new MockConfiguration()), shardManagerID); + final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache(); + final TestActorRef shardManager1 = TestActorRef.create(system1, + newTestShardMgrBuilder().shardActor(mockShardActor1).cluster( + new ClusterWrapperImpl(system1)).primaryShardInfoCache(primaryShardInfoCache).props(), + shardManagerID); // Create an ActorSystem ShardManager actor for member-2. - final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2")); + final ActorSystem system2 = newActorSystem("Member2"); Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); @@ -732,12 +828,11 @@ public class ShardManagerTest extends AbstractActorTest { MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.>builder(). put("default", Arrays.asList("member-1", "member-2")).build()); - final TestActorRef shardManager2 = TestActorRef.create(system2, - newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2), - mockConfig2), shardManagerID); + final TestActorRef shardManager2 = TestActorRef.create(system2, + newTestShardMgrBuilder(mockConfig2).shardActor(mockShardActor2).cluster( + new ClusterWrapperImpl(system2)).props(), shardManagerID); new JavaTestKit(system1) {{ - shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager1.tell(new ActorInitialized(), mockShardActor1); @@ -789,15 +884,14 @@ public class ShardManagerTest extends AbstractActorTest { }}; - JavaTestKit.shutdownActorSystem(system1); - JavaTestKit.shutdownActorSystem(system2); + LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange ending"); } @Test public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); @@ -812,7 +906,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testOnReceiveFindLocalShardForExistentShard() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); @@ -829,7 +923,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testOnReceiveFindLocalShardForNotInitializedShard() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef()); @@ -839,8 +933,9 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testOnReceiveFindLocalShardWaitForShardInitialized() throws Exception { + LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting"); new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); @@ -854,6 +949,8 @@ public class ShardManagerTest extends AbstractActorTest { Object resp = Await.result(future, duration("5 seconds")); assertTrue("Expected: LocalShardFound, Actual: " + resp, resp instanceof LocalShardFound); }}; + + LOG.info("testOnReceiveFindLocalShardWaitForShardInitialized starting"); } @Test @@ -1002,6 +1099,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{ + LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting"); TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() { @Override public List getMemberShardNames(String memberName) { @@ -1041,12 +1139,13 @@ public class ShardManagerTest extends AbstractActorTest { // Sync status is now true assertEquals(true, shardManager.getMBean().getSyncStatus()); + LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards ending"); } @Test public void testOnReceiveSwitchShardBehavior() throws Exception { new JavaTestKit(getSystem()) {{ - final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); @@ -1062,10 +1161,11 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testOnCreateShard() { + LOG.info("testOnCreateShard starting"); new JavaTestKit(getSystem()) {{ datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true); - ActorRef shardManager = getSystem().actorOf(newShardMgrProps( + ActorRef shardManager = actorFactory.createActor(newShardMgrProps( new ConfigurationImpl(new EmptyModuleShardConfigProvider()))); SchemaContext schemaContext = TestModel.createTestContext(); @@ -1102,14 +1202,17 @@ public class ShardManagerTest extends AbstractActorTest { Success success = expectMsgClass(duration("5 seconds"), Success.class); assertNotNull("Success status is null", success.status()); }}; + + LOG.info("testOnCreateShard ending"); } @Test public void testOnCreateShardWithLocalMemberNotInShardConfig() { + LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig starting"); new JavaTestKit(getSystem()) {{ datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true); - ActorRef shardManager = getSystem().actorOf(newShardMgrProps( + ActorRef shardManager = actorFactory.createActor(newShardMgrProps( new ConfigurationImpl(new EmptyModuleShardConfigProvider()))); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender()); @@ -1128,12 +1231,15 @@ public class ShardManagerTest extends AbstractActorTest { assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(), shardBuilder.getDatastoreContext().getShardRaftConfig().getCustomRaftPolicyImplementationClass()); }}; + + LOG.info("testOnCreateShardWithLocalMemberNotInShardConfig ending"); } @Test public void testOnCreateShardWithNoInitialSchemaContext() { + LOG.info("testOnCreateShardWithNoInitialSchemaContext starting"); new JavaTestKit(getSystem()) {{ - ActorRef shardManager = getSystem().actorOf(newShardMgrProps( + ActorRef shardManager = actorFactory.createActor(newShardMgrProps( new ConfigurationImpl(new EmptyModuleShardConfigProvider()))); Shard.Builder shardBuilder = Shard.builder(); @@ -1154,52 +1260,124 @@ public class ShardManagerTest extends AbstractActorTest { assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext()); assertNotNull("schemaContext is null", shardBuilder.getDatastoreContext()); }}; + + LOG.info("testOnCreateShardWithNoInitialSchemaContext ending"); } @Test public void testGetSnapshot() throws Throwable { + LOG.info("testGetSnapshot starting"); JavaTestKit kit = new JavaTestKit(getSystem()); MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.>builder(). put("shard1", Arrays.asList("member-1")). - put("shard2", Arrays.asList("member-1")).build()); + put("shard2", Arrays.asList("member-1")). + put("astronauts", Collections.emptyList()).build()); - ActorRef shardManager = getSystem().actorOf(newShardMgrProps(mockConfig).withDispatcher( - Dispatchers.DefaultDispatcherId())); + TestActorRef shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig). + withDispatcher(Dispatchers.DefaultDispatcherId())); shardManager.tell(GetSnapshot.INSTANCE, kit.getRef()); Failure failure = kit.expectMsgClass(Failure.class); assertEquals("Failure cause type", IllegalStateException.class, failure.cause().getClass()); - kit = new JavaTestKit(getSystem()); + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender()); + + waitForShardInitialized(shardManager, "shard1", kit); + waitForShardInitialized(shardManager, "shard2", kit); + + shardManager.tell(GetSnapshot.INSTANCE, kit.getRef()); + + DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot"); + + assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType()); + assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot()); + + Function shardNameTransformer = new Function() { + @Override + public String apply(ShardSnapshot s) { + return s.getName(); + } + }; + + assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet( + Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer))); + + // Add a new replica + + JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem()); + + TestShardManager shardManagerInstance = shardManager.underlyingActor(); + shardManagerInstance.setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef())); + + shardManager.tell(new AddShardReplica("astronauts"), kit.getRef()); + mockShardLeaderKit.expectMsgClass(AddServer.class); + mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.OK, "")); + kit.expectMsgClass(Status.Success.class); + waitForShardInitialized(shardManager, "astronauts", kit); + + // Send another GetSnapshot and verify + + shardManager.tell(GetSnapshot.INSTANCE, kit.getRef()); + datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot"); + + assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), Sets.newHashSet( + Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer))); + + byte[] snapshotBytes = datastoreSnapshot.getShardManagerSnapshot(); + assertNotNull("Expected ShardManagerSnapshot", snapshotBytes); + ShardManagerSnapshot snapshot = SerializationUtils.deserialize(snapshotBytes); + assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), + Sets.newHashSet(snapshot.getShardList())); + + LOG.info("testGetSnapshot ending"); + } + + @Test + public void testRestoreFromSnapshot() throws Throwable { + LOG.info("testRestoreFromSnapshot starting"); + + JavaTestKit kit = new JavaTestKit(getSystem()); + + MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.>builder(). + put("shard1", Collections.emptyList()). + put("shard2", Collections.emptyList()). + put("astronauts", Collections.emptyList()).build()); + + + ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList("shard1", "shard2", "astronauts")); + DatastoreSnapshot restoreFromSnapshot = new DatastoreSnapshot(shardMrgIDSuffix, + SerializationUtils.serialize(snapshot), Collections.emptyList()); + TestActorRef shardManager = actorFactory.createTestActor(newTestShardMgrBuilder(mockConfig). + restoreFromSnapshot(restoreFromSnapshot).props().withDispatcher(Dispatchers.DefaultDispatcherId())); + + shardManager.underlyingActor().waitForRecoveryComplete(); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender()); - shardManager.tell(new FindLocalShard("shard1", true), kit.getRef()); - kit.expectMsgClass(LocalShardFound.class); - shardManager.tell(new FindLocalShard("shard2", true), kit.getRef()); - kit.expectMsgClass(LocalShardFound.class); + waitForShardInitialized(shardManager, "shard1", kit); + waitForShardInitialized(shardManager, "shard2", kit); + waitForShardInitialized(shardManager, "astronauts", kit); shardManager.tell(GetSnapshot.INSTANCE, kit.getRef()); - DatastoreSnapshot datastoreSnapshot = kit.expectMsgClass(DatastoreSnapshot.class); + DatastoreSnapshot datastoreSnapshot = expectMsgClassOrFailure(DatastoreSnapshot.class, kit, "GetSnapshot"); assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType()); - List shardSnapshots = datastoreSnapshot.getShardSnapshots(); - Set actualShardNames = new HashSet<>(); - for(ShardSnapshot s: shardSnapshots) { - actualShardNames.add(s.getName()); - } - assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), actualShardNames); + byte[] snapshotBytes = datastoreSnapshot.getShardManagerSnapshot(); + assertNotNull("Expected ShardManagerSnapshot", snapshotBytes); + snapshot = SerializationUtils.deserialize(snapshotBytes); + assertEquals("Shard names", Sets.newHashSet("shard1", "shard2", "astronauts"), + Sets.newHashSet(snapshot.getShardList())); - shardManager.tell(PoisonPill.getInstance(), ActorRef.noSender()); + LOG.info("testRestoreFromSnapshot ending"); } @Test public void testAddShardReplicaForNonExistentShardConfig() throws Exception { new JavaTestKit(getSystem()) {{ - ActorRef shardManager = getSystem().actorOf(newShardMgrProps( + ActorRef shardManager = actorFactory.createActor(newShardMgrProps( new ConfigurationImpl(new EmptyModuleShardConfigProvider()))); shardManager.tell(new AddShardReplica("model-inventory"), getRef()); @@ -1217,27 +1395,27 @@ public class ShardManagerTest extends AbstractActorTest { put("default", Arrays.asList("member-1", "member-2")). put("astronauts", Arrays.asList("member-2")).build()); - String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); + final String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); + datastoreContextBuilder.shardManagerPersistenceId(shardManagerID); // Create an ActorSystem ShardManager actor for member-1. - final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); + final ActorSystem system1 = newActorSystem("Member1"); Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1"); - final TestActorRef newReplicaShardManager = TestActorRef.create(system1, - newPropsShardMgrWithMockShardActor("shardManager1", mockDefaultShardActor, - new ClusterWrapperImpl(system1), mockConfig), shardManagerID); + final TestActorRef newReplicaShardManager = TestActorRef.create(system1, + newTestShardMgrBuilder(mockConfig).shardActor(mockDefaultShardActor).cluster( + new ClusterWrapperImpl(system1)).props(), shardManagerID); // Create an ActorSystem ShardManager actor for member-2. - final ActorSystem system2 = ActorSystem.create("cluster-test", - ConfigFactory.load().getConfig("Member2")); + final ActorSystem system2 = newActorSystem("Member2"); Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); String name = new ShardIdentifier("astronauts", "member-2", "config").toString(); final TestActorRef mockShardLeaderActor = - TestActorRef.create(system2, Props.create(MockRespondActor.class), name); - final TestActorRef leaderShardManager = TestActorRef.create(system2, - newPropsShardMgrWithMockShardActor("shardManager2", mockShardLeaderActor, - new ClusterWrapperImpl(system2), mockConfig), shardManagerID); + TestActorRef.create(system2, Props.create(MockRespondActor.class), name); + final TestActorRef leaderShardManager = TestActorRef.create(system2, + newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor).cluster( + new ClusterWrapperImpl(system2)).props(), shardManagerID); new JavaTestKit(system1) {{ @@ -1256,6 +1434,15 @@ public class ShardManagerTest extends AbstractActorTest { newReplicaShardManager.underlyingActor().waitForMemberUp(); leaderShardManager.underlyingActor().waitForMemberUp(); + //Have a dummy snapshot to be overwritten by the new data persisted. + String[] restoredShards = {"default", "people"}; + ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards)); + InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot); + Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS); + + InMemorySnapshotStore.addSnapshotSavedLatch(shardManagerID); + InMemorySnapshotStore.addSnapshotDeletedLatch(shardManagerID); + //construct a mock response message AddServerReply response = new AddServerReply(ServerChangeStatus.OK, memberId2); mockShardLeaderActor.underlyingActor().updateResponse(response); @@ -1264,19 +1451,26 @@ public class ShardManagerTest extends AbstractActorTest { AddServer.class); String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix; assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId()); - newReplicaShardManager.underlyingActor() - .verifySnapshotPersisted(Sets.newHashSet("default", "astronauts")); expectMsgClass(duration("5 seconds"), Status.Success.class); + + InMemorySnapshotStore.waitForSavedSnapshot(shardManagerID, ShardManagerSnapshot.class); + InMemorySnapshotStore.waitForDeletedSnapshot(shardManagerID); + List persistedSnapshots = + InMemorySnapshotStore.getSnapshots(shardManagerID, ShardManagerSnapshot.class); + assertEquals("Number of snapshots persisted", 1, persistedSnapshots.size()); + ShardManagerSnapshot shardManagerSnapshot = persistedSnapshots.get(0); + assertEquals("Persisted local shards", Sets.newHashSet("default", "astronauts"), + Sets.newHashSet(shardManagerSnapshot.getShardList())); }}; - JavaTestKit.shutdownActorSystem(system1); - JavaTestKit.shutdownActorSystem(system2); + LOG.info("testAddShardReplica ending"); } @Test public void testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader() throws Exception { + LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader starting"); new JavaTestKit(getSystem()) {{ - TestActorRef shardManager = TestActorRef.create(getSystem(), + TestActorRef shardManager = actorFactory.createTestActor( newPropsShardMgrWithMockShardActor(), shardMgrID); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); @@ -1321,16 +1515,17 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef()); expectMsgClass(duration("5 seconds"), LocalShardFound.class); - - leaderShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; + + LOG.info("testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader ending"); } @Test public void testAddShardReplicaWithPreExistingLocalReplicaLeader() throws Exception { + LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader starting"); new JavaTestKit(getSystem()) {{ String memberId = "member-1-shard-default-" + shardMrgIDSuffix; - ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor()); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); @@ -1346,10 +1541,13 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef()); expectMsgClass(duration("5 seconds"), LocalShardFound.class); }}; + + LOG.info("testAddShardReplicaWithPreExistingLocalReplicaLeader ending"); } @Test public void testAddShardReplicaWithAddServerReplyFailure() throws Exception { + LOG.info("testAddShardReplicaWithAddServerReplyFailure starting"); new JavaTestKit(getSystem()) {{ JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem()); @@ -1358,9 +1556,8 @@ public class ShardManagerTest extends AbstractActorTest { put("astronauts", Arrays.asList("member-2")).build()); ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1"); - TestActorRef shardManager = TestActorRef.create(getSystem(), - newPropsShardMgrWithMockShardActor("newReplicaShardManager", mockNewReplicaShardActor, - new MockClusterWrapper(), mockConfig), shardMgrID); + final TestActorRef shardManager = actorFactory.createTestActor( + newTestShardMgrBuilder(mockConfig).shardActor(mockNewReplicaShardActor).props(), shardMgrID); shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef())); shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); @@ -1389,43 +1586,25 @@ public class ShardManagerTest extends AbstractActorTest { failure = expectMsgClass(duration("5 seconds"), Failure.class); assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass()); }}; + + LOG.info("testAddShardReplicaWithAddServerReplyFailure ending"); } @Test public void testAddShardReplicaWithAlreadyInProgress() throws Exception { - new JavaTestKit(getSystem()) {{ - JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem()); - JavaTestKit secondRequestKit = new JavaTestKit(getSystem()); - - MockConfiguration mockConfig = - new MockConfiguration(ImmutableMap.>builder(). - put("astronauts", Arrays.asList("member-2")).build()); - - TestActorRef shardManager = TestActorRef.create(getSystem(), - newPropsShardMgrWithMockShardActor("newReplicaShardManager", mockShardActor, - new MockClusterWrapper(), mockConfig), shardMgrID); - shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef())); - - shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); - - shardManager.tell(new AddShardReplica("astronauts"), getRef()); - - mockShardLeaderKit.expectMsgClass(AddServer.class); - - shardManager.tell(new AddShardReplica("astronauts"), secondRequestKit.getRef()); - - secondRequestKit.expectMsgClass(duration("5 seconds"), Failure.class); - }}; + testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"), + AddServer.class, new AddShardReplica("astronauts")); } @Test public void testAddShardReplicaWithFindPrimaryTimeout() throws Exception { + LOG.info("testAddShardReplicaWithFindPrimaryTimeout starting"); new JavaTestKit(getSystem()) {{ MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.>builder(). put("astronauts", Arrays.asList("member-2")).build()); - ActorRef newReplicaShardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor( - "shardManager", mockShardActor, new MockClusterWrapper(), mockConfig)); + final ActorRef newReplicaShardManager = actorFactory.createActor(newTestShardMgrBuilder(mockConfig). + shardActor(mockShardActor).props(), shardMgrID); newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2", getRef().path().toString()); @@ -1435,24 +1614,186 @@ public class ShardManagerTest extends AbstractActorTest { assertEquals("Failure obtained", true, (resp.cause() instanceof RuntimeException)); }}; + + LOG.info("testAddShardReplicaWithFindPrimaryTimeout ending"); } @Test public void testRemoveShardReplicaForNonExistentShard() throws Exception { new JavaTestKit(getSystem()) {{ - ActorRef shardManager = getSystem().actorOf(newShardMgrProps( + ActorRef shardManager = actorFactory.createActor(newShardMgrProps( new ConfigurationImpl(new EmptyModuleShardConfigProvider()))); - shardManager.tell(new RemoveShardReplica("model-inventory"), getRef()); - Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class); + shardManager.tell(new RemoveShardReplica("model-inventory", "member-1"), getRef()); + Status.Failure resp = expectMsgClass(duration("10 seconds"), Status.Failure.class); assertEquals("Failure obtained", true, - (resp.cause() instanceof IllegalArgumentException)); + (resp.cause() instanceof PrimaryNotFoundException)); }}; + } + @Test + /** + * Primary is Local + */ + public void testRemoveShardReplicaLocal() throws Exception { + new JavaTestKit(getSystem()) {{ + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + + final TestActorRef respondActor = + TestActorRef.create(getSystem(), Props.create(MockRespondActor.class), memberId); + + ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor)); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), respondActor); + shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mock(DataTree.class)), + DataStoreVersions.CURRENT_VERSION), getRef()); + shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(), + RaftState.Leader.name())), respondActor); + + respondActor.underlyingActor().updateResponse(new RemoveServerReply(ServerChangeStatus.OK, null)); + shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, "member-1"), getRef()); + final RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(respondActor, RemoveServer.class); + assertEquals(new ShardIdentifier("default", "member-1", shardMrgIDSuffix).toString(), + removeServer.getServerId()); + expectMsgClass(duration("5 seconds"), Success.class); + }}; + } + + @Test + public void testRemoveShardReplicaRemote() throws Exception { + MockConfiguration mockConfig = + new MockConfiguration(ImmutableMap.>builder(). + put("default", Arrays.asList("member-1", "member-2")). + put("astronauts", Arrays.asList("member-1")).build()); + + String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); + + // Create an ActorSystem ShardManager actor for member-1. + final ActorSystem system1 = newActorSystem("Member1"); + Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1"); + + final TestActorRef newReplicaShardManager = TestActorRef.create(system1, + newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockDefaultShardActor).cluster( + new ClusterWrapperImpl(system1)).props(), + shardManagerID); + + // Create an ActorSystem ShardManager actor for member-2. + final ActorSystem system2 = newActorSystem("Member2"); + Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + + String name = new ShardIdentifier("default", "member-2", shardMrgIDSuffix).toString(); + final TestActorRef mockShardLeaderActor = + TestActorRef.create(system2, Props.create(MockRespondActor.class), name); + + LOG.error("Mock Shard Leader Actor : {}", mockShardLeaderActor); + + final TestActorRef leaderShardManager = TestActorRef.create(system2, + newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardLeaderActor).cluster( + new ClusterWrapperImpl(system2)).props(), + shardManagerID); + + // Because mockShardLeaderActor is created at the top level of the actor system it has an address like so, + // akka.tcp://cluster-test@127.0.0.1:2559/user/member-2-shard-default-config1 + // However when a shard manager has a local shard which is a follower and a leader that is remote it will + // try to compute an address for the remote shard leader using the ShardPeerAddressResolver. This address will + // look like so, + // akka.tcp://cluster-test@127.0.0.1:2559/user/shardmanager-config1/member-2-shard-default-config1 + // In this specific case if we did a FindPrimary for shard default from member-1 we would come up + // with the address of an actor which does not exist, therefore any message sent to that actor would go to + // dead letters. + // To work around this problem we create a ForwardingActor with the right address and pass to it the + // mockShardLeaderActor. The ForwardingActor simply forwards all messages to the mockShardLeaderActor and every + // thing works as expected + final ActorRef actorRef = leaderShardManager.underlyingActor().context() + .actorOf(Props.create(ForwardingActor.class, mockShardLeaderActor), "member-2-shard-default-" + shardMrgIDSuffix); + + LOG.error("Forwarding actor : {}", actorRef); + + new JavaTestKit(system1) {{ + + newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + leaderShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + + leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor); + newReplicaShardManager.tell(new ActorInitialized(), mockShardLeaderActor); + + String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; + short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1; + leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2, + Optional.of(mock(DataTree.class)), leaderVersion), mockShardLeaderActor); + leaderShardManager.tell(new RoleChangeNotification(memberId2, + RaftState.Candidate.name(), RaftState.Leader.name()), mockShardLeaderActor); + + String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; + newReplicaShardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, + Optional.of(mock(DataTree.class)), leaderVersion), mockShardActor); + newReplicaShardManager.tell(new RoleChangeNotification(memberId1, + RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor); + + newReplicaShardManager.underlyingActor().waitForMemberUp(); + leaderShardManager.underlyingActor().waitForMemberUp(); + + //construct a mock response message + RemoveServerReply response = new RemoveServerReply(ServerChangeStatus.OK, memberId2); + mockShardLeaderActor.underlyingActor().updateResponse(response); + newReplicaShardManager.tell(new RemoveShardReplica("default", "member-1"), getRef()); + RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor, + RemoveServer.class); + String removeServerId = new ShardIdentifier("default", "member-1", shardMrgIDSuffix).toString(); + assertEquals("RemoveServer serverId", removeServerId, removeServer.getServerId()); + expectMsgClass(duration("5 seconds"), Status.Success.class); + }}; + + } + + @Test + public void testRemoveShardReplicaWhenAnotherRemoveShardReplicaAlreadyInProgress() throws Exception { + testServerChangeWhenAlreadyInProgress("astronauts", new RemoveShardReplica("astronauts", "member-2"), + RemoveServer.class, new RemoveShardReplica("astronauts", "member-3")); + } + + @Test + public void testRemoveShardReplicaWhenAddShardReplicaAlreadyInProgress() throws Exception { + testServerChangeWhenAlreadyInProgress("astronauts", new AddShardReplica("astronauts"), + AddServer.class, new RemoveShardReplica("astronauts", "member-2")); + } + + + public void testServerChangeWhenAlreadyInProgress(final String shardName, final Object firstServerChange, + final Class firstForwardedServerChangeClass, + final Object secondServerChange) throws Exception { + new JavaTestKit(getSystem()) {{ + JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem()); + JavaTestKit secondRequestKit = new JavaTestKit(getSystem()); + + MockConfiguration mockConfig = + new MockConfiguration(ImmutableMap.>builder(). + put(shardName, Arrays.asList("member-2")).build()); + + final TestActorRef shardManager = TestActorRef.create(getSystem(), + newTestShardMgrBuilder().configuration(mockConfig).shardActor(mockShardActor).cluster( + new MockClusterWrapper()).props(), + shardMgrID); + + shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef())); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + + shardManager.tell(firstServerChange, getRef()); + + mockShardLeaderKit.expectMsgClass(firstForwardedServerChangeClass); + + shardManager.tell(secondServerChange, secondRequestKit.getRef()); + + secondRequestKit.expectMsgClass(duration("5 seconds"), Failure.class); + }}; } @Test public void testServerRemovedShardActorNotRunning() throws Exception { + LOG.info("testServerRemovedShardActorNotRunning starting"); new JavaTestKit(getSystem()) {{ MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.>builder(). @@ -1460,11 +1801,9 @@ public class ShardManagerTest extends AbstractActorTest { put("astronauts", Arrays.asList("member-2")). put("people", Arrays.asList("member-1", "member-2")).build()); - TestActorRef shardManager = TestActorRef.create(getSystem(), - newShardMgrProps(mockConfig)); + TestActorRef shardManager = actorFactory.createTestActor(newShardMgrProps(mockConfig)); shardManager.underlyingActor().waitForRecoveryComplete(); - shardManager.tell(new FindLocalShard("people", false), getRef()); expectMsgClass(duration("5 seconds"), NotInitializedException.class); @@ -1473,15 +1812,18 @@ public class ShardManagerTest extends AbstractActorTest { // Removed the default shard replica from member-1 ShardIdentifier.Builder builder = new ShardIdentifier.Builder(); - final ShardIdentifier shardId = builder.shardName("default").memberName("member-1").type("config1").build(); + ShardIdentifier shardId = builder.shardName("default").memberName("member-1").type(shardMrgIDSuffix).build(); shardManager.tell(new ServerRemoved(shardId.toString()), getRef()); shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people")); }}; + + LOG.info("testServerRemovedShardActorNotRunning ending"); } @Test public void testServerRemovedShardActorRunning() throws Exception { + LOG.info("testServerRemovedShardActorRunning starting"); new JavaTestKit(getSystem()) {{ MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.>builder(). @@ -1489,39 +1831,39 @@ public class ShardManagerTest extends AbstractActorTest { put("astronauts", Arrays.asList("member-2")). put("people", Arrays.asList("member-1", "member-2")).build()); - TestActorRef shardManager = TestActorRef.create(getSystem(), - newShardMgrProps(mockConfig)); + String shardId = ShardIdentifier.builder().shardName("default").memberName("member-1"). + type(shardMrgIDSuffix).build().toString(); + TestActorRef shard = actorFactory.createTestActor( + MessageCollectorActor.props(), shardId); - TestActorRef shard = TestActorRef.create(getSystem(), MessageCollectorActor.props()); + TestActorRef shardManager = actorFactory.createTestActor( + newTestShardMgrBuilder(mockConfig).addShardActor("default", shard).props()); watch(shard); shardManager.underlyingActor().waitForRecoveryComplete(); - shardManager.underlyingActor().addShardActor("default", shard); - shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), shard); - shardManager.tell(new FindLocalShard("people", false), getRef()); - expectMsgClass(duration("5 seconds"), NotInitializedException.class); - - shardManager.tell(new FindLocalShard("default", false), getRef()); - expectMsgClass(duration("5 seconds"), NotInitializedException.class); + waitForShardInitialized(shardManager, "people", this); + waitForShardInitialized(shardManager, "default", this); // Removed the default shard replica from member-1 - ShardIdentifier.Builder builder = new ShardIdentifier.Builder(); - final ShardIdentifier shardId = builder.shardName("default").memberName("member-1").type("config1").build(); - shardManager.tell(new ServerRemoved(shardId.toString()), getRef()); + shardManager.tell(new ServerRemoved(shardId), getRef()); shardManager.underlyingActor().verifySnapshotPersisted(Sets.newHashSet("people")); expectMsgClass(duration("5 seconds"), Terminated.class); }}; + + LOG.info("testServerRemovedShardActorRunning ending"); } @Test public void testShardPersistenceWithRestoredData() throws Exception { + LOG.info("testShardPersistenceWithRestoredData starting"); new JavaTestKit(getSystem()) {{ MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.>builder(). @@ -1533,7 +1875,7 @@ public class ShardManagerTest extends AbstractActorTest { InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot); //create shardManager to come up with restored data - TestActorRef newRestoredShardManager = TestActorRef.create(getSystem(), + TestActorRef newRestoredShardManager = actorFactory.createTestActor( newShardMgrProps(mockConfig)); newRestoredShardManager.underlyingActor().waitForRecoveryComplete(); @@ -1551,6 +1893,8 @@ public class ShardManagerTest extends AbstractActorTest { newRestoredShardManager.tell(new FindLocalShard("astronauts", false), getRef()); expectMsgClass(duration("5 seconds"), NotInitializedException.class); }}; + + LOG.info("testShardPersistenceWithRestoredData ending"); } @@ -1558,10 +1902,19 @@ public class ShardManagerTest extends AbstractActorTest { private final CountDownLatch recoveryComplete = new CountDownLatch(1); private final CountDownLatch snapshotPersist = new CountDownLatch(1); private ShardManagerSnapshot snapshot; - private Map shardActors = new HashMap<>(); + private final Map shardActors; + private final ActorRef shardActor; + private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1); + private CountDownLatch memberUpReceived = new CountDownLatch(1); + private CountDownLatch memberRemovedReceived = new CountDownLatch(1); + private CountDownLatch memberUnreachableReceived = new CountDownLatch(1); + private CountDownLatch memberReachableReceived = new CountDownLatch(1); + private volatile MessageInterceptor messageInterceptor; private TestShardManager(Builder builder) { super(builder); + shardActor = builder.shardActor; + shardActors = builder.shardActors; } @Override @@ -1575,109 +1928,6 @@ public class ShardManagerTest extends AbstractActorTest { } } - void waitForRecoveryComplete() { - assertEquals("Recovery complete", true, - Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS)); - } - - public static Builder builder(DatastoreContext.Builder datastoreContextBuilder) { - return new Builder(datastoreContextBuilder); - } - - private static class Builder extends ShardManager.Builder { - Builder(DatastoreContext.Builder datastoreContextBuilder) { - cluster(new MockClusterWrapper()).configuration(new MockConfiguration()); - datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build())); - waitTillReadyCountdownLatch(ready).primaryShardInfoCache(new PrimaryShardInfoFutureCache()); - } - - @Override - public Props props() { - verify(); - return Props.create(TestShardManager.class, this); - } - } - - @Override - public void saveSnapshot(Object obj) { - snapshot = (ShardManagerSnapshot) obj; - snapshotPersist.countDown(); - } - - void verifySnapshotPersisted(Set shardList) { - assertEquals("saveSnapshot invoked", true, - Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS)); - assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList())); - } - - @Override - protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) { - if(shardActors.get(info.getShardName()) != null){ - return shardActors.get(info.getShardName()); - } - return super.newShardActor(schemaContext, info); - } - - public void addShardActor(String shardName, ActorRef actorRef){ - shardActors.put(shardName, actorRef); - } - } - - private static class DelegatingShardManagerCreator implements Creator { - private static final long serialVersionUID = 1L; - private final Creator delegate; - - public DelegatingShardManagerCreator(Creator delegate) { - this.delegate = delegate; - } - - @Override - public ShardManager create() throws Exception { - return delegate.create(); - } - } - - interface MessageInterceptor extends Function { - boolean canIntercept(Object message); - } - - private MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) { - return new MessageInterceptor(){ - @Override - public Object apply(Object message) { - return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1); - } - - @Override - public boolean canIntercept(Object message) { - return message instanceof FindPrimary; - } - }; - } - - private static class ForwardingShardManager extends ShardManager { - private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1); - private CountDownLatch memberUpReceived = new CountDownLatch(1); - private CountDownLatch memberRemovedReceived = new CountDownLatch(1); - private CountDownLatch memberUnreachableReceived = new CountDownLatch(1); - private CountDownLatch memberReachableReceived = new CountDownLatch(1); - private final ActorRef shardActor; - private final String name; - private final CountDownLatch snapshotPersist = new CountDownLatch(1); - private ShardManagerSnapshot snapshot; - private volatile MessageInterceptor messageInterceptor; - - public ForwardingShardManager(Builder builder, String name, ActorRef shardActor) { - super(builder); - this.shardActor = shardActor; - this.name = name; - } - - void setMessageInterceptor(MessageInterceptor messageInterceptor) { - this.messageInterceptor = messageInterceptor; - } - - @Override public void handleCommand(Object message) throws Exception { try{ @@ -1713,14 +1963,13 @@ public class ShardManagerTest extends AbstractActorTest { } } - @Override - public String persistenceId() { - return name; + void setMessageInterceptor(MessageInterceptor messageInterceptor) { + this.messageInterceptor = messageInterceptor; } - @Override - protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) { - return shardActor; + void waitForRecoveryComplete() { + assertEquals("Recovery complete", true, + Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS)); } void waitForMemberUp() { @@ -1754,21 +2003,115 @@ public class ShardManagerTest extends AbstractActorTest { findPrimaryMessageReceived = new CountDownLatch(1); } + public static Builder builder(DatastoreContext.Builder datastoreContextBuilder) { + return new Builder(datastoreContextBuilder); + } + + private static class Builder extends AbstractGenericBuilder { + private ActorRef shardActor; + private final Map shardActors = new HashMap<>(); + + Builder(DatastoreContext.Builder datastoreContextBuilder) { + super(TestShardManager.class); + datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build())); + } + + Builder shardActor(ActorRef shardActor) { + this.shardActor = shardActor; + return this; + } + + Builder addShardActor(String shardName, ActorRef actorRef){ + shardActors.put(shardName, actorRef); + return this; + } + } + @Override public void saveSnapshot(Object obj) { snapshot = (ShardManagerSnapshot) obj; snapshotPersist.countDown(); + super.saveSnapshot(obj); } void verifySnapshotPersisted(Set shardList) { assertEquals("saveSnapshot invoked", true, - Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS)); + Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS)); assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList())); } + + @Override + protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) { + if(shardActors.get(info.getShardName()) != null){ + return shardActors.get(info.getShardName()); + } + + if(shardActor != null) { + return shardActor; + } + + return super.newShardActor(schemaContext, info); + } + } + + private static abstract class AbstractGenericBuilder, C extends ShardManager> + extends ShardManager.AbstractBuilder { + private final Class shardManagerClass; + + AbstractGenericBuilder(Class shardManagerClass) { + this.shardManagerClass = shardManagerClass; + cluster(new MockClusterWrapper()).configuration(new MockConfiguration()). + waitTillReadyCountdownLatch(ready).primaryShardInfoCache(new PrimaryShardInfoFutureCache()); + } + + @Override + public Props props() { + verify(); + return Props.create(shardManagerClass, this); + } + } + + private static class GenericBuilder extends AbstractGenericBuilder, C> { + GenericBuilder(Class shardManagerClass) { + super(shardManagerClass); + } + } + + private static class DelegatingShardManagerCreator implements Creator { + private static final long serialVersionUID = 1L; + private final Creator delegate; + + public DelegatingShardManagerCreator(Creator delegate) { + this.delegate = delegate; + } + + @Override + public ShardManager create() throws Exception { + return delegate.create(); + } + } + + interface MessageInterceptor extends Function { + boolean canIntercept(Object message); + } + + private MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) { + return new MessageInterceptor(){ + @Override + public Object apply(Object message) { + return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1); + } + + @Override + public boolean canIntercept(Object message) { + return message instanceof FindPrimary; + } + }; } private static class MockRespondActor extends MessageCollectorActor { static final String CLEAR_RESPONSE = "clear-response"; + static final org.slf4j.Logger LOG = LoggerFactory.getLogger(MockRespondActor.class); private volatile Object responseMsg; @@ -1787,12 +2130,15 @@ public class ShardManagerTest extends AbstractActorTest { @Override public void onReceive(Object message) throws Exception { + if(!"get-all-messages".equals(message)) { + LOG.debug("Received message : {}", message); + } super.onReceive(message); - if (message instanceof AddServer) { - if (responseMsg != null) { - getSender().tell(responseMsg, getSelf()); - } - } if(message.equals(CLEAR_RESPONSE)) { + if (message instanceof AddServer && responseMsg != null) { + getSender().tell(responseMsg, getSelf()); + } else if(message instanceof RemoveServer && responseMsg != null){ + getSender().tell(responseMsg, getSelf()); + } else if(message.equals(CLEAR_RESPONSE)) { responseMsg = null; } }