Isolate Pekko Snapshotter methods in RaftActor 45/116245/1
authorRobert Varga <robert.varga@pantheon.tech>
Sun, 13 Apr 2025 13:19:31 +0000 (15:19 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Sun, 13 Apr 2025 13:58:44 +0000 (15:58 +0200)
RaftActor is an abstract class, which leaks Snapshotter interface to
users. Override methods to mark them as deprecated and provide internal
methods to route to Pekko.

JIRA: CONTROLLER-2134
Change-Id: Iad9240562c44dceec9028b79f0b1c886bdca8f83
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
raft/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/PekkoRaftStorage.java
raft/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
raft/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java
raft/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
raft/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/spi/ForwardingDataPersistenceProvider.java
raft/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/spi/ImmediateDataPersistenceProvider.java
raft/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/spi/SnapshotStore.java
raft/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java
raft/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
raft/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java

index cd3dd29242e85688f59be7ff0b8e1923fb0736a0..fbbbfa98995cf6c51432e798bb58bbdc085960d7 100644 (file)
@@ -23,7 +23,6 @@ import org.apache.pekko.persistence.DeleteMessagesSuccess;
 import org.apache.pekko.persistence.DeleteSnapshotsSuccess;
 import org.apache.pekko.persistence.JournalProtocol;
 import org.apache.pekko.persistence.SnapshotProtocol;
-import org.apache.pekko.persistence.SnapshotSelectionCriteria;
 import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
@@ -195,8 +194,8 @@ final class PekkoRaftStorage extends EnabledRaftStorage {
     }
 
     @Override
-    public void deleteSnapshots(final SnapshotSelectionCriteria criteria) {
-        actor.deleteSnapshots(criteria);
+    public void deleteSnapshots(final long maxTimestamp) {
+        actor.deleteSnapshots(maxTimestamp);
     }
 
     @Override
index cae345cc3c67859fd96ce575a2fc80b15ebe7edc..8c97ad40ec4d8494eadb7f083937decc2e6f9bf2 100644 (file)
@@ -32,6 +32,7 @@ import org.apache.pekko.actor.Status;
 import org.apache.pekko.japi.Procedure;
 import org.apache.pekko.persistence.JournalProtocol;
 import org.apache.pekko.persistence.SnapshotProtocol;
+import org.apache.pekko.persistence.SnapshotSelectionCriteria;
 import org.eclipse.jdt.annotation.NonNull;
 import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.eclipse.jdt.annotation.Nullable;
@@ -1024,6 +1025,39 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         super.persistAsync(entry, callback::accept);
     }
 
+    @Override
+    @Deprecated(since = "11.0.0", forRemoval = true)
+    public final void loadSnapshot(final String persistenceId, final SnapshotSelectionCriteria criteria,
+            final long toSequenceNr) {
+        super.loadSnapshot(persistenceId, criteria, toSequenceNr);
+    }
+
+    @Override
+    @Deprecated(since = "11.0.0", forRemoval = true)
+    public final void saveSnapshot(final Object snapshot) {
+        throw new UnsupportedOperationException();
+    }
+
+    final void saveSnapshot(final Snapshot snapshot) {
+        super.saveSnapshot(snapshot);
+    }
+
+    @Override
+    @Deprecated(since = "11.0.0", forRemoval = true)
+    public final void deleteSnapshot(final long sequenceNr) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    @Deprecated(since = "11.0.0", forRemoval = true)
+    public final void deleteSnapshots(final SnapshotSelectionCriteria criteria) {
+        throw new UnsupportedOperationException();
+    }
+
+    final void deleteSnapshots(final long maxTimestamp) {
+        super.deleteSnapshots(SnapshotSelectionCriteria.create(Long.MAX_VALUE, maxTimestamp));
+    }
+
     /**
      * A point-in-time capture of {@link RaftActorBehavior} state critical for transitioning between behaviors.
      */
index b85f34d1c4bc605ce0c3669f6a79a6b2bfb17b2d..8a7eee88df392b6432e3435687c63d97f63f0d0b 100644 (file)
@@ -14,7 +14,6 @@ import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
-import org.apache.pekko.persistence.AbstractPersistentActor;
 import org.apache.pekko.persistence.RecoveryCompleted;
 import org.apache.pekko.persistence.SnapshotOffer;
 import org.eclipse.jdt.annotation.NonNull;
@@ -65,7 +64,7 @@ class RaftActorRecoverySupport {
         }
     }
 
-    boolean handleRecoveryMessage(final AbstractPersistentActor actor, final Object message) {
+    boolean handleRecoveryMessage(final RaftActor actor, final Object message) {
         LOG.trace("{}: handleRecoveryMessage: {}", memberId(), message);
 
         anyDataRecovered = anyDataRecovered || !(message instanceof RecoveryCompleted);
@@ -287,7 +286,7 @@ class RaftActorRecoverySupport {
         currentRecoveryBatchCount = 0;
     }
 
-    private void onRecoveryCompletedMessage(final AbstractPersistentActor raftActor) {
+    private void onRecoveryCompletedMessage(final RaftActor raftActor) {
         if (currentRecoveryBatchCount > 0) {
             endCurrentLogRecoveryBatch();
         }
@@ -345,10 +344,8 @@ class RaftActorRecoverySupport {
             // messages. Either way, we persist a snapshot and delete all the messages from the akka journal
             // to clean out unwanted messages.
 
-            Snapshot snapshot = Snapshot.create(EmptyState.INSTANCE, List.of(), -1, -1, -1, -1, context.termInfo(),
-                context.getPeerServerInfo(true));
-
-            raftActor.saveSnapshot(snapshot);
+            raftActor.saveSnapshot(Snapshot.create(EmptyState.INSTANCE, List.of(), -1, -1, -1, -1, context.termInfo(),
+                context.getPeerServerInfo(true)));
             raftActor.deleteMessages(raftActor.lastSequenceNr());
         } else if (hasMigratedDataRecovered) {
             LOG.info("{}: Snapshot capture initiated after recovery due to migrated messages", memberId());
index 69e63e6e7f34e0371979aca09ae90d7dc1dc1e9d..318a362de50a53b3bb1f6ef80145613f3e908803 100644 (file)
@@ -13,7 +13,6 @@ import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.util.List;
 import org.apache.pekko.dispatch.ControlMessage;
-import org.apache.pekko.persistence.SnapshotSelectionCriteria;
 import org.eclipse.jdt.annotation.NonNull;
 import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.eclipse.jdt.annotation.Nullable;
@@ -489,9 +488,9 @@ public final class SnapshotManager {
      * Commit the snapshot by trimming the log.
      *
      * @param sequenceNumber the sequence number of the persisted snapshot
-     * @param timeStamp the time stamp of the persisted snapshot
+     * @param timestamp the time stamp of the persisted snapshot
      */
-    void commit(final long sequenceNumber, final long timeStamp) {
+    void commit(final long sequenceNumber, final long timestamp) {
         if (!(task instanceof Persist persist)) {
             LOG.debug("{}: commit should not be called in state {}", memberId(), task);
             return;
@@ -501,7 +500,7 @@ public final class SnapshotManager {
         final var lastSequenceNumber = commit(persist);
 
         final var persistence = context.getPersistenceProvider();
-        persistence.deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), timeStamp - 1, 0L, 0L));
+        persistence.deleteSnapshots(timestamp - 1);
         persistence.deleteMessages(lastSequenceNumber);
 
         snapshotComplete();
index d8c498cff5c534a285d561e5a971987d2ba51de3..e8e1a4be4e3d3329c22059c9f45053f25fde1ef5 100644 (file)
@@ -12,7 +12,6 @@ import java.io.IOException;
 import java.util.function.Consumer;
 import org.apache.pekko.persistence.JournalProtocol;
 import org.apache.pekko.persistence.SnapshotProtocol;
-import org.apache.pekko.persistence.SnapshotSelectionCriteria;
 import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
@@ -56,8 +55,8 @@ public abstract class ForwardingDataPersistenceProvider implements DataPersisten
     }
 
     @Override
-    public void deleteSnapshots(final SnapshotSelectionCriteria criteria) {
-        delegate().deleteSnapshots(criteria);
+    public void deleteSnapshots(final long maxTimestamp) {
+        delegate().deleteSnapshots(maxTimestamp);
     }
 
     @Override
index 8aa4d61a83c24b53a4b54a25f6bbe1df07095625..0609512be55787485b1ec713316f32cfa83f2027 100644 (file)
@@ -13,7 +13,6 @@ import java.io.IOException;
 import java.util.function.Consumer;
 import org.apache.pekko.persistence.JournalProtocol;
 import org.apache.pekko.persistence.SnapshotProtocol;
-import org.apache.pekko.persistence.SnapshotSelectionCriteria;
 import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.controller.cluster.common.actor.ExecuteInSelfActor;
@@ -50,7 +49,7 @@ public interface ImmediateDataPersistenceProvider extends DataPersistenceProvide
     }
 
     @Override
-    default void deleteSnapshots(final SnapshotSelectionCriteria criteria) {
+    default void deleteSnapshots(final long maxTimestamp) {
         // no-op
     }
 
index 74bfe9b275ce4d34b20d7df0c88cc43019943b85..418397fa0bcf96564a07cc7d0b22b8701d29b2e0 100644 (file)
@@ -9,7 +9,6 @@ package org.opendaylight.controller.cluster.raft.spi;
 
 import java.io.IOException;
 import org.apache.pekko.persistence.SnapshotProtocol;
-import org.apache.pekko.persistence.SnapshotSelectionCriteria;
 import org.eclipse.jdt.annotation.NonNull;
 import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.eclipse.jdt.annotation.Nullable;
@@ -62,6 +61,7 @@ public interface SnapshotStore {
      * @param snapshot the snapshot object to save
      */
     // FIXME: Callback<SnapshotFile> callback
+    // FIXME: imply async deletion of all other snapshots, only the last one will be reported
     void saveSnapshot(@NonNull Snapshot snapshot);
 
     //  @NonNullByDefault
@@ -69,13 +69,12 @@ public interface SnapshotStore {
     //      Callback<SnapshotFile> callback);
 
     /**
-     * Deletes snapshots based on the given criteria.
+     * Deletes snapshots up to and including a time.
      *
-     * @param criteria the search criteria
+     * @param maxTimestamp the timestamp, in Epoch milliseconds
      */
-    // FIXME: criteria == max size? max snapshots?
-    // FIXME: throws IOException
-    void deleteSnapshots(@NonNull SnapshotSelectionCriteria criteria);
+    // FIXME: integrate into saveSnapshot()
+    void deleteSnapshots(long maxTimestamp);
 
     /**
      * Receive and potentially handle a {@link SnapshotProtocol} response.
index a546b953ac4ad23a561889e5afebdbffc8f19bc8..6ff1d02c9557631d6e44dc268f7b905f9ac475cf 100644 (file)
@@ -29,7 +29,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.pekko.actor.ActorRef;
 import org.apache.pekko.actor.ActorSystem;
 import org.apache.pekko.actor.Props;
-import org.apache.pekko.persistence.AbstractPersistentActor;
 import org.apache.pekko.persistence.RecoveryCompleted;
 import org.apache.pekko.persistence.SnapshotMetadata;
 import org.apache.pekko.persistence.SnapshotOffer;
@@ -74,7 +73,7 @@ class RaftActorRecoverySupportTest {
     @Mock
     private RaftActorRecoveryCohort mockCohort;
     @Mock
-    private AbstractPersistentActor mockActor;
+    private RaftActor mockActor;
     @Mock
     private RaftActorSnapshotCohort<?> mockSnapshotCohort;
     @TempDir
index 4c9019bf18f5f8263565f3a2ce6a731746682e9f..e41190d91cbb8dcf31e757990163bd9fab280d45 100644 (file)
@@ -44,7 +44,6 @@ import org.apache.pekko.actor.ActorRef;
 import org.apache.pekko.actor.PoisonPill;
 import org.apache.pekko.actor.Terminated;
 import org.apache.pekko.dispatch.Dispatchers;
-import org.apache.pekko.persistence.AbstractPersistentActor;
 import org.apache.pekko.persistence.SaveSnapshotFailure;
 import org.apache.pekko.persistence.SaveSnapshotSuccess;
 import org.apache.pekko.persistence.SnapshotMetadata;
@@ -296,11 +295,11 @@ public class RaftActorTest extends AbstractActorTest {
         UpdateElectionTerm updateElectionTerm = new UpdateElectionTerm(5, "member2");
         mockRaftActor.handleRecover(updateElectionTerm);
 
-        verify(mockSupport).handleRecoveryMessage(any(AbstractPersistentActor.class), same(snapshotOffer));
-        verify(mockSupport).handleRecoveryMessage(any(AbstractPersistentActor.class), same(logEntry));
-        verify(mockSupport).handleRecoveryMessage(any(AbstractPersistentActor.class), same(applyJournalEntries));
-        verify(mockSupport).handleRecoveryMessage(any(AbstractPersistentActor.class), same(deleteEntries));
-        verify(mockSupport).handleRecoveryMessage(any(AbstractPersistentActor.class), same(updateElectionTerm));
+        verify(mockSupport).handleRecoveryMessage(any(), same(snapshotOffer));
+        verify(mockSupport).handleRecoveryMessage(any(), same(logEntry));
+        verify(mockSupport).handleRecoveryMessage(any(), same(applyJournalEntries));
+        verify(mockSupport).handleRecoveryMessage(any(), same(deleteEntries));
+        verify(mockSupport).handleRecoveryMessage(any(), same(updateElectionTerm));
     }
 
     @Test
index da1813cb47d1b867a4bb0e72aeb2a1ef1941b78b..edf0189326cf0b4a893f4b7c85bde31eb8a65b51 100644 (file)
@@ -25,7 +25,6 @@ import java.io.ByteArrayOutputStream;
 import java.io.OutputStream;
 import java.util.List;
 import org.apache.pekko.actor.ActorRef;
-import org.apache.pekko.persistence.SnapshotSelectionCriteria;
 import org.eclipse.jdt.annotation.NonNull;
 import org.junit.After;
 import org.junit.Before;
@@ -397,12 +396,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         verify(mockDataPersistenceProvider).deleteMessages(50L);
 
-        final var criteriaCaptor = ArgumentCaptor.forClass(SnapshotSelectionCriteria.class);
-
-        verify(mockDataPersistenceProvider).deleteSnapshots(criteriaCaptor.capture());
-
-        assertEquals(Long.MAX_VALUE, criteriaCaptor.getValue().maxSequenceNr());
-        assertEquals(1233L, criteriaCaptor.getValue().maxTimestamp());
+        verify(mockDataPersistenceProvider).deleteSnapshots(1233L);
 
         MessageCollectorActor.expectFirstMatching(actorRef, SnapshotComplete.class);
     }
@@ -418,7 +412,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         verify(mockDataPersistenceProvider, never()).deleteMessages(100L);
 
-        verify(mockDataPersistenceProvider, never()).deleteSnapshots(any(SnapshotSelectionCriteria.class));
+        verify(mockDataPersistenceProvider, never()).deleteSnapshots(anyLong());
     }
 
     @Test
@@ -429,7 +423,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         verify(mockDataPersistenceProvider, never()).deleteMessages(anyLong());
 
-        verify(mockDataPersistenceProvider, never()).deleteSnapshots(any(SnapshotSelectionCriteria.class));
+        verify(mockDataPersistenceProvider, never()).deleteSnapshots(anyLong());
 
     }
 
@@ -449,7 +443,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         verify(mockDataPersistenceProvider, times(1)).deleteMessages(50L);
 
-        verify(mockDataPersistenceProvider, times(1)).deleteSnapshots(any(SnapshotSelectionCriteria.class));
+        verify(mockDataPersistenceProvider, times(1)).deleteSnapshots(anyLong());
     }
 
     @Test