import java.util.Queue;
import java.util.UUID;
import javax.annotation.Nullable;
-import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
+import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
import org.opendaylight.controller.cluster.raft.messages.AddServer;
import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
+import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.util.AbstractUUIDIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;
private void onRemoveServer(RemoveServer removeServer, ActorRef sender) {
LOG.debug("{}: onRemoveServer: {}, state: {}", raftContext.getId(), removeServer, currentOperationState);
- boolean isSelf = removeServer.getServerId().equals(raftActor.getId());
+ boolean isSelf = removeServer.getServerId().equals(raftContext.getId());
if(isSelf && !raftContext.hasFollowers()) {
sender.tell(new RemoveServerReply(ServerChangeStatus.NOT_SUPPORTED, raftActor.getLeaderId()),
raftActor.getSelf());
// Sanity check - we could get an ApplyState from a previous operation that timed out so make
// sure it's meant for us.
if(operationContext.getContextId().equals(applyState.getIdentifier())) {
- LOG.info("{}: {} has been successfully replicated to a majority of followers", raftActor.getId(),
+ LOG.info("{}: {} has been successfully replicated to a majority of followers", raftContext.getId(),
applyState.getReplicatedLogEntry().getData());
timer.cancel();
}
}
+ private static final class ServerOperationContextIdentifier extends AbstractUUIDIdentifier<ServerOperationContextIdentifier> {
+ private static final long serialVersionUID = 1L;
+
+ ServerOperationContextIdentifier() {
+ super(UUID.randomUUID());
+ }
+ }
+
/**
* Stores context information for a server operation.
*
private static abstract class ServerOperationContext<T> {
private final T operation;
private final ActorRef clientRequestor;
- private final String contextId;
+ private final Identifier contextId;
ServerOperationContext(T operation, ActorRef clientRequestor){
this.operation = operation;
this.clientRequestor = clientRequestor;
- contextId = UUID.randomUUID().toString();
+ contextId = new ServerOperationContextIdentifier();
}
- String getContextId() {
+ Identifier getContextId() {
return contextId;
}
// leadership.
boolean localServerChangedToNonVoting = Boolean.FALSE.equals(getOperation().
getServerVotingStatusMap().get(raftActor.getRaftActorContext().getId()));
- if(succeeded && localServerChangedToNonVoting && raftActor.isLeader()) {
- raftActor.initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
- @Override
- public void onSuccess(ActorRef raftActorRef, ActorRef replyTo) {
- LOG.debug("{}: leader transfer succeeded after change to non-voting", raftActor.persistenceId());
- ensureFollowerState(raftActor);
- }
-
- @Override
- public void onFailure(ActorRef raftActorRef, ActorRef replyTo) {
- LOG.debug("{}: leader transfer failed after change to non-voting", raftActor.persistenceId());
- ensureFollowerState(raftActor);
- }
-
- private void ensureFollowerState(RaftActor raftActor) {
- // Whether or not leadership transfer succeeded, we have to step down as leader and
- // switch to Follower so ensure that.
- if(raftActor.getRaftState() != RaftState.Follower) {
- raftActor.initializeBehavior();
- }
- }
- });
+ if (succeeded && localServerChangedToNonVoting) {
+ LOG.debug("Leader changed to non-voting - trying leadership transfer");
+ raftActor.becomeNonVoting();
}
}
if(tryToElectLeader) {
initiateLocalLeaderElection();
- } else {
- updateLocalPeerInfo();
-
+ } else if(updateLocalPeerInfo()) {
persistNewServerConfiguration(changeVotingStatusContext);
}
}
LOG.debug("{}: Sending local ElectionTimeout to start leader election", raftContext.getId());
ServerConfigurationPayload previousServerConfig = raftContext.getPeerServerInfo(true);
- updateLocalPeerInfo();
+ if(!updateLocalPeerInfo()) {
+ return;
+ }
- raftContext.getActor().tell(ElectionTimeout.INSTANCE, raftContext.getActor());
+ raftContext.getActor().tell(TimeoutNow.INSTANCE, raftContext.getActor());
currentOperationState = new WaitingForLeaderElected(changeVotingStatusContext, previousServerConfig);
}
- private void updateLocalPeerInfo() {
+ private boolean updateLocalPeerInfo() {
List<ServerInfo> newServerInfoList = newServerInfoList();
+ // Check if new voting state would leave us with no voting members.
+ boolean atLeastOneVoting = false;
+ for(ServerInfo info: newServerInfoList) {
+ if(info.isVoting()) {
+ atLeastOneVoting = true;
+ break;
+ }
+ }
+
+ if(!atLeastOneVoting) {
+ operationComplete(changeVotingStatusContext, ServerChangeStatus.INVALID_REQUEST);
+ return false;
+ }
+
raftContext.updatePeerIds(new ServerConfigurationPayload(newServerInfoList));
if(raftActor.getCurrentBehavior() instanceof AbstractLeader) {
AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
leader.updateMinReplicaCount();
}
+
+ return true;
}
private List<ServerInfo> newServerInfoList() {
@Override
void onNewLeader(String newLeader) {
+ if(newLeader == null) {
+ return;
+ }
+
LOG.debug("{}: New leader {} elected", raftContext.getId(), newLeader);
timer.cancel();