protected AbstractLeader(RaftActorContext context, RaftState state) {
super(context, state);
- setLeaderPayloadVersion(context.getPayloadVersion());
-
for(PeerInfo peerInfo: context.getPeers()) {
FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context);
followerToLog.put(peerInfo.getId(), followerLogInformation);
}
- leaderId = context.getId();
-
LOG.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds());
updateMinReplicaCount();
}
@Override
- public String getLeaderId() {
+ public final String getLeaderId() {
return context.getId();
}
+ @Override
+ public final short getLeaderPayloadVersion() {
+ return context.getPayloadVersion();
+ }
+
protected boolean isLeaderIsolated() {
int minPresent = getMinIsolatedLeaderPeerCount();
for (FollowerLogInformation followerLogInformation : followerToLog.values()) {
*/
private Cancellable electionCancel = null;
- /**
- *
- */
- protected String leaderId = null;
-
- private short leaderPayloadVersion = -1;
-
private long replicatedToAllIndex = -1;
private final String logName;
return state;
}
- public String logName() {
+ protected final String logName() {
return logName;
}
return this;
}
- @Override
- public String getLeaderId() {
- return leaderId;
- }
-
- @Override
- public short getLeaderPayloadVersion() {
- return leaderPayloadVersion;
- }
-
- public void setLeaderPayloadVersion(short leaderPayloadVersion) {
- this.leaderPayloadVersion = leaderPayloadVersion;
- }
-
@Override
public RaftActorBehavior switchBehavior(RaftActorBehavior behavior) {
return internalSwitchBehavior(behavior);
} else {
scheduleElection(electionDuration());
}
+ }
+ @Override
+ public final String getLeaderId() {
+ return null;
+ }
+ @Override
+ public final short getLeaderPayloadVersion() {
+ return -1;
}
- @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
+ @Override
+ protected RaftActorBehavior handleAppendEntries(ActorRef sender,
AppendEntries appendEntries) {
if(LOG.isDebugEnabled()) {
return this;
}
- @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
- AppendEntriesReply appendEntriesReply) {
-
+ @Override
+ protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) {
return this;
}
- @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
- RequestVoteReply requestVoteReply) {
-
- LOG.debug("{}: handleRequestVoteReply: {}, current voteCount: {}", logName(), requestVoteReply,
- voteCount);
+ @Override
+ protected RaftActorBehavior handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply) {
+ LOG.debug("{}: handleRequestVoteReply: {}, current voteCount: {}", logName(), requestVoteReply, voteCount);
if (requestVoteReply.isVoteGranted()) {
voteCount++;
import akka.actor.ActorRef;
import akka.japi.Procedure;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import java.util.ArrayList;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
};
private SnapshotTracker snapshotTracker = null;
+ private String leaderId;
+ private short leaderPayloadVersion;
public Follower(RaftActorContext context) {
this(context, null, (short)-1);
public Follower(RaftActorContext context, String initialLeaderId, short initialLeaderPayloadVersion) {
super(context, RaftState.Follower);
- leaderId = initialLeaderId;
- setLeaderPayloadVersion(initialLeaderPayloadVersion);
+ this.leaderId = initialLeaderId;
+ this.leaderPayloadVersion = initialLeaderPayloadVersion;
initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), SYNC_THRESHOLD);
scheduleElection(electionDuration());
}
}
+ }
+
+ @Override
+ public final String getLeaderId() {
+ return leaderId;
+ }
+
+ @VisibleForTesting
+ protected final void setLeaderId(final String leaderId) {
+ this.leaderId = Preconditions.checkNotNull(leaderId);
+ }
+
+ @Override
+ public short getLeaderPayloadVersion() {
+ return leaderPayloadVersion;
+ }
+ @VisibleForTesting
+ protected final void setLeaderPayloadVersion(short leaderPayloadVersion) {
+ this.leaderPayloadVersion = leaderPayloadVersion;
}
private boolean isLogEntryPresent(long index){
initialSyncStatusTracker.update(leaderId, currentLeaderCommit, context.getCommitIndex());
}
- @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
- AppendEntries appendEntries) {
+ @Override
+ protected RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries) {
int numLogEntries = appendEntries.getEntries() != null ? appendEntries.getEntries().size() : 0;
if(LOG.isTraceEnabled()) {
// If we got here then we do appear to be talking to the leader
leaderId = appendEntries.getLeaderId();
-
- setLeaderPayloadVersion(appendEntries.getPayloadVersion());
+ leaderPayloadVersion = appendEntries.getPayloadVersion();
updateInitialSyncStatus(appendEntries.getLeaderCommit(), appendEntries.getLeaderId());
// First check if the logs are in sync or not
return outOfSync;
}
- @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
+ @Override
+ protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
AppendEntriesReply appendEntriesReply) {
return this;
}
- @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
+ @Override
+ protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
RequestVoteReply requestVoteReply) {
return this;
}
- @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
+ @Override
+ public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
Object message = fromSerializableMessage(originalMessage);