From a8000ee3b6071fa3b83500a39fc60ab3a9c5f085 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Wed, 20 Apr 2016 11:41:25 -0400 Subject: [PATCH] Implement cluster admin RPCs to change member voting states Added 3 new RPCs for changing voting states: change-member-voting-states-for-shard change-member-voting-states-for-all-shards flip-member-voting-states-for-all-shards These replace the original ones added in Be that weren't implemented. They were added as placeholders based on how it was thought it would work at that time. New related ShardManager messages were added that are sent by the ClusterAdminRpcService. The flip-member-voting-states-for-all-shards RPC is a shortcut that obtains the current voting states via the GetOnDemandRaftState message to the RaftActor and inverts them. New fields were added to the OnDemandRaftState response to return the voting states. Modified the ShardStats JXM bean to report the new OnDemandRaftState fields. Added a check in RaftActorServerConfigurationSupport to ensure that there's at least 1 voting member otherwise one can end up with an unusable shard with no ability to elect a leader. Fixed a couple bugs in Leader and AbstractLeader that were found during testing. AbstractLeader needs to take into account the follower's voting state when determining if the leader is isolated. Change-Id: I58686e3ce94d58de7cf289e55bb717ba46bc1de5 Signed-off-by: Tom Pantelis --- .../controller/cluster/raft/RaftActor.java | 8 +- .../RaftActorServerConfigurationSupport.java | 29 +- .../raft/behaviors/AbstractLeader.java | 5 +- .../cluster/raft/behaviors/Leader.java | 4 +- .../client/messages/OnDemandRaftState.java | 21 + .../raft/messages/ServerChangeStatus.java | 5 + ...ftActorServerConfigurationSupportTest.java | 15 + .../cluster/raft/behaviors/LeaderTest.java | 19 + .../admin/ClusterAdminRpcService.java | 155 ++++--- .../src/main/yang/cluster-admin.yang | 74 +++- .../admin/ClusterAdminRpcServiceTest.java | 397 +++++++++++++++++- .../test/resources/simplelogger.properties | 10 + .../jmx/mbeans/shard/ShardStats.java | 26 +- .../jmx/mbeans/shard/ShardStatsMXBean.java | 4 + .../ChangeShardMembersVotingStatus.java | 43 ++ .../FlipShardMembersVotingStatus.java | 32 ++ .../datastore/shardmanager/ShardManager.java | 238 ++++++++--- .../shardmanager/ShardManagerTest.java | 109 +++-- 18 files changed, 992 insertions(+), 202 deletions(-) create mode 100644 opendaylight/md-sal/sal-cluster-admin/src/test/resources/simplelogger.properties create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ChangeShardMembersVotingStatus.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FlipShardMembersVotingStatus.java diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 350fa54c83..b207e0b725 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -409,8 +409,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { // Debugging message to retrieve raft stats. Map peerAddresses = new HashMap<>(); - for(String peerId: context.getPeerIds()) { - peerAddresses.put(peerId, context.getPeerAddress(peerId)); + Map peerVotingStates = new HashMap<>(); + for(PeerInfo info: context.getPeers()) { + peerVotingStates.put(info.getId(), info.getVotingState() != VotingState.NON_VOTING); + peerAddresses.put(info.getId(), info.getAddress()); } final RaftActorBehavior currentBehavior = context.getCurrentBehavior(); @@ -429,7 +431,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { .snapshotIndex(replicatedLog().getSnapshotIndex()) .snapshotTerm(replicatedLog().getSnapshotTerm()) .votedFor(context.getTermInformation().getVotedFor()) + .isVoting(context.isVotingMember()) .peerAddresses(peerAddresses) + .peerVotingStates(peerVotingStates) .customRaftPolicyClassName(context.getConfigParams().getCustomRaftPolicyImplementationClass()); ReplicatedLogEntry lastLogEntry = replicatedLog().last(); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java index 4cad0d0519..f76c7d70d5 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java @@ -681,7 +681,8 @@ class RaftActorServerConfigurationSupport { boolean localServerChangedToNonVoting = Boolean.FALSE.equals(getOperation(). getServerVotingStatusMap().get(raftActor.getRaftActorContext().getId())); if (succeeded && localServerChangedToNonVoting) { - raftActor.becomeNonVoting(); + LOG.debug("Leader changed to non-voting - trying leadership transfer"); + raftActor.becomeNonVoting(); } } @@ -707,9 +708,7 @@ class RaftActorServerConfigurationSupport { if(tryToElectLeader) { initiateLocalLeaderElection(); - } else { - updateLocalPeerInfo(); - + } else if(updateLocalPeerInfo()) { persistNewServerConfiguration(changeVotingStatusContext); } } @@ -718,21 +717,39 @@ class RaftActorServerConfigurationSupport { LOG.debug("{}: Sending local ElectionTimeout to start leader election", raftContext.getId()); ServerConfigurationPayload previousServerConfig = raftContext.getPeerServerInfo(true); - updateLocalPeerInfo(); + if(!updateLocalPeerInfo()) { + return; + } raftContext.getActor().tell(ElectionTimeout.INSTANCE, raftContext.getActor()); currentOperationState = new WaitingForLeaderElected(changeVotingStatusContext, previousServerConfig); } - private void updateLocalPeerInfo() { + private boolean updateLocalPeerInfo() { List newServerInfoList = newServerInfoList(); + // Check if new voting state would leave us with no voting members. + boolean atLeastOneVoting = false; + for(ServerInfo info: newServerInfoList) { + if(info.isVoting()) { + atLeastOneVoting = true; + break; + } + } + + if(!atLeastOneVoting) { + operationComplete(changeVotingStatusContext, ServerChangeStatus.INVALID_REQUEST); + return false; + } + raftContext.updatePeerIds(new ServerConfigurationPayload(newServerInfoList)); if(raftActor.getCurrentBehavior() instanceof AbstractLeader) { AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior(); leader.updateMinReplicaCount(); } + + return true; } private List newServerInfoList() { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 6779414f8c..2fe3bfdc87 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -775,10 +775,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { protected boolean isLeaderIsolated() { int minPresent = getMinIsolatedLeaderPeerCount(); for (FollowerLogInformation followerLogInformation : followerToLog.values()) { - if (followerLogInformation.isFollowerActive()) { + final PeerInfo peerInfo = context.getPeerInfo(followerLogInformation.getId()); + if(peerInfo != null && peerInfo.isVoting() && followerLogInformation.isFollowerActive()) { --minPresent; if (minPresent == 0) { - break; + return false; } } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index 36e9b646e6..77853f38c8 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -161,7 +161,9 @@ public class Leader extends AbstractLeader { @Override public void close() { if(leadershipTransferContext != null) { - leadershipTransferContext.transferCohort.abortTransfer(); + LeadershipTransferContext localLeadershipTransferContext = leadershipTransferContext; + leadershipTransferContext = null; + localLeadershipTransferContext.transferCohort.abortTransfer(); } super.close(); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/OnDemandRaftState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/OnDemandRaftState.java index 57f8beb005..0bd85b1e6d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/OnDemandRaftState.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/OnDemandRaftState.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.cluster.raft.client.messages; +import com.google.common.collect.ImmutableMap; import java.util.Collections; import java.util.List; import java.util.Map; @@ -34,9 +35,11 @@ public class OnDemandRaftState { private String votedFor; private boolean isSnapshotCaptureInitiated; private String customRaftPolicyClassName; + private boolean isVoting; private List followerInfoList = Collections.emptyList(); private Map peerAddresses = Collections.emptyMap(); + private Map peerVotingStates = Collections.emptyMap(); private OnDemandRaftState() { } @@ -109,6 +112,10 @@ public class OnDemandRaftState { return isSnapshotCaptureInitiated; } + public boolean isVoting() { + return isVoting; + } + public List getFollowerInfoList() { return followerInfoList; } @@ -117,6 +124,10 @@ public class OnDemandRaftState { return peerAddresses; } + public Map getPeerVotingStates() { + return peerVotingStates; + } + public String getCustomRaftPolicyClassName() { return customRaftPolicyClassName; } @@ -199,6 +210,11 @@ public class OnDemandRaftState { return this; } + public Builder isVoting(boolean isVoting) { + stats.isVoting = isVoting; + return this; + } + public Builder followerInfoList(List followerInfoList) { stats.followerInfoList = followerInfoList; return this; @@ -209,6 +225,11 @@ public class OnDemandRaftState { return this; } + public Builder peerVotingStates(Map peerVotingStates) { + stats.peerVotingStates = ImmutableMap.copyOf(peerVotingStates); + return this; + } + public Builder isSnapshotCaptureInitiated(boolean value) { stats.isSnapshotCaptureInitiated = value; return this; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/ServerChangeStatus.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/ServerChangeStatus.java index 8f6a370a92..ca6f34c5c4 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/ServerChangeStatus.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/ServerChangeStatus.java @@ -48,4 +48,9 @@ public enum ServerChangeStatus { * An unsupported request, for example removing the leader in a single node cluster. */ NOT_SUPPORTED, + + /** + * Some part of the request is invalid. + */ + INVALID_REQUEST, } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java index 02ba015319..1a43dfe829 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java @@ -1085,6 +1085,21 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { LOG.info("testChangeLeaderToNonVoting ending"); } + @Test + public void testChangeLeaderToNonVotingInSingleNode() { + LOG.info("testChangeLeaderToNonVotingInSingleNode starting"); + + TestActorRef leaderActor = actorFactory.createTestActor( + MockLeaderRaftActor.props(ImmutableMap.of(), new MockRaftActorContext()). + withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID)); + + leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef()); + ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class); + assertEquals("getStatus", ServerChangeStatus.INVALID_REQUEST, reply.getStatus()); + + LOG.info("testChangeLeaderToNonVotingInSingleNode ending"); + } + @Test public void testChangeToVotingWithNoLeader() { LOG.info("testChangeToVotingWithNoLeader starting"); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 0937f65565..c99c85fbce 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -1836,6 +1836,25 @@ public class LeaderTest extends AbstractLeaderTest { assertTrue(behavior instanceof Leader); } + @Test + public void testIsolatedLeaderCheckNoVotingFollowers() { + logStart("testIsolatedLeaderCheckNoVotingFollowers"); + + MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader(); + Follower follower = new Follower(followerActorContext); + followerActor.underlyingActor().setBehavior(follower); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(1000, TimeUnit.SECONDS)); + leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING); + + leader = new Leader(leaderActorContext); + leader.getFollower(FOLLOWER_ID).markFollowerActive(); + RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK); + assertTrue("Expected Leader", behavior instanceof Leader); + } + private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){ ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1"); ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2"); diff --git a/opendaylight/md-sal/sal-cluster-admin/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java b/opendaylight/md-sal/sal-cluster-admin/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java index d52a15483c..617120a048 100644 --- a/opendaylight/md-sal/sal-cluster-admin/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java +++ b/opendaylight/md-sal/sal-cluster-admin/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java @@ -22,7 +22,9 @@ import com.google.common.util.concurrent.SettableFuture; import java.io.FileOutputStream; import java.util.AbstractMap.SimpleEntry; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.Future; @@ -31,25 +33,30 @@ import org.apache.commons.lang3.SerializationUtils; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface; import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; +import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshotList; +import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus; import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; -import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration; -import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ClusterAdminService; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ConvertMembersToNonvotingForAllShardsInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ConvertMembersToVotingForAllShardsInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingState; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder; import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; @@ -63,14 +70,13 @@ import org.slf4j.LoggerFactory; * * @author Thomas Pantelis */ -public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseable { +public class ClusterAdminRpcService implements ClusterAdminService { private static final Timeout SHARD_MGR_TIMEOUT = new Timeout(1, TimeUnit.MINUTES); private static final Logger LOG = LoggerFactory.getLogger(ClusterAdminRpcService.class); private final DistributedDataStoreInterface configDataStore; private final DistributedDataStoreInterface operDataStore; - private RpcRegistration rpcRegistration; public ClusterAdminRpcService(DistributedDataStoreInterface configDataStore, DistributedDataStoreInterface operDataStore) { @@ -78,19 +84,6 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl this.operDataStore = operDataStore; } - public void start(RpcProviderRegistry rpcProviderRegistry) { - LOG.debug("ClusterAdminRpcService starting"); - - rpcRegistration = rpcProviderRegistry.addRpcImplementation(ClusterAdminService.class, this); - } - - @Override - public void close() { - if(rpcRegistration != null) { - rpcRegistration.close(); - } - } - @Override public Future> addShardReplica(final AddShardReplicaInput input) { final String shardName = input.getShardName(); @@ -168,22 +161,14 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl LOG.info("Adding replicas for all shards"); final List, ShardResultBuilder>> shardResultData = new ArrayList<>(); - Function messageSupplier = new Function() { - @Override - public Object apply(String shardName) { - return new AddShardReplica(shardName); - } - }; + Function messageSupplier = shardName -> new AddShardReplica(shardName); sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier); sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier); - return waitForShardResults(shardResultData, new Function, AddReplicasForAllShardsOutput>() { - @Override - public AddReplicasForAllShardsOutput apply(List shardResults) { - return new AddReplicasForAllShardsOutputBuilder().setShardResult(shardResults).build(); - } - }, "Failed to add replica"); + return waitForShardResults(shardResultData, shardResults -> + new AddReplicasForAllShardsOutputBuilder().setShardResult(shardResults).build(), + "Failed to add replica"); } @@ -197,37 +182,95 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl } final List, ShardResultBuilder>> shardResultData = new ArrayList<>(); - Function messageSupplier = new Function() { - @Override - public Object apply(String shardName) { - return new RemoveShardReplica(shardName, MemberName.forName(memberName)); - } - }; + Function messageSupplier = shardName -> + new RemoveShardReplica(shardName, MemberName.forName(memberName)); sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier); sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier); - return waitForShardResults(shardResultData, new Function, RemoveAllShardReplicasOutput>() { + return waitForShardResults(shardResultData, shardResults -> + new RemoveAllShardReplicasOutputBuilder().setShardResult(shardResults).build(), + " Failed to remove replica"); + } + + @Override + public Future> changeMemberVotingStatesForShard(ChangeMemberVotingStatesForShardInput input) { + final String shardName = input.getShardName(); + if(Strings.isNullOrEmpty(shardName)) { + return newFailedRpcResultFuture("A valid shard name must be specified"); + } + + DataStoreType dataStoreType = input.getDataStoreType(); + if(dataStoreType == null) { + return newFailedRpcResultFuture("A valid DataStoreType must be specified"); + } + + List memberVotingStates = input.getMemberVotingState(); + if(memberVotingStates == null || memberVotingStates.isEmpty()) { + return newFailedRpcResultFuture("No member voting state input was specified"); + } + + ChangeShardMembersVotingStatus changeVotingStatus = toChangeShardMembersVotingStatus(shardName, + memberVotingStates); + + LOG.info("Change member voting states for shard {}: {}", shardName, + changeVotingStatus.getMeberVotingStatusMap()); + + final SettableFuture> returnFuture = SettableFuture.create(); + ListenableFuture future = sendMessageToShardManager(dataStoreType, changeVotingStatus); + Futures.addCallback(future, new FutureCallback() { @Override - public RemoveAllShardReplicasOutput apply(List shardResults) { - return new RemoveAllShardReplicasOutputBuilder().setShardResult(shardResults).build(); + public void onSuccess(Success success) { + LOG.info("Successfully changed member voting states for shard {}", shardName); + returnFuture.set(newSuccessfulResult()); } - }, "Failed to add replica"); + + @Override + public void onFailure(Throwable failure) { + onMessageFailure(String.format("Failed to change member voting states for shard %s", shardName), + returnFuture, failure); + } + }); + + return returnFuture; } @Override - public Future> convertMembersToVotingForAllShards(ConvertMembersToVotingForAllShardsInput input) { - // TODO implement - return RpcResultBuilder.failed().withError(ErrorType.APPLICATION, "operation-not-supported", - "Not implemented yet").buildFuture(); + public Future> changeMemberVotingStatesForAllShards( + final ChangeMemberVotingStatesForAllShardsInput input) { + List memberVotingStates = input.getMemberVotingState(); + if(memberVotingStates == null || memberVotingStates.isEmpty()) { + return newFailedRpcResultFuture("No member voting state input was specified"); + } + + final List, ShardResultBuilder>> shardResultData = new ArrayList<>(); + Function messageSupplier = shardName -> + toChangeShardMembersVotingStatus(shardName, memberVotingStates); + + LOG.info("Change member voting states for all shards"); + + sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier); + sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier); + + return waitForShardResults(shardResultData, shardResults -> + new ChangeMemberVotingStatesForAllShardsOutputBuilder().setShardResult(shardResults).build(), + "Failed to change member voting states"); } @Override - public Future> convertMembersToNonvotingForAllShards( - ConvertMembersToNonvotingForAllShardsInput input) { - // TODO implement - return RpcResultBuilder.failed().withError(ErrorType.APPLICATION, "operation-not-supported", - "Not implemented yet").buildFuture(); + public Future> flipMemberVotingStatesForAllShards() { + final List, ShardResultBuilder>> shardResultData = new ArrayList<>(); + Function messageSupplier = shardName -> + new FlipShardMembersVotingStatus(shardName); + + LOG.info("Flip member voting states for all shards"); + + sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier); + sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier); + + return waitForShardResults(shardResultData, shardResults -> + new FlipMemberVotingStatesForAllShardsOutputBuilder().setShardResult(shardResults).build(), + "Failed to change member voting states"); } @Override @@ -255,6 +298,18 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl return returnFuture; } + private ChangeShardMembersVotingStatus toChangeShardMembersVotingStatus(final String shardName, + List memberVotingStatus) { + Map serverVotingStatusMap = new HashMap<>(); + for(MemberVotingState memberStatus: memberVotingStatus) { + serverVotingStatusMap.put(memberStatus.getMemberName(), memberStatus.isVoting()); + } + + ChangeShardMembersVotingStatus changeVotingStatus = new ChangeShardMembersVotingStatus(shardName, + serverVotingStatusMap); + return changeVotingStatus; + } + private static SettableFuture> waitForShardResults( final List, ShardResultBuilder>> shardResultData, final Function, T> resultDataSupplier, diff --git a/opendaylight/md-sal/sal-cluster-admin/src/main/yang/cluster-admin.yang b/opendaylight/md-sal/sal-cluster-admin/src/main/yang/cluster-admin.yang index 2d81db7117..40fb5fbe71 100644 --- a/opendaylight/md-sal/sal-cluster-admin/src/main/yang/cluster-admin.yang +++ b/opendaylight/md-sal/sal-cluster-admin/src/main/yang/cluster-admin.yang @@ -47,7 +47,21 @@ module cluster-admin { description "The list of results, one per shard"; } } - + + grouping member-voting-states-input { + list member-voting-state { + leaf member-name { + type string; + } + + leaf voting { + type boolean; + } + + description "The list of member voting states"; + } + } + rpc add-shard-replica { input { leaf shard-name { @@ -112,40 +126,62 @@ module cluster-admin { description "The cluster member from which the shard replicas should be removed"; } } - + output { uses shard-result-output; } - + description "Removes replicas for all shards on this node. This is equivalent to issuing a remove-shard-replica for all shards and essentially removes this node from a cluster."; } - rpc convert-members-to-nonvoting-for-all-shards { + rpc change-member-voting-states-for-shard { input { - leaf-list member-names { + leaf shard-name { + mandatory true; type string; - description "The names of the cluster members to convert."; + description "The name of the shard for which to change voting state."; } + + leaf data-store-type { + mandatory true; + type data-store-type; + description "The type of the data store to which the shard belongs"; + } + + uses member-voting-states-input; } - - description "Converts the given cluster members to non-voting for all shards. The members will no - longer participate in leader elections and consensus but will be replicated. This is useful for - having a set of members serve as a backup cluster in case the primary voting cluster suffers - catastrophic failure. This RPC can be issued to any cluster member and will be forwarded - to the leader."; + + description "Changes the voting states, either voting or non-voting, of cluster members for a shard. + Non-voting members will no longer participate in leader elections and consensus but will be + replicated. This is useful for having a set of members serve as a backup cluster in case the + primary voting cluster suffers catastrophic failure. This RPC can be issued to any cluster member + and will be forwarded to the leader."; } - rpc convert-members-to-voting-for-all-shards { + rpc change-member-voting-states-for-all-shards { input { - leaf-list member-names { - type string; - description "The names of the cluster members to convert."; - } + uses member-voting-states-input; + } + + output { + uses shard-result-output; + } + + description "Changes the voting states, either voting or non-voting, of cluster members for all shards. + Non-voting members will no longer participate in leader elections and consensus but will be + replicated. This is useful for having a set of members serve as a backup cluster in case the + primary voting cluster suffers catastrophic failure. This RPC can be issued to any cluster member + and will be forwarded to the leader."; + } + + rpc flip-member-voting-states-for-all-shards { + output { + uses shard-result-output; } - description "Converts the given cluster members to voting for all shards. The members will - participate in leader elections and consensus."; + description "Flips the voting states of all cluster members for all shards, such that if a member + was voting it becomes non-voting and vice versa."; } rpc backup-datastore { diff --git a/opendaylight/md-sal/sal-cluster-admin/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java b/opendaylight/md-sal/sal-cluster-admin/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java index 1c78622d5e..8ec173d284 100644 --- a/opendaylight/md-sal/sal-cluster-admin/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java +++ b/opendaylight/md-sal/sal-cluster-admin/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java @@ -13,6 +13,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyNoShardPresent; import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftPeersPresent; @@ -22,18 +23,21 @@ import akka.actor.PoisonPill; import akka.actor.Status.Success; import akka.cluster.Cluster; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import java.io.File; import java.io.FileInputStream; import java.net.URI; +import java.util.AbstractMap.SimpleEntry; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.SerializationUtils; @@ -47,8 +51,14 @@ import org.opendaylight.controller.cluster.datastore.MemberNode; import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier; import org.opendaylight.controller.cluster.datastore.Shard; import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.CreateShard; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; +import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; +import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload; +import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo; +import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; @@ -58,10 +68,15 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingStateBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder; import org.opendaylight.yangtools.yang.common.RpcError; @@ -133,8 +148,6 @@ public class ClusterAdminRpcServiceTest { get(5, TimeUnit.SECONDS); assertEquals("isSuccessful", false, rpcResult.isSuccessful()); assertEquals("getErrors", 1, rpcResult.getErrors().size()); - - service.close(); } private static void verifyDatastoreSnapshot(String type, DatastoreSnapshot datastoreSnapshot, String... expShardNames) { @@ -231,8 +244,6 @@ public class ClusterAdminRpcServiceTest { rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("people"). setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS); verifyFailedRpcResult(rpcResult); - - service.close(); } private static NormalizedNode writeCarsNodeAndVerify(DistributedDataStore writeToStore, @@ -280,8 +291,6 @@ public class ClusterAdminRpcServiceTest { verifySuccessfulRpcResult(rpcResult); verifyRaftPeersPresent(memberNode.operDataStore(), shardName, peerMemberNames); - - service.close(); } private static T verifySuccessfulRpcResult(RpcResult rpcResult) { @@ -333,7 +342,6 @@ public class ClusterAdminRpcServiceTest { setShardName("cars").setMemberName("member-3").setDataStoreType(DataStoreType.Config).build()). get(10, TimeUnit.SECONDS); verifySuccessfulRpcResult(rpcResult); - service3.close(); verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2"); verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1"); @@ -358,7 +366,6 @@ public class ClusterAdminRpcServiceTest { setShardName("cars").setMemberName("member-2").setDataStoreType(DataStoreType.Config).build()). get(10, TimeUnit.SECONDS); verifySuccessfulRpcResult(rpcResult); - service1.close(); verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars"); verifyNoShardPresent(replicaNode2.configDataStore(), "cars"); @@ -396,7 +403,6 @@ public class ClusterAdminRpcServiceTest { setShardName("cars").setMemberName("member-1").setDataStoreType(DataStoreType.Config).build()). get(10, TimeUnit.SECONDS); verifySuccessfulRpcResult(rpcResult); - service1.close(); verifyRaftState(replicaNode2.configDataStore(), "cars", new RaftStateVerifier() { @Override @@ -458,8 +464,6 @@ public class ClusterAdminRpcServiceTest { verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "pets", "member-1"); verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1"); verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "people", "member-1"); - - service.close(); } @Test @@ -520,18 +524,379 @@ public class ClusterAdminRpcServiceTest { verifyNoShardPresent(replicaNode3.configDataStore(), "cars"); verifyNoShardPresent(replicaNode3.configDataStore(), "people"); verifyNoShardPresent(replicaNode3.configDataStore(), "pets"); + } + + @Test + public void testChangeMemberVotingStatesForShard() throws Exception { + String name = "testChangeMemberVotingStatusForShard"; + String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf"; + MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ). + moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder(). + shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build(); + + MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name). + moduleShardsConfig(moduleShardsConfig).build(); + + MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name). + moduleShardsConfig(moduleShardsConfig).build(); + + leaderNode1.configDataStore().waitTillReady(); + verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3"); + verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3"); + verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2"); - service3.close(); + // Invoke RPC service on member-3 to change voting status + + ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), + replicaNode3.operDataStore()); + + RpcResult rpcResult = service3.changeMemberVotingStatesForShard( + new ChangeMemberVotingStatesForShardInputBuilder().setShardName("cars"). + setDataStoreType(DataStoreType.Config).setMemberVotingState(ImmutableList.of( + new MemberVotingStateBuilder().setMemberName("member-2").setVoting(false).build(), + new MemberVotingStateBuilder().setMemberName("member-3").setVoting(false).build())).build()). + get(10, TimeUnit.SECONDS); + verifySuccessfulRpcResult(rpcResult); + + verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", false), + new SimpleEntry<>("member-3", false)); + verifyVotingStates(replicaNode2.configDataStore(), "cars", new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", false), + new SimpleEntry<>("member-3", false)); + verifyVotingStates(replicaNode3.configDataStore(), "cars", new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", false), + new SimpleEntry<>("member-3", false)); } @Test - public void testConvertMembersToVotingForAllShards() { - // TODO implement + public void testChangeMemberVotingStatesForSingleNodeShard() throws Exception { + String name = "testChangeMemberVotingStatesForSingleNodeShard"; + String moduleShardsConfig = "module-shards-member1.conf"; + MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ). + moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder(). + shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build(); + + leaderNode.configDataStore().waitTillReady(); + + // Invoke RPC service on member-3 to change voting status + + ClusterAdminRpcService service = new ClusterAdminRpcService(leaderNode.configDataStore(), + leaderNode.operDataStore()); + + RpcResult rpcResult = service.changeMemberVotingStatesForShard( + new ChangeMemberVotingStatesForShardInputBuilder().setShardName("cars"). + setDataStoreType(DataStoreType.Config).setMemberVotingState(ImmutableList.of( + new MemberVotingStateBuilder().setMemberName("member-1").setVoting(false).build())).build()). + get(10, TimeUnit.SECONDS); + verifyFailedRpcResult(rpcResult); + + verifyVotingStates(leaderNode.configDataStore(), "cars", new SimpleEntry<>("member-1", true)); + } + + @Test + public void testChangeMemberVotingStatesForAllShards() throws Exception { + String name = "testChangeMemberVotingStatesForAllShards"; + String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf"; + MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ). + moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder(). + shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build(); + + MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name). + moduleShardsConfig(moduleShardsConfig).build(); + + MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name). + moduleShardsConfig(moduleShardsConfig).build(); + + leaderNode1.configDataStore().waitTillReady(); + leaderNode1.operDataStore().waitTillReady(); + verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3"); + verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3"); + verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2"); + + // Invoke RPC service on member-3 to change voting status + + ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), + replicaNode3.operDataStore()); + + RpcResult rpcResult = service3.changeMemberVotingStatesForAllShards( + new ChangeMemberVotingStatesForAllShardsInputBuilder().setMemberVotingState(ImmutableList.of( + new MemberVotingStateBuilder().setMemberName("member-2").setVoting(false).build(), + new MemberVotingStateBuilder().setMemberName("member-3").setVoting(false).build())).build()). + get(10, TimeUnit.SECONDS); + ChangeMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult); + verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config), + successShardResult("people", DataStoreType.Config), + successShardResult("cars", DataStoreType.Operational), + successShardResult("people", DataStoreType.Operational)); + + verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(), + replicaNode2.configDataStore(), replicaNode2.operDataStore(), + replicaNode3.configDataStore(), replicaNode3.operDataStore()}, + new String[]{"cars", "people"}, new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", false), + new SimpleEntry<>("member-3", false)); } @Test - public void testConvertMembersToNonvotingForAllShards() { - // TODO implement + public void testFlipMemberVotingStates() throws Exception { + String name = "testFlipMemberVotingStates"; + + ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList( + new ServerInfo("member-1", true), new ServerInfo("member-2", true), + new ServerInfo("member-3", false))); + + setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people"); + setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people"); + setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people"); + + String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf"; + MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ). + moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder(). + shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build(); + + MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name). + moduleShardsConfig(moduleShardsConfig).build(); + + MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name). + moduleShardsConfig(moduleShardsConfig).build(); + + leaderNode1.configDataStore().waitTillReady(); + leaderNode1.operDataStore().waitTillReady(); + verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", true), + new SimpleEntry<>("member-2", true), new SimpleEntry<>("member-3", false)); + + ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(), + replicaNode3.operDataStore()); + + RpcResult rpcResult = service3.flipMemberVotingStatesForAllShards(). + get(10, TimeUnit.SECONDS); + FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult); + verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config), + successShardResult("people", DataStoreType.Config), + successShardResult("cars", DataStoreType.Operational), + successShardResult("people", DataStoreType.Operational)); + + verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(), + replicaNode2.configDataStore(), replicaNode2.operDataStore(), + replicaNode3.configDataStore(), replicaNode3.operDataStore()}, + new String[]{"cars", "people"}, + new SimpleEntry<>("member-1", false), new SimpleEntry<>("member-2", false), + new SimpleEntry<>("member-3", true)); + + // Leadership should have transferred to member 3 since it is the only remaining voting member. + verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> { + assertNotNull("Expected non-null leader Id", raftState.getLeader()); + assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(), + raftState.getLeader().contains("member-3")); + }); + + verifyRaftState(leaderNode1.operDataStore(), "cars", raftState -> { + assertNotNull("Expected non-null leader Id", raftState.getLeader()); + assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(), + raftState.getLeader().contains("member-3")); + }); + + // Flip the voting states back to the original states. + + rpcResult = service3.flipMemberVotingStatesForAllShards(). get(10, TimeUnit.SECONDS); + result = verifySuccessfulRpcResult(rpcResult); + verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config), + successShardResult("people", DataStoreType.Config), + successShardResult("cars", DataStoreType.Operational), + successShardResult("people", DataStoreType.Operational)); + + verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(), + replicaNode2.configDataStore(), replicaNode2.operDataStore(), + replicaNode3.configDataStore(), replicaNode3.operDataStore()}, + new String[]{"cars", "people"}, + new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", true), + new SimpleEntry<>("member-3", false)); + + // Leadership should have transferred to member 1 or 2. + verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> { + assertNotNull("Expected non-null leader Id", raftState.getLeader()); + assertTrue("Expected leader member-1 or member-2. Actual: " + raftState.getLeader(), + raftState.getLeader().contains("member-1") || raftState.getLeader().contains("member-2")); + }); + } + + @Test + public void testFlipMemberVotingStatesWithNoInitialLeader() throws Exception { + String name = "testFlipMemberVotingStatesWithNoInitialLeader"; + + // Members 1, 2, and 3 are initially started up as non-voting. Members 4, 5, and 6 are initially + // non-voting and simulated as down by not starting them up. + ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList( + new ServerInfo("member-1", false), new ServerInfo("member-2", false), + new ServerInfo("member-3", false), new ServerInfo("member-4", true), + new ServerInfo("member-5", true), new ServerInfo("member-6", true))); + + setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people"); + setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people"); + setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people"); + + String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf"; + MemberNode replicaNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ). + moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder(). + shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build(); + + MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name). + moduleShardsConfig(moduleShardsConfig).build(); + + MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name). + moduleShardsConfig(moduleShardsConfig).build(); + + // Initially there won't be a leader b/c all the up nodes are non-voting. + + replicaNode1.waitForMembersUp("member-2", "member-3"); + + verifyVotingStates(replicaNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", false), + new SimpleEntry<>("member-2", false), new SimpleEntry<>("member-3", false), + new SimpleEntry<>("member-4", true), new SimpleEntry<>("member-5", true), + new SimpleEntry<>("member-6", true)); + + verifyRaftState(replicaNode1.configDataStore(), "cars", raftState -> + assertEquals("Expected raft state", RaftState.Follower.toString(), raftState.getRaftState())); + + ClusterAdminRpcService service1 = new ClusterAdminRpcService(replicaNode1.configDataStore(), + replicaNode1.operDataStore()); + + RpcResult rpcResult = service1.flipMemberVotingStatesForAllShards(). + get(10, TimeUnit.SECONDS); + FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult); + verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config), + successShardResult("people", DataStoreType.Config), + successShardResult("cars", DataStoreType.Operational), + successShardResult("people", DataStoreType.Operational)); + + verifyVotingStates(new DistributedDataStore[]{replicaNode1.configDataStore(), replicaNode1.operDataStore(), + replicaNode2.configDataStore(), replicaNode2.operDataStore(), + replicaNode3.configDataStore(), replicaNode3.operDataStore()}, + new String[]{"cars", "people"}, + new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", true), + new SimpleEntry<>("member-3", true), new SimpleEntry<>("member-4", false), + new SimpleEntry<>("member-5", false), new SimpleEntry<>("member-6", false)); + + // Since member 1 was changed to voting and there was no leader, it should've started and election + // and become leader + verifyRaftState(replicaNode1.configDataStore(), "cars", raftState -> { + assertNotNull("Expected non-null leader Id", raftState.getLeader()); + assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(), + raftState.getLeader().contains("member-1")); + }); + + verifyRaftState(replicaNode1.operDataStore(), "cars", raftState -> { + assertNotNull("Expected non-null leader Id", raftState.getLeader()); + assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(), + raftState.getLeader().contains("member-1")); + }); + } + + @Test + public void testFlipMemberVotingStatesWithVotingMembersDown() throws Exception { + String name = "testFlipMemberVotingStatesWithVotingMembersDown"; + + // Members 4, 5, and 6 are initially non-voting and simulated as down by not starting them up. + ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList( + new ServerInfo("member-1", true), new ServerInfo("member-2", true), + new ServerInfo("member-3", true), new ServerInfo("member-4", false), + new ServerInfo("member-5", false), new ServerInfo("member-6", false))); + + setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people"); + setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people"); + setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people"); + + String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf"; + MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ). + moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder(). + shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build(); + + MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name). + moduleShardsConfig(moduleShardsConfig).build(); + + MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name). + moduleShardsConfig(moduleShardsConfig).build(); + + leaderNode1.configDataStore().waitTillReady(); + leaderNode1.operDataStore().waitTillReady(); + verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", true), + new SimpleEntry<>("member-2", true), new SimpleEntry<>("member-3", true), + new SimpleEntry<>("member-4", false), new SimpleEntry<>("member-5", false), + new SimpleEntry<>("member-6", false)); + + ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(), + leaderNode1.operDataStore()); + + RpcResult rpcResult = service1.flipMemberVotingStatesForAllShards(). + get(10, TimeUnit.SECONDS); + FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult); + verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config), + successShardResult("people", DataStoreType.Config), + successShardResult("cars", DataStoreType.Operational), + successShardResult("people", DataStoreType.Operational)); + + // Members 2 and 3 are now non-voting but should get replicated with the new new server config. + verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(), + replicaNode2.configDataStore(), replicaNode2.operDataStore(), + replicaNode3.configDataStore(), replicaNode3.operDataStore()}, + new String[]{"cars", "people"}, + new SimpleEntry<>("member-1", false), new SimpleEntry<>("member-2", false), + new SimpleEntry<>("member-3", false), new SimpleEntry<>("member-4", true), + new SimpleEntry<>("member-5", true), new SimpleEntry<>("member-6", true)); + + // The leader (member 1) was changed to non-voting but it shouldn't be able to step down as leader yet + // b/c it can't get a majority consensus with all voting members down. So verify it remains the leader. + verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> { + assertNotNull("Expected non-null leader Id", raftState.getLeader()); + assertTrue("Expected leader member-1", raftState.getLeader().contains("member-1")); + }); + } + + private void setupPersistedServerConfigPayload(ServerConfigurationPayload serverConfig, + String member, String datastoreTypeSuffix, String... shards) { + String[] datastoreTypes = {"config_", "oper_"}; + for(String type: datastoreTypes) { + for(String shard: shards) { + List newServerInfo = new ArrayList<>(serverConfig.getServerConfig().size()); + for(ServerInfo info: serverConfig.getServerConfig()) { + newServerInfo.add(new ServerInfo(ShardIdentifier.create(shard, MemberName.forName(info.getId()), + type + datastoreTypeSuffix).toString(), info.isVoting())); + } + + String shardID = ShardIdentifier.create(shard, MemberName.forName(member), + type + datastoreTypeSuffix).toString(); + InMemoryJournal.addEntry(shardID, 1, new UpdateElectionTerm(1, null)); + InMemoryJournal.addEntry(shardID, 2, new ReplicatedLogImplEntry(0, 1, + new ServerConfigurationPayload(newServerInfo))); + } + } + } + + @SafeVarargs + private static void verifyVotingStates(DistributedDataStore[] datastores, String[] shards, + SimpleEntry... expStates) throws Exception { + for(DistributedDataStore datastore: datastores) { + for(String shard: shards) { + verifyVotingStates(datastore, shard, expStates); + } + } + } + + @SafeVarargs + private static void verifyVotingStates(DistributedDataStore datastore, String shardName, + SimpleEntry... expStates) throws Exception { + String localMemberName = datastore.getActorContext().getCurrentMemberName().getName(); + Map expStateMap = new HashMap<>(); + for(Entry e: expStates) { + expStateMap.put(ShardIdentifier.create(shardName, MemberName.forName(e.getKey()), + datastore.getActorContext().getDataStoreName()).toString(), e.getValue()); + } + + verifyRaftState(datastore, shardName, raftState -> { + String localPeerId = ShardIdentifier.create(shardName, MemberName.forName(localMemberName), + datastore.getActorContext().getDataStoreName()).toString(); + assertEquals("Voting state for " + localPeerId, expStateMap.get(localPeerId), raftState.isVoting()); + for(Entry e: raftState.getPeerVotingStates().entrySet()) { + assertEquals("Voting state for " + e.getKey(), expStateMap.get(e.getKey()), e.getValue()); + } + }); } private static void verifyShardResults(List shardResults, ShardResult... expShardResults) { diff --git a/opendaylight/md-sal/sal-cluster-admin/src/test/resources/simplelogger.properties b/opendaylight/md-sal/sal-cluster-admin/src/test/resources/simplelogger.properties new file mode 100644 index 0000000000..31736982b6 --- /dev/null +++ b/opendaylight/md-sal/sal-cluster-admin/src/test/resources/simplelogger.properties @@ -0,0 +1,10 @@ +org.slf4j.simpleLogger.showDateTime=true +org.slf4j.simpleLogger.dateTimeFormat=hh:mm:ss,S a +org.slf4j.simpleLogger.logFile=System.out +org.slf4j.simpleLogger.showShortLogName=true +org.slf4j.simpleLogger.levelInBrackets=true +org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore=debug +org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore.Shard=error +org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore.utils.ActorContext=error +org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.raft.RaftActorServerConfigurationSupport=debug +org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore.node.utils.stream=off \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java index 0f491570a3..b01eb099a5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java @@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard; import akka.actor.ActorRef; import akka.pattern.Patterns; import akka.util.Timeout; +import com.google.common.base.Joiner; import com.google.common.base.Stopwatch; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; @@ -194,6 +195,15 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean { public String getVotedFor() { return getOnDemandRaftState().getVotedFor(); } + @Override + public boolean isVoting() { + return getOnDemandRaftState().isVoting(); + } + + @Override + public String getPeerVotingStates() { + return toStringMap(getOnDemandRaftState().getPeerVotingStates()); + } @Override public boolean isSnapshotCaptureInitiated() { @@ -302,17 +312,11 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean { @Override public String getPeerAddresses() { - StringBuilder builder = new StringBuilder(); - int i = 0; - for(Map.Entry e: getOnDemandRaftState().getPeerAddresses().entrySet()) { - if(i++ > 0) { - builder.append(", "); - } - - builder.append(e.getKey()).append(": ").append(e.getValue()); - } + return toStringMap(getOnDemandRaftState().getPeerAddresses()); + } - return builder.toString(); + private static String toStringMap(Map map) { + return Joiner.on(", ").withKeyValueSeparator(": ").join(map); } @Override @@ -347,6 +351,7 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean { return shard.getPendingTxCommitQueueSize(); } + @Override public int getTxCohortCacheSize() { return shard.getCohortCacheSize(); } @@ -357,5 +362,4 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean { shard.getSelf().tell(new InitiateCaptureSnapshot(), ActorRef.noSender()); } } - } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMXBean.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMXBean.java index 39cc22fc2a..1ae58e2fe8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMXBean.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMXBean.java @@ -66,6 +66,8 @@ public interface ShardStatsMXBean { boolean isSnapshotCaptureInitiated(); + boolean isVoting(); + void resetTransactionCounters(); long getInMemoryJournalDataSize(); @@ -78,6 +80,8 @@ public interface ShardStatsMXBean { String getPeerAddresses(); + String getPeerVotingStates(); + long getLeadershipChangeCount(); String getLastLeadershipChangeTime(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ChangeShardMembersVotingStatus.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ChangeShardMembersVotingStatus.java new file mode 100644 index 0000000000..0846f7b1c9 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ChangeShardMembersVotingStatus.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import java.util.Map; + +/** + * A local message sent to the ShardManager to change the raft voting status for members of a shard. + * + * @author Thomas Pantelis + */ +public class ChangeShardMembersVotingStatus { + private final String shardName; + private final Map meberVotingStatusMap; + + public ChangeShardMembersVotingStatus(String shardName, Map meberVotingStatusMap) { + this.shardName = Preconditions.checkNotNull(shardName); + this.meberVotingStatusMap = ImmutableMap.copyOf(meberVotingStatusMap); + } + + public String getShardName() { + return shardName; + } + + public Map getMeberVotingStatusMap() { + return meberVotingStatusMap; + } + + @Override + public String toString() { + return "ChangeShardMembersVotingStatus [shardName=" + shardName + ", meberVotingStatusMap=" + + meberVotingStatusMap + "]"; + } + + +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FlipShardMembersVotingStatus.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FlipShardMembersVotingStatus.java new file mode 100644 index 0000000000..24859a4d16 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FlipShardMembersVotingStatus.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +import com.google.common.base.Preconditions; + +/** + * A local message sent to the ShardManager to flip the raft voting states for members of a shard. + * + * @author Thomas Pantelis + */ +public class FlipShardMembersVotingStatus { + private final String shardName; + + public FlipShardMembersVotingStatus(String shardName) { + this.shardName = Preconditions.checkNotNull(shardName); + } + + public String getShardName() { + return shardName; + } + + @Override + public String toString() { + return "FlipShardMembersVotingStatus [shardName=" + shardName + "]"; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java index 04c64ddca7..0f6425e15d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java @@ -40,13 +40,13 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; import java.util.function.Supplier; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; import org.apache.commons.lang3.SerializationUtils; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; @@ -63,10 +63,12 @@ import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundE import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; +import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus; import org.opendaylight.controller.cluster.datastore.messages.CreateShard; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus; import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; @@ -81,12 +83,16 @@ import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListe import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; +import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; +import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; import org.opendaylight.controller.cluster.raft.messages.AddServer; import org.opendaylight.controller.cluster.raft.messages.AddServerReply; +import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus; import org.opendaylight.controller.cluster.raft.messages.RemoveServer; import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply; +import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply; import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; @@ -220,9 +226,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } else if(message instanceof ForwardedAddServerFailure) { ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message; onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure); - } else if(message instanceof PrimaryShardFoundForContext) { - PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message; - onPrimaryShardFoundContext(primaryShardFoundContext); } else if(message instanceof RemoveShardReplica) { onRemoveShardReplica((RemoveShardReplica) message); } else if(message instanceof WrappedShardResponse){ @@ -231,6 +234,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { onGetSnapshot(); } else if(message instanceof ServerRemoved){ onShardReplicaRemoved((ServerRemoved) message); + } else if(message instanceof ChangeShardMembersVotingStatus){ + onChangeShardServersVotingStatus((ChangeShardMembersVotingStatus) message); + } else if(message instanceof FlipShardMembersVotingStatus){ + onFlipShardMembersVotingStatus((FlipShardMembersVotingStatus) message); } else if(message instanceof SaveSnapshotSuccess) { onSaveSnapshotSuccess((SaveSnapshotSuccess)message); } else if(message instanceof SaveSnapshotFailure) { @@ -240,6 +247,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { onShutDown(); } else if (message instanceof GetLocalShardIds) { onGetLocalShardIds(); + } else if(message instanceof RunnableMessage) { + ((RunnableMessage)message).run(); } else { unknownMessage(message); } @@ -312,16 +321,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) { - if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) { - addShard(primaryShardFoundContext.getShardName(), primaryShardFoundContext.getRemotePrimaryShardFound(), - getSender()); - } else if(primaryShardFoundContext.getContextMessage() instanceof RemoveShardReplica){ - removeShardReplica((RemoveShardReplica) primaryShardFoundContext.getContextMessage(), - primaryShardFoundContext.getShardName(), primaryShardFoundContext.getPrimaryPath(), getSender()); - } - } - private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath, final ActorRef sender) { if(isShardReplicaOperationInProgress(shardName, sender)) { @@ -347,6 +346,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override public void onComplete(Throwable failure, Object response) { if (failure != null) { + shardReplicaOperationsInProgress.add(shardName); String msg = String.format("RemoveServer request to leader %s for shard %s failed", primaryPath, shardName); @@ -1079,7 +1079,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) { @Override public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) { - getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor()); + getSelf().tell(new RunnableMessage() { + @Override + public void run() { + addShard(getShardName(), response, getSender()); + } + }, getTargetActor()); } @Override @@ -1227,12 +1232,21 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { shardReplicaMsg.getShardName(), persistenceId(), getSelf()) { @Override public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) { - getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor()); + doRemoveShardReplicaAsync(response.getPrimaryPath()); } @Override public void onLocalPrimaryFound(LocalPrimaryShardFound response) { - getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor()); + doRemoveShardReplicaAsync(response.getPrimaryPath()); + } + + private void doRemoveShardReplicaAsync(final String primaryPath) { + getSelf().tell(new RunnableMessage() { + @Override + public void run() { + removeShardReplica(shardReplicaMsg, getShardName(), primaryPath, getSender()); + } + }, getTargetActor()); } }); } @@ -1284,6 +1298,145 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { 0, 0)); } + private void onChangeShardServersVotingStatus(final ChangeShardMembersVotingStatus changeMembersVotingStatus) { + LOG.debug("{}: onChangeShardServersVotingStatus: {}", persistenceId(), changeMembersVotingStatus); + + String shardName = changeMembersVotingStatus.getShardName(); + Map serverVotingStatusMap = new HashMap<>(); + for(Entry e: changeMembersVotingStatus.getMeberVotingStatusMap().entrySet()) { + serverVotingStatusMap.put(getShardIdentifier(MemberName.forName(e.getKey()), shardName).toString(), + e.getValue()); + } + + ChangeServersVotingStatus changeServersVotingStatus = new ChangeServersVotingStatus(serverVotingStatusMap); + + findLocalShard(shardName, getSender(), + localShardFound -> changeShardMembersVotingStatus(changeServersVotingStatus, shardName, + localShardFound.getPath(), getSender())); + } + + private void onFlipShardMembersVotingStatus(FlipShardMembersVotingStatus flipMembersVotingStatus) { + LOG.debug("{}: onFlipShardMembersVotingStatus: {}", persistenceId(), flipMembersVotingStatus); + + ActorRef sender = getSender(); + final String shardName = flipMembersVotingStatus.getShardName(); + findLocalShard(shardName, sender, localShardFound -> { + Future future = ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE, + Timeout.apply(30, TimeUnit.SECONDS)); + + future.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Object response) { + if (failure != null) { + sender.tell(new Status.Failure(new RuntimeException( + String.format("Failed to access local shard %s", shardName), failure)), self()); + return; + } + + OnDemandRaftState raftState = (OnDemandRaftState) response; + Map serverVotingStatusMap = new HashMap<>(); + for(Entry e: raftState.getPeerVotingStates().entrySet()) { + serverVotingStatusMap.put(e.getKey(), !e.getValue()); + } + + serverVotingStatusMap.put(getShardIdentifier(cluster.getCurrentMemberName(), shardName). + toString(), !raftState.isVoting()); + + changeShardMembersVotingStatus(new ChangeServersVotingStatus(serverVotingStatusMap), + shardName, localShardFound.getPath(), sender); + } + }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); + }); + + } + + private void findLocalShard(final String shardName, final ActorRef sender, + final Consumer onLocalShardFound) { + Timeout findLocalTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext(). + getShardInitializationTimeout().duration().$times(2)); + + Future futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout); + futureObj.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Object response) { + if (failure != null) { + LOG.debug ("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName, failure); + sender.tell(new Status.Failure(new RuntimeException( + String.format("Failed to find local shard %s", shardName), failure)), self()); + } else { + if(response instanceof LocalShardFound) { + getSelf().tell(new RunnableMessage() { + @Override + public void run() { + onLocalShardFound.accept((LocalShardFound) response); + } + }, sender); + } else if(response instanceof LocalShardNotFound) { + String msg = String.format("Local shard %s does not exist", shardName); + LOG.debug ("{}: {}", persistenceId, msg); + sender.tell(new Status.Failure(new IllegalArgumentException(msg)), self()); + } else { + String msg = String.format("Failed to find local shard %s: received response: %s", + shardName, response); + LOG.debug ("{}: {}", persistenceId, msg); + sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response : + new RuntimeException(msg)), self()); + } + } + } + }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); + } + + private void changeShardMembersVotingStatus(ChangeServersVotingStatus changeServersVotingStatus, + final String shardName, final ActorRef shardActorRef, final ActorRef sender) { + if(isShardReplicaOperationInProgress(shardName, sender)) { + return; + } + + shardReplicaOperationsInProgress.add(shardName); + + DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build(); + final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName); + + LOG.debug("{}: Sending ChangeServersVotingStatus message {} to local shard {}", persistenceId(), + changeServersVotingStatus, shardActorRef.path()); + + Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2)); + Future futureObj = ask(shardActorRef, changeServersVotingStatus, timeout); + + futureObj.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Object response) { + shardReplicaOperationsInProgress.remove(shardName); + if (failure != null) { + String msg = String.format("ChangeServersVotingStatus request to local shard %s failed", + shardActorRef.path()); + LOG.debug ("{}: {}", persistenceId(), msg, failure); + sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self()); + } else { + LOG.debug ("{}: Received {} from local shard {}", persistenceId(), response, shardActorRef.path()); + + ServerChangeReply replyMsg = (ServerChangeReply) response; + if(replyMsg.getStatus() == ServerChangeStatus.OK) { + LOG.debug ("{}: ChangeServersVotingStatus succeeded for shard {}", persistenceId(), shardName); + sender.tell(new Status.Success(null), getSelf()); + } else if(replyMsg.getStatus() == ServerChangeStatus.INVALID_REQUEST) { + sender.tell(new Status.Failure(new IllegalArgumentException(String.format( + "The requested voting state change for shard %s is invalid. At least one member must be voting", + shardId.getShardName()))), getSelf()); + } else { + LOG.warn ("{}: ChangeServersVotingStatus failed for shard {} with status {}", + persistenceId(), shardName, replyMsg.getStatus()); + + Exception error = getServerChangeException(ChangeServersVotingStatus.class, + replyMsg.getStatus(), shardActorRef.path().toString(), shardId); + sender.tell(new Status.Failure(error), getSelf()); + } + } + } + }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); + } + private static final class ForwardedAddServerReply { ShardInformation shardInfo; AddServerReply addServerReply; @@ -1365,6 +1518,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); } + private static interface RunnableMessage extends Runnable { + } + /** * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the * a remote or local find primary message is processed @@ -1446,52 +1602,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - /** - * The PrimaryShardFoundForContext is a DTO which puts together a message (aka 'Context' message) which needs to be - * forwarded to the primary replica of a shard and the message (aka 'PrimaryShardFound' message) that is received - * as a successful response to find primary. - */ - private static class PrimaryShardFoundForContext { - private final String shardName; - private final Object contextMessage; - private final RemotePrimaryShardFound remotePrimaryShardFound; - private final LocalPrimaryShardFound localPrimaryShardFound; - - public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage, - @Nonnull Object primaryFoundMessage) { - this.shardName = Preconditions.checkNotNull(shardName); - this.contextMessage = Preconditions.checkNotNull(contextMessage); - Preconditions.checkNotNull(primaryFoundMessage); - this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ? - (RemotePrimaryShardFound) primaryFoundMessage : null; - this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ? - (LocalPrimaryShardFound) primaryFoundMessage : null; - } - - @Nonnull - String getPrimaryPath(){ - if(remotePrimaryShardFound != null) { - return remotePrimaryShardFound.getPrimaryPath(); - } - return localPrimaryShardFound.getPrimaryPath(); - } - - @Nonnull - Object getContextMessage() { - return contextMessage; - } - - @Nullable - RemotePrimaryShardFound getRemotePrimaryShardFound() { - return remotePrimaryShardFound; - } - - @Nonnull - String getShardName() { - return shardName; - } - } - /** * The WrappedShardResponse class wraps a response from a Shard. */ diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java index f0f051f163..3b81412bac 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java @@ -86,6 +86,7 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; +import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus; import org.opendaylight.controller.cluster.datastore.messages.CreateShard; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot; @@ -116,8 +117,10 @@ import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; import org.opendaylight.controller.cluster.raft.messages.AddServer; import org.opendaylight.controller.cluster.raft.messages.AddServerReply; +import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus; import org.opendaylight.controller.cluster.raft.messages.RemoveServer; import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply; +import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply; import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; @@ -1442,9 +1445,11 @@ public class ShardManagerTest extends AbstractActorTest { final ActorSystem system2 = newActorSystem("Member2"); Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); + String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix; String name = ShardIdentifier.create("astronauts", MEMBER_2, "config").toString(); final TestActorRef mockShardLeaderActor = - TestActorRef.create(system2, Props.create(MockRespondActor.class). + TestActorRef.create(system2, Props.create(MockRespondActor.class, AddServer.class, + new AddServerReply(ServerChangeStatus.OK, memberId2)). withDispatcher(Dispatchers.DefaultDispatcherId()), name); final TestActorRef leaderShardManager = TestActorRef.create(system2, newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor).cluster( @@ -1458,7 +1463,6 @@ public class ShardManagerTest extends AbstractActorTest { leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor); - String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix; short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1; leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion), mockShardLeaderActor); @@ -1478,8 +1482,6 @@ public class ShardManagerTest extends AbstractActorTest { InMemorySnapshotStore.addSnapshotDeletedLatch(shardManagerID); //construct a mock response message - AddServerReply response = new AddServerReply(ServerChangeStatus.OK, memberId2); - mockShardLeaderActor.underlyingActor().updateResponse(response); newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef()); AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor, AddServer.class); @@ -1512,7 +1514,7 @@ public class ShardManagerTest extends AbstractActorTest { String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix; AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null); ActorRef leaderShardActor = shardManager.underlyingActor().getContext().actorOf( - Props.create(MockRespondActor.class, addServerReply), leaderId); + Props.create(MockRespondActor.class, AddServer.class, addServerReply), leaderId); MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString()); @@ -1675,7 +1677,8 @@ public class ShardManagerTest extends AbstractActorTest { String memberId = "member-1-shard-default-" + shardMrgIDSuffix; final TestActorRef respondActor = - TestActorRef.create(getSystem(), Props.create(MockRespondActor.class), memberId); + actorFactory.createTestActor(Props.create(MockRespondActor.class, RemoveServer.class, + new RemoveServerReply(ServerChangeStatus.OK, null)), memberId); ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor)); @@ -1686,7 +1689,6 @@ public class ShardManagerTest extends AbstractActorTest { shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(), RaftState.Leader.name())), respondActor); - respondActor.underlyingActor().updateResponse(new RemoveServerReply(ServerChangeStatus.OK, null)); shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, MEMBER_1), getRef()); final RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(respondActor, RemoveServer.class); assertEquals(ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(), @@ -1719,8 +1721,10 @@ public class ShardManagerTest extends AbstractActorTest { Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558")); String name = ShardIdentifier.create("default", MEMBER_2, shardMrgIDSuffix).toString(); + String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; final TestActorRef mockShardLeaderActor = - TestActorRef.create(system2, Props.create(MockRespondActor.class), name); + TestActorRef.create(system2, Props.create(MockRespondActor.class, RemoveServer.class, + new RemoveServerReply(ServerChangeStatus.OK, memberId2)), name); LOG.error("Mock Shard Leader Actor : {}", mockShardLeaderActor); @@ -1754,7 +1758,6 @@ public class ShardManagerTest extends AbstractActorTest { leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor); newReplicaShardManager.tell(new ActorInitialized(), mockShardLeaderActor); - String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix; short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1; leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2, mock(DataTree.class), leaderVersion), mockShardLeaderActor); @@ -1771,8 +1774,6 @@ public class ShardManagerTest extends AbstractActorTest { leaderShardManager.underlyingActor().waitForMemberUp(); //construct a mock response message - RemoveServerReply response = new RemoveServerReply(ServerChangeStatus.OK, memberId2); - mockShardLeaderActor.underlyingActor().updateResponse(response); newReplicaShardManager.tell(new RemoveShardReplica("default", MEMBER_1), getRef()); RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor, RemoveServer.class); @@ -1977,6 +1978,62 @@ public class ShardManagerTest extends AbstractActorTest { LOG.info("testShutDown ending"); } + @Test + public void testChangeServersVotingStatus() throws Exception { + new JavaTestKit(getSystem()) {{ + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + + TestActorRef respondActor = + actorFactory.createTestActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class, + new ServerChangeReply(ServerChangeStatus.OK, null)), memberId); + + ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor)); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), respondActor); + shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class), + DataStoreVersions.CURRENT_VERSION), getRef()); + shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(), + RaftState.Leader.name())), respondActor); + + shardManager.tell(new ChangeShardMembersVotingStatus("default", + ImmutableMap.of("member-2", Boolean.TRUE)), getRef()); + + ChangeServersVotingStatus actualChangeStatusMsg = MessageCollectorActor.expectFirstMatching( + respondActor, ChangeServersVotingStatus.class); + assertEquals("ChangeServersVotingStatus map", actualChangeStatusMsg.getServerVotingStatusMap(), + ImmutableMap.of(ShardIdentifier.create("default", MemberName.forName("member-2"), + shardMrgIDSuffix).toString(), Boolean.TRUE)); + + expectMsgClass(duration("5 seconds"), Success.class); + }}; + } + + @Test + public void testChangeServersVotingStatusWithNoLeader() throws Exception { + new JavaTestKit(getSystem()) {{ + String memberId = "member-1-shard-default-" + shardMrgIDSuffix; + + TestActorRef respondActor = + actorFactory.createTestActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class, + new ServerChangeReply(ServerChangeStatus.NO_LEADER, null)), memberId); + + ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor)); + + shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + shardManager.tell(new ActorInitialized(), respondActor); + shardManager.tell((new RoleChangeNotification(memberId, null, RaftState.Follower.name())), respondActor); + + shardManager.tell(new ChangeShardMembersVotingStatus("default", + ImmutableMap.of("member-2", Boolean.TRUE)), getRef()); + + MessageCollectorActor.expectFirstMatching(respondActor, ChangeServersVotingStatus.class); + + Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class); + assertEquals("Failure resposnse", true, (resp.cause() instanceof NoShardLeaderException)); + }}; + } + private static class TestShardManager extends ShardManager { private final CountDownLatch recoveryComplete = new CountDownLatch(1); private final CountDownLatch snapshotPersist = new CountDownLatch(1); @@ -2184,35 +2241,25 @@ public class ShardManagerTest extends AbstractActorTest { private static class MockRespondActor extends MessageCollectorActor { static final String CLEAR_RESPONSE = "clear-response"; - static final org.slf4j.Logger LOG = LoggerFactory.getLogger(MockRespondActor.class); - private volatile Object responseMsg; + private Object responseMsg; + private final Class requestClass; @SuppressWarnings("unused") - public MockRespondActor() { - } - - @SuppressWarnings("unused") - public MockRespondActor(Object responseMsg) { + public MockRespondActor(Class requestClass, Object responseMsg) { + this.requestClass = requestClass; this.responseMsg = responseMsg; } - public void updateResponse(Object response) { - responseMsg = response; - } - @Override public void onReceive(Object message) throws Exception { - if(!"get-all-messages".equals(message)) { - LOG.debug("Received message : {}", message); - } - super.onReceive(message); - if (message instanceof AddServer && responseMsg != null) { - getSender().tell(responseMsg, getSelf()); - } else if(message instanceof RemoveServer && responseMsg != null){ - getSender().tell(responseMsg, getSelf()); - } else if(message.equals(CLEAR_RESPONSE)) { + if(message.equals(CLEAR_RESPONSE)) { responseMsg = null; + } else { + super.onReceive(message); + if (message.getClass().equals(requestClass) && responseMsg != null) { + getSender().tell(responseMsg, getSelf()); + } } } } -- 2.36.6