import akka.persistence.UntypedPersistentActor;
import com.google.common.base.Optional;
import com.google.protobuf.ByteString;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
* This context should NOT be passed directly to any other actor it is
* only to be consumed by the RaftActorBehaviors
*/
- private RaftActorContext context;
+ protected RaftActorContext context;
/**
* The in-memory journal
@Override public void onReceiveRecover(Object message) {
if (message instanceof SnapshotOffer) {
- LOG.debug("SnapshotOffer called..");
+ LOG.info("SnapshotOffer called..");
SnapshotOffer offer = (SnapshotOffer) message;
Snapshot snapshot = (Snapshot) offer.snapshot();
context.setReplicatedLog(replicatedLog);
context.setLastApplied(snapshot.getLastAppliedIndex());
+ context.setCommitIndex(snapshot.getLastAppliedIndex());
- LOG.debug("Applied snapshot to replicatedLog. " +
- "snapshotIndex={}, snapshotTerm={}, journal-size={}",
+ LOG.info("Applied snapshot to replicatedLog. " +
+ "snapshotIndex={}, snapshotTerm={}, journal-size={}",
replicatedLog.snapshotIndex, replicatedLog.snapshotTerm,
- replicatedLog.size());
+ replicatedLog.size()
+ );
// Apply the snapshot to the actors state
applySnapshot(ByteString.copyFrom(snapshot.getState()));
} else if (message instanceof ReplicatedLogEntry) {
ReplicatedLogEntry logEntry = (ReplicatedLogEntry) message;
-
- // Apply State immediately
+ LOG.info("Received ReplicatedLogEntry for recovery:{}", logEntry.getIndex());
replicatedLog.append(logEntry);
- applyState(null, "recovery", logEntry.getData());
- context.setLastApplied(logEntry.getIndex());
- context.setCommitIndex(logEntry.getIndex());
+
+ } else if (message instanceof ApplyLogEntries) {
+ ApplyLogEntries ale = (ApplyLogEntries) message;
+
+ LOG.info("Received ApplyLogEntries for recovery, applying to state:{} to {}",
+ context.getLastApplied() + 1, ale.getToIndex());
+
+ for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
+ applyState(null, "recovery", replicatedLog.get(i).getData());
+ }
+ context.setLastApplied(ale.getToIndex());
+ context.setCommitIndex(ale.getToIndex());
+
} else if (message instanceof DeleteEntries) {
replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
+
} else if (message instanceof UpdateElectionTerm) {
- context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), ((UpdateElectionTerm) message).getVotedFor());
+ context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
+ ((UpdateElectionTerm) message).getVotedFor());
+
} else if (message instanceof RecoveryCompleted) {
- LOG.debug(
+ LOG.info(
"RecoveryCompleted - Switching actor to Follower - " +
"Persistence Id = " + persistenceId() +
" Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
if (message instanceof ApplyState){
ApplyState applyState = (ApplyState) message;
- LOG.debug("Applying state for log index {} data {}",
- applyState.getReplicatedLogEntry().getIndex(),
- applyState.getReplicatedLogEntry().getData());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Applying state for log index {} data {}",
+ applyState.getReplicatedLogEntry().getIndex(),
+ applyState.getReplicatedLogEntry().getData());
+ }
applyState(applyState.getClientActor(), applyState.getIdentifier(),
applyState.getReplicatedLogEntry().getData());
+ } else if (message instanceof ApplyLogEntries){
+ ApplyLogEntries ale = (ApplyLogEntries) message;
+ LOG.info("Persisting ApplyLogEntries with index={}", ale.getToIndex());
+ persist(new ApplyLogEntries(ale.getToIndex()), new Procedure<ApplyLogEntries>() {
+ @Override
+ public void apply(ApplyLogEntries param) throws Exception {
+ }
+ });
+
} else if(message instanceof ApplySnapshot ) {
Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
- LOG.debug("ApplySnapshot called on Follower Actor " +
- "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(),
- snapshot.getLastAppliedTerm());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("ApplySnapshot called on Follower Actor " +
+ "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(),
+ snapshot.getLastAppliedTerm()
+ );
+ }
applySnapshot(ByteString.copyFrom(snapshot.getState()));
//clears the followers log, sets the snapshot index to ensure adjusted-index works
context.removePeer(rrp.getName());
} else if (message instanceof CaptureSnapshot) {
- LOG.debug("CaptureSnapshot received by actor");
+ LOG.info("CaptureSnapshot received by actor");
CaptureSnapshot cs = (CaptureSnapshot)message;
captureSnapshot = cs;
createSnapshot();
} else if (message instanceof CaptureSnapshotReply){
- LOG.debug("CaptureSnapshotReply received by actor");
+ LOG.info("CaptureSnapshotReply received by actor");
CaptureSnapshotReply csr = (CaptureSnapshotReply) message;
ByteString stateInBytes = csr.getSnapshot();
- LOG.debug("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size());
+ LOG.info("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size());
handleCaptureSnapshotReply(stateInBytes);
} else {
if (!(message instanceof AppendEntriesMessages.AppendEntries)
&& !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) {
- LOG.debug("onReceiveCommand: message:" + message.getClass());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("onReceiveCommand: message:" + message.getClass());
+ }
}
RaftState state =
context.getReplicatedLog().lastIndex() + 1,
context.getTermInformation().getCurrentTerm(), data);
- LOG.debug("Persist data {}", replicatedLogEntry);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Persist data {}", replicatedLogEntry);
+ }
replicatedLog
.appendAndPersist(clientActor, identifier, replicatedLogEntry);
return null;
}
String peerAddress = context.getPeerAddress(leaderId);
- LOG.debug("getLeaderAddress leaderId = " + leaderId + " peerAddress = "
- + peerAddress);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("getLeaderAddress leaderId = " + leaderId + " peerAddress = "
+ + peerAddress);
+ }
return peerAddress;
}
lastAppliedTerm = lastAppliedEntry.getTerm();
}
- LOG.debug("Snapshot Capture logSize: {}", journal.size());
- LOG.debug("Snapshot Capture lastApplied:{} ", context.getLastApplied());
- LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex);
- LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Snapshot Capture logSize: {}", journal.size());
+ LOG.debug("Snapshot Capture lastApplied:{} ",
+ context.getLastApplied());
+ LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex);
+ LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm);
+ }
// send a CaptureSnapshot to self to make the expensive operation async.
getSelf().tell(new CaptureSnapshot(
}
@Override public void update(long currentTerm, String votedFor) {
- LOG.debug("Set currentTerm={}, votedFor={}", currentTerm, votedFor);
-
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Set currentTerm={}, votedFor={}", currentTerm, votedFor);
+ }
this.currentTerm = currentTerm;
this.votedFor = votedFor;
}