X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=eaa005e3efdf6911a7beadce66e427d68a89161e;hb=d8d8f731bbe6c58fcbd0e616734e2e230aaf4ab4;hp=ec3f375bdeb0fb940e4d08d14397ebd03f957853;hpb=5cd6ac6819a1dcbab05c6ffbc3ac427ea41c658d;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index ec3f375bde..eaa005e3ef 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -87,6 +87,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis + private static final Procedure APPLY_JOURNAL_ENTRIES_PERSIST_CALLBACK = + new Procedure() { + @Override + public void apply(ApplyJournalEntries param) throws Exception { + } + }; + protected final Logger LOG = LoggerFactory.getLogger(getClass()); /** @@ -319,11 +326,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if(LOG.isDebugEnabled()) { LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), applyEntries.getToIndex()); } - persistence().persist(applyEntries, new Procedure() { - @Override - public void apply(ApplyJournalEntries param) throws Exception { - } - }); + + persistence().persist(applyEntries, APPLY_JOURNAL_ENTRIES_PERSIST_CALLBACK); } else if(message instanceof ApplySnapshot ) { Snapshot snapshot = ((ApplySnapshot) message).getSnapshot(); @@ -371,7 +375,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { context.getReplicatedLog().size()); } else if (message instanceof CaptureSnapshot) { - LOG.info("{}: CaptureSnapshot received by actor", persistenceId()); + LOG.debug("{}: CaptureSnapshot received by actor: {}", persistenceId(), message); if(captureSnapshot == null) { captureSnapshot = (CaptureSnapshot)message; @@ -675,7 +679,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } private void handleCaptureSnapshotReply(byte[] snapshotBytes) { - LOG.info("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length); + LOG.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length); // create a snapshot object from the state provided and save it // when snapshot is saved async, SaveSnapshotSuccess is raised. @@ -692,13 +696,24 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { long dataThreshold = Runtime.getRuntime().totalMemory() * getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100; if (context.getReplicatedLog().dataSize() > dataThreshold) { + + if(LOG.isDebugEnabled()) { + LOG.debug("{}: dataSize {} exceeds dataThreshold {} - doing snapshotPreCommit with index {}", + persistenceId(), context.getReplicatedLog().dataSize(), dataThreshold, + captureSnapshot.getLastAppliedIndex()); + } + // if memory is less, clear the log based on lastApplied. // this could/should only happen if one of the followers is down // as normally we keep removing from the log when its replicated to all. context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm()); - getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex()); + // Don't reset replicatedToAllIndex to -1 as this may prevent us from trimming the log after an + // install snapshot to a follower. + if(captureSnapshot.getReplicatedToAllIndex() >= 0) { + getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex()); + } } else if(captureSnapshot.getReplicatedToAllIndex() != -1){ // clear the log based on replicatedToAllIndex context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(), @@ -715,9 +730,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } - LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " + - "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(), - captureSnapshot.getLastAppliedTerm()); + LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex: {} " + + "and term: {}", persistenceId(), replicatedLog.getSnapshotIndex(), + replicatedLog.getSnapshotTerm()); if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) { // this would be call straight to the leader and won't initiate in serialization