import com.google.protobuf.ByteString;
import java.io.Serializable;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
* </ul>
*/
public abstract class RaftActor extends AbstractUntypedPersistentActor {
+
+ private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis
+
protected final Logger LOG = LoggerFactory.getLogger(getClass());
/**
} else if (message instanceof ReplicatedLogEntry) {
onRecoveredJournalLogEntry((ReplicatedLogEntry) message);
} else if (message instanceof ApplyLogEntries) {
- onRecoveredApplyLogEntries((ApplyLogEntries) message);
+ // Handle this message for backwards compatibility with pre-Lithium versions.
+ onRecoveredApplyLogEntries(((ApplyLogEntries) message).getToIndex());
+ } else if (message instanceof ApplyJournalEntries) {
+ onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
} else if (message instanceof DeleteEntries) {
replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
} else if (message instanceof UpdateElectionTerm) {
replicatedLog.append(logEntry);
}
- private void onRecoveredApplyLogEntries(ApplyLogEntries ale) {
+ private void onRecoveredApplyLogEntries(long toIndex) {
if(LOG.isDebugEnabled()) {
LOG.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
- persistenceId(), context.getLastApplied() + 1, ale.getToIndex());
+ persistenceId(), context.getLastApplied() + 1, toIndex);
}
- for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
+ for (long i = context.getLastApplied() + 1; i <= toIndex; i++) {
batchRecoveredLogEntry(replicatedLog.get(i));
}
- context.setLastApplied(ale.getToIndex());
- context.setCommitIndex(ale.getToIndex());
+ context.setLastApplied(toIndex);
+ context.setCommitIndex(toIndex);
}
private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
if (message instanceof ApplyState){
ApplyState applyState = (ApplyState) message;
+ long elapsedTime = (System.nanoTime() - applyState.getStartTime());
+ if(elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS){
+ LOG.warn("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
+ TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState);
+ }
+
if(LOG.isDebugEnabled()) {
LOG.debug("{}: Applying state for log index {} data {}",
persistenceId(), applyState.getReplicatedLogEntry().getIndex(),
applyState(applyState.getClientActor(), applyState.getIdentifier(),
applyState.getReplicatedLogEntry().getData());
- } else if (message instanceof ApplyLogEntries){
- ApplyLogEntries ale = (ApplyLogEntries) message;
+ } else if (message instanceof ApplyJournalEntries){
+ ApplyJournalEntries applyEntries = (ApplyJournalEntries) message;
if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), ale.getToIndex());
+ LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), applyEntries.getToIndex());
}
- persistence().persist(new ApplyLogEntries(ale.getToIndex()), new Procedure<ApplyLogEntries>() {
+ persistence().persist(applyEntries, new Procedure<ApplyJournalEntries>() {
@Override
- public void apply(ApplyLogEntries param) throws Exception {
+ public void apply(ApplyJournalEntries param) throws Exception {
}
});
// Apply the state immediately
applyState(clientActor, identifier, data);
- // Send a ApplyLogEntries message so that we write the fact that we applied
+ // Send a ApplyJournalEntries message so that we write the fact that we applied
// the state to durable storage
- self().tell(new ApplyLogEntries((int) replicatedLogEntry.getIndex()), self());
+ self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self());
// Check if the "real" snapshot capture has been initiated. If no then do the fake snapshot
if(!context.isSnapshotCaptureInitiated()){
context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(),
captureSnapshot.getLastAppliedTerm());
- } else {
+ getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
+ } else if(captureSnapshot.getReplicatedToAllIndex() != -1){
// clear the log based on replicatedToAllIndex
context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
captureSnapshot.getReplicatedToAllTerm());
+
+ getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
+ } else {
+ // The replicatedToAllIndex was not found in the log
+ // This means that replicatedToAllIndex never moved beyond -1 or that it is already in the snapshot.
+ // In this scenario we may need to save the snapshot to the akka persistence
+ // snapshot for recovery but we do not need to do the replicated log trimming.
+ context.getReplicatedLog().snapshotPreCommit(replicatedLog.getSnapshotIndex(),
+ replicatedLog.getSnapshotTerm());
}
- getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
+
LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
"and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(),
dataSizeSinceLastSnapshot = 0;
- LOG.info("{}: Initiating Snapshot Capture..", persistenceId());
+ LOG.info("{}: Initiating Snapshot Capture, journalSize = {}, dataSizeForCheck = {}," +
+ " dataThreshold = {}", persistenceId(), journalSize, dataSizeForCheck, dataThreshold);
+
long lastAppliedIndex = -1;
long lastAppliedTerm = -1;