X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActorSnapshotMessageSupport.java;h=5173dc89f7c6b114ab857eb1970a2793b3c2f2e8;hb=95d3c7975a423951dcbdecfbfa4cb6b7a23591cc;hp=8b6871174180d8d372bcc2eb075ebef045ba6c3d;hpb=24ace09aacc620fd9768e0a7004e802f9385bcfc;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java index 8b68711741..5173dc89f7 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java @@ -8,7 +8,6 @@ package org.opendaylight.controller.cluster.raft; import akka.actor.ActorRef; -import akka.japi.Procedure; import akka.persistence.SaveSnapshotFailure; import akka.persistence.SaveSnapshotSuccess; import com.google.common.annotations.VisibleForTesting; @@ -29,25 +28,16 @@ import scala.concurrent.duration.Duration; * @author Thomas Pantelis */ class RaftActorSnapshotMessageSupport { - static final String COMMIT_SNAPSHOT = "commit_snapshot"; - - private final RaftActorContext context; - private final RaftActorSnapshotCohort cohort; - private final Logger log; - - private final Procedure createSnapshotProcedure = new Procedure() { + static final Object COMMIT_SNAPSHOT = new Object() { @Override - public void apply(Void notUsed) { - cohort.createSnapshot(context.getActor()); + public String toString() { + return "commit_snapshot"; } }; - private final Procedure applySnapshotProcedure = new Procedure() { - @Override - public void apply(byte[] state) { - cohort.applySnapshot(state); - } - }; + private final RaftActorContext context; + private final RaftActorSnapshotCohort cohort; + private final Logger log; private Duration snapshotReplyActorTimeout = Duration.create(30, TimeUnit.SECONDS); @@ -56,8 +46,8 @@ class RaftActorSnapshotMessageSupport { this.cohort = cohort; this.log = context.getLogger(); - context.getSnapshotManager().setCreateSnapshotCallable(createSnapshotProcedure); - context.getSnapshotManager().setApplySnapshotProcedure(applySnapshotProcedure); + context.getSnapshotManager().setCreateSnapshotRunnable(() -> cohort.createSnapshot(context.getActor())); + context.getSnapshotManager().setApplySnapshotConsumer(cohort::applySnapshot); } boolean handleSnapshotMessage(Object message, ActorRef sender) { @@ -69,8 +59,8 @@ class RaftActorSnapshotMessageSupport { onSaveSnapshotFailure((SaveSnapshotFailure) message); } else if (message instanceof CaptureSnapshotReply) { onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot()); - } else if (message.equals(COMMIT_SNAPSHOT)) { - context.getSnapshotManager().commit(-1); + } else if (COMMIT_SNAPSHOT.equals(message)) { + context.getSnapshotManager().commit(-1, -1); } else if (message instanceof GetSnapshot) { onGetSnapshot(sender); } else { @@ -94,16 +84,15 @@ class RaftActorSnapshotMessageSupport { } private void onSaveSnapshotSuccess(SaveSnapshotSuccess success) { - log.info("{}: SaveSnapshotSuccess received for snapshot", context.getId()); - long sequenceNumber = success.metadata().sequenceNr(); - context.getSnapshotManager().commit(sequenceNumber); + log.info("{}: SaveSnapshotSuccess received for snapshot, sequenceNr: {}", context.getId(), sequenceNumber); + + context.getSnapshotManager().commit(sequenceNumber, success.metadata().timestamp()); } private void onApplySnapshot(ApplySnapshot message) { - log.info("{}: Applying snapshot on follower with snapshotIndex: {}, snapshotTerm: {}", context.getId(), - message.getSnapshot().getLastAppliedIndex(), message.getSnapshot().getLastAppliedTerm()); + log.info("{}: Applying snapshot on follower: {}", context.getId(), message.getSnapshot()); context.getSnapshotManager().apply(message); }