X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManagerTest.java;h=0f31c6a3a1fb4a5a9df34fe0abf891ad1868c46a;hb=f807611cb1efd307031d3bed10914d07c643e344;hp=4362047a32def9e5380585e68c6e9b543693f6eb;hpb=5c5c980e564d2b5f6cd26821ffd26997f59af260;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index 4362047a32..0f31c6a3a1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -1,6 +1,15 @@ +/* + * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; @@ -11,7 +20,10 @@ import static org.mockito.Mockito.verify; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.AddressFromURIString; +import akka.actor.PoisonPill; import akka.actor.Props; +import akka.actor.Status; +import akka.actor.Status.Failure; import akka.cluster.Cluster; import akka.cluster.ClusterEvent; import akka.dispatch.Dispatchers; @@ -24,34 +36,56 @@ import akka.util.Timeout; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.ConfigFactory; +import java.net.URI; +import java.util.AbstractMap; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.MockitoAnnotations; +import org.opendaylight.controller.cluster.datastore.config.Configuration; +import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl; +import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider; +import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; +import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; +import org.opendaylight.controller.cluster.datastore.messages.CreateShard; +import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply; +import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; +import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; +import org.opendaylight.controller.cluster.datastore.messages.PeerDown; +import org.opendaylight.controller.cluster.datastore.messages.PeerUp; import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound; +import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica; import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; +import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; -import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; @@ -60,7 +94,13 @@ import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListe import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; +import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; +import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; +import org.opendaylight.controller.cluster.raft.messages.AddServer; +import org.opendaylight.controller.cluster.raft.messages.AddServerReply; +import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; +import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -79,6 +119,8 @@ public class ShardManagerTest extends AbstractActorTest { private static TestActorRef mockShardActor; + private static String mockShardName; + private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder(). dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS) .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6); @@ -97,8 +139,8 @@ public class ShardManagerTest extends AbstractActorTest { InMemoryJournal.clear(); if(mockShardActor == null) { - String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString(); - mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), name); + mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString(); + mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), mockShardName); } mockShardActor.underlyingActor().clear(); @@ -109,9 +151,20 @@ public class ShardManagerTest extends AbstractActorTest { InMemoryJournal.clear(); } - private Props newShardMgrProps(boolean persistent) { - return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(), - datastoreContextBuilder.persistent(persistent).build(), ready, primaryShardInfoCache); + private Props newShardMgrProps() { + return newShardMgrProps(new MockConfiguration()); + } + + private static DatastoreContextFactory newDatastoreContextFactory(DatastoreContext datastoreContext) { + DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class); + Mockito.doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext(); + Mockito.doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(Mockito.anyString()); + return mockFactory; + } + + private Props newShardMgrProps(Configuration config) { + return ShardManager.props(new MockClusterWrapper(), config, + newDatastoreContextFactory(datastoreContextBuilder.build()), ready, primaryShardInfoCache); } private Props newPropsShardMgrWithMockShardActor() { @@ -125,14 +178,102 @@ public class ShardManagerTest extends AbstractActorTest { private static final long serialVersionUID = 1L; @Override public ShardManager create() throws Exception { - return new ForwardingShardManager(clusterWrapper, config, datastoreContextBuilder.build(), - ready, name, shardActor, primaryShardInfoCache); + return new ForwardingShardManager(clusterWrapper, config, newDatastoreContextFactory( + datastoreContextBuilder.build()), ready, name, shardActor, primaryShardInfoCache); } }; return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()); } + @Test + public void testPerShardDatastoreContext() throws Exception { + final DatastoreContextFactory mockFactory = newDatastoreContextFactory( + datastoreContextBuilder.shardElectionTimeoutFactor(5).build()); + + Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()). + shardElectionTimeoutFactor(6).build()).when(mockFactory).getShardDatastoreContext("default"); + + Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()). + shardElectionTimeoutFactor(7).build()).when(mockFactory).getShardDatastoreContext("topology"); + + final MockConfiguration mockConfig = new MockConfiguration() { + @Override + public Collection getMemberShardNames(String memberName) { + return Arrays.asList("default", "topology"); + } + + @Override + public Collection getMembersFromShardName(String shardName) { + return Arrays.asList("member-1"); + } + }; + + final TestActorRef defaultShardActor = TestActorRef.create(getSystem(), + Props.create(MessageCollectorActor.class), "default"); + final TestActorRef topologyShardActor = TestActorRef.create(getSystem(), + Props.create(MessageCollectorActor.class), "topology"); + + final Map> shardInfoMap = Collections.synchronizedMap( + new HashMap>()); + shardInfoMap.put("default", new AbstractMap.SimpleEntry(defaultShardActor, null)); + shardInfoMap.put("topology", new AbstractMap.SimpleEntry(topologyShardActor, null)); + + final CountDownLatch newShardActorLatch = new CountDownLatch(2); + final Creator creator = new Creator() { + private static final long serialVersionUID = 1L; + @Override + public ShardManager create() throws Exception { + return new ShardManager(new MockClusterWrapper(), mockConfig, mockFactory, ready, primaryShardInfoCache) { + @Override + protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) { + Entry entry = shardInfoMap.get(info.getShardName()); + ActorRef ref = null; + if(entry != null) { + ref = entry.getKey(); + entry.setValue(info.getDatastoreContext()); + } + + newShardActorLatch.countDown(); + return ref; + } + }; + } + }; + + JavaTestKit kit = new JavaTestKit(getSystem()); + + final ActorRef shardManager = getSystem().actorOf(Props.create(new DelegatingShardManagerCreator(creator)). + withDispatcher(Dispatchers.DefaultDispatcherId())); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), kit.getRef()); + + assertEquals("Shard actors created", true, newShardActorLatch.await(5, TimeUnit.SECONDS)); + assertEquals("getShardElectionTimeoutFactor", 6, shardInfoMap.get("default").getValue(). + getShardElectionTimeoutFactor()); + assertEquals("getShardElectionTimeoutFactor", 7, shardInfoMap.get("topology").getValue(). + getShardElectionTimeoutFactor()); + + DatastoreContextFactory newMockFactory = newDatastoreContextFactory( + datastoreContextBuilder.shardElectionTimeoutFactor(5).build()); + Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()). + shardElectionTimeoutFactor(66).build()).when(newMockFactory).getShardDatastoreContext("default"); + + Mockito.doReturn(DatastoreContext.newBuilderFrom(datastoreContextBuilder.build()). + shardElectionTimeoutFactor(77).build()).when(newMockFactory).getShardDatastoreContext("topology"); + + shardManager.tell(newMockFactory, kit.getRef()); + + DatastoreContext newContext = MessageCollectorActor.expectFirstMatching(defaultShardActor, DatastoreContext.class); + assertEquals("getShardElectionTimeoutFactor", 66, newContext.getShardElectionTimeoutFactor()); + + newContext = MessageCollectorActor.expectFirstMatching(topologyShardActor, DatastoreContext.class); + assertEquals("getShardElectionTimeoutFactor", 77, newContext.getShardElectionTimeoutFactor()); + + defaultShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); + topologyShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + @Test public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception { new JavaTestKit(getSystem()) {{ @@ -503,6 +644,15 @@ public class ShardManagerTest extends AbstractActorTest { shardManager1.underlyingActor().waitForUnreachableMember(); + PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class); + assertEquals("getMemberName", "member-2", peerDown.getMemberName()); + MessageCollectorActor.clearMessages(mockShardActor1); + + shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper. + createMemberRemoved("member-2", "akka.tcp://cluster-test@127.0.0.1:2558")); + + MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class); + shardManager1.tell(new FindPrimary("default", true), getRef()); expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); @@ -512,12 +662,21 @@ public class ShardManagerTest extends AbstractActorTest { shardManager1.underlyingActor().waitForReachableMember(); + PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class); + assertEquals("getMemberName", "member-2", peerUp.getMemberName()); + MessageCollectorActor.clearMessages(mockShardActor1); + shardManager1.tell(new FindPrimary("default", true), getRef()); RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class); String path1 = found1.getPrimaryPath(); assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config")); + shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper. + createMemberUp("member-2", "akka.tcp://cluster-test@127.0.0.1:2558")); + + MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class); + }}; JavaTestKit.shutdownActorSystem(system1); @@ -701,7 +860,7 @@ public class ShardManagerTest extends AbstractActorTest { public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception { new JavaTestKit(getSystem()) { { - TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true)); + TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); String memberId = "member-1-shard-default-" + shardMrgIDSuffix; shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( @@ -721,7 +880,7 @@ public class ShardManagerTest extends AbstractActorTest { public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception { new JavaTestKit(getSystem()) { { - TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true)); + TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); String memberId = "member-1-shard-default-" + shardMrgIDSuffix; shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( @@ -744,7 +903,7 @@ public class ShardManagerTest extends AbstractActorTest { public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception { new JavaTestKit(getSystem()) { { - TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true)); + TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); String memberId = "member-1-shard-default-" + shardMrgIDSuffix; shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( @@ -767,7 +926,7 @@ public class ShardManagerTest extends AbstractActorTest { public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception { new JavaTestKit(getSystem()) { { - TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true)); + TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( "unknown", RaftState.Candidate.name(), RaftState.Leader.name())); @@ -780,7 +939,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testByDefaultSyncStatusIsFalse() throws Exception{ - final Props persistentProps = newShardMgrProps(true); + final Props persistentProps = newShardMgrProps(); final TestActorRef shardManager = TestActorRef.create(getSystem(), persistentProps); @@ -791,10 +950,9 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{ - final Props persistentProps = ShardManager.props( - new MockClusterWrapper(), - new MockConfiguration(), - DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache); + final Props persistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration(), + newDatastoreContextFactory(DatastoreContext.newBuilder().persistent(true).build()), ready, + primaryShardInfoCache); final TestActorRef shardManager = TestActorRef.create(getSystem(), persistentProps); @@ -807,7 +965,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{ - final Props persistentProps = newShardMgrProps(true); + final Props persistentProps = newShardMgrProps(); final TestActorRef shardManager = TestActorRef.create(getSystem(), persistentProps); @@ -825,10 +983,9 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{ - final Props persistentProps = ShardManager.props( - new MockClusterWrapper(), - new MockConfiguration(), - DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache); + final Props persistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration(), + newDatastoreContextFactory(DatastoreContext.newBuilder().persistent(true).build()), ready, + primaryShardInfoCache); final TestActorRef shardManager = TestActorRef.create(getSystem(), persistentProps); @@ -853,15 +1010,15 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{ - final Props persistentProps = ShardManager.props( - new MockClusterWrapper(), + final Props persistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration() { @Override public List getMemberShardNames(String memberName) { return Arrays.asList("default", "astronauts"); } }, - DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache); + newDatastoreContextFactory(DatastoreContext.newBuilder().persistent(true).build()), ready, + primaryShardInfoCache); final TestActorRef shardManager = TestActorRef.create(getSystem(), persistentProps); @@ -899,13 +1056,272 @@ public class ShardManagerTest extends AbstractActorTest { } + @Test + public void testOnReceiveSwitchShardBehavior() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); + + shardManager.tell(new SwitchShardBehavior(mockShardName, "Leader", 1000), getRef()); + + SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor, SwitchBehavior.class); + + assertEquals(RaftState.Leader, switchBehavior.getNewState()); + assertEquals(1000, switchBehavior.getNewTerm()); + }}; + } + + @Test + public void testOnReceiveCreateShard() { + new JavaTestKit(getSystem()) {{ + datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true); + + ActorRef shardManager = getSystem().actorOf(newShardMgrProps( + new ConfigurationImpl(new EmptyModuleShardConfigProvider()))); + + SchemaContext schemaContext = TestModel.createTestContext(); + shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender()); + + DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100). + persistent(false).build(); + Shard.Builder shardBuilder = Shard.builder(); + + ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module", + "foo", null, Arrays.asList("member-1", "member-5", "member-6")); + shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), getRef()); + + expectMsgClass(duration("5 seconds"), CreateShardReply.class); + + shardManager.tell(new FindLocalShard("foo", true), getRef()); + + expectMsgClass(duration("5 seconds"), LocalShardFound.class); + + assertEquals("isRecoveryApplicable", false, shardBuilder.getDatastoreContext().isPersistent()); + assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig(). + getPeerAddressResolver() instanceof ShardPeerAddressResolver); + assertEquals("peerMembers", Sets.newHashSet(new ShardIdentifier("foo", "member-5", shardMrgIDSuffix).toString(), + new ShardIdentifier("foo", "member-6", shardMrgIDSuffix).toString()), + shardBuilder.getPeerAddresses().keySet()); + assertEquals("ShardIdentifier", new ShardIdentifier("foo", "member-1", shardMrgIDSuffix), + shardBuilder.getId()); + assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext()); + + // Send CreateShard with same name - should fail. + + shardManager.tell(new CreateShard(config, shardBuilder, null), getRef()); + + expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); + }}; + } + + @Test + public void testOnReceiveCreateShardWithNoInitialSchemaContext() { + new JavaTestKit(getSystem()) {{ + ActorRef shardManager = getSystem().actorOf(newShardMgrProps( + new ConfigurationImpl(new EmptyModuleShardConfigProvider()))); + + Shard.Builder shardBuilder = Shard.builder(); + + ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module", + "foo", null, Arrays.asList("member-1")); + shardManager.tell(new CreateShard(config, shardBuilder, null), getRef()); + + expectMsgClass(duration("5 seconds"), CreateShardReply.class); + + SchemaContext schemaContext = TestModel.createTestContext(); + shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender()); + + shardManager.tell(new FindLocalShard("foo", true), getRef()); + + expectMsgClass(duration("5 seconds"), LocalShardFound.class); + + assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext()); + assertNotNull("schemaContext is null", shardBuilder.getDatastoreContext()); + }}; + } + + @Test + public void testGetSnapshot() throws Throwable { + JavaTestKit kit = new JavaTestKit(getSystem()); + + MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.>builder(). + put("shard1", Arrays.asList("member-1")). + put("shard2", Arrays.asList("member-1")).build()); + + ActorRef shardManager = getSystem().actorOf(newShardMgrProps(mockConfig).withDispatcher( + Dispatchers.DefaultDispatcherId())); + + shardManager.tell(GetSnapshot.INSTANCE, kit.getRef()); + Failure failure = kit.expectMsgClass(Failure.class); + assertEquals("Failure cause type", IllegalStateException.class, failure.cause().getClass()); + + kit = new JavaTestKit(getSystem()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender()); + + shardManager.tell(new FindLocalShard("shard1", true), kit.getRef()); + kit.expectMsgClass(LocalShardFound.class); + shardManager.tell(new FindLocalShard("shard2", true), kit.getRef()); + kit.expectMsgClass(LocalShardFound.class); + + shardManager.tell(GetSnapshot.INSTANCE, kit.getRef()); + + DatastoreSnapshot datastoreSnapshot = kit.expectMsgClass(DatastoreSnapshot.class); + + assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType()); + List shardSnapshots = datastoreSnapshot.getShardSnapshots(); + Set actualShardNames = new HashSet<>(); + for(ShardSnapshot s: shardSnapshots) { + actualShardNames.add(s.getName()); + } + + assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), actualShardNames); + + shardManager.tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + + @Test + public void testAddShardReplicaForNonExistentShard() throws Exception { + new JavaTestKit(getSystem()) {{ + ActorRef shardManager = getSystem().actorOf(newShardMgrProps( + new ConfigurationImpl(new EmptyModuleShardConfigProvider()))); + + shardManager.tell(new AddShardReplica("model-inventory"), getRef()); + Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class); + + assertEquals("Failure obtained", true, + (resp.cause() instanceof IllegalArgumentException)); + }}; + } + + @Test + public void testAddShardReplicaForAlreadyCreatedShard() throws Exception { + new JavaTestKit(getSystem()) {{ + ActorRef shardManager = getSystem().actorOf(newShardMgrProps()); + shardManager.tell(new AddShardReplica("default"), getRef()); + Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class); + assertEquals("Failure obtained", true, + (resp.cause() instanceof IllegalArgumentException)); + }}; + } + + @Test + public void testAddShardReplica() throws Exception { + MockConfiguration mockConfig = + new MockConfiguration(ImmutableMap.>builder(). + put("default", Arrays.asList("member-1", "member-2")). + put("astronauts", Arrays.asList("member-2")).build()); + + String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); + + // Create an ActorSystem ShardManager actor for member-1. + final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); + Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1"); + final TestActorRef newReplicaShardManager = TestActorRef.create(system1, + newPropsShardMgrWithMockShardActor("shardManager1", mockDefaultShardActor, + new ClusterWrapperImpl(system1), mockConfig), shardManagerID); + + // Create an ActorSystem ShardManager actor for member-2. + final ActorSystem system2 = ActorSystem.create("cluster-test", + ConfigFactory.load().getConfig("Member2")); + Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + + String name = new ShardIdentifier("astronauts", "member-2", "config").toString(); + final TestActorRef mockShardLeaderActor = + TestActorRef.create(system2, Props.create(MockRespondActor.class), name); + final TestActorRef leaderShardManager = TestActorRef.create(system2, + newPropsShardMgrWithMockShardActor("shardManager2", mockShardLeaderActor, + new ClusterWrapperImpl(system2), mockConfig), shardManagerID); + + new JavaTestKit(system1) {{ + + newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + leaderShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + + leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor); + + String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix; + short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1; + leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2, + Optional.of(mock(DataTree.class)), leaderVersion), mockShardLeaderActor); + leaderShardManager.tell(new RoleChangeNotification(memberId2, + RaftState.Candidate.name(), RaftState.Leader.name()), mockShardLeaderActor); + + newReplicaShardManager.underlyingActor().waitForMemberUp(); + leaderShardManager.underlyingActor().waitForMemberUp(); + + //construct a mock response message + AddServerReply response = new AddServerReply(ServerChangeStatus.OK, memberId2); + mockShardLeaderActor.underlyingActor().updateResponse(response); + newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef()); + AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor, + AddServer.class); + String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix; + assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId()); + + expectMsgClass(duration("5 seconds"), Status.Success.class); + }}; + + JavaTestKit.shutdownActorSystem(system1); + JavaTestKit.shutdownActorSystem(system2); + } + + @Test + public void testAddShardReplicaWithFindPrimaryTimeout() throws Exception { + MockConfiguration mockConfig = + new MockConfiguration(ImmutableMap.>builder(). + put("default", Arrays.asList("member-1", "member-2")). + put("astronauts", Arrays.asList("member-2")).build()); + + String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); + + // Create an ActorSystem ShardManager actor for member-1. + final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); + Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1"); + final TestActorRef newReplicaShardManager = TestActorRef.create(system1, + newPropsShardMgrWithMockShardActor("shardManager1", mockDefaultShardActor, + new ClusterWrapperImpl(system1), mockConfig), shardManagerID); + + new JavaTestKit(system1) {{ + + newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2", getRef().path().toString()); + newReplicaShardManager.underlyingActor().waitForMemberUp(); + + newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef()); + Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class); + assertEquals("Failure obtained", true, + (resp.cause() instanceof RuntimeException)); + }}; + + JavaTestKit.shutdownActorSystem(system1); + } + + @Test + public void testRemoveShardReplicaForNonExistentShard() throws Exception { + new JavaTestKit(getSystem()) {{ + ActorRef shardManager = getSystem().actorOf(newShardMgrProps( + new ConfigurationImpl(new EmptyModuleShardConfigProvider()))); + + shardManager.tell(new RemoveShardReplica("model-inventory"), getRef()); + Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class); + assertEquals("Failure obtained", true, + (resp.cause() instanceof IllegalArgumentException)); + }}; + + } + private static class TestShardManager extends ShardManager { private final CountDownLatch recoveryComplete = new CountDownLatch(1); TestShardManager(String shardMrgIDSuffix) { super(new MockClusterWrapper(), new MockConfiguration(), - DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready, - new PrimaryShardInfoFutureCache()); + newDatastoreContextFactory(DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build()), + ready, new PrimaryShardInfoFutureCache()); } @Override @@ -964,9 +1380,9 @@ public class ShardManagerTest extends AbstractActorTest { private final String name; protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration, - DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name, + DatastoreContextFactory factory, CountDownLatch waitTillReadyCountdownLatch, String name, ActorRef shardActor, PrimaryShardInfoFutureCache primaryShardInfoCache) { - super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch, primaryShardInfoCache); + super(cluster, configuration, factory, waitTillReadyCountdownLatch, primaryShardInfoCache); this.shardActor = shardActor; this.name = name; } @@ -1043,4 +1459,24 @@ public class ShardManagerTest extends AbstractActorTest { findPrimaryMessageReceived = new CountDownLatch(1); } } + + private static class MockRespondActor extends MessageCollectorActor { + + private volatile Object responseMsg; + + public void updateResponse(Object response) { + responseMsg = response; + } + + @Override + public void onReceive(Object message) throws Exception { + super.onReceive(message); + if (message instanceof AddServer) { + if (responseMsg != null) { + getSender().tell(responseMsg, getSelf()); + responseMsg = null; + } + } + } + } }