import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.SerializationUtils;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
*/
public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
+ protected static final ElectionTimeout ELECTION_TIMEOUT = new ElectionTimeout();
+
/**
* Information about the RaftActor whose behavior this class represents
*/
private long replicatedToAllIndex = -1;
- protected AbstractRaftActorBehavior(RaftActorContext context) {
+ private final String logName;
+
+ private final RaftState state;
+
+ protected AbstractRaftActorBehavior(RaftActorContext context, RaftState state) {
this.context = context;
+ this.state = state;
this.LOG = context.getLogger();
+
+ logName = String.format("%s (%s)", context.getId(), state);
+ }
+
+ @Override
+ public RaftState state() {
+ return state;
+ }
+
+ public String logName() {
+ return logName;
}
@Override
if (appendEntries.getTerm() < currentTerm()) {
if(LOG.isDebugEnabled()) {
LOG.debug("{}: Cannot append entries because sender term {} is less than {}",
- context.getId(), appendEntries.getTerm(), currentTerm());
+ logName(), appendEntries.getTerm(), currentTerm());
}
sender.tell(
new AppendEntriesReply(context.getId(), currentTerm(), false,
- lastIndex(), lastTerm()), actor()
+ lastIndex(), lastTerm(), context.getPayloadVersion()), actor()
);
return this;
}
* @param requestVote
* @return
*/
- protected RaftActorBehavior requestVote(ActorRef sender,
- RequestVote requestVote) {
+ protected RaftActorBehavior requestVote(ActorRef sender, RequestVote requestVote) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Received {}", context.getId(), requestVote);
- }
+ LOG.debug("{}: In requestVote: {}", logName(), requestVote);
boolean grantVote = false;
}
}
- sender.tell(new RequestVoteReply(currentTerm(), grantVote), actor());
+ RequestVoteReply reply = new RequestVoteReply(currentTerm(), grantVote);
+
+ LOG.debug("{}: requestVote returning: {}", logName(), reply);
+
+ sender.tell(reply, actor());
return this;
}
// message is sent to itself
electionCancel =
context.getActorSystem().scheduler().scheduleOnce(interval,
- context.getActor(), new ElectionTimeout(),
+ context.getActor(), ELECTION_TIMEOUT,
context.getActorSystem().dispatcher(), context.getActor());
}
// around as the rest wont be present either
LOG.warn(
"{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}",
- context.getId(), i, i, index);
+ logName(), i, i, index);
break;
}
}
if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Setting last applied to {}", context.getId(), newLastApplied);
+ LOG.debug("{}: Setting last applied to {}", logName(), newLastApplied);
}
context.setLastApplied(newLastApplied);
// will be used during recovery
//in case if the above code throws an error and this message is not sent, it would be fine
// as the append entries received later would initiate add this message to the journal
- actor().tell(new ApplyLogEntries((int) context.getLastApplied()), actor());
+ actor().tell(new ApplyJournalEntries(context.getLastApplied()), actor());
}
protected Object fromSerializableMessage(Object serializable){
}
protected RaftActorBehavior switchBehavior(RaftActorBehavior behavior) {
- LOG.info("{} :- Switching from behavior {} to {}", context.getId(), this.state(), behavior.state());
+ LOG.info("{} :- Switching from behavior {} to {}", logName(), this.state(), behavior.state());
try {
close();
} catch (Exception e) {
- LOG.error("{}: Failed to close behavior : {}", context.getId(), this.state(), e);
+ LOG.error("{}: Failed to close behavior : {}", logName(), this.state(), e);
}
return behavior;
* @param snapshotCapturedIndex
*/
protected void performSnapshotWithoutCapture(final long snapshotCapturedIndex) {
- // we would want to keep the lastApplied as its used while capturing snapshots
- long lastApplied = context.getLastApplied();
- long tempMin = Math.min(snapshotCapturedIndex, (lastApplied > -1 ? lastApplied - 1 : -1));
-
- if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) {
- //use the term of the temp-min, since we check for isPresent, entry will not be null
- ReplicatedLogEntry entry = context.getReplicatedLog().get(tempMin);
- context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm());
- context.getReplicatedLog().snapshotCommit();
- setReplicatedToAllIndex(tempMin);
+ long actualIndex = context.getSnapshotManager().trimLog(snapshotCapturedIndex, this);
+
+ if(actualIndex != -1){
+ setReplicatedToAllIndex(actualIndex);
}
}
+ protected String getId(){
+ return context.getId();
+ }
+
}