} else if (message instanceof CaptureSnapshotReply) {
onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
} else if (COMMIT_SNAPSHOT.equals(message)) {
- context.getSnapshotManager().commit(-1);
+ context.getSnapshotManager().commit(-1, -1);
} else if (message instanceof GetSnapshot) {
onGetSnapshot(sender);
} else {
}
private void onSaveSnapshotSuccess(SaveSnapshotSuccess success) {
- log.info("{}: SaveSnapshotSuccess received for snapshot", context.getId());
-
long sequenceNumber = success.metadata().sequenceNr();
- context.getSnapshotManager().commit(sequenceNumber);
+ log.info("{}: SaveSnapshotSuccess received for snapshot, sequenceNr: {}", context.getId(), sequenceNumber);
+
+ context.getSnapshotManager().commit(sequenceNumber, success.metadata().timestamp());
}
private void onApplySnapshot(ApplySnapshot message) {
}
@Override
- public void commit(final long sequenceNumber) {
- currentState.commit(sequenceNumber);
+ public void commit(final long sequenceNumber, long timeStamp) {
+ currentState.commit(sequenceNumber, timeStamp);
}
@Override
}
@Override
- public void commit(final long sequenceNumber) {
+ public void commit(final long sequenceNumber, long timeStamp) {
LOG.debug("commit should not be called in state {}", this);
}
private class Persisting extends AbstractSnapshotState {
@Override
- public void commit(final long sequenceNumber) {
+ public void commit(final long sequenceNumber, long timeStamp) {
LOG.debug("{}: Snapshot success - sequence number: {}", persistenceId(), sequenceNumber);
if(applySnapshot != null) {
context.getReplicatedLog().snapshotCommit();
}
- context.getPersistenceProvider().deleteSnapshots(new SnapshotSelectionCriteria(
- sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), Long.MAX_VALUE, 0L, 0L));
+ context.getPersistenceProvider().deleteSnapshots(new SnapshotSelectionCriteria(sequenceNumber,
+ timeStamp - 1, 0L, 0L));
context.getPersistenceProvider().deleteMessages(lastSequenceNumber);
* Commit the snapshot by trimming the log
*
* @param sequenceNumber
+ * @param timeStamp
*/
- void commit(long sequenceNumber);
+ void commit(long sequenceNumber, long timeStamp);
/**
* Rollback the snapshot
public void testOnSaveSnapshotSuccess() {
long sequenceNumber = 100;
- sendMessageToSupport(new SaveSnapshotSuccess(new SnapshotMetadata("foo", sequenceNumber, 1234L)));
+ long timeStamp = 1234L;
+ sendMessageToSupport(new SaveSnapshotSuccess(new SnapshotMetadata("foo", sequenceNumber, timeStamp)));
- verify(mockSnapshotManager).commit(eq(sequenceNumber));
+ verify(mockSnapshotManager).commit(eq(sequenceNumber), eq(timeStamp));
}
@Test
sendMessageToSupport(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT);
- verify(mockSnapshotManager).commit(eq(-1L));
+ verify(mockSnapshotManager).commit(eq(-1L), eq(-1L));
}
@Test
assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
// The commit is needed to complete the snapshot creation process
- leaderActor.getRaftActorContext().getSnapshotManager().commit(-1);
+ leaderActor.getRaftActorContext().getSnapshotManager().commit(-1, -1);
// capture snapshot reply should remove the snapshotted entries only
assertEquals(3, leaderActor.getReplicatedLog().size());
assertTrue(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
// The commit is needed to complete the snapshot creation process
- followerActor.getRaftActorContext().getSnapshotManager().commit(-1);
+ followerActor.getRaftActorContext().getSnapshotManager().commit(-1, -1);
// capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
assertEquals(true, snapshotManager.isCapturing());
- snapshotManager.commit(100L);
+ snapshotManager.commit(100L, 1234L);
assertEquals(false, snapshotManager.isCapturing());
verify(mockDataPersistenceProvider).deleteSnapshots(criteriaCaptor.capture());
- assertEquals(90, criteriaCaptor.getValue().maxSequenceNr()); // sequenceNumber = 100
- // config snapShotBatchCount = 10
- // therefore maxSequenceNumber = 90
+ assertEquals(100L, criteriaCaptor.getValue().maxSequenceNr());
+ assertEquals(1233L, criteriaCaptor.getValue().maxTimestamp());
MessageCollectorActor.expectFirstMatching(actorRef, SnapshotComplete.class);
}
snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
new MockRaftActorContext.MockPayload()), -1, "follower-1");
- snapshotManager.commit(100L);
+ snapshotManager.commit(100L, 0);
verify(mockReplicatedLog, never()).snapshotCommit();
@Test
public void testCommitBeforeCapture(){
- snapshotManager.commit(100L);
+ snapshotManager.commit(100L, 0);
verify(mockReplicatedLog, never()).snapshotCommit();
snapshotManager.persist(new byte[]{}, Runtime.getRuntime().totalMemory());
- snapshotManager.commit(100L);
+ snapshotManager.commit(100L, 0);
- snapshotManager.commit(100L);
+ snapshotManager.commit(100L, 0);
verify(mockReplicatedLog, times(1)).snapshotCommit();