/* * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.cluster.raft; import static com.google.common.base.Verify.verify; import static java.util.Objects.requireNonNull; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.PoisonPill; import akka.actor.Status; import akka.persistence.JournalProtocol; import akka.persistence.SnapshotProtocol; import com.google.common.annotations.VisibleForTesting; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.time.DurationFormatUtils; import org.eclipse.jdt.annotation.NonNull; import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider; import org.opendaylight.controller.cluster.NonPersistentDataProvider; import org.opendaylight.controller.cluster.PersistentDataProvider; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; import org.opendaylight.controller.cluster.mgmt.api.FollowerInfo; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RoleChanged; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached; import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader; import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior; import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; import org.opendaylight.controller.cluster.raft.messages.Payload; import org.opendaylight.controller.cluster.raft.messages.RequestLeadership; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.persisted.NoopPayload; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; import org.opendaylight.yangtools.concepts.Identifier; import org.opendaylight.yangtools.concepts.Immutable; /** * RaftActor encapsulates a state machine that needs to be kept synchronized * in a cluster. It implements the RAFT algorithm as described in the paper * * In Search of an Understandable Consensus Algorithm * *
* RaftActor has 3 states and each state has a certain behavior associated * with it. A Raft actor can behave as, *
* A RaftActor MUST be a Leader in order to accept requests from clients to * change the state of it's encapsulated state machine. Once a RaftActor becomes * a Leader it is also responsible for ensuring that all followers ultimately * have the same log and therefore the same state machine as itself. * *
* The current behavior of a RaftActor determines how election for leadership * is initiated and how peer RaftActors react to request for votes. * *
* Each RaftActor also needs to know the current election term. It uses this * information for a couple of things. One is to simply figure out who it * voted for in the last election. Another is to figure out if the message * it received to update it's state is stale. * *
* The RaftActor uses akka-persistence to store it's replicated log. * Furthermore through it's behaviors a Raft Actor determines *
* This is to account for situations where a we know that a peer * exists but we do not know an address up-front. This may also be used in * situations where a known peer starts off in a different location and we * need to change it's address * *
* Note that if the peerId does not match the list of peers passed to
* this actor during construction an IllegalStateException will be thrown.
*/
protected void setPeerAddress(final String peerId, final String peerAddress) {
context.setPeerAddress(peerId, peerAddress);
}
/**
* The applyState method will be called by the RaftActor when some data
* needs to be applied to the actor's state.
*
* @param clientActor A reference to the client who sent this message. This
* is the same reference that was passed to persistData
* by the derived actor. clientActor may be null when
* the RaftActor is behaving as a follower or during
* recovery.
* @param identifier The identifier of the persisted data. This is also
* the same identifier that was passed to persistData by
* the derived actor. identifier may be null when
* the RaftActor is behaving as a follower or during
* recovery
* @param data A piece of data that was persisted by the persistData call.
* This should NEVER be null.
*/
protected abstract void applyState(ActorRef clientActor, Identifier identifier, Object data);
/**
* Returns the RaftActorRecoveryCohort to participate in persistence recovery.
*/
protected abstract @NonNull RaftActorRecoveryCohort getRaftActorRecoveryCohort();
/**
* This method is called when recovery is complete.
*/
protected abstract void onRecoveryComplete();
/**
* Returns the RaftActorSnapshotCohort to participate in snapshot captures.
*/
protected abstract @NonNull RaftActorSnapshotCohort getRaftActorSnapshotCohort();
/**
* This method will be called by the RaftActor when the state of the
* RaftActor changes. The derived actor can then use methods like
* isLeader or getLeader to do something useful
*/
protected abstract void onStateChanged();
/**
* Notifier Actor for this RaftActor to notify when a role change happens.
*
* @return ActorRef - ActorRef of the notifier or Optional.absent if none.
*/
protected abstract Optional
* The default implementation immediately runs the operation.
*
* @param operation the operation to run
*/
protected void pauseLeader(final Runnable operation) {
operation.run();
}
/**
* This method is invoked when the actions hooked to the leader becoming paused failed to execute and the leader
* should resume normal operations.
*
*
* Note this method can be invoked even before the operation supplied to {@link #pauseLeader(Runnable)} is invoked.
*/
protected void unpauseLeader() {
}
protected void onLeaderChanged(final String oldLeader, final String newLeader) {
}
private String getLeaderAddress() {
if (isLeader()) {
return getSelf().path().toString();
}
String leaderId = getLeaderId();
if (leaderId == null) {
return null;
}
String peerAddress = context.getPeerAddress(leaderId);
LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}", persistenceId(), leaderId, peerAddress);
return peerAddress;
}
protected boolean hasFollowers() {
return getRaftActorContext().hasFollowers();
}
private void captureSnapshot() {
SnapshotManager snapshotManager = context.getSnapshotManager();
if (!snapshotManager.isCapturing()) {
final long idx = getCurrentBehavior().getReplicatedToAllIndex();
LOG.debug("Take a snapshot of current state. lastReplicatedLog is {} and replicatedToAllIndex is {}",
replicatedLog().last(), idx);
snapshotManager.captureWithForcedTrim(replicatedLog().last(), idx);
}
}
/**
* Switch this member to non-voting status. This is a no-op for all behaviors except when we are the leader,
* in which case we need to step down.
*/
void becomeNonVoting() {
if (isLeader()) {
initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
@Override
public void onSuccess(final ActorRef raftActorRef) {
LOG.debug("{}: leader transfer succeeded after change to non-voting", persistenceId());
ensureFollowerState();
}
@Override
public void onFailure(final ActorRef raftActorRef) {
LOG.debug("{}: leader transfer failed after change to non-voting", persistenceId());
ensureFollowerState();
}
private void ensureFollowerState() {
// Whether or not leadership transfer succeeded, we have to step down as leader and
// switch to Follower so ensure that.
if (getRaftState() != RaftState.Follower) {
initializeBehavior();
}
}
}, null, RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT);
}
}
/**
* A point-in-time capture of {@link RaftActorBehavior} state critical for transitioning between behaviors.
*/
private abstract static class BehaviorState implements Immutable {
@Nullable abstract RaftActorBehavior getBehavior();
@Nullable abstract String getLastValidLeaderId();
@Nullable abstract String getLastLeaderId();
abstract short getLeaderPayloadVersion();
}
/**
* A {@link BehaviorState} corresponding to non-null {@link RaftActorBehavior} state.
*/
private static final class SimpleBehaviorState extends BehaviorState {
private final RaftActorBehavior behavior;
private final String lastValidLeaderId;
private final String lastLeaderId;
private final short leaderPayloadVersion;
SimpleBehaviorState(final String lastValidLeaderId, final String lastLeaderId,
final RaftActorBehavior behavior) {
this.lastValidLeaderId = lastValidLeaderId;
this.lastLeaderId = lastLeaderId;
this.behavior = requireNonNull(behavior);
leaderPayloadVersion = behavior.getLeaderPayloadVersion();
}
@Override
RaftActorBehavior getBehavior() {
return behavior;
}
@Override
String getLastValidLeaderId() {
return lastValidLeaderId;
}
@Override
short getLeaderPayloadVersion() {
return leaderPayloadVersion;
}
@Override
String getLastLeaderId() {
return lastLeaderId;
}
}
/**
* Class tracking behavior-related information, which we need to keep around and pass across behavior switches.
* An instance is created for each RaftActor. It has two functions:
* - it keeps track of the last leader ID we have encountered since we have been created
* - it creates state capture needed to transition from one behavior to the next
*/
private static final class BehaviorStateTracker {
/**
* A {@link BehaviorState} corresponding to null {@link RaftActorBehavior} state. Since null behavior is only
* allowed before we receive the first message, we know the leader ID to be null.
*/
private static final BehaviorState NULL_BEHAVIOR_STATE = new BehaviorState() {
@Override
RaftActorBehavior getBehavior() {
return null;
}
@Override
String getLastValidLeaderId() {
return null;
}
@Override
short getLeaderPayloadVersion() {
return -1;
}
@Override
String getLastLeaderId() {
return null;
}
};
private String lastValidLeaderId;
private String lastLeaderId;
BehaviorState capture(final RaftActorBehavior behavior) {
if (behavior == null) {
verify(lastValidLeaderId == null, "Null behavior with non-null last leader");
return NULL_BEHAVIOR_STATE;
}
lastLeaderId = behavior.getLeaderId();
if (lastLeaderId != null) {
lastValidLeaderId = lastLeaderId;
}
return new SimpleBehaviorState(lastValidLeaderId, lastLeaderId, behavior);
}
}
}