*/
public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
+ protected static final ElectionTimeout ELECTION_TIMEOUT = new ElectionTimeout();
+
/**
* Information about the RaftActor whose behavior this class represents
*/
*/
protected String leaderId = null;
+ private short leaderPayloadVersion = -1;
+
private long replicatedToAllIndex = -1;
private final String logName;
sender.tell(
new AppendEntriesReply(context.getId(), currentTerm(), false,
- lastIndex(), lastTerm()), actor()
+ lastIndex(), lastTerm(), context.getPayloadVersion()), actor()
);
return this;
}
// message is sent to itself
electionCancel =
context.getActorSystem().scheduler().scheduleOnce(interval,
- context.getActor(), new ElectionTimeout(),
+ context.getActor(), ELECTION_TIMEOUT,
context.getActorSystem().dispatcher(), context.getActor());
}
return leaderId;
}
+ @Override
+ public short getLeaderPayloadVersion() {
+ return leaderPayloadVersion;
+ }
+
+ public void setLeaderPayloadVersion(short leaderPayloadVersion) {
+ this.leaderPayloadVersion = leaderPayloadVersion;
+ }
+
protected RaftActorBehavior switchBehavior(RaftActorBehavior behavior) {
LOG.info("{} :- Switching from behavior {} to {}", logName(), this.state(), behavior.state());
try {
* @param snapshotCapturedIndex
*/
protected void performSnapshotWithoutCapture(final long snapshotCapturedIndex) {
- // we would want to keep the lastApplied as its used while capturing snapshots
- long lastApplied = context.getLastApplied();
- long tempMin = Math.min(snapshotCapturedIndex, (lastApplied > -1 ? lastApplied - 1 : -1));
-
- if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) {
- LOG.debug("{}: fakeSnapshot purging log to {} for term {}", logName(), tempMin,
- context.getTermInformation().getCurrentTerm());
-
- //use the term of the temp-min, since we check for isPresent, entry will not be null
- ReplicatedLogEntry entry = context.getReplicatedLog().get(tempMin);
- context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm());
- context.getReplicatedLog().snapshotCommit();
- setReplicatedToAllIndex(tempMin);
+ long actualIndex = context.getSnapshotManager().trimLog(snapshotCapturedIndex, this);
+
+ if(actualIndex != -1){
+ setReplicatedToAllIndex(actualIndex);
}
}
+ protected String getId(){
+ return context.getId();
+ }
+
}