// Debugging message to retrieve raft stats.
Map<String, String> peerAddresses = new HashMap<>();
- for(String peerId: context.getPeerIds()) {
- peerAddresses.put(peerId, context.getPeerAddress(peerId));
+ Map<String, Boolean> peerVotingStates = new HashMap<>();
+ for(PeerInfo info: context.getPeers()) {
+ peerVotingStates.put(info.getId(), info.getVotingState() != VotingState.NON_VOTING);
+ peerAddresses.put(info.getId(), info.getAddress());
}
final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
.snapshotIndex(replicatedLog().getSnapshotIndex())
.snapshotTerm(replicatedLog().getSnapshotTerm())
.votedFor(context.getTermInformation().getVotedFor())
+ .isVoting(context.isVotingMember())
.peerAddresses(peerAddresses)
+ .peerVotingStates(peerVotingStates)
.customRaftPolicyClassName(context.getConfigParams().getCustomRaftPolicyImplementationClass());
ReplicatedLogEntry lastLogEntry = replicatedLog().last();
boolean localServerChangedToNonVoting = Boolean.FALSE.equals(getOperation().
getServerVotingStatusMap().get(raftActor.getRaftActorContext().getId()));
if (succeeded && localServerChangedToNonVoting) {
- raftActor.becomeNonVoting();
+ LOG.debug("Leader changed to non-voting - trying leadership transfer");
+ raftActor.becomeNonVoting();
}
}
if(tryToElectLeader) {
initiateLocalLeaderElection();
- } else {
- updateLocalPeerInfo();
-
+ } else if(updateLocalPeerInfo()) {
persistNewServerConfiguration(changeVotingStatusContext);
}
}
LOG.debug("{}: Sending local ElectionTimeout to start leader election", raftContext.getId());
ServerConfigurationPayload previousServerConfig = raftContext.getPeerServerInfo(true);
- updateLocalPeerInfo();
+ if(!updateLocalPeerInfo()) {
+ return;
+ }
raftContext.getActor().tell(ElectionTimeout.INSTANCE, raftContext.getActor());
currentOperationState = new WaitingForLeaderElected(changeVotingStatusContext, previousServerConfig);
}
- private void updateLocalPeerInfo() {
+ private boolean updateLocalPeerInfo() {
List<ServerInfo> newServerInfoList = newServerInfoList();
+ // Check if new voting state would leave us with no voting members.
+ boolean atLeastOneVoting = false;
+ for(ServerInfo info: newServerInfoList) {
+ if(info.isVoting()) {
+ atLeastOneVoting = true;
+ break;
+ }
+ }
+
+ if(!atLeastOneVoting) {
+ operationComplete(changeVotingStatusContext, ServerChangeStatus.INVALID_REQUEST);
+ return false;
+ }
+
raftContext.updatePeerIds(new ServerConfigurationPayload(newServerInfoList));
if(raftActor.getCurrentBehavior() instanceof AbstractLeader) {
AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
leader.updateMinReplicaCount();
}
+
+ return true;
}
private List<ServerInfo> newServerInfoList() {
protected boolean isLeaderIsolated() {
int minPresent = getMinIsolatedLeaderPeerCount();
for (FollowerLogInformation followerLogInformation : followerToLog.values()) {
- if (followerLogInformation.isFollowerActive()) {
+ final PeerInfo peerInfo = context.getPeerInfo(followerLogInformation.getId());
+ if(peerInfo != null && peerInfo.isVoting() && followerLogInformation.isFollowerActive()) {
--minPresent;
if (minPresent == 0) {
- break;
+ return false;
}
}
}
@Override
public void close() {
if(leadershipTransferContext != null) {
- leadershipTransferContext.transferCohort.abortTransfer();
+ LeadershipTransferContext localLeadershipTransferContext = leadershipTransferContext;
+ leadershipTransferContext = null;
+ localLeadershipTransferContext.transferCohort.abortTransfer();
}
super.close();
*/
package org.opendaylight.controller.cluster.raft.client.messages;
+import com.google.common.collect.ImmutableMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;
private String votedFor;
private boolean isSnapshotCaptureInitiated;
private String customRaftPolicyClassName;
+ private boolean isVoting;
private List<FollowerInfo> followerInfoList = Collections.emptyList();
private Map<String, String> peerAddresses = Collections.emptyMap();
+ private Map<String, Boolean> peerVotingStates = Collections.emptyMap();
private OnDemandRaftState() {
}
return isSnapshotCaptureInitiated;
}
+ public boolean isVoting() {
+ return isVoting;
+ }
+
public List<FollowerInfo> getFollowerInfoList() {
return followerInfoList;
}
return peerAddresses;
}
+ public Map<String, Boolean> getPeerVotingStates() {
+ return peerVotingStates;
+ }
+
public String getCustomRaftPolicyClassName() {
return customRaftPolicyClassName;
}
return this;
}
+ public Builder isVoting(boolean isVoting) {
+ stats.isVoting = isVoting;
+ return this;
+ }
+
public Builder followerInfoList(List<FollowerInfo> followerInfoList) {
stats.followerInfoList = followerInfoList;
return this;
return this;
}
+ public Builder peerVotingStates(Map<String, Boolean> peerVotingStates) {
+ stats.peerVotingStates = ImmutableMap.copyOf(peerVotingStates);
+ return this;
+ }
+
public Builder isSnapshotCaptureInitiated(boolean value) {
stats.isSnapshotCaptureInitiated = value;
return this;
* An unsupported request, for example removing the leader in a single node cluster.
*/
NOT_SUPPORTED,
+
+ /**
+ * Some part of the request is invalid.
+ */
+ INVALID_REQUEST,
}
LOG.info("testChangeLeaderToNonVoting ending");
}
+ @Test
+ public void testChangeLeaderToNonVotingInSingleNode() {
+ LOG.info("testChangeLeaderToNonVotingInSingleNode starting");
+
+ TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
+ MockLeaderRaftActor.props(ImmutableMap.of(), new MockRaftActorContext()).
+ withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
+
+ leaderActor.tell(new ChangeServersVotingStatus(ImmutableMap.of(LEADER_ID, false)), testKit.getRef());
+ ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
+ assertEquals("getStatus", ServerChangeStatus.INVALID_REQUEST, reply.getStatus());
+
+ LOG.info("testChangeLeaderToNonVotingInSingleNode ending");
+ }
+
@Test
public void testChangeToVotingWithNoLeader() {
LOG.info("testChangeToVotingWithNoLeader starting");
assertTrue(behavior instanceof Leader);
}
+ @Test
+ public void testIsolatedLeaderCheckNoVotingFollowers() {
+ logStart("testIsolatedLeaderCheckNoVotingFollowers");
+
+ MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
+ Follower follower = new Follower(followerActorContext);
+ followerActor.underlyingActor().setBehavior(follower);
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(1000, TimeUnit.SECONDS));
+ leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
+
+ leader = new Leader(leaderActorContext);
+ leader.getFollower(FOLLOWER_ID).markFollowerActive();
+ RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
+ assertTrue("Expected Leader", behavior instanceof Leader);
+ }
+
private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
import java.io.FileOutputStream;
import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Future;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
+import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshotList;
+import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
-import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
-import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ClusterAdminService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ConvertMembersToNonvotingForAllShardsInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ConvertMembersToVotingForAllShardsInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingState;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
*
* @author Thomas Pantelis
*/
-public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseable {
+public class ClusterAdminRpcService implements ClusterAdminService {
private static final Timeout SHARD_MGR_TIMEOUT = new Timeout(1, TimeUnit.MINUTES);
private static final Logger LOG = LoggerFactory.getLogger(ClusterAdminRpcService.class);
private final DistributedDataStoreInterface configDataStore;
private final DistributedDataStoreInterface operDataStore;
- private RpcRegistration<ClusterAdminService> rpcRegistration;
public ClusterAdminRpcService(DistributedDataStoreInterface configDataStore,
DistributedDataStoreInterface operDataStore) {
this.operDataStore = operDataStore;
}
- public void start(RpcProviderRegistry rpcProviderRegistry) {
- LOG.debug("ClusterAdminRpcService starting");
-
- rpcRegistration = rpcProviderRegistry.addRpcImplementation(ClusterAdminService.class, this);
- }
-
- @Override
- public void close() {
- if(rpcRegistration != null) {
- rpcRegistration.close();
- }
- }
-
@Override
public Future<RpcResult<Void>> addShardReplica(final AddShardReplicaInput input) {
final String shardName = input.getShardName();
LOG.info("Adding replicas for all shards");
final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData = new ArrayList<>();
- Function<String, Object> messageSupplier = new Function<String, Object>() {
- @Override
- public Object apply(String shardName) {
- return new AddShardReplica(shardName);
- }
- };
+ Function<String, Object> messageSupplier = shardName -> new AddShardReplica(shardName);
sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier);
sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier);
- return waitForShardResults(shardResultData, new Function<List<ShardResult>, AddReplicasForAllShardsOutput>() {
- @Override
- public AddReplicasForAllShardsOutput apply(List<ShardResult> shardResults) {
- return new AddReplicasForAllShardsOutputBuilder().setShardResult(shardResults).build();
- }
- }, "Failed to add replica");
+ return waitForShardResults(shardResultData, shardResults ->
+ new AddReplicasForAllShardsOutputBuilder().setShardResult(shardResults).build(),
+ "Failed to add replica");
}
}
final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData = new ArrayList<>();
- Function<String, Object> messageSupplier = new Function<String, Object>() {
- @Override
- public Object apply(String shardName) {
- return new RemoveShardReplica(shardName, MemberName.forName(memberName));
- }
- };
+ Function<String, Object> messageSupplier = shardName ->
+ new RemoveShardReplica(shardName, MemberName.forName(memberName));
sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier);
sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier);
- return waitForShardResults(shardResultData, new Function<List<ShardResult>, RemoveAllShardReplicasOutput>() {
+ return waitForShardResults(shardResultData, shardResults ->
+ new RemoveAllShardReplicasOutputBuilder().setShardResult(shardResults).build(),
+ " Failed to remove replica");
+ }
+
+ @Override
+ public Future<RpcResult<Void>> changeMemberVotingStatesForShard(ChangeMemberVotingStatesForShardInput input) {
+ final String shardName = input.getShardName();
+ if(Strings.isNullOrEmpty(shardName)) {
+ return newFailedRpcResultFuture("A valid shard name must be specified");
+ }
+
+ DataStoreType dataStoreType = input.getDataStoreType();
+ if(dataStoreType == null) {
+ return newFailedRpcResultFuture("A valid DataStoreType must be specified");
+ }
+
+ List<MemberVotingState> memberVotingStates = input.getMemberVotingState();
+ if(memberVotingStates == null || memberVotingStates.isEmpty()) {
+ return newFailedRpcResultFuture("No member voting state input was specified");
+ }
+
+ ChangeShardMembersVotingStatus changeVotingStatus = toChangeShardMembersVotingStatus(shardName,
+ memberVotingStates);
+
+ LOG.info("Change member voting states for shard {}: {}", shardName,
+ changeVotingStatus.getMeberVotingStatusMap());
+
+ final SettableFuture<RpcResult<Void>> returnFuture = SettableFuture.create();
+ ListenableFuture<Success> future = sendMessageToShardManager(dataStoreType, changeVotingStatus);
+ Futures.addCallback(future, new FutureCallback<Success>() {
@Override
- public RemoveAllShardReplicasOutput apply(List<ShardResult> shardResults) {
- return new RemoveAllShardReplicasOutputBuilder().setShardResult(shardResults).build();
+ public void onSuccess(Success success) {
+ LOG.info("Successfully changed member voting states for shard {}", shardName);
+ returnFuture.set(newSuccessfulResult());
}
- }, "Failed to add replica");
+
+ @Override
+ public void onFailure(Throwable failure) {
+ onMessageFailure(String.format("Failed to change member voting states for shard %s", shardName),
+ returnFuture, failure);
+ }
+ });
+
+ return returnFuture;
}
@Override
- public Future<RpcResult<Void>> convertMembersToVotingForAllShards(ConvertMembersToVotingForAllShardsInput input) {
- // TODO implement
- return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
- "Not implemented yet").buildFuture();
+ public Future<RpcResult<ChangeMemberVotingStatesForAllShardsOutput>> changeMemberVotingStatesForAllShards(
+ final ChangeMemberVotingStatesForAllShardsInput input) {
+ List<MemberVotingState> memberVotingStates = input.getMemberVotingState();
+ if(memberVotingStates == null || memberVotingStates.isEmpty()) {
+ return newFailedRpcResultFuture("No member voting state input was specified");
+ }
+
+ final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData = new ArrayList<>();
+ Function<String, Object> messageSupplier = shardName ->
+ toChangeShardMembersVotingStatus(shardName, memberVotingStates);
+
+ LOG.info("Change member voting states for all shards");
+
+ sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier);
+ sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier);
+
+ return waitForShardResults(shardResultData, shardResults ->
+ new ChangeMemberVotingStatesForAllShardsOutputBuilder().setShardResult(shardResults).build(),
+ "Failed to change member voting states");
}
@Override
- public Future<RpcResult<Void>> convertMembersToNonvotingForAllShards(
- ConvertMembersToNonvotingForAllShardsInput input) {
- // TODO implement
- return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
- "Not implemented yet").buildFuture();
+ public Future<RpcResult<FlipMemberVotingStatesForAllShardsOutput>> flipMemberVotingStatesForAllShards() {
+ final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData = new ArrayList<>();
+ Function<String, Object> messageSupplier = shardName ->
+ new FlipShardMembersVotingStatus(shardName);
+
+ LOG.info("Flip member voting states for all shards");
+
+ sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier);
+ sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier);
+
+ return waitForShardResults(shardResultData, shardResults ->
+ new FlipMemberVotingStatesForAllShardsOutputBuilder().setShardResult(shardResults).build(),
+ "Failed to change member voting states");
}
@Override
return returnFuture;
}
+ private ChangeShardMembersVotingStatus toChangeShardMembersVotingStatus(final String shardName,
+ List<MemberVotingState> memberVotingStatus) {
+ Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
+ for(MemberVotingState memberStatus: memberVotingStatus) {
+ serverVotingStatusMap.put(memberStatus.getMemberName(), memberStatus.isVoting());
+ }
+
+ ChangeShardMembersVotingStatus changeVotingStatus = new ChangeShardMembersVotingStatus(shardName,
+ serverVotingStatusMap);
+ return changeVotingStatus;
+ }
+
private static <T> SettableFuture<RpcResult<T>> waitForShardResults(
final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData,
final Function<List<ShardResult>, T> resultDataSupplier,
description "The list of results, one per shard";
}
}
-
+
+ grouping member-voting-states-input {
+ list member-voting-state {
+ leaf member-name {
+ type string;
+ }
+
+ leaf voting {
+ type boolean;
+ }
+
+ description "The list of member voting states";
+ }
+ }
+
rpc add-shard-replica {
input {
leaf shard-name {
description "The cluster member from which the shard replicas should be removed";
}
}
-
+
output {
uses shard-result-output;
}
-
+
description "Removes replicas for all shards on this node. This is equivalent to issuing
a remove-shard-replica for all shards and essentially removes this node from a cluster.";
}
- rpc convert-members-to-nonvoting-for-all-shards {
+ rpc change-member-voting-states-for-shard {
input {
- leaf-list member-names {
+ leaf shard-name {
+ mandatory true;
type string;
- description "The names of the cluster members to convert.";
+ description "The name of the shard for which to change voting state.";
}
+
+ leaf data-store-type {
+ mandatory true;
+ type data-store-type;
+ description "The type of the data store to which the shard belongs";
+ }
+
+ uses member-voting-states-input;
}
-
- description "Converts the given cluster members to non-voting for all shards. The members will no
- longer participate in leader elections and consensus but will be replicated. This is useful for
- having a set of members serve as a backup cluster in case the primary voting cluster suffers
- catastrophic failure. This RPC can be issued to any cluster member and will be forwarded
- to the leader.";
+
+ description "Changes the voting states, either voting or non-voting, of cluster members for a shard.
+ Non-voting members will no longer participate in leader elections and consensus but will be
+ replicated. This is useful for having a set of members serve as a backup cluster in case the
+ primary voting cluster suffers catastrophic failure. This RPC can be issued to any cluster member
+ and will be forwarded to the leader.";
}
- rpc convert-members-to-voting-for-all-shards {
+ rpc change-member-voting-states-for-all-shards {
input {
- leaf-list member-names {
- type string;
- description "The names of the cluster members to convert.";
- }
+ uses member-voting-states-input;
+ }
+
+ output {
+ uses shard-result-output;
+ }
+
+ description "Changes the voting states, either voting or non-voting, of cluster members for all shards.
+ Non-voting members will no longer participate in leader elections and consensus but will be
+ replicated. This is useful for having a set of members serve as a backup cluster in case the
+ primary voting cluster suffers catastrophic failure. This RPC can be issued to any cluster member
+ and will be forwarded to the leader.";
+ }
+
+ rpc flip-member-voting-states-for-all-shards {
+ output {
+ uses shard-result-output;
}
- description "Converts the given cluster members to voting for all shards. The members will
- participate in leader elections and consensus.";
+ description "Flips the voting states of all cluster members for all shards, such that if a member
+ was voting it becomes non-voting and vice versa.";
}
rpc backup-datastore {
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyNoShardPresent;
import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftPeersPresent;
import akka.actor.Status.Success;
import akka.cluster.Cluster;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import java.io.File;
import java.io.FileInputStream;
import java.net.URI;
+import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.SerializationUtils;
import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier;
import org.opendaylight.controller.cluster.datastore.Shard;
import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
+import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingStateBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
import org.opendaylight.yangtools.yang.common.RpcError;
get(5, TimeUnit.SECONDS);
assertEquals("isSuccessful", false, rpcResult.isSuccessful());
assertEquals("getErrors", 1, rpcResult.getErrors().size());
-
- service.close();
}
private static void verifyDatastoreSnapshot(String type, DatastoreSnapshot datastoreSnapshot, String... expShardNames) {
rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName("people").
setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
verifyFailedRpcResult(rpcResult);
-
- service.close();
}
private static NormalizedNode<?, ?> writeCarsNodeAndVerify(DistributedDataStore writeToStore,
verifySuccessfulRpcResult(rpcResult);
verifyRaftPeersPresent(memberNode.operDataStore(), shardName, peerMemberNames);
-
- service.close();
}
private static <T> T verifySuccessfulRpcResult(RpcResult<T> rpcResult) {
setShardName("cars").setMemberName("member-3").setDataStoreType(DataStoreType.Config).build()).
get(10, TimeUnit.SECONDS);
verifySuccessfulRpcResult(rpcResult);
- service3.close();
verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
setShardName("cars").setMemberName("member-2").setDataStoreType(DataStoreType.Config).build()).
get(10, TimeUnit.SECONDS);
verifySuccessfulRpcResult(rpcResult);
- service1.close();
verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars");
verifyNoShardPresent(replicaNode2.configDataStore(), "cars");
setShardName("cars").setMemberName("member-1").setDataStoreType(DataStoreType.Config).build()).
get(10, TimeUnit.SECONDS);
verifySuccessfulRpcResult(rpcResult);
- service1.close();
verifyRaftState(replicaNode2.configDataStore(), "cars", new RaftStateVerifier() {
@Override
verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "pets", "member-1");
verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1");
verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "people", "member-1");
-
- service.close();
}
@Test
verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
verifyNoShardPresent(replicaNode3.configDataStore(), "people");
verifyNoShardPresent(replicaNode3.configDataStore(), "pets");
+ }
+
+ @Test
+ public void testChangeMemberVotingStatesForShard() throws Exception {
+ String name = "testChangeMemberVotingStatusForShard";
+ String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
+ MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+ moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder().
+ shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
+
+ MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
+ moduleShardsConfig(moduleShardsConfig).build();
+
+ MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
+ moduleShardsConfig(moduleShardsConfig).build();
+
+ leaderNode1.configDataStore().waitTillReady();
+ verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
+ verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
+ verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
- service3.close();
+ // Invoke RPC service on member-3 to change voting status
+
+ ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
+ replicaNode3.operDataStore());
+
+ RpcResult<Void> rpcResult = service3.changeMemberVotingStatesForShard(
+ new ChangeMemberVotingStatesForShardInputBuilder().setShardName("cars").
+ setDataStoreType(DataStoreType.Config).setMemberVotingState(ImmutableList.of(
+ new MemberVotingStateBuilder().setMemberName("member-2").setVoting(false).build(),
+ new MemberVotingStateBuilder().setMemberName("member-3").setVoting(false).build())).build()).
+ get(10, TimeUnit.SECONDS);
+ verifySuccessfulRpcResult(rpcResult);
+
+ verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", false),
+ new SimpleEntry<>("member-3", false));
+ verifyVotingStates(replicaNode2.configDataStore(), "cars", new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", false),
+ new SimpleEntry<>("member-3", false));
+ verifyVotingStates(replicaNode3.configDataStore(), "cars", new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", false),
+ new SimpleEntry<>("member-3", false));
}
@Test
- public void testConvertMembersToVotingForAllShards() {
- // TODO implement
+ public void testChangeMemberVotingStatesForSingleNodeShard() throws Exception {
+ String name = "testChangeMemberVotingStatesForSingleNodeShard";
+ String moduleShardsConfig = "module-shards-member1.conf";
+ MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+ moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder().
+ shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
+
+ leaderNode.configDataStore().waitTillReady();
+
+ // Invoke RPC service on member-3 to change voting status
+
+ ClusterAdminRpcService service = new ClusterAdminRpcService(leaderNode.configDataStore(),
+ leaderNode.operDataStore());
+
+ RpcResult<Void> rpcResult = service.changeMemberVotingStatesForShard(
+ new ChangeMemberVotingStatesForShardInputBuilder().setShardName("cars").
+ setDataStoreType(DataStoreType.Config).setMemberVotingState(ImmutableList.of(
+ new MemberVotingStateBuilder().setMemberName("member-1").setVoting(false).build())).build()).
+ get(10, TimeUnit.SECONDS);
+ verifyFailedRpcResult(rpcResult);
+
+ verifyVotingStates(leaderNode.configDataStore(), "cars", new SimpleEntry<>("member-1", true));
+ }
+
+ @Test
+ public void testChangeMemberVotingStatesForAllShards() throws Exception {
+ String name = "testChangeMemberVotingStatesForAllShards";
+ String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
+ MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+ moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder().
+ shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
+
+ MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
+ moduleShardsConfig(moduleShardsConfig).build();
+
+ MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
+ moduleShardsConfig(moduleShardsConfig).build();
+
+ leaderNode1.configDataStore().waitTillReady();
+ leaderNode1.operDataStore().waitTillReady();
+ verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
+ verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
+ verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
+
+ // Invoke RPC service on member-3 to change voting status
+
+ ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
+ replicaNode3.operDataStore());
+
+ RpcResult<ChangeMemberVotingStatesForAllShardsOutput> rpcResult = service3.changeMemberVotingStatesForAllShards(
+ new ChangeMemberVotingStatesForAllShardsInputBuilder().setMemberVotingState(ImmutableList.of(
+ new MemberVotingStateBuilder().setMemberName("member-2").setVoting(false).build(),
+ new MemberVotingStateBuilder().setMemberName("member-3").setVoting(false).build())).build()).
+ get(10, TimeUnit.SECONDS);
+ ChangeMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
+ verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
+ successShardResult("people", DataStoreType.Config),
+ successShardResult("cars", DataStoreType.Operational),
+ successShardResult("people", DataStoreType.Operational));
+
+ verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
+ replicaNode2.configDataStore(), replicaNode2.operDataStore(),
+ replicaNode3.configDataStore(), replicaNode3.operDataStore()},
+ new String[]{"cars", "people"}, new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", false),
+ new SimpleEntry<>("member-3", false));
}
@Test
- public void testConvertMembersToNonvotingForAllShards() {
- // TODO implement
+ public void testFlipMemberVotingStates() throws Exception {
+ String name = "testFlipMemberVotingStates";
+
+ ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
+ new ServerInfo("member-1", true), new ServerInfo("member-2", true),
+ new ServerInfo("member-3", false)));
+
+ setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
+ setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
+ setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
+
+ String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
+ MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+ moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder().
+ shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
+
+ MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
+ moduleShardsConfig(moduleShardsConfig).build();
+
+ MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
+ moduleShardsConfig(moduleShardsConfig).build();
+
+ leaderNode1.configDataStore().waitTillReady();
+ leaderNode1.operDataStore().waitTillReady();
+ verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", true),
+ new SimpleEntry<>("member-2", true), new SimpleEntry<>("member-3", false));
+
+ ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
+ replicaNode3.operDataStore());
+
+ RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service3.flipMemberVotingStatesForAllShards().
+ get(10, TimeUnit.SECONDS);
+ FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
+ verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
+ successShardResult("people", DataStoreType.Config),
+ successShardResult("cars", DataStoreType.Operational),
+ successShardResult("people", DataStoreType.Operational));
+
+ verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
+ replicaNode2.configDataStore(), replicaNode2.operDataStore(),
+ replicaNode3.configDataStore(), replicaNode3.operDataStore()},
+ new String[]{"cars", "people"},
+ new SimpleEntry<>("member-1", false), new SimpleEntry<>("member-2", false),
+ new SimpleEntry<>("member-3", true));
+
+ // Leadership should have transferred to member 3 since it is the only remaining voting member.
+ verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
+ assertNotNull("Expected non-null leader Id", raftState.getLeader());
+ assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
+ raftState.getLeader().contains("member-3"));
+ });
+
+ verifyRaftState(leaderNode1.operDataStore(), "cars", raftState -> {
+ assertNotNull("Expected non-null leader Id", raftState.getLeader());
+ assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
+ raftState.getLeader().contains("member-3"));
+ });
+
+ // Flip the voting states back to the original states.
+
+ rpcResult = service3.flipMemberVotingStatesForAllShards(). get(10, TimeUnit.SECONDS);
+ result = verifySuccessfulRpcResult(rpcResult);
+ verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
+ successShardResult("people", DataStoreType.Config),
+ successShardResult("cars", DataStoreType.Operational),
+ successShardResult("people", DataStoreType.Operational));
+
+ verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
+ replicaNode2.configDataStore(), replicaNode2.operDataStore(),
+ replicaNode3.configDataStore(), replicaNode3.operDataStore()},
+ new String[]{"cars", "people"},
+ new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", true),
+ new SimpleEntry<>("member-3", false));
+
+ // Leadership should have transferred to member 1 or 2.
+ verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
+ assertNotNull("Expected non-null leader Id", raftState.getLeader());
+ assertTrue("Expected leader member-1 or member-2. Actual: " + raftState.getLeader(),
+ raftState.getLeader().contains("member-1") || raftState.getLeader().contains("member-2"));
+ });
+ }
+
+ @Test
+ public void testFlipMemberVotingStatesWithNoInitialLeader() throws Exception {
+ String name = "testFlipMemberVotingStatesWithNoInitialLeader";
+
+ // Members 1, 2, and 3 are initially started up as non-voting. Members 4, 5, and 6 are initially
+ // non-voting and simulated as down by not starting them up.
+ ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
+ new ServerInfo("member-1", false), new ServerInfo("member-2", false),
+ new ServerInfo("member-3", false), new ServerInfo("member-4", true),
+ new ServerInfo("member-5", true), new ServerInfo("member-6", true)));
+
+ setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
+ setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
+ setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
+
+ String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
+ MemberNode replicaNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+ moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder().
+ shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
+
+ MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
+ moduleShardsConfig(moduleShardsConfig).build();
+
+ MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
+ moduleShardsConfig(moduleShardsConfig).build();
+
+ // Initially there won't be a leader b/c all the up nodes are non-voting.
+
+ replicaNode1.waitForMembersUp("member-2", "member-3");
+
+ verifyVotingStates(replicaNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", false),
+ new SimpleEntry<>("member-2", false), new SimpleEntry<>("member-3", false),
+ new SimpleEntry<>("member-4", true), new SimpleEntry<>("member-5", true),
+ new SimpleEntry<>("member-6", true));
+
+ verifyRaftState(replicaNode1.configDataStore(), "cars", raftState ->
+ assertEquals("Expected raft state", RaftState.Follower.toString(), raftState.getRaftState()));
+
+ ClusterAdminRpcService service1 = new ClusterAdminRpcService(replicaNode1.configDataStore(),
+ replicaNode1.operDataStore());
+
+ RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards().
+ get(10, TimeUnit.SECONDS);
+ FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
+ verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
+ successShardResult("people", DataStoreType.Config),
+ successShardResult("cars", DataStoreType.Operational),
+ successShardResult("people", DataStoreType.Operational));
+
+ verifyVotingStates(new DistributedDataStore[]{replicaNode1.configDataStore(), replicaNode1.operDataStore(),
+ replicaNode2.configDataStore(), replicaNode2.operDataStore(),
+ replicaNode3.configDataStore(), replicaNode3.operDataStore()},
+ new String[]{"cars", "people"},
+ new SimpleEntry<>("member-1", true), new SimpleEntry<>("member-2", true),
+ new SimpleEntry<>("member-3", true), new SimpleEntry<>("member-4", false),
+ new SimpleEntry<>("member-5", false), new SimpleEntry<>("member-6", false));
+
+ // Since member 1 was changed to voting and there was no leader, it should've started and election
+ // and become leader
+ verifyRaftState(replicaNode1.configDataStore(), "cars", raftState -> {
+ assertNotNull("Expected non-null leader Id", raftState.getLeader());
+ assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
+ raftState.getLeader().contains("member-1"));
+ });
+
+ verifyRaftState(replicaNode1.operDataStore(), "cars", raftState -> {
+ assertNotNull("Expected non-null leader Id", raftState.getLeader());
+ assertTrue("Expected leader member-1. Actual: " + raftState.getLeader(),
+ raftState.getLeader().contains("member-1"));
+ });
+ }
+
+ @Test
+ public void testFlipMemberVotingStatesWithVotingMembersDown() throws Exception {
+ String name = "testFlipMemberVotingStatesWithVotingMembersDown";
+
+ // Members 4, 5, and 6 are initially non-voting and simulated as down by not starting them up.
+ ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
+ new ServerInfo("member-1", true), new ServerInfo("member-2", true),
+ new ServerInfo("member-3", true), new ServerInfo("member-4", false),
+ new ServerInfo("member-5", false), new ServerInfo("member-6", false)));
+
+ setupPersistedServerConfigPayload(persistedServerConfig, "member-1", name, "cars", "people");
+ setupPersistedServerConfigPayload(persistedServerConfig, "member-2", name, "cars", "people");
+ setupPersistedServerConfigPayload(persistedServerConfig, "member-3", name, "cars", "people");
+
+ String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
+ MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+ moduleShardsConfig(moduleShardsConfig).datastoreContextBuilder(DatastoreContext.newBuilder().
+ shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
+
+ MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
+ moduleShardsConfig(moduleShardsConfig).build();
+
+ MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
+ moduleShardsConfig(moduleShardsConfig).build();
+
+ leaderNode1.configDataStore().waitTillReady();
+ leaderNode1.operDataStore().waitTillReady();
+ verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", true),
+ new SimpleEntry<>("member-2", true), new SimpleEntry<>("member-3", true),
+ new SimpleEntry<>("member-4", false), new SimpleEntry<>("member-5", false),
+ new SimpleEntry<>("member-6", false));
+
+ ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
+ leaderNode1.operDataStore());
+
+ RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards().
+ get(10, TimeUnit.SECONDS);
+ FlipMemberVotingStatesForAllShardsOutput result = verifySuccessfulRpcResult(rpcResult);
+ verifyShardResults(result.getShardResult(), successShardResult("cars", DataStoreType.Config),
+ successShardResult("people", DataStoreType.Config),
+ successShardResult("cars", DataStoreType.Operational),
+ successShardResult("people", DataStoreType.Operational));
+
+ // Members 2 and 3 are now non-voting but should get replicated with the new new server config.
+ verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
+ replicaNode2.configDataStore(), replicaNode2.operDataStore(),
+ replicaNode3.configDataStore(), replicaNode3.operDataStore()},
+ new String[]{"cars", "people"},
+ new SimpleEntry<>("member-1", false), new SimpleEntry<>("member-2", false),
+ new SimpleEntry<>("member-3", false), new SimpleEntry<>("member-4", true),
+ new SimpleEntry<>("member-5", true), new SimpleEntry<>("member-6", true));
+
+ // The leader (member 1) was changed to non-voting but it shouldn't be able to step down as leader yet
+ // b/c it can't get a majority consensus with all voting members down. So verify it remains the leader.
+ verifyRaftState(leaderNode1.configDataStore(), "cars", raftState -> {
+ assertNotNull("Expected non-null leader Id", raftState.getLeader());
+ assertTrue("Expected leader member-1", raftState.getLeader().contains("member-1"));
+ });
+ }
+
+ private void setupPersistedServerConfigPayload(ServerConfigurationPayload serverConfig,
+ String member, String datastoreTypeSuffix, String... shards) {
+ String[] datastoreTypes = {"config_", "oper_"};
+ for(String type: datastoreTypes) {
+ for(String shard: shards) {
+ List<ServerInfo> newServerInfo = new ArrayList<>(serverConfig.getServerConfig().size());
+ for(ServerInfo info: serverConfig.getServerConfig()) {
+ newServerInfo.add(new ServerInfo(ShardIdentifier.create(shard, MemberName.forName(info.getId()),
+ type + datastoreTypeSuffix).toString(), info.isVoting()));
+ }
+
+ String shardID = ShardIdentifier.create(shard, MemberName.forName(member),
+ type + datastoreTypeSuffix).toString();
+ InMemoryJournal.addEntry(shardID, 1, new UpdateElectionTerm(1, null));
+ InMemoryJournal.addEntry(shardID, 2, new ReplicatedLogImplEntry(0, 1,
+ new ServerConfigurationPayload(newServerInfo)));
+ }
+ }
+ }
+
+ @SafeVarargs
+ private static void verifyVotingStates(DistributedDataStore[] datastores, String[] shards,
+ SimpleEntry<String, Boolean>... expStates) throws Exception {
+ for(DistributedDataStore datastore: datastores) {
+ for(String shard: shards) {
+ verifyVotingStates(datastore, shard, expStates);
+ }
+ }
+ }
+
+ @SafeVarargs
+ private static void verifyVotingStates(DistributedDataStore datastore, String shardName,
+ SimpleEntry<String, Boolean>... expStates) throws Exception {
+ String localMemberName = datastore.getActorContext().getCurrentMemberName().getName();
+ Map<String, Boolean> expStateMap = new HashMap<>();
+ for(Entry<String, Boolean> e: expStates) {
+ expStateMap.put(ShardIdentifier.create(shardName, MemberName.forName(e.getKey()),
+ datastore.getActorContext().getDataStoreName()).toString(), e.getValue());
+ }
+
+ verifyRaftState(datastore, shardName, raftState -> {
+ String localPeerId = ShardIdentifier.create(shardName, MemberName.forName(localMemberName),
+ datastore.getActorContext().getDataStoreName()).toString();
+ assertEquals("Voting state for " + localPeerId, expStateMap.get(localPeerId), raftState.isVoting());
+ for(Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
+ assertEquals("Voting state for " + e.getKey(), expStateMap.get(e.getKey()), e.getValue());
+ }
+ });
}
private static void verifyShardResults(List<ShardResult> shardResults, ShardResult... expShardResults) {
--- /dev/null
+org.slf4j.simpleLogger.showDateTime=true
+org.slf4j.simpleLogger.dateTimeFormat=hh:mm:ss,S a
+org.slf4j.simpleLogger.logFile=System.out
+org.slf4j.simpleLogger.showShortLogName=true
+org.slf4j.simpleLogger.levelInBrackets=true
+org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore=debug
+org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore.Shard=error
+org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore.utils.ActorContext=error
+org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.raft.RaftActorServerConfigurationSupport=debug
+org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore.node.utils.stream=off
\ No newline at end of file
import akka.actor.ActorRef;
import akka.pattern.Patterns;
import akka.util.Timeout;
+import com.google.common.base.Joiner;
import com.google.common.base.Stopwatch;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
public String getVotedFor() {
return getOnDemandRaftState().getVotedFor();
}
+ @Override
+ public boolean isVoting() {
+ return getOnDemandRaftState().isVoting();
+ }
+
+ @Override
+ public String getPeerVotingStates() {
+ return toStringMap(getOnDemandRaftState().getPeerVotingStates());
+ }
@Override
public boolean isSnapshotCaptureInitiated() {
@Override
public String getPeerAddresses() {
- StringBuilder builder = new StringBuilder();
- int i = 0;
- for(Map.Entry<String, String> e: getOnDemandRaftState().getPeerAddresses().entrySet()) {
- if(i++ > 0) {
- builder.append(", ");
- }
-
- builder.append(e.getKey()).append(": ").append(e.getValue());
- }
+ return toStringMap(getOnDemandRaftState().getPeerAddresses());
+ }
- return builder.toString();
+ private static String toStringMap(Map<?, ?> map) {
+ return Joiner.on(", ").withKeyValueSeparator(": ").join(map);
}
@Override
return shard.getPendingTxCommitQueueSize();
}
+ @Override
public int getTxCohortCacheSize() {
return shard.getCohortCacheSize();
}
shard.getSelf().tell(new InitiateCaptureSnapshot(), ActorRef.noSender());
}
}
-
}
boolean isSnapshotCaptureInitiated();
+ boolean isVoting();
+
void resetTransactionCounters();
long getInMemoryJournalDataSize();
String getPeerAddresses();
+ String getPeerVotingStates();
+
long getLeadershipChangeCount();
String getLastLeadershipChangeTime();
--- /dev/null
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+
+/**
+ * A local message sent to the ShardManager to change the raft voting status for members of a shard.
+ *
+ * @author Thomas Pantelis
+ */
+public class ChangeShardMembersVotingStatus {
+ private final String shardName;
+ private final Map<String, Boolean> meberVotingStatusMap;
+
+ public ChangeShardMembersVotingStatus(String shardName, Map<String, Boolean> meberVotingStatusMap) {
+ this.shardName = Preconditions.checkNotNull(shardName);
+ this.meberVotingStatusMap = ImmutableMap.copyOf(meberVotingStatusMap);
+ }
+
+ public String getShardName() {
+ return shardName;
+ }
+
+ public Map<String, Boolean> getMeberVotingStatusMap() {
+ return meberVotingStatusMap;
+ }
+
+ @Override
+ public String toString() {
+ return "ChangeShardMembersVotingStatus [shardName=" + shardName + ", meberVotingStatusMap="
+ + meberVotingStatusMap + "]";
+ }
+
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A local message sent to the ShardManager to flip the raft voting states for members of a shard.
+ *
+ * @author Thomas Pantelis
+ */
+public class FlipShardMembersVotingStatus {
+ private final String shardName;
+
+ public FlipShardMembersVotingStatus(String shardName) {
+ this.shardName = Preconditions.checkNotNull(shardName);
+ }
+
+ public String getShardName() {
+ return shardName;
+ }
+
+ @Override
+ public String toString() {
+ return "FlipShardMembersVotingStatus [shardName=" + shardName + "]";
+ }
+}
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
import java.util.function.Supplier;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
import org.apache.commons.lang3.SerializationUtils;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
+import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
+import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
import org.opendaylight.controller.cluster.raft.messages.AddServer;
import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
} else if(message instanceof ForwardedAddServerFailure) {
ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
- } else if(message instanceof PrimaryShardFoundForContext) {
- PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message;
- onPrimaryShardFoundContext(primaryShardFoundContext);
} else if(message instanceof RemoveShardReplica) {
onRemoveShardReplica((RemoveShardReplica) message);
} else if(message instanceof WrappedShardResponse){
onGetSnapshot();
} else if(message instanceof ServerRemoved){
onShardReplicaRemoved((ServerRemoved) message);
+ } else if(message instanceof ChangeShardMembersVotingStatus){
+ onChangeShardServersVotingStatus((ChangeShardMembersVotingStatus) message);
+ } else if(message instanceof FlipShardMembersVotingStatus){
+ onFlipShardMembersVotingStatus((FlipShardMembersVotingStatus) message);
} else if(message instanceof SaveSnapshotSuccess) {
onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
} else if(message instanceof SaveSnapshotFailure) {
onShutDown();
} else if (message instanceof GetLocalShardIds) {
onGetLocalShardIds();
+ } else if(message instanceof RunnableMessage) {
+ ((RunnableMessage)message).run();
} else {
unknownMessage(message);
}
}
}
- private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
- if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) {
- addShard(primaryShardFoundContext.getShardName(), primaryShardFoundContext.getRemotePrimaryShardFound(),
- getSender());
- } else if(primaryShardFoundContext.getContextMessage() instanceof RemoveShardReplica){
- removeShardReplica((RemoveShardReplica) primaryShardFoundContext.getContextMessage(),
- primaryShardFoundContext.getShardName(), primaryShardFoundContext.getPrimaryPath(), getSender());
- }
- }
-
private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
final ActorRef sender) {
if(isShardReplicaOperationInProgress(shardName, sender)) {
@Override
public void onComplete(Throwable failure, Object response) {
if (failure != null) {
+ shardReplicaOperationsInProgress.add(shardName);
String msg = String.format("RemoveServer request to leader %s for shard %s failed",
primaryPath, shardName);
findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
@Override
public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
- getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+ getSelf().tell(new RunnableMessage() {
+ @Override
+ public void run() {
+ addShard(getShardName(), response, getSender());
+ }
+ }, getTargetActor());
}
@Override
shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
@Override
public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
- getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+ doRemoveShardReplicaAsync(response.getPrimaryPath());
}
@Override
public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
- getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+ doRemoveShardReplicaAsync(response.getPrimaryPath());
+ }
+
+ private void doRemoveShardReplicaAsync(final String primaryPath) {
+ getSelf().tell(new RunnableMessage() {
+ @Override
+ public void run() {
+ removeShardReplica(shardReplicaMsg, getShardName(), primaryPath, getSender());
+ }
+ }, getTargetActor());
}
});
}
0, 0));
}
+ private void onChangeShardServersVotingStatus(final ChangeShardMembersVotingStatus changeMembersVotingStatus) {
+ LOG.debug("{}: onChangeShardServersVotingStatus: {}", persistenceId(), changeMembersVotingStatus);
+
+ String shardName = changeMembersVotingStatus.getShardName();
+ Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
+ for(Entry<String, Boolean> e: changeMembersVotingStatus.getMeberVotingStatusMap().entrySet()) {
+ serverVotingStatusMap.put(getShardIdentifier(MemberName.forName(e.getKey()), shardName).toString(),
+ e.getValue());
+ }
+
+ ChangeServersVotingStatus changeServersVotingStatus = new ChangeServersVotingStatus(serverVotingStatusMap);
+
+ findLocalShard(shardName, getSender(),
+ localShardFound -> changeShardMembersVotingStatus(changeServersVotingStatus, shardName,
+ localShardFound.getPath(), getSender()));
+ }
+
+ private void onFlipShardMembersVotingStatus(FlipShardMembersVotingStatus flipMembersVotingStatus) {
+ LOG.debug("{}: onFlipShardMembersVotingStatus: {}", persistenceId(), flipMembersVotingStatus);
+
+ ActorRef sender = getSender();
+ final String shardName = flipMembersVotingStatus.getShardName();
+ findLocalShard(shardName, sender, localShardFound -> {
+ Future<Object> future = ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE,
+ Timeout.apply(30, TimeUnit.SECONDS));
+
+ future.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object response) {
+ if (failure != null) {
+ sender.tell(new Status.Failure(new RuntimeException(
+ String.format("Failed to access local shard %s", shardName), failure)), self());
+ return;
+ }
+
+ OnDemandRaftState raftState = (OnDemandRaftState) response;
+ Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
+ for(Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
+ serverVotingStatusMap.put(e.getKey(), !e.getValue());
+ }
+
+ serverVotingStatusMap.put(getShardIdentifier(cluster.getCurrentMemberName(), shardName).
+ toString(), !raftState.isVoting());
+
+ changeShardMembersVotingStatus(new ChangeServersVotingStatus(serverVotingStatusMap),
+ shardName, localShardFound.getPath(), sender);
+ }
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ });
+
+ }
+
+ private void findLocalShard(final String shardName, final ActorRef sender,
+ final Consumer<LocalShardFound> onLocalShardFound) {
+ Timeout findLocalTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
+ getShardInitializationTimeout().duration().$times(2));
+
+ Future<Object> futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout);
+ futureObj.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object response) {
+ if (failure != null) {
+ LOG.debug ("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName, failure);
+ sender.tell(new Status.Failure(new RuntimeException(
+ String.format("Failed to find local shard %s", shardName), failure)), self());
+ } else {
+ if(response instanceof LocalShardFound) {
+ getSelf().tell(new RunnableMessage() {
+ @Override
+ public void run() {
+ onLocalShardFound.accept((LocalShardFound) response);
+ }
+ }, sender);
+ } else if(response instanceof LocalShardNotFound) {
+ String msg = String.format("Local shard %s does not exist", shardName);
+ LOG.debug ("{}: {}", persistenceId, msg);
+ sender.tell(new Status.Failure(new IllegalArgumentException(msg)), self());
+ } else {
+ String msg = String.format("Failed to find local shard %s: received response: %s",
+ shardName, response);
+ LOG.debug ("{}: {}", persistenceId, msg);
+ sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
+ new RuntimeException(msg)), self());
+ }
+ }
+ }
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ }
+
+ private void changeShardMembersVotingStatus(ChangeServersVotingStatus changeServersVotingStatus,
+ final String shardName, final ActorRef shardActorRef, final ActorRef sender) {
+ if(isShardReplicaOperationInProgress(shardName, sender)) {
+ return;
+ }
+
+ shardReplicaOperationsInProgress.add(shardName);
+
+ DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
+ final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
+
+ LOG.debug("{}: Sending ChangeServersVotingStatus message {} to local shard {}", persistenceId(),
+ changeServersVotingStatus, shardActorRef.path());
+
+ Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2));
+ Future<Object> futureObj = ask(shardActorRef, changeServersVotingStatus, timeout);
+
+ futureObj.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object response) {
+ shardReplicaOperationsInProgress.remove(shardName);
+ if (failure != null) {
+ String msg = String.format("ChangeServersVotingStatus request to local shard %s failed",
+ shardActorRef.path());
+ LOG.debug ("{}: {}", persistenceId(), msg, failure);
+ sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
+ } else {
+ LOG.debug ("{}: Received {} from local shard {}", persistenceId(), response, shardActorRef.path());
+
+ ServerChangeReply replyMsg = (ServerChangeReply) response;
+ if(replyMsg.getStatus() == ServerChangeStatus.OK) {
+ LOG.debug ("{}: ChangeServersVotingStatus succeeded for shard {}", persistenceId(), shardName);
+ sender.tell(new Status.Success(null), getSelf());
+ } else if(replyMsg.getStatus() == ServerChangeStatus.INVALID_REQUEST) {
+ sender.tell(new Status.Failure(new IllegalArgumentException(String.format(
+ "The requested voting state change for shard %s is invalid. At least one member must be voting",
+ shardId.getShardName()))), getSelf());
+ } else {
+ LOG.warn ("{}: ChangeServersVotingStatus failed for shard {} with status {}",
+ persistenceId(), shardName, replyMsg.getStatus());
+
+ Exception error = getServerChangeException(ChangeServersVotingStatus.class,
+ replyMsg.getStatus(), shardActorRef.path().toString(), shardId);
+ sender.tell(new Status.Failure(error), getSelf());
+ }
+ }
+ }
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ }
+
private static final class ForwardedAddServerReply {
ShardInformation shardInfo;
AddServerReply addServerReply;
}, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
}
+ private static interface RunnableMessage extends Runnable {
+ }
+
/**
* The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
* a remote or local find primary message is processed
}
}
- /**
- * The PrimaryShardFoundForContext is a DTO which puts together a message (aka 'Context' message) which needs to be
- * forwarded to the primary replica of a shard and the message (aka 'PrimaryShardFound' message) that is received
- * as a successful response to find primary.
- */
- private static class PrimaryShardFoundForContext {
- private final String shardName;
- private final Object contextMessage;
- private final RemotePrimaryShardFound remotePrimaryShardFound;
- private final LocalPrimaryShardFound localPrimaryShardFound;
-
- public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage,
- @Nonnull Object primaryFoundMessage) {
- this.shardName = Preconditions.checkNotNull(shardName);
- this.contextMessage = Preconditions.checkNotNull(contextMessage);
- Preconditions.checkNotNull(primaryFoundMessage);
- this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ?
- (RemotePrimaryShardFound) primaryFoundMessage : null;
- this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ?
- (LocalPrimaryShardFound) primaryFoundMessage : null;
- }
-
- @Nonnull
- String getPrimaryPath(){
- if(remotePrimaryShardFound != null) {
- return remotePrimaryShardFound.getPrimaryPath();
- }
- return localPrimaryShardFound.getPrimaryPath();
- }
-
- @Nonnull
- Object getContextMessage() {
- return contextMessage;
- }
-
- @Nullable
- RemotePrimaryShardFound getRemotePrimaryShardFound() {
- return remotePrimaryShardFound;
- }
-
- @Nonnull
- String getShardName() {
- return shardName;
- }
- }
-
/**
* The WrappedShardResponse class wraps a response from a Shard.
*/
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
+import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
import org.opendaylight.controller.cluster.raft.messages.AddServer;
import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
final ActorSystem system2 = newActorSystem("Member2");
Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+ String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
String name = ShardIdentifier.create("astronauts", MEMBER_2, "config").toString();
final TestActorRef<MockRespondActor> mockShardLeaderActor =
- TestActorRef.create(system2, Props.create(MockRespondActor.class).
+ TestActorRef.create(system2, Props.create(MockRespondActor.class, AddServer.class,
+ new AddServerReply(ServerChangeStatus.OK, memberId2)).
withDispatcher(Dispatchers.DefaultDispatcherId()), name);
final TestActorRef<TestShardManager> leaderShardManager = TestActorRef.create(system2,
newTestShardMgrBuilder(mockConfig).shardActor(mockShardLeaderActor).cluster(
leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
- String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2,
mock(DataTree.class), leaderVersion), mockShardLeaderActor);
InMemorySnapshotStore.addSnapshotDeletedLatch(shardManagerID);
//construct a mock response message
- AddServerReply response = new AddServerReply(ServerChangeStatus.OK, memberId2);
- mockShardLeaderActor.underlyingActor().updateResponse(response);
newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
AddServer addServerMsg = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
AddServer.class);
String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix;
AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null);
ActorRef leaderShardActor = shardManager.underlyingActor().getContext().actorOf(
- Props.create(MockRespondActor.class, addServerReply), leaderId);
+ Props.create(MockRespondActor.class, AddServer.class, addServerReply), leaderId);
MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString());
String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
final TestActorRef<MockRespondActor> respondActor =
- TestActorRef.create(getSystem(), Props.create(MockRespondActor.class), memberId);
+ actorFactory.createTestActor(Props.create(MockRespondActor.class, RemoveServer.class,
+ new RemoveServerReply(ServerChangeStatus.OK, null)), memberId);
ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
RaftState.Leader.name())), respondActor);
- respondActor.underlyingActor().updateResponse(new RemoveServerReply(ServerChangeStatus.OK, null));
shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, MEMBER_1), getRef());
final RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(respondActor, RemoveServer.class);
assertEquals(ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(),
Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
String name = ShardIdentifier.create("default", MEMBER_2, shardMrgIDSuffix).toString();
+ String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
final TestActorRef<MockRespondActor> mockShardLeaderActor =
- TestActorRef.create(system2, Props.create(MockRespondActor.class), name);
+ TestActorRef.create(system2, Props.create(MockRespondActor.class, RemoveServer.class,
+ new RemoveServerReply(ServerChangeStatus.OK, memberId2)), name);
LOG.error("Mock Shard Leader Actor : {}", mockShardLeaderActor);
leaderShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
newReplicaShardManager.tell(new ActorInitialized(), mockShardLeaderActor);
- String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
short leaderVersion = DataStoreVersions.CURRENT_VERSION - 1;
leaderShardManager.tell(new ShardLeaderStateChanged(memberId2, memberId2,
mock(DataTree.class), leaderVersion), mockShardLeaderActor);
leaderShardManager.underlyingActor().waitForMemberUp();
//construct a mock response message
- RemoveServerReply response = new RemoveServerReply(ServerChangeStatus.OK, memberId2);
- mockShardLeaderActor.underlyingActor().updateResponse(response);
newReplicaShardManager.tell(new RemoveShardReplica("default", MEMBER_1), getRef());
RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
RemoveServer.class);
LOG.info("testShutDown ending");
}
+ @Test
+ public void testChangeServersVotingStatus() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+
+ TestActorRef<MockRespondActor> respondActor =
+ actorFactory.createTestActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
+ new ServerChangeReply(ServerChangeStatus.OK, null)), memberId);
+
+ ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), respondActor);
+ shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, mock(DataTree.class),
+ DataStoreVersions.CURRENT_VERSION), getRef());
+ shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
+ RaftState.Leader.name())), respondActor);
+
+ shardManager.tell(new ChangeShardMembersVotingStatus("default",
+ ImmutableMap.of("member-2", Boolean.TRUE)), getRef());
+
+ ChangeServersVotingStatus actualChangeStatusMsg = MessageCollectorActor.expectFirstMatching(
+ respondActor, ChangeServersVotingStatus.class);
+ assertEquals("ChangeServersVotingStatus map", actualChangeStatusMsg.getServerVotingStatusMap(),
+ ImmutableMap.of(ShardIdentifier.create("default", MemberName.forName("member-2"),
+ shardMrgIDSuffix).toString(), Boolean.TRUE));
+
+ expectMsgClass(duration("5 seconds"), Success.class);
+ }};
+ }
+
+ @Test
+ public void testChangeServersVotingStatusWithNoLeader() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+
+ TestActorRef<MockRespondActor> respondActor =
+ actorFactory.createTestActor(Props.create(MockRespondActor.class, ChangeServersVotingStatus.class,
+ new ServerChangeReply(ServerChangeStatus.NO_LEADER, null)), memberId);
+
+ ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(respondActor));
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ shardManager.tell(new ActorInitialized(), respondActor);
+ shardManager.tell((new RoleChangeNotification(memberId, null, RaftState.Follower.name())), respondActor);
+
+ shardManager.tell(new ChangeShardMembersVotingStatus("default",
+ ImmutableMap.of("member-2", Boolean.TRUE)), getRef());
+
+ MessageCollectorActor.expectFirstMatching(respondActor, ChangeServersVotingStatus.class);
+
+ Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class);
+ assertEquals("Failure resposnse", true, (resp.cause() instanceof NoShardLeaderException));
+ }};
+ }
+
private static class TestShardManager extends ShardManager {
private final CountDownLatch recoveryComplete = new CountDownLatch(1);
private final CountDownLatch snapshotPersist = new CountDownLatch(1);
private static class MockRespondActor extends MessageCollectorActor {
static final String CLEAR_RESPONSE = "clear-response";
- static final org.slf4j.Logger LOG = LoggerFactory.getLogger(MockRespondActor.class);
- private volatile Object responseMsg;
+ private Object responseMsg;
+ private final Class<?> requestClass;
@SuppressWarnings("unused")
- public MockRespondActor() {
- }
-
- @SuppressWarnings("unused")
- public MockRespondActor(Object responseMsg) {
+ public MockRespondActor(Class<?> requestClass, Object responseMsg) {
+ this.requestClass = requestClass;
this.responseMsg = responseMsg;
}
- public void updateResponse(Object response) {
- responseMsg = response;
- }
-
@Override
public void onReceive(Object message) throws Exception {
- if(!"get-all-messages".equals(message)) {
- LOG.debug("Received message : {}", message);
- }
- super.onReceive(message);
- if (message instanceof AddServer && responseMsg != null) {
- getSender().tell(responseMsg, getSelf());
- } else if(message instanceof RemoveServer && responseMsg != null){
- getSender().tell(responseMsg, getSelf());
- } else if(message.equals(CLEAR_RESPONSE)) {
+ if(message.equals(CLEAR_RESPONSE)) {
responseMsg = null;
+ } else {
+ super.onReceive(message);
+ if (message.getClass().equals(requestClass) && responseMsg != null) {
+ getSender().tell(responseMsg, getSelf());
+ }
}
}
}