X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManagerTest.java;h=1ffa6790d8477f30755c1ce4513e02ef48321e18;hb=02f738dec4a31bdad04e42b2c19ecf09aacc0b87;hp=ae8db2b5f5d2a4727dd063a51fe90cafbc609460;hpb=6d9b1057a07bd6efc29be852721070e1fec5a496;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index ae8db2b5f5..1ffa6790d8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -1,6 +1,15 @@ +/* + * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; @@ -24,9 +33,11 @@ import akka.util.Timeout; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.ConfigFactory; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -36,22 +47,27 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; +import org.opendaylight.controller.cluster.datastore.messages.CreateShard; +import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; +import org.opendaylight.controller.cluster.datastore.messages.PeerDown; +import org.opendaylight.controller.cluster.datastore.messages.PeerUp; import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; +import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; -import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; @@ -60,7 +76,9 @@ import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListe import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; +import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; +import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -79,8 +97,11 @@ public class ShardManagerTest extends AbstractActorTest { private static TestActorRef mockShardActor; + private static String mockShardName; + private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder(). - dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS); + dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS) + .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(6); private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) { String name = new ShardIdentifier(shardName, memberName,"config").toString(); @@ -96,8 +117,8 @@ public class ShardManagerTest extends AbstractActorTest { InMemoryJournal.clear(); if(mockShardActor == null) { - String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString(); - mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), name); + mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1", "config").toString(); + mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), mockShardName); } mockShardActor.underlyingActor().clear(); @@ -156,7 +177,8 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell(new ActorInitialized(), mockShardActor); DataTree mockDataTree = mock(DataTree.class); - shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree)), getRef()); + shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree), + DataStoreVersions.CURRENT_VERSION), getRef()); MessageCollectorActor.expectFirstMatching(mockShardActor, RegisterRoleChangeListener.class); shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(), @@ -183,7 +205,7 @@ public class ShardManagerTest extends AbstractActorTest { String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; shardManager.tell(new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor); - shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor); + shardManager.tell(new LeaderStateChanged(memberId1, memberId2, DataStoreVersions.CURRENT_VERSION), mockShardActor); shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); @@ -205,13 +227,16 @@ public class ShardManagerTest extends AbstractActorTest { String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; shardManager.tell(new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor); - shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, Optional.absent()), mockShardActor); + short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1; + shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, Optional.absent(), + leaderVersion), mockShardActor); shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); RemotePrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), primaryFound.getPrimaryPath().contains("member-2-shard-default")); + assertEquals("getPrimaryVersion", leaderVersion, primaryFound.getPrimaryVersion()); }}; } @@ -257,7 +282,8 @@ public class ShardManagerTest extends AbstractActorTest { expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); DataTree mockDataTree = mock(DataTree.class); - shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree)), mockShardActor); + shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree), + DataStoreVersions.CURRENT_VERSION), mockShardActor); shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef()); @@ -292,7 +318,8 @@ public class ShardManagerTest extends AbstractActorTest { expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS)); DataTree mockDataTree = mock(DataTree.class); - shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree)), mockShardActor); + shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mockDataTree), + DataStoreVersions.CURRENT_VERSION), mockShardActor); LocalPrimaryShardFound primaryFound = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class); assertTrue("Unexpected primary path " + primaryFound.getPrimaryPath(), @@ -336,6 +363,22 @@ public class ShardManagerTest extends AbstractActorTest { }}; } + @Test + public void testOnReceiveFindPrimaryWaitForReadyWithIsolatedLeaderShard() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); + shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix, + null, RaftState.IsolatedLeader.name()), mockShardActor); + + shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef()); + + expectMsgClass(duration("2 seconds"), NoShardLeaderException.class); + }}; + } + @Test public void testOnReceiveFindPrimaryWaitForReadyWithNoRoleShard() throws Exception { new JavaTestKit(getSystem()) {{ @@ -389,8 +432,9 @@ public class ShardManagerTest extends AbstractActorTest { shardManager2.tell(new ActorInitialized(), mockShardActor2); String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix; + short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1; shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, - Optional.of(mock(DataTree.class))), mockShardActor2); + Optional.of(mock(DataTree.class)), leaderVersion), mockShardActor2); shardManager2.tell(new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2); @@ -401,6 +445,7 @@ public class ShardManagerTest extends AbstractActorTest { RemotePrimaryShardFound found = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class); String path = found.getPrimaryPath(); assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config")); + assertEquals("getPrimaryVersion", leaderVersion, found.getPrimaryVersion()); shardManager2.underlyingActor().verifyFindPrimary(); @@ -457,10 +502,11 @@ public class ShardManagerTest extends AbstractActorTest { String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, - Optional.of(mock(DataTree.class))), mockShardActor1); + Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1); shardManager1.tell(new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1); - shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class))), + shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)), + DataStoreVersions.CURRENT_VERSION), mockShardActor2); shardManager2.tell(new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2); @@ -477,6 +523,15 @@ public class ShardManagerTest extends AbstractActorTest { shardManager1.underlyingActor().waitForUnreachableMember(); + PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class); + assertEquals("getMemberName", "member-2", peerDown.getMemberName()); + MessageCollectorActor.clearMessages(mockShardActor1); + + shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper. + createMemberRemoved("member-2", "akka.tcp://cluster-test@127.0.0.1:2558")); + + MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class); + shardManager1.tell(new FindPrimary("default", true), getRef()); expectMsgClass(duration("5 seconds"), NoShardLeaderException.class); @@ -486,12 +541,21 @@ public class ShardManagerTest extends AbstractActorTest { shardManager1.underlyingActor().waitForReachableMember(); + PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class); + assertEquals("getMemberName", "member-2", peerUp.getMemberName()); + MessageCollectorActor.clearMessages(mockShardActor1); + shardManager1.tell(new FindPrimary("default", true), getRef()); RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class); String path1 = found1.getPrimaryPath(); assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config")); + shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper. + createMemberUp("member-2", "akka.tcp://cluster-test@127.0.0.1:2558")); + + MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class); + }}; JavaTestKit.shutdownActorSystem(system1); @@ -538,10 +602,11 @@ public class ShardManagerTest extends AbstractActorTest { String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix; shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId2, - Optional.of(mock(DataTree.class))), mockShardActor1); + Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION), mockShardActor1); shardManager1.tell(new RoleChangeNotification(memberId1, RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor1); - shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class))), + shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId2, Optional.of(mock(DataTree.class)), + DataStoreVersions.CURRENT_VERSION), mockShardActor2); shardManager2.tell(new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2); @@ -554,7 +619,7 @@ public class ShardManagerTest extends AbstractActorTest { assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config")); primaryShardInfoCache.putSuccessful("default", new PrimaryShardInfo(system1.actorSelection( - mockShardActor1.path()), Optional.absent())); + mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION, Optional.absent())); shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper. createUnreachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558")); @@ -567,8 +632,8 @@ public class ShardManagerTest extends AbstractActorTest { assertNull("Expected primaryShardInfoCache entry removed", primaryShardInfoCache.getIfPresent("default")); - shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, Optional.of(mock(DataTree.class))), - mockShardActor1); + shardManager1.tell(new ShardLeaderStateChanged(memberId1, memberId1, Optional.of(mock(DataTree.class)), + DataStoreVersions.CURRENT_VERSION), mockShardActor1); shardManager1.tell(new RoleChangeNotification(memberId1, RaftState.Follower.name(), RaftState.Leader.name()), mockShardActor1); @@ -683,7 +748,7 @@ public class ShardManagerTest extends AbstractActorTest { verify(ready, never()).countDown(); shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, memberId, - Optional.of(mock(DataTree.class)))); + Optional.of(mock(DataTree.class)), DataStoreVersions.CURRENT_VERSION)); verify(ready, times(1)).countDown(); @@ -705,7 +770,8 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString())); shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, - "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)))); + "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)), + DataStoreVersions.CURRENT_VERSION)); verify(ready, times(1)).countDown(); @@ -725,7 +791,8 @@ public class ShardManagerTest extends AbstractActorTest { verify(ready, never()).countDown(); shardManager.underlyingActor().onReceiveCommand(new ShardLeaderStateChanged(memberId, - "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)))); + "member-2-shard-default-" + shardMrgIDSuffix, Optional.of(mock(DataTree.class)), + DataStoreVersions.CURRENT_VERSION)); shardManager.underlyingActor().onReceiveCommand(MockClusterWrapper.createMemberUp("member-2", getRef().path().toString())); @@ -870,6 +937,102 @@ public class ShardManagerTest extends AbstractActorTest { } + @Test + public void testOnReceiveSwitchShardBehavior() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor()); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), mockShardActor); + + shardManager.tell(new SwitchShardBehavior(mockShardName, "Leader", 1000), getRef()); + + SwitchBehavior switchBehavior = MessageCollectorActor.expectFirstMatching(mockShardActor, SwitchBehavior.class); + + assertEquals(RaftState.Leader, switchBehavior.getNewState()); + assertEquals(1000, switchBehavior.getNewTerm()); + }}; + } + + public void testOnReceiveCreateShard() { + new JavaTestKit(getSystem()) {{ + datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true); + + ActorRef shardManager = getSystem().actorOf(newShardMgrProps(false)); + + SchemaContext schemaContext = TestModel.createTestContext(); + shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender()); + + DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100). + persistent(false).build(); + TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator(); + + shardManager.tell(new CreateShard("foo", Arrays.asList("member-1", "member-5", "member-6"), shardPropsCreator, + datastoreContext), getRef()); + + expectMsgClass(duration("5 seconds"), CreateShardReply.class); + + shardManager.tell(new FindLocalShard("foo", true), getRef()); + + expectMsgClass(duration("5 seconds"), LocalShardFound.class); + + assertEquals("isRecoveryApplicable", false, shardPropsCreator.datastoreContext.isPersistent()); + assertEquals("peerMembers", Sets.newHashSet(new ShardIdentifier("foo", "member-5", shardMrgIDSuffix).toString(), + new ShardIdentifier("foo", "member-6", shardMrgIDSuffix).toString()), + shardPropsCreator.peerAddresses.keySet()); + assertEquals("ShardIdentifier", new ShardIdentifier("foo", "member-1", shardMrgIDSuffix), + shardPropsCreator.shardId); + assertSame("schemaContext", schemaContext, shardPropsCreator.schemaContext); + + // Send CreateShard with same name - should fail. + + shardManager.tell(new CreateShard("foo", Collections.emptyList(), shardPropsCreator, null), getRef()); + + expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); + }}; + } + + @Test + public void testOnReceiveCreateShardWithNoInitialSchemaContext() { + new JavaTestKit(getSystem()) {{ + ActorRef shardManager = getSystem().actorOf(newShardMgrProps(false)); + + TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator(); + + shardManager.tell(new CreateShard("foo", Arrays.asList("member-1"), shardPropsCreator, null), getRef()); + + expectMsgClass(duration("5 seconds"), CreateShardReply.class); + + SchemaContext schemaContext = TestModel.createTestContext(); + shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender()); + + shardManager.tell(new FindLocalShard("foo", true), getRef()); + + expectMsgClass(duration("5 seconds"), LocalShardFound.class); + + assertSame("schemaContext", schemaContext, shardPropsCreator.schemaContext); + assertNotNull("schemaContext is null", shardPropsCreator.datastoreContext); + }}; + } + + private static class TestShardPropsCreator implements ShardPropsCreator { + ShardIdentifier shardId; + Map peerAddresses; + SchemaContext schemaContext; + DatastoreContext datastoreContext; + + @Override + public Props newProps(ShardIdentifier shardId, Map peerAddresses, + DatastoreContext datastoreContext, SchemaContext schemaContext) { + this.shardId = shardId; + this.peerAddresses = peerAddresses; + this.schemaContext = schemaContext; + this.datastoreContext = datastoreContext; + return Shard.props(shardId, peerAddresses, datastoreContext, schemaContext); + } + + } + private static class TestShardManager extends ShardManager { private final CountDownLatch recoveryComplete = new CountDownLatch(1);