import org.apache.pekko.persistence.DeleteSnapshotsSuccess;
import org.apache.pekko.persistence.JournalProtocol;
import org.apache.pekko.persistence.SnapshotProtocol;
-import org.apache.pekko.persistence.SnapshotSelectionCriteria;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
}
@Override
- public void deleteSnapshots(final SnapshotSelectionCriteria criteria) {
- actor.deleteSnapshots(criteria);
+ public void deleteSnapshots(final long maxTimestamp) {
+ actor.deleteSnapshots(maxTimestamp);
}
@Override
import org.apache.pekko.japi.Procedure;
import org.apache.pekko.persistence.JournalProtocol;
import org.apache.pekko.persistence.SnapshotProtocol;
+import org.apache.pekko.persistence.SnapshotSelectionCriteria;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
super.persistAsync(entry, callback::accept);
}
+ @Override
+ @Deprecated(since = "11.0.0", forRemoval = true)
+ public final void loadSnapshot(final String persistenceId, final SnapshotSelectionCriteria criteria,
+ final long toSequenceNr) {
+ super.loadSnapshot(persistenceId, criteria, toSequenceNr);
+ }
+
+ @Override
+ @Deprecated(since = "11.0.0", forRemoval = true)
+ public final void saveSnapshot(final Object snapshot) {
+ throw new UnsupportedOperationException();
+ }
+
+ final void saveSnapshot(final Snapshot snapshot) {
+ super.saveSnapshot(snapshot);
+ }
+
+ @Override
+ @Deprecated(since = "11.0.0", forRemoval = true)
+ public final void deleteSnapshot(final long sequenceNr) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ @Deprecated(since = "11.0.0", forRemoval = true)
+ public final void deleteSnapshots(final SnapshotSelectionCriteria criteria) {
+ throw new UnsupportedOperationException();
+ }
+
+ final void deleteSnapshots(final long maxTimestamp) {
+ super.deleteSnapshots(SnapshotSelectionCriteria.create(Long.MAX_VALUE, maxTimestamp));
+ }
+
/**
* A point-in-time capture of {@link RaftActorBehavior} state critical for transitioning between behaviors.
*/
import java.io.UncheckedIOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
-import org.apache.pekko.persistence.AbstractPersistentActor;
import org.apache.pekko.persistence.RecoveryCompleted;
import org.apache.pekko.persistence.SnapshotOffer;
import org.eclipse.jdt.annotation.NonNull;
}
}
- boolean handleRecoveryMessage(final AbstractPersistentActor actor, final Object message) {
+ boolean handleRecoveryMessage(final RaftActor actor, final Object message) {
LOG.trace("{}: handleRecoveryMessage: {}", memberId(), message);
anyDataRecovered = anyDataRecovered || !(message instanceof RecoveryCompleted);
currentRecoveryBatchCount = 0;
}
- private void onRecoveryCompletedMessage(final AbstractPersistentActor raftActor) {
+ private void onRecoveryCompletedMessage(final RaftActor raftActor) {
if (currentRecoveryBatchCount > 0) {
endCurrentLogRecoveryBatch();
}
// messages. Either way, we persist a snapshot and delete all the messages from the akka journal
// to clean out unwanted messages.
- Snapshot snapshot = Snapshot.create(EmptyState.INSTANCE, List.of(), -1, -1, -1, -1, context.termInfo(),
- context.getPeerServerInfo(true));
-
- raftActor.saveSnapshot(snapshot);
+ raftActor.saveSnapshot(Snapshot.create(EmptyState.INSTANCE, List.of(), -1, -1, -1, -1, context.termInfo(),
+ context.getPeerServerInfo(true)));
raftActor.deleteMessages(raftActor.lastSequenceNr());
} else if (hasMigratedDataRecovered) {
LOG.info("{}: Snapshot capture initiated after recovery due to migrated messages", memberId());
import java.io.IOException;
import java.util.List;
import org.apache.pekko.dispatch.ControlMessage;
-import org.apache.pekko.persistence.SnapshotSelectionCriteria;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
* Commit the snapshot by trimming the log.
*
* @param sequenceNumber the sequence number of the persisted snapshot
- * @param timeStamp the time stamp of the persisted snapshot
+ * @param timestamp the time stamp of the persisted snapshot
*/
- void commit(final long sequenceNumber, final long timeStamp) {
+ void commit(final long sequenceNumber, final long timestamp) {
if (!(task instanceof Persist persist)) {
LOG.debug("{}: commit should not be called in state {}", memberId(), task);
return;
final var lastSequenceNumber = commit(persist);
final var persistence = context.getPersistenceProvider();
- persistence.deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), timeStamp - 1, 0L, 0L));
+ persistence.deleteSnapshots(timestamp - 1);
persistence.deleteMessages(lastSequenceNumber);
snapshotComplete();
import java.util.function.Consumer;
import org.apache.pekko.persistence.JournalProtocol;
import org.apache.pekko.persistence.SnapshotProtocol;
-import org.apache.pekko.persistence.SnapshotSelectionCriteria;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
}
@Override
- public void deleteSnapshots(final SnapshotSelectionCriteria criteria) {
- delegate().deleteSnapshots(criteria);
+ public void deleteSnapshots(final long maxTimestamp) {
+ delegate().deleteSnapshots(maxTimestamp);
}
@Override
import java.util.function.Consumer;
import org.apache.pekko.persistence.JournalProtocol;
import org.apache.pekko.persistence.SnapshotProtocol;
-import org.apache.pekko.persistence.SnapshotSelectionCriteria;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.controller.cluster.common.actor.ExecuteInSelfActor;
}
@Override
- default void deleteSnapshots(final SnapshotSelectionCriteria criteria) {
+ default void deleteSnapshots(final long maxTimestamp) {
// no-op
}
import java.io.IOException;
import org.apache.pekko.persistence.SnapshotProtocol;
-import org.apache.pekko.persistence.SnapshotSelectionCriteria;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
* @param snapshot the snapshot object to save
*/
// FIXME: Callback<SnapshotFile> callback
+ // FIXME: imply async deletion of all other snapshots, only the last one will be reported
void saveSnapshot(@NonNull Snapshot snapshot);
// @NonNullByDefault
// Callback<SnapshotFile> callback);
/**
- * Deletes snapshots based on the given criteria.
+ * Deletes snapshots up to and including a time.
*
- * @param criteria the search criteria
+ * @param maxTimestamp the timestamp, in Epoch milliseconds
*/
- // FIXME: criteria == max size? max snapshots?
- // FIXME: throws IOException
- void deleteSnapshots(@NonNull SnapshotSelectionCriteria criteria);
+ // FIXME: integrate into saveSnapshot()
+ void deleteSnapshots(long maxTimestamp);
/**
* Receive and potentially handle a {@link SnapshotProtocol} response.
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Props;
-import org.apache.pekko.persistence.AbstractPersistentActor;
import org.apache.pekko.persistence.RecoveryCompleted;
import org.apache.pekko.persistence.SnapshotMetadata;
import org.apache.pekko.persistence.SnapshotOffer;
@Mock
private RaftActorRecoveryCohort mockCohort;
@Mock
- private AbstractPersistentActor mockActor;
+ private RaftActor mockActor;
@Mock
private RaftActorSnapshotCohort<?> mockSnapshotCohort;
@TempDir
import org.apache.pekko.actor.PoisonPill;
import org.apache.pekko.actor.Terminated;
import org.apache.pekko.dispatch.Dispatchers;
-import org.apache.pekko.persistence.AbstractPersistentActor;
import org.apache.pekko.persistence.SaveSnapshotFailure;
import org.apache.pekko.persistence.SaveSnapshotSuccess;
import org.apache.pekko.persistence.SnapshotMetadata;
UpdateElectionTerm updateElectionTerm = new UpdateElectionTerm(5, "member2");
mockRaftActor.handleRecover(updateElectionTerm);
- verify(mockSupport).handleRecoveryMessage(any(AbstractPersistentActor.class), same(snapshotOffer));
- verify(mockSupport).handleRecoveryMessage(any(AbstractPersistentActor.class), same(logEntry));
- verify(mockSupport).handleRecoveryMessage(any(AbstractPersistentActor.class), same(applyJournalEntries));
- verify(mockSupport).handleRecoveryMessage(any(AbstractPersistentActor.class), same(deleteEntries));
- verify(mockSupport).handleRecoveryMessage(any(AbstractPersistentActor.class), same(updateElectionTerm));
+ verify(mockSupport).handleRecoveryMessage(any(), same(snapshotOffer));
+ verify(mockSupport).handleRecoveryMessage(any(), same(logEntry));
+ verify(mockSupport).handleRecoveryMessage(any(), same(applyJournalEntries));
+ verify(mockSupport).handleRecoveryMessage(any(), same(deleteEntries));
+ verify(mockSupport).handleRecoveryMessage(any(), same(updateElectionTerm));
}
@Test
import java.io.OutputStream;
import java.util.List;
import org.apache.pekko.actor.ActorRef;
-import org.apache.pekko.persistence.SnapshotSelectionCriteria;
import org.eclipse.jdt.annotation.NonNull;
import org.junit.After;
import org.junit.Before;
verify(mockDataPersistenceProvider).deleteMessages(50L);
- final var criteriaCaptor = ArgumentCaptor.forClass(SnapshotSelectionCriteria.class);
-
- verify(mockDataPersistenceProvider).deleteSnapshots(criteriaCaptor.capture());
-
- assertEquals(Long.MAX_VALUE, criteriaCaptor.getValue().maxSequenceNr());
- assertEquals(1233L, criteriaCaptor.getValue().maxTimestamp());
+ verify(mockDataPersistenceProvider).deleteSnapshots(1233L);
MessageCollectorActor.expectFirstMatching(actorRef, SnapshotComplete.class);
}
verify(mockDataPersistenceProvider, never()).deleteMessages(100L);
- verify(mockDataPersistenceProvider, never()).deleteSnapshots(any(SnapshotSelectionCriteria.class));
+ verify(mockDataPersistenceProvider, never()).deleteSnapshots(anyLong());
}
@Test
verify(mockDataPersistenceProvider, never()).deleteMessages(anyLong());
- verify(mockDataPersistenceProvider, never()).deleteSnapshots(any(SnapshotSelectionCriteria.class));
+ verify(mockDataPersistenceProvider, never()).deleteSnapshots(anyLong());
}
verify(mockDataPersistenceProvider, times(1)).deleteMessages(50L);
- verify(mockDataPersistenceProvider, times(1)).deleteSnapshots(any(SnapshotSelectionCriteria.class));
+ verify(mockDataPersistenceProvider, times(1)).deleteSnapshots(anyLong());
}
@Test