import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.VotingState;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
+import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import scala.concurrent.duration.FiniteDuration;
/**
private final Queue<ClientRequestTracker> trackers = new LinkedList<>();
private Cancellable heartbeatSchedule = null;
- private Optional<SnapshotHolder> snapshot;
+ private Optional<SnapshotHolder> snapshot = Optional.absent();;
private int minReplicationCount;
- protected AbstractLeader(RaftActorContext context, RaftState state) {
+ protected AbstractLeader(RaftActorContext context, RaftState state,
+ @Nullable AbstractLeader initializeFromLeader) {
super(context, state);
- for(PeerInfo peerInfo: context.getPeers()) {
- FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context);
- followerToLog.put(peerInfo.getId(), followerLogInformation);
+ if(initializeFromLeader != null) {
+ followerToLog.putAll(initializeFromLeader.followerToLog);
+ mapFollowerToSnapshot.putAll(initializeFromLeader.mapFollowerToSnapshot);
+ snapshot = initializeFromLeader.snapshot;
+ trackers.addAll(initializeFromLeader.trackers);
+ } else {
+ for(PeerInfo peerInfo: context.getPeers()) {
+ FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context);
+ followerToLog.put(peerInfo.getId(), followerLogInformation);
+ }
}
LOG.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds());
updateMinReplicaCount();
- snapshot = Optional.absent();
-
// Immediately schedule a heartbeat
// Upon election: send initial empty AppendEntries RPCs
// (heartbeat) to each server; repeat during idle periods to
scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
}
+ protected AbstractLeader(RaftActorContext context, RaftState state) {
+ this(context, state, null);
+ }
+
/**
* Return an immutable collection of follower identifiers.
*
LOG.trace("{}: checking Nth index {}", logName(), N);
for (FollowerLogInformation info : followerToLog.values()) {
final PeerInfo peerInfo = context.getPeerInfo(info.getId());
- if(info.getMatchIndex() >= N && (peerInfo != null && peerInfo.isVoting())) {
+ if(info.getMatchIndex() >= N && peerInfo != null && peerInfo.isVoting()) {
replicatedCount++;
} else if(LOG.isTraceEnabled()) {
LOG.trace("{}: Not counting follower {} - matchIndex: {}, {}", logName(), info.getId(),
protected void beforeSendHeartbeat(){}
@Override
- public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
+ public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
Preconditions.checkNotNull(sender, "sender should not be null");
- Object message = fromSerializableMessage(originalMessage);
-
if (message instanceof RaftRPC) {
RaftRPC rpc = (RaftRPC) message;
// If RPC request or response contains term T > currentTerm:
private void replicate(Replicate replicate) {
long logIndex = replicate.getReplicatedLogEntry().getIndex();
- LOG.debug("{}: Replicate message: identifier: {}, logIndex: {}", logName(),
- replicate.getIdentifier(), logIndex);
+ LOG.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}", logName(),
+ replicate.getIdentifier(), logIndex, replicate.getReplicatedLogEntry().getData().getClass());
// Create a tracker entry we will use this later to notify the
// client actor
- trackers.add(
- new ClientRequestTrackerImpl(replicate.getClientActor(),
- replicate.getIdentifier(),
- logIndex)
- );
+ if(replicate.getClientActor() != null) {
+ trackers.add(new ClientRequestTrackerImpl(replicate.getClientActor(), replicate.getIdentifier(),
+ logIndex));
+ }
boolean applyModificationToState = !context.anyVotingPeers()
|| context.getRaftPolicy().applyModificationToStateBeforeConsensus();
// If the follower's nextIndex is -1 then we might as well send it a snapshot
// Otherwise send it a snapshot only if the nextIndex is not present in the log but is present
// in the snapshot
- return (nextIndex == -1 ||
+ return nextIndex == -1 ||
(!context.getReplicatedLog().isPresent(nextIndex)
- && context.getReplicatedLog().isInSnapshot(nextIndex)));
+ && context.getReplicatedLog().isInSnapshot(nextIndex));
}
}
}
}
- return (minPresent != 0);
+ return minPresent != 0;
}
/**
public FollowerToSnapshot(ByteString snapshotBytes) {
this.snapshotBytes = snapshotBytes;
int size = snapshotBytes.size();
- totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
- ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
+ totalChunks = (size / context.getConfigParams().getSnapshotChunkSize()) +
+ (size % context.getConfigParams().getSnapshotChunkSize() > 0 ? 1 : 0);
if(LOG.isDebugEnabled()) {
LOG.debug("{}: Snapshot {} bytes, total chunks to send:{}",
logName(), size, totalChunks);