When a snapshot is saved, we attempt to delete prior snapshots as only
the last one is recovered from persistence. For the maxSequenceNr in the
criteria, we use snapshot sequenceNr - snapshot batch count. However
this assumes every snapshot is based on snapshot batch count. We may
snapshot for other reasons, eg install snapshot on follower. This can
leave multiple prior snapshots behind and use up significant disk space.
We should only retain the last snapshot saved so I changed the criteria
to use: maxSequenceNr = the saved snapshot sequenceNr (it's possible the
sequenceNr hasn't changed from th elast snapshot) and maxTimeStamp =
saved snapshot timestamp -1.
Change-Id: I35b1d71ed433d52ecff79ca07a81616e393a7b7f
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
} else if (message instanceof CaptureSnapshotReply) {
onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
} else if (COMMIT_SNAPSHOT.equals(message)) {
} else if (message instanceof CaptureSnapshotReply) {
onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
} else if (COMMIT_SNAPSHOT.equals(message)) {
- context.getSnapshotManager().commit(-1);
+ context.getSnapshotManager().commit(-1, -1);
} else if (message instanceof GetSnapshot) {
onGetSnapshot(sender);
} else {
} else if (message instanceof GetSnapshot) {
onGetSnapshot(sender);
} else {
}
private void onSaveSnapshotSuccess(SaveSnapshotSuccess success) {
}
private void onSaveSnapshotSuccess(SaveSnapshotSuccess success) {
- log.info("{}: SaveSnapshotSuccess received for snapshot", context.getId());
-
long sequenceNumber = success.metadata().sequenceNr();
long sequenceNumber = success.metadata().sequenceNr();
- context.getSnapshotManager().commit(sequenceNumber);
+ log.info("{}: SaveSnapshotSuccess received for snapshot, sequenceNr: {}", context.getId(), sequenceNumber);
+
+ context.getSnapshotManager().commit(sequenceNumber, success.metadata().timestamp());
}
private void onApplySnapshot(ApplySnapshot message) {
}
private void onApplySnapshot(ApplySnapshot message) {
- public void commit(final long sequenceNumber) {
- currentState.commit(sequenceNumber);
+ public void commit(final long sequenceNumber, long timeStamp) {
+ currentState.commit(sequenceNumber, timeStamp);
- public void commit(final long sequenceNumber) {
+ public void commit(final long sequenceNumber, long timeStamp) {
LOG.debug("commit should not be called in state {}", this);
}
LOG.debug("commit should not be called in state {}", this);
}
private class Persisting extends AbstractSnapshotState {
@Override
private class Persisting extends AbstractSnapshotState {
@Override
- public void commit(final long sequenceNumber) {
+ public void commit(final long sequenceNumber, long timeStamp) {
LOG.debug("{}: Snapshot success - sequence number: {}", persistenceId(), sequenceNumber);
if(applySnapshot != null) {
LOG.debug("{}: Snapshot success - sequence number: {}", persistenceId(), sequenceNumber);
if(applySnapshot != null) {
context.getReplicatedLog().snapshotCommit();
}
context.getReplicatedLog().snapshotCommit();
}
- context.getPersistenceProvider().deleteSnapshots(new SnapshotSelectionCriteria(
- sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), Long.MAX_VALUE, 0L, 0L));
+ context.getPersistenceProvider().deleteSnapshots(new SnapshotSelectionCriteria(sequenceNumber,
+ timeStamp - 1, 0L, 0L));
context.getPersistenceProvider().deleteMessages(lastSequenceNumber);
context.getPersistenceProvider().deleteMessages(lastSequenceNumber);
* Commit the snapshot by trimming the log
*
* @param sequenceNumber
* Commit the snapshot by trimming the log
*
* @param sequenceNumber
- void commit(long sequenceNumber);
+ void commit(long sequenceNumber, long timeStamp);
/**
* Rollback the snapshot
/**
* Rollback the snapshot
public void testOnSaveSnapshotSuccess() {
long sequenceNumber = 100;
public void testOnSaveSnapshotSuccess() {
long sequenceNumber = 100;
- sendMessageToSupport(new SaveSnapshotSuccess(new SnapshotMetadata("foo", sequenceNumber, 1234L)));
+ long timeStamp = 1234L;
+ sendMessageToSupport(new SaveSnapshotSuccess(new SnapshotMetadata("foo", sequenceNumber, timeStamp)));
- verify(mockSnapshotManager).commit(eq(sequenceNumber));
+ verify(mockSnapshotManager).commit(eq(sequenceNumber), eq(timeStamp));
sendMessageToSupport(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT);
sendMessageToSupport(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT);
- verify(mockSnapshotManager).commit(eq(-1L));
+ verify(mockSnapshotManager).commit(eq(-1L), eq(-1L));
assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
// The commit is needed to complete the snapshot creation process
assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
// The commit is needed to complete the snapshot creation process
- leaderActor.getRaftActorContext().getSnapshotManager().commit(-1);
+ leaderActor.getRaftActorContext().getSnapshotManager().commit(-1, -1);
// capture snapshot reply should remove the snapshotted entries only
assertEquals(3, leaderActor.getReplicatedLog().size());
// capture snapshot reply should remove the snapshotted entries only
assertEquals(3, leaderActor.getReplicatedLog().size());
assertTrue(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
// The commit is needed to complete the snapshot creation process
assertTrue(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
// The commit is needed to complete the snapshot creation process
- followerActor.getRaftActorContext().getSnapshotManager().commit(-1);
+ followerActor.getRaftActorContext().getSnapshotManager().commit(-1, -1);
// capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
// capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
assertEquals(true, snapshotManager.isCapturing());
assertEquals(true, snapshotManager.isCapturing());
- snapshotManager.commit(100L);
+ snapshotManager.commit(100L, 1234L);
assertEquals(false, snapshotManager.isCapturing());
assertEquals(false, snapshotManager.isCapturing());
verify(mockDataPersistenceProvider).deleteSnapshots(criteriaCaptor.capture());
verify(mockDataPersistenceProvider).deleteSnapshots(criteriaCaptor.capture());
- assertEquals(90, criteriaCaptor.getValue().maxSequenceNr()); // sequenceNumber = 100
- // config snapShotBatchCount = 10
- // therefore maxSequenceNumber = 90
+ assertEquals(100L, criteriaCaptor.getValue().maxSequenceNr());
+ assertEquals(1233L, criteriaCaptor.getValue().maxTimestamp());
MessageCollectorActor.expectFirstMatching(actorRef, SnapshotComplete.class);
}
MessageCollectorActor.expectFirstMatching(actorRef, SnapshotComplete.class);
}
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
- snapshotManager.commit(100L);
+ snapshotManager.commit(100L, 0);
verify(mockReplicatedLog, never()).snapshotCommit();
verify(mockReplicatedLog, never()).snapshotCommit();
@Test
public void testCommitBeforeCapture(){
@Test
public void testCommitBeforeCapture(){
- snapshotManager.commit(100L);
+ snapshotManager.commit(100L, 0);
verify(mockReplicatedLog, never()).snapshotCommit();
verify(mockReplicatedLog, never()).snapshotCommit();
snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
- snapshotManager.commit(100L);
+ snapshotManager.commit(100L, 0);
- snapshotManager.commit(100L);
+ snapshotManager.commit(100L, 0);
verify(mockReplicatedLog, times(1)).snapshotCommit();
verify(mockReplicatedLog, times(1)).snapshotCommit();