import akka.japi.Procedure;
import akka.persistence.SnapshotSelectionCriteria;
import com.google.protobuf.ByteString;
+import java.util.List;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
public class SnapshotManager implements SnapshotState {
-
private final SnapshotState IDLE = new Idle();
private final SnapshotState CAPTURING = new Capturing();
private final SnapshotState PERSISTING = new Persisting();
private SnapshotState currentState = IDLE;
private CaptureSnapshot captureSnapshot;
+ private long lastSequenceNumber = -1;
public SnapshotManager(RaftActorContext context, Logger logger) {
this.context = context;
long newReplicatedToAllTerm = replicatedToAllTermInfoReader.getTerm();
// send a CaptureSnapshot to self to make the expensive operation async.
+
+ List<ReplicatedLogEntry> unAppliedEntries = context.getReplicatedLog().getFrom(lastAppliedIndex + 1);
+
captureSnapshot = new CaptureSnapshot(lastLogEntry.getIndex(),
lastLogEntry.getTerm(), lastAppliedIndex, lastAppliedTerm,
- newReplicatedToAllIndex, newReplicatedToAllTerm, targetFollower!=null);
+ newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries, targetFollower != null);
SnapshotManager.this.currentState = CAPTURING;
- if(targetFollower != null){
- LOG.info("{}: Initiating snapshot capture {}", persistenceId(), captureSnapshot);
- } else {
+ if(captureSnapshot.isInstallSnapshotInitiated()) {
LOG.info("{}: Initiating snapshot capture {} to install on {}",
persistenceId(), captureSnapshot, targetFollower);
+ } else {
+ LOG.info("{}: Initiating snapshot capture {}", persistenceId(), captureSnapshot);
}
+ lastSequenceNumber = context.getPersistenceProvider().getLastSequenceNumber();
+
+ LOG.debug("lastSequenceNumber prior to capture: {}", lastSequenceNumber);
+
context.getActor().tell(captureSnapshot, context.getActor());
return true;
// when snapshot is saved async, SaveSnapshotSuccess is raised.
Snapshot sn = Snapshot.create(snapshotBytes,
- context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
+ captureSnapshot.getUnAppliedEntries(),
captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
persistenceProvider.deleteSnapshots(new SnapshotSelectionCriteria(
sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
- persistenceProvider.deleteMessages(sequenceNumber);
+ persistenceProvider.deleteMessages(lastSequenceNumber);
+ lastSequenceNumber = -1;
SnapshotManager.this.currentState = IDLE;
}