import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.SerializationUtils;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
*/
protected String leaderId = null;
+ private long replicatedToAllIndex = -1;
- protected AbstractRaftActorBehavior(RaftActorContext context) {
+ private final String logName;
+
+ private final RaftState state;
+
+ protected AbstractRaftActorBehavior(RaftActorContext context, RaftState state) {
this.context = context;
+ this.state = state;
this.LOG = context.getLogger();
+
+ logName = String.format("%s (%s)", context.getId(), state);
+ }
+
+ @Override
+ public RaftState state() {
+ return state;
+ }
+
+ public String logName() {
+ return logName;
+ }
+
+ @Override
+ public void setReplicatedToAllIndex(long replicatedToAllIndex) {
+ this.replicatedToAllIndex = replicatedToAllIndex;
+ }
+
+ @Override
+ public long getReplicatedToAllIndex() {
+ return replicatedToAllIndex;
}
/**
if (appendEntries.getTerm() < currentTerm()) {
if(LOG.isDebugEnabled()) {
LOG.debug("{}: Cannot append entries because sender term {} is less than {}",
- context.getId(), appendEntries.getTerm(), currentTerm());
+ logName(), appendEntries.getTerm(), currentTerm());
}
sender.tell(
* @param requestVote
* @return
*/
- protected RaftActorBehavior requestVote(ActorRef sender,
- RequestVote requestVote) {
+ protected RaftActorBehavior requestVote(ActorRef sender, RequestVote requestVote) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Received {}", context.getId(), requestVote);
- }
+ LOG.debug("{}: In requestVote: {}", logName(), requestVote);
boolean grantVote = false;
}
}
- sender.tell(new RequestVoteReply(currentTerm(), grantVote), actor());
+ RequestVoteReply reply = new RequestVoteReply(currentTerm(), grantVote);
+
+ LOG.debug("{}: requestVote returning: {}", logName(), reply);
+
+ sender.tell(reply, actor());
return this;
}
// around as the rest wont be present either
LOG.warn(
"{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}",
- context.getId(), i, i, index);
+ logName(), i, i, index);
break;
}
}
if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Setting last applied to {}", context.getId(), newLastApplied);
+ LOG.debug("{}: Setting last applied to {}", logName(), newLastApplied);
}
context.setLastApplied(newLastApplied);
}
protected RaftActorBehavior switchBehavior(RaftActorBehavior behavior) {
- LOG.info("{} :- Switching from behavior {} to {}", context.getId(), this.state(), behavior.state());
+ LOG.info("{} :- Switching from behavior {} to {}", logName(), this.state(), behavior.state());
try {
close();
} catch (Exception e) {
- LOG.error("{}: Failed to close behavior : {}", context.getId(), this.state(), e);
+ LOG.error("{}: Failed to close behavior : {}", logName(), this.state(), e);
}
return behavior;
}
- protected long fakeSnapshot(final long minReplicatedToAllIndex, final long currentReplicatedIndex) {
+ /**
+ * Performs a snapshot with no capture on the replicated log.
+ * It clears the log from the supplied index or last-applied-1 which ever is minimum.
+ *
+ * @param snapshotCapturedIndex
+ */
+ protected void performSnapshotWithoutCapture(final long snapshotCapturedIndex) {
// we would want to keep the lastApplied as its used while capturing snapshots
- long tempMin = Math.min(minReplicatedToAllIndex,
- (context.getLastApplied() > -1 ? context.getLastApplied() - 1 : -1));
+ long lastApplied = context.getLastApplied();
+ long tempMin = Math.min(snapshotCapturedIndex, (lastApplied > -1 ? lastApplied - 1 : -1));
if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) {
- context.getReplicatedLog().snapshotPreCommit(tempMin, context.getTermInformation().getCurrentTerm());
+ LOG.debug("{}: fakeSnapshot purging log to {} for term {}", logName(), tempMin,
+ context.getTermInformation().getCurrentTerm());
+
+ //use the term of the temp-min, since we check for isPresent, entry will not be null
+ ReplicatedLogEntry entry = context.getReplicatedLog().get(tempMin);
+ context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm());
context.getReplicatedLog().snapshotCommit();
- return tempMin;
+ setReplicatedToAllIndex(tempMin);
}
- return currentReplicatedIndex;
}
+
}