import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
} else if (message instanceof ReplicatedLogEntry) {
onRecoveredJournalLogEntry((ReplicatedLogEntry) message);
} else if (message instanceof ApplyLogEntries) {
- onRecoveredApplyLogEntries((ApplyLogEntries) message);
+ // Handle this message for backwards compatibility with pre-Lithium versions.
+ onRecoveredApplyLogEntries(((ApplyLogEntries) message).getToIndex());
+ } else if (message instanceof ApplyJournalEntries) {
+ onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
} else if (message instanceof DeleteEntries) {
replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
} else if (message instanceof UpdateElectionTerm) {
replicatedLog.append(logEntry);
}
- private void onRecoveredApplyLogEntries(ApplyLogEntries ale) {
+ private void onRecoveredApplyLogEntries(long toIndex) {
if(LOG.isDebugEnabled()) {
LOG.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
- persistenceId(), context.getLastApplied() + 1, ale.getToIndex());
+ persistenceId(), context.getLastApplied() + 1, toIndex);
}
- for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
+ for (long i = context.getLastApplied() + 1; i <= toIndex; i++) {
batchRecoveredLogEntry(replicatedLog.get(i));
}
- context.setLastApplied(ale.getToIndex());
- context.setCommitIndex(ale.getToIndex());
+ context.setLastApplied(toIndex);
+ context.setCommitIndex(toIndex);
}
private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
applyState(applyState.getClientActor(), applyState.getIdentifier(),
applyState.getReplicatedLogEntry().getData());
- } else if (message instanceof ApplyLogEntries){
- ApplyLogEntries ale = (ApplyLogEntries) message;
+ } else if (message instanceof ApplyJournalEntries){
+ ApplyJournalEntries applyEntries = (ApplyJournalEntries) message;
if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), ale.getToIndex());
+ LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), applyEntries.getToIndex());
}
- persistence().persist(new ApplyLogEntries(ale.getToIndex()), new Procedure<ApplyLogEntries>() {
+ persistence().persist(applyEntries, new Procedure<ApplyJournalEntries>() {
@Override
- public void apply(ApplyLogEntries param) throws Exception {
+ public void apply(ApplyJournalEntries param) throws Exception {
}
});
// Apply the state immediately
applyState(clientActor, identifier, data);
- // Send a ApplyLogEntries message so that we write the fact that we applied
+ // Send a ApplyJournalEntries message so that we write the fact that we applied
// the state to durable storage
- self().tell(new ApplyLogEntries((int) replicatedLogEntry.getIndex()), self());
+ self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self());
// Check if the "real" snapshot capture has been initiated. If no then do the fake snapshot
if(!context.isSnapshotCaptureInitiated()){