From 282a8afa481508d1c749d87502c6224932c8b36a Mon Sep 17 00:00:00 2001 From: YIN Kangqian Date: Thu, 21 Dec 2017 13:44:54 +0800 Subject: [PATCH] Compare member names in leader identifier and member event instead of by leader identifier contains member name. ShardManager#markMemberUnavailable/markMemberAvailable now decide whether current leader shards are on the unavailable/available member by the leader identifier contains the member name. This way will make a wrong decision when the member name on which leader shard reside contains the member name which becomes unavailable/available. New added UT ShardManagerTest#testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable captures this radical case. And two lines change in ShardManager will pass this UT. Change-Id: Ib7cd02316c192aeaa8cdac14ff9ece0f872386a6 Signed-off-by: YIN Kangqian --- .../datastore/shardmanager/ShardManager.java | 8 +- .../shardmanager/ShardManagerTest.java | 101 ++++++++++++++++++ .../src/test/resources/application.conf | 63 +++++++++++ 3 files changed, 166 insertions(+), 6 deletions(-) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java index 430b3b7a85..0b3768f395 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java @@ -1044,11 +1044,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void markMemberUnavailable(final MemberName memberName) { - final String memberStr = memberName.getName(); for (ShardInformation info : localShards.values()) { String leaderId = info.getLeaderId(); - // XXX: why are we using String#contains() here? - if (leaderId != null && leaderId.contains(memberStr)) { + if (leaderId != null && ShardIdentifier.fromShardIdString(leaderId).getMemberName().equals(memberName)) { LOG.debug("Marking Leader {} as unavailable.", leaderId); info.setLeaderAvailable(false); @@ -1060,11 +1058,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void markMemberAvailable(final MemberName memberName) { - final String memberStr = memberName.getName(); for (ShardInformation info : localShards.values()) { String leaderId = info.getLeaderId(); - // XXX: why are we using String#contains() here? - if (leaderId != null && leaderId.contains(memberStr)) { + if (leaderId != null && ShardIdentifier.fromShardIdString(leaderId).getMemberName().equals(memberName)) { LOG.debug("Marking Leader {} as available.", leaderId); info.setLeaderAvailable(true); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java index ee0f91ee27..df9d76433c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java @@ -30,6 +30,7 @@ import akka.cluster.Cluster; import akka.cluster.ClusterEvent; import akka.cluster.Member; import akka.dispatch.Dispatchers; +import akka.dispatch.OnComplete; import akka.japi.Creator; import akka.pattern.Patterns; import akka.persistence.RecoveryCompleted; @@ -912,6 +913,106 @@ public class ShardManagerTest extends AbstractShardManagerTest { LOG.info("testShardAvailabilityChangeOnMemberUnreachableAndLeadershipChange ending"); } + @Test + public void testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable() throws Exception { + LOG.info("testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable starting"); + String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); + + MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.>builder() + .put("default", Arrays.asList("member-256", "member-2")).build()); + + // Create an ActorSystem, ShardManager and actor for member-256. + + final ActorSystem system256 = newActorSystem("Member256"); + // 2562 is the tcp port of Member256 in src/test/resources/application.conf. + Cluster.get(system256).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2562")); + + final ActorRef mockShardActor256 = newMockShardActor(system256, Shard.DEFAULT_NAME, "member-256"); + + final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache(); + + // ShardManager must be created with shard configuration to let its localShards has shards. + final TestActorRef shardManager256 = TestActorRef.create(system256, + newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor256) + .cluster(new ClusterWrapperImpl(system256)) + .primaryShardInfoCache(primaryShardInfoCache).props() + .withDispatcher(Dispatchers.DefaultDispatcherId()), + shardManagerID); + + // Create an ActorSystem, ShardManager and actor for member-2 whose name is contained in member-256. + + final ActorSystem system2 = newActorSystem("Member2"); + + // Join member-2 into the cluster of member-256. + Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2562")); + + final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2"); + + final TestActorRef shardManager2 = TestActorRef.create(system2, + newTestShardMgrBuilder(mockConfig).shardActor(mockShardActor2).cluster( + new ClusterWrapperImpl(system2)).props().withDispatcher( + Dispatchers.DefaultDispatcherId()), shardManagerID); + + new JavaTestKit(system256) { + { + shardManager256.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager256.tell(new ActorInitialized(), mockShardActor256); + shardManager2.tell(new ActorInitialized(), mockShardActor2); + + String memberId256 = "member-256-shard-default-" + shardMrgIDSuffix; + String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; + shardManager256.tell(new ShardLeaderStateChanged(memberId256, memberId256, mock(DataTree.class), + DataStoreVersions.CURRENT_VERSION), mockShardActor256); + shardManager256.tell( + new RoleChangeNotification(memberId256, RaftState.Candidate.name(), RaftState.Leader.name()), + mockShardActor256); + shardManager2.tell(new ShardLeaderStateChanged(memberId2, memberId256, mock(DataTree.class), + DataStoreVersions.CURRENT_VERSION), mockShardActor2); + shardManager2.tell( + new RoleChangeNotification(memberId2, RaftState.Candidate.name(), RaftState.Follower.name()), + mockShardActor2); + shardManager256.underlyingActor().waitForMemberUp(); + + shardManager256.tell(new FindPrimary("default", true), getRef()); + + LocalPrimaryShardFound found = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class); + String path = found.getPrimaryPath(); + assertTrue("Unexpected primary path " + path + " which must on member-256", + path.contains("member-256-shard-default-config")); + + PrimaryShardInfo primaryShardInfo = new PrimaryShardInfo( + system256.actorSelection(mockShardActor256.path()), DataStoreVersions.CURRENT_VERSION); + primaryShardInfoCache.putSuccessful("default", primaryShardInfo); + + // Simulate member-2 become unreachable. + shardManager256.tell(MockClusterWrapper.createUnreachableMember("member-2", + "akka://cluster-test@127.0.0.1:2558"), getRef()); + shardManager256.underlyingActor().waitForUnreachableMember(); + + // Make sure leader shard on member-256 is still leader and still in the cache. + shardManager256.tell(new FindPrimary("default", true), getRef()); + found = expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class); + path = found.getPrimaryPath(); + assertTrue("Unexpected primary path " + path + " which must still not on member-256", + path.contains("member-256-shard-default-config")); + Future futurePrimaryShard = primaryShardInfoCache.getIfPresent("default"); + futurePrimaryShard.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable failure, final PrimaryShardInfo futurePrimaryShardInfo) { + if (failure != null) { + assertTrue("Primary shard info is unexpectedly removed from primaryShardInfoCache", false); + } else { + assertEquals("Expected primaryShardInfoCache entry", + primaryShardInfo, futurePrimaryShardInfo); + } + } + }, system256.dispatchers().defaultGlobalDispatcher()); + } + }; + + LOG.info("testShardAvailabilityChangeOnMemberWithNameContainedInLeaderIdUnreachable ending"); + } @Test public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf index 5e61f4defc..b4dd5c3405 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf @@ -402,6 +402,69 @@ Member5 { } } +Member256 { + bounded-mailbox { + mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox" + mailbox-capacity = 1000 + mailbox-push-timeout-time = 100ms + } + + in-memory-journal { + class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal" + } + + in-memory-snapshot-store { + class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore" + plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" + } + + shard-dispatcher { + type = Dispatcher + executor = "default-executor" + mailbox-type = "org.opendaylight.controller.cluster.common.actor.UnboundedDequeBasedControlAwareMailbox" + } + + akka { + persistence.snapshot-store.plugin = "in-memory-snapshot-store" + persistence.journal.plugin = "in-memory-journal" + + loglevel = "INFO" + + actor { + provider = "akka.cluster.ClusterActorRefProvider" + + serializers { + readylocal = "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransactionSerializer" + } + + serialization-bindings { + "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction" = readylocal + } + } + remote { + log-remote-lifecycle-events = off + artery { + enabled = on + canonical.hostname = "127.0.0.1" + canonical.port = 2562 + } + + netty.tcp { + hostname = "127.0.0.1" + port = 2562 + } + } + + cluster { + retry-unsuccessful-join-after = 100ms + + roles = [ + "member-256" + ] + } + } +} + Member1-without-artery { akka.remote.artery.enabled = off } -- 2.36.6