X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManagerTest.java;h=82f4b3ccf9edde056646cf823755570006001c48;hp=b676cf225c801039d1e679298e6d7cb23d2808ac;hb=30507b196fa240a4176ba12102ac0469280feff9;hpb=a681e6bec3bbd7b536302ee9e083ae04b7f5ebdd diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index b676cf225c..82f4b3ccf9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -1,13 +1,22 @@ +/* + * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.AddressFromURIString; @@ -21,6 +30,7 @@ import akka.persistence.RecoveryCompleted; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import akka.util.Timeout; +import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; @@ -28,11 +38,8 @@ import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.ConfigFactory; import java.net.URI; import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.junit.After; @@ -40,30 +47,43 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.datastore.config.Configuration; +import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl; +import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider; +import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; +import org.opendaylight.controller.cluster.datastore.messages.CreateShard; +import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; +import org.opendaylight.controller.cluster.datastore.messages.PeerDown; +import org.opendaylight.controller.cluster.datastore.messages.PeerUp; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; +import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound; +import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; +import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; -import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; +import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; +import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; +import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; -import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; import scala.concurrent.Future; @@ -80,14 +100,19 @@ public class ShardManagerTest extends AbstractActorTest { private static TestActorRef mockShardActor; + private static String mockShardName; + private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder(). - dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS); + dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS) + .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6); private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) { String name = new ShardIdentifier(shardName, memberName,"config").toString(); return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name); } + private final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache(); + @Before public void setUp() { MockitoAnnotations.initMocks(this); @@ -95,8 +120,8 @@ public class ShardManagerTest extends AbstractActorTest { InMemoryJournal.clear(); if(mockShardActor == null) { - String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString(); - mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), name); + mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString(); + mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), mockShardName); } mockShardActor.underlyingActor().clear(); @@ -108,8 +133,12 @@ public class ShardManagerTest extends AbstractActorTest { } private Props newShardMgrProps() { - return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(), - datastoreContextBuilder.build(), ready); + return newShardMgrProps(new MockConfiguration()); + } + + private Props newShardMgrProps(Configuration config) { + return ShardManager.props(new MockClusterWrapper(), config, datastoreContextBuilder.build(), ready, + primaryShardInfoCache); } private Props newPropsShardMgrWithMockShardActor() { @@ -124,7 +153,7 @@ public class ShardManagerTest extends AbstractActorTest { @Override public ShardManager create() throws Exception { return new ForwardingShardManager(clusterWrapper, config, datastoreContextBuilder.build(), - ready, name, shardActor); + ready, name, shardActor, primaryShardInfoCache); } }; @@ -154,7 +183,9 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); shardManager.tell(new ActorInitialized(), mockShardActor); - shardManager.tell(new LeaderStateChanged(memberId, memberId), getRef()); + DataTree mockDataTree = mock(DataTree.class); + shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree), + DataStoreVersions.CURRENT_VERSION), getRef()); MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class); shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(), @@ -162,9 +193,30 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); - PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class); + LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), primaryFound.getPrimaryPath().contains("member-1-shard-default")); + assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() ); + }}; + } + + @Test + public void testOnReceiveFindPrimaryForNonLocalLeaderShardBeforeMemberUp() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); + + String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; + String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.tell(new RoleChangeNotification(memberId1, + RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor); + shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION), mockShardActor); + + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); + + expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); }}; } @@ -182,13 +234,16 @@ public class ShardManagerTest extends AbstractActorTest { String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; shardManager.tell(new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor); - shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor); + short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1; + shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, Optional.absent(), + leaderVersion), mockShardActor); shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); - PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class); + RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), primaryFound.getPrimaryPath().contains("member-2-shard-default")); + assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion()); }}; } @@ -233,13 +288,16 @@ public class ShardManagerTest extends AbstractActorTest { expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); - shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor); + DataTree mockDataTree = mock(DataTree.class); + shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree), + DataStoreVersions.CURRENT_VERSION), mockShardActor); shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); - PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class); + LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), primaryFound.getPrimaryPath().contains("member-1-shard-default")); + assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() ); }}; } @@ -266,11 +324,14 @@ public class ShardManagerTest extends AbstractActorTest { expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS)); - shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor); + DataTree mockDataTree = mock(DataTree.class); + shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree), + DataStoreVersions.CURRENT_VERSION), mockShardActor); - PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class); + LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), primaryFound.getPrimaryPath().contains("member-1-shard-default")); + assertSame("getLocalShardDataTree", mockDataTree, primaryFound.getLocalShardDataTree() ); expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); }}; @@ -309,6 +370,22 @@ public class ShardManagerTest extends AbstractActorTest { }}; } + @Test + public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, + null, RaftState.IsolatedLeader.name()), mockShardActor); + + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef()); + + expectMsgClass(duration("2 seconds"), NoShardLeaderException.class); + }}; + } + @Test public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception { new JavaTestKit(getSystem()) {{ @@ -362,7 +439,9 @@ public class ShardManagerTest extends AbstractActorTest { shardManager2.tell(new ActorInitialized(), mockShardActor2); String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix; - shardManager2.tell(new LeaderStateChanged(memberId2, memberId2), mockShardActor2); + short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1; + shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, + Optional.of(mock(DataTree.class)), leaderVersion), mockShardActor2); shardManager2.tell(new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2); @@ -370,9 +449,10 @@ public class ShardManagerTest extends AbstractActorTest { shardManager1.tell(new FindPrimary("astronauts", false), getRef()); - PrimaryFound found = expectMsgClass(duration("5 seconds"), PrimaryFound.class); + RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class); String path = found.getPrimaryPath(); assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config")); + assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion()); shardManager2.underlyingActor().verifyFindPrimary(); @@ -389,6 +469,194 @@ public class ShardManagerTest extends AbstractActorTest { JavaTestKit.shutdownActorSystem(system2); } + @Test + public void testShardAvailabilityOnChangeOfMemberReachability() throws Exception { + String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); + + // Create an ActorSystem ShardManager actor for member-1. + + final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); + Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + + final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1"); + + final TestActorRef shardManager1 = TestActorRef.create(system1, + newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1), + new MockConfiguration()), shardManagerID); + + // Create an ActorSystem ShardManager actor for member-2. + + final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2")); + + Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + + final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2"); + + MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.>builder(). + put("default", Arrays.asList("member-1", "member-2")).build()); + + final TestActorRef shardManager2 = TestActorRef.create(system2, + newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2), + mockConfig2), shardManagerID); + + new JavaTestKit(system1) {{ + + shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager1.tell(new ActorInitialized(), mockShardActor1); + shardManager2.tell(new ActorInitialized(), mockShardActor2); + + String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; + String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, + Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1); + shardManager1.tell(new RoleChangeNotification(memberId1, + RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1); + shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)), + DataStoreVersions.CURRENT_VERSION), + mockShardActor2); + shardManager2.tell(new RoleChangeNotification(memberId2, + RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2); + shardManager1.underlyingActor().waitForMemberUp(); + + shardManager1.tell(new FindPrimary("default", true), getRef()); + + RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class); + String path = found.getPrimaryPath(); + assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config")); + + shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper. + createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558")); + + shardManager1.underlyingActor().waitForUnreachableMember(); + + PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class); + assertEquals("getMemberName", "member-2", peerDown.getMemberName()); + MessageCollectorActor.clearMessages(mockShardActor1); + + shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper. + createMemberRemoved("member-2", "akka.tcp://cluster-test@127.0.0.1:2558")); + + MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class); + + shardManager1.tell(new FindPrimary("default", true), getRef()); + + expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); + + shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper. + createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558")); + + shardManager1.underlyingActor().waitForReachableMember(); + + PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class); + assertEquals("getMemberName", "member-2", peerUp.getMemberName()); + MessageCollectorActor.clearMessages(mockShardActor1); + + shardManager1.tell(new FindPrimary("default", true), getRef()); + + RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class); + String path1 = found1.getPrimaryPath(); + assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config")); + + shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper. + createMemberUp("member-2", "akka.tcp://cluster-test@127.0.0.1:2558")); + + MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class); + + }}; + + JavaTestKit.shutdownActorSystem(system1); + JavaTestKit.shutdownActorSystem(system2); + } + + @Test + public void testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange() throws Exception { + String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); + + // Create an ActorSystem ShardManager actor for member-1. + + final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); + Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + + final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1"); + + final TestActorRef shardManager1 = TestActorRef.create(system1, + newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1), + new MockConfiguration()), shardManagerID); + + // Create an ActorSystem ShardManager actor for member-2. + + final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2")); + + Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + + final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2"); + + MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.>builder(). + put("default", Arrays.asList("member-1", "member-2")).build()); + + final TestActorRef shardManager2 = TestActorRef.create(system2, + newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2), + mockConfig2), shardManagerID); + + new JavaTestKit(system1) {{ + + shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager1.tell(new ActorInitialized(), mockShardActor1); + shardManager2.tell(new ActorInitialized(), mockShardActor2); + + String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; + String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, + Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1); + shardManager1.tell(new RoleChangeNotification(memberId1, + RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1); + shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)), + DataStoreVersions.CURRENT_VERSION), + mockShardActor2); + shardManager2.tell(new RoleChangeNotification(memberId2, + RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2); + shardManager1.underlyingActor().waitForMemberUp(); + + shardManager1.tell(new FindPrimary("default", true), getRef()); + + RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class); + String path = found.getPrimaryPath(); + assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config")); + + primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(system1.actorSelection( + mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION, Optional.absent())); + + shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper. + createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558")); + + shardManager1.underlyingActor().waitForUnreachableMember(); + + shardManager1.tell(new FindPrimary("default", true), getRef()); + + expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); + + assertNull("Expected primaryShardInfoCache entry removed", primaryShardInfoCache.getIfPresent("default")); + + shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, Optional.of(mock(DataTree.class)), + DataStoreVersions.CURRENT_VERSION), mockShardActor1); + shardManager1.tell(new RoleChangeNotification(memberId1, + RaftState.Follower.name(), RaftState.Leader.name()), mockShardActor1); + + shardManager1.tell(new FindPrimary("default", true), getRef()); + + LocalPrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class); + String path1 = found1.getPrimaryPath(); + assertTrue("Unexpected primary path " + path1, path1.contains("member-1-shard-default-config")); + + }}; + + JavaTestKit.shutdownActorSystem(system1); + JavaTestKit.shutdownActorSystem(system2); + } + + @Test public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception { new JavaTestKit(getSystem()) {{ @@ -469,188 +737,48 @@ public class ShardManagerTest extends AbstractActorTest { // Journal entries up to the last one should've been deleted Map journal = InMemoryJournal.get(shardMgrID); synchronized (journal) { - assertEquals("Journal size", 1, journal.size()); - assertEquals("Journal entry seq #", Long.valueOf(2), journal.keySet().iterator().next()); + assertEquals("Journal size", 0, journal.size()); } }}; } @Test - public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception { - final ImmutableSet persistedModules = ImmutableSet.of("foo", "bar"); - InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules( - persistedModules)); - new JavaTestKit(getSystem()) {{ - TestActorRef shardManager = TestActorRef.create(getSystem(), - Props.create(new TestShardManagerCreator(shardMrgIDSuffix))); - - shardManager.underlyingActor().waitForRecoveryComplete(); - - Collection knownModules = shardManager.underlyingActor().getKnownModules(); - - assertEquals("getKnownModules", persistedModules, Sets.newHashSet(knownModules)); - }}; - } - - @Test - public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules() - throws Exception { - new JavaTestKit(getSystem()) {{ - final TestActorRef shardManager = - TestActorRef.create(getSystem(), newShardMgrProps()); - - assertEquals("getKnownModules size", 0, shardManager.underlyingActor().getKnownModules().size()); - - ModuleIdentifier foo = mock(ModuleIdentifier.class); - when(foo.getNamespace()).thenReturn(new URI("foo")); - - Set moduleIdentifierSet = new HashSet<>(); - moduleIdentifierSet.add(foo); - - SchemaContext schemaContext = mock(SchemaContext.class); - when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet); - - shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext)); - - assertEquals("getKnownModules", Sets.newHashSet("foo"), - Sets.newHashSet(shardManager.underlyingActor().getKnownModules())); - - ModuleIdentifier bar = mock(ModuleIdentifier.class); - when(bar.getNamespace()).thenReturn(new URI("bar")); - - moduleIdentifierSet.add(bar); - - shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext)); - - assertEquals("getKnownModules", Sets.newHashSet("foo", "bar"), - Sets.newHashSet(shardManager.underlyingActor().getKnownModules())); - }}; - } - - @Test - public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules() - throws Exception { - new JavaTestKit(getSystem()) {{ - final TestActorRef shardManager = - TestActorRef.create(getSystem(), newShardMgrProps()); - - SchemaContext schemaContext = mock(SchemaContext.class); - Set moduleIdentifierSet = new HashSet<>(); - - ModuleIdentifier foo = mock(ModuleIdentifier.class); - when(foo.getNamespace()).thenReturn(new URI("foo")); - - moduleIdentifierSet.add(foo); - - when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet); - - shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext)); - - assertEquals("getKnownModules", Sets.newHashSet("foo"), - Sets.newHashSet(shardManager.underlyingActor().getKnownModules())); - - //Create a completely different SchemaContext with only the bar module in it - //schemaContext = mock(SchemaContext.class); - moduleIdentifierSet.clear(); - ModuleIdentifier bar = mock(ModuleIdentifier.class); - when(bar.getNamespace()).thenReturn(new URI("bar")); - - moduleIdentifierSet.add(bar); - - shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext)); - - assertEquals("getKnownModules", Sets.newHashSet("foo"), - Sets.newHashSet(shardManager.underlyingActor().getKnownModules())); - - }}; - } - - @Test - public void testRecoveryApplicable(){ + public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception { new JavaTestKit(getSystem()) { { - final Props persistentProps = ShardManager.props( - new MockClusterWrapper(), - new MockConfiguration(), - DatastoreContext.newBuilder().persistent(true).build(), ready); - final TestActorRef persistentShardManager = - TestActorRef.create(getSystem(), persistentProps); - - DataPersistenceProvider dataPersistenceProvider1 = persistentShardManager.underlyingActor().getDataPersistenceProvider(); - - assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable()); + TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); - final Props nonPersistentProps = ShardManager.props( - new MockClusterWrapper(), - new MockConfiguration(), - DatastoreContext.newBuilder().persistent(false).build(), ready); - final TestActorRef nonPersistentShardManager = - TestActorRef.create(getSystem(), nonPersistentProps); + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( + memberId, RaftState.Candidate.name(), RaftState.Leader.name())); - DataPersistenceProvider dataPersistenceProvider2 = nonPersistentShardManager.underlyingActor().getDataPersistenceProvider(); + verify(ready, never()).countDown(); - assertFalse("Recovery Not Applicable", dataPersistenceProvider2.isRecoveryApplicable()); + shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId, + Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION)); + verify(ready, times(1)).countDown(); }}; - } @Test - public void testOnUpdateSchemaContextUpdateKnownModulesCallsDataPersistenceProvider() - throws Exception { - final CountDownLatch persistLatch = new CountDownLatch(1); - final Creator creator = new Creator() { - private static final long serialVersionUID = 1L; - @Override - public ShardManager create() throws Exception { - return new ShardManager(new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(), ready) { - @Override - protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) { - DataPersistenceProviderMonitor dataPersistenceProviderMonitor - = new DataPersistenceProviderMonitor(); - dataPersistenceProviderMonitor.setPersistLatch(persistLatch); - return dataPersistenceProviderMonitor; - } - }; - } - }; - - new JavaTestKit(getSystem()) {{ - - final TestActorRef shardManager = - TestActorRef.create(getSystem(), Props.create(new DelegatingShardManagerCreator(creator))); - - ModuleIdentifier foo = mock(ModuleIdentifier.class); - when(foo.getNamespace()).thenReturn(new URI("foo")); - - Set moduleIdentifierSet = new HashSet<>(); - moduleIdentifierSet.add(foo); - - SchemaContext schemaContext = mock(SchemaContext.class); - when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet); - - shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext)); - - assertEquals("Persisted", true, - Uninterruptibles.awaitUninterruptibly(persistLatch, 5, TimeUnit.SECONDS)); - - }}; - } - - @Test - public void testRoleChangeNotificationAndLeaderStateChangedReleaseReady() throws Exception { + public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception { new JavaTestKit(getSystem()) { { TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); String memberId = "member-1-shard-default-" + shardMrgIDSuffix; shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification( - memberId, RaftState.Candidate.name(), RaftState.Leader.name())); + memberId, null, RaftState.Follower.name())); verify(ready, never()).countDown(); - shardManager.underlyingActor().onReceiveCommand(new LeaderStateChanged(memberId, memberId)); + shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString())); + + shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, + "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)), + DataStoreVersions.CURRENT_VERSION)); verify(ready, times(1)).countDown(); @@ -658,7 +786,7 @@ public class ShardManagerTest extends AbstractActorTest { } @Test - public void testRoleChangeNotificationToFollowerWithLeaderStateChangedReleaseReady() throws Exception { + public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception { new JavaTestKit(getSystem()) { { TestActorRef shardManager = TestActorRef.create(getSystem(), newShardMgrProps()); @@ -669,14 +797,17 @@ public class ShardManagerTest extends AbstractActorTest { verify(ready, never()).countDown(); - shardManager.underlyingActor().onReceiveCommand(new LeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix)); + shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, + "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)), + DataStoreVersions.CURRENT_VERSION)); + + shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString())); verify(ready, times(1)).countDown(); }}; } - @Test public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception { new JavaTestKit(getSystem()) { @@ -694,10 +825,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testByDefaultSyncStatusIsFalse() throws Exception{ - final Props persistentProps = ShardManager.props( - new MockClusterWrapper(), - new MockConfiguration(), - DatastoreContext.newBuilder().persistent(true).build(), ready); + final Props persistentProps = newShardMgrProps(); final TestActorRef shardManager = TestActorRef.create(getSystem(), persistentProps); @@ -711,7 +839,7 @@ public class ShardManagerTest extends AbstractActorTest { final Props persistentProps = ShardManager.props( new MockClusterWrapper(), new MockConfiguration(), - DatastoreContext.newBuilder().persistent(true).build(), ready); + DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache); final TestActorRef shardManager = TestActorRef.create(getSystem(), persistentProps); @@ -724,10 +852,7 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{ - final Props persistentProps = ShardManager.props( - new MockClusterWrapper(), - new MockConfiguration(), - DatastoreContext.newBuilder().persistent(true).build(), ready); + final Props persistentProps = newShardMgrProps(); final TestActorRef shardManager = TestActorRef.create(getSystem(), persistentProps); @@ -748,7 +873,7 @@ public class ShardManagerTest extends AbstractActorTest { final Props persistentProps = ShardManager.props( new MockClusterWrapper(), new MockConfiguration(), - DatastoreContext.newBuilder().persistent(true).build(), ready); + DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache); final TestActorRef shardManager = TestActorRef.create(getSystem(), persistentProps); @@ -781,7 +906,7 @@ public class ShardManagerTest extends AbstractActorTest { return Arrays.asList("default", "astronauts"); } }, - DatastoreContext.newBuilder().persistent(true).build(), ready); + DatastoreContext.newBuilder().persistent(true).build(), ready, primaryShardInfoCache); final TestActorRef shardManager = TestActorRef.create(getSystem(), persistentProps); @@ -819,12 +944,114 @@ public class ShardManagerTest extends AbstractActorTest { } + @Test + public void testOnReceiveSwitchShardBehavior() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); + + shardManager.tell(new SwitchShardBehavior(mockShardName, "Leader", 1000), getRef()); + + SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor, SwitchBehavior.class); + + assertEquals(RaftState.Leader, switchBehavior.getNewState()); + assertEquals(1000, switchBehavior.getNewTerm()); + }}; + } + + public void testOnReceiveCreateShard() { + new JavaTestKit(getSystem()) {{ + datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true); + + ActorRef shardManager = getSystem().actorOf(newShardMgrProps( + new ConfigurationImpl(new EmptyModuleShardConfigProvider()))); + + SchemaContext schemaContext = TestModel.createTestContext(); + shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender()); + + DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100). + persistent(false).build(); + TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator(); + + ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module", + "foo", null, Arrays.asList("member-1", "member-5", "member-6")); + shardManager.tell(new CreateShard(config, shardPropsCreator, datastoreContext), getRef()); + + expectMsgClass(duration("5 seconds"), CreateShardReply.class); + + shardManager.tell(new FindLocalShard("foo", true), getRef()); + + expectMsgClass(duration("5 seconds"), LocalShardFound.class); + + assertEquals("isRecoveryApplicable", false, shardPropsCreator.datastoreContext.isPersistent()); + assertEquals("peerMembers", Sets.newHashSet(new ShardIdentifier("foo", "member-5", shardMrgIDSuffix).toString(), + new ShardIdentifier("foo", "member-6", shardMrgIDSuffix).toString()), + shardPropsCreator.peerAddresses.keySet()); + assertEquals("ShardIdentifier", new ShardIdentifier("foo", "member-1", shardMrgIDSuffix), + shardPropsCreator.shardId); + assertSame("schemaContext", schemaContext, shardPropsCreator.schemaContext); + + // Send CreateShard with same name - should fail. + + shardManager.tell(new CreateShard(config, shardPropsCreator, null), getRef()); + + expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); + }}; + } + + @Test + public void testOnReceiveCreateShardWithNoInitialSchemaContext() { + new JavaTestKit(getSystem()) {{ + ActorRef shardManager = getSystem().actorOf(newShardMgrProps( + new ConfigurationImpl(new EmptyModuleShardConfigProvider()))); + + TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator(); + + ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module", + "foo", null, Arrays.asList("member-1")); + shardManager.tell(new CreateShard(config, shardPropsCreator, null), getRef()); + + expectMsgClass(duration("5 seconds"), CreateShardReply.class); + + SchemaContext schemaContext = TestModel.createTestContext(); + shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender()); + + shardManager.tell(new FindLocalShard("foo", true), getRef()); + + expectMsgClass(duration("5 seconds"), LocalShardFound.class); + + assertSame("schemaContext", schemaContext, shardPropsCreator.schemaContext); + assertNotNull("schemaContext is null", shardPropsCreator.datastoreContext); + }}; + } + + private static class TestShardPropsCreator implements ShardPropsCreator { + ShardIdentifier shardId; + Map peerAddresses; + SchemaContext schemaContext; + DatastoreContext datastoreContext; + + @Override + public Props newProps(ShardIdentifier shardId, Map peerAddresses, + DatastoreContext datastoreContext, SchemaContext schemaContext) { + this.shardId = shardId; + this.peerAddresses = peerAddresses; + this.schemaContext = schemaContext; + this.datastoreContext = datastoreContext; + return Shard.props(shardId, peerAddresses, datastoreContext, schemaContext); + } + + } + private static class TestShardManager extends ShardManager { private final CountDownLatch recoveryComplete = new CountDownLatch(1); TestShardManager(String shardMrgIDSuffix) { super(new MockClusterWrapper(), new MockConfiguration(), - DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready); + DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready, + new PrimaryShardInfoFutureCache()); } @Override @@ -877,13 +1104,15 @@ public class ShardManagerTest extends AbstractActorTest { private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1); private CountDownLatch memberUpReceived = new CountDownLatch(1); private CountDownLatch memberRemovedReceived = new CountDownLatch(1); + private CountDownLatch memberUnreachableReceived = new CountDownLatch(1); + private CountDownLatch memberReachableReceived = new CountDownLatch(1); private final ActorRef shardActor; private final String name; protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration, DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name, - ActorRef shardActor) { - super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch); + ActorRef shardActor, PrimaryShardInfoFutureCache primaryShardInfoCache) { + super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch, primaryShardInfoCache); this.shardActor = shardActor; this.name = name; } @@ -905,6 +1134,16 @@ public class ShardManagerTest extends AbstractActorTest { if(!getCluster().getCurrentMemberName().equals(role)) { memberRemovedReceived.countDown(); } + } else if(message instanceof ClusterEvent.UnreachableMember) { + String role = ((ClusterEvent.UnreachableMember)message).member().roles().head(); + if(!getCluster().getCurrentMemberName().equals(role)) { + memberUnreachableReceived.countDown(); + } + } else if(message instanceof ClusterEvent.ReachableMember) { + String role = ((ClusterEvent.ReachableMember)message).member().roles().head(); + if(!getCluster().getCurrentMemberName().equals(role)) { + memberReachableReceived.countDown(); + } } } } @@ -931,6 +1170,19 @@ public class ShardManagerTest extends AbstractActorTest { memberRemovedReceived = new CountDownLatch(1); } + void waitForUnreachableMember() { + assertEquals("UnreachableMember received", true, + Uninterruptibles.awaitUninterruptibly(memberUnreachableReceived, 5, TimeUnit.SECONDS + )); + memberUnreachableReceived = new CountDownLatch(1); + } + + void waitForReachableMember() { + assertEquals("ReachableMember received", true, + Uninterruptibles.awaitUninterruptibly(memberReachableReceived, 5, TimeUnit.SECONDS)); + memberReachableReceived = new CountDownLatch(1); + } + void verifyFindPrimary() { assertEquals("FindPrimary received", true, Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));