Route {Journal,Snapshot}Protocol messages to DataPersistenceProvider 69/100769/1
authorRobert Varga <robert.varga@pantheon.tech>
Sun, 24 Apr 2022 11:17:15 +0000 (13:17 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Sun, 24 Apr 2022 11:17:53 +0000 (13:17 +0200)
The way we wire our messages ends up being not exactly nice, as the
baseline RaftActor is not cognizant of all persistence operations. This
means it cannot correctly ignore responses to deletion of journal and
snapshot entries -- and hence we get dead letters logged.

Route JournalProtocol and SnapshotProtocol messages to
DataPersistenceProvider, which can then handle them and correctly
tell us whether to log them or not.

JIRA: CONTROLLER-2042
Change-Id: I47b8c3ae67d0a0ea0aad9f0a64e1bb8dc11400fc
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/DataPersistenceProvider.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/DelegatingPersistentDataProvider.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/NonPersistentDataProvider.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/PersistentDataProvider.java

index 6fd0693..bc4c77a 100644 (file)
@@ -15,6 +15,8 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.PoisonPill;
 import akka.actor.Status;
+import akka.persistence.JournalProtocol;
+import akka.persistence.SnapshotProtocol;
 import com.google.common.annotations.VisibleForTesting;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.ArrayList;
@@ -131,9 +133,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         persistentProvider = new PersistentDataProvider(this);
         delegatingPersistenceProvider = new RaftActorDelegatingPersistentDataProvider(null, persistentProvider);
 
-        context = new RaftActorContextImpl(this.getSelf(),
-            this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG),
-            -1, -1, peerAddresses,
+        context = new RaftActorContextImpl(getSelf(), getContext(), id,
+            new ElectionTermImpl(persistentProvider, id, LOG), -1, -1, peerAddresses,
             configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(),
             delegatingPersistenceProvider, this::handleApplyState, LOG, this::executeInSelf);
 
@@ -246,12 +247,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex());
 
             persistence().persistAsync(applyEntries, NoopProcedure.instance());
-
         } else if (message instanceof FindLeader) {
-            getSender().tell(
-                new FindLeaderReply(getLeaderAddress()),
-                getSelf()
-            );
+            getSender().tell(new FindLeaderReply(getLeaderAddress()), getSelf());
         } else if (message instanceof GetOnDemandRaftState) {
             onGetOnDemandRaftStats();
         } else if (message instanceof InitiateCaptureSnapshot) {
@@ -269,7 +266,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         } else if (message instanceof RequestLeadership) {
             onRequestLeadership((RequestLeadership) message);
         } else if (!possiblyHandleBehaviorMessage(message)) {
-            handleNonRaftCommand(message);
+            if (message instanceof JournalProtocol.Response
+                && delegatingPersistenceProvider.handleJournalResponse((JournalProtocol.Response) message)) {
+                LOG.debug("{}: handled a journal response", persistenceId());
+            } else if (message instanceof SnapshotProtocol.Response
+                && delegatingPersistenceProvider.handleSnapshotResponse((SnapshotProtocol.Response) message)) {
+                LOG.debug("{}: handled a snapshot response", persistenceId());
+            } else {
+                handleNonRaftCommand(message);
+            }
         }
     }
 
index c655dcd..44afa63 100644 (file)
@@ -9,7 +9,10 @@
 package org.opendaylight.controller.cluster;
 
 import akka.japi.Procedure;
+import akka.persistence.JournalProtocol;
+import akka.persistence.SnapshotProtocol;
 import akka.persistence.SnapshotSelectionCriteria;
+import org.eclipse.jdt.annotation.NonNull;
 
 /**
  * DataPersistenceProvider provides methods to persist data and is an abstraction of the akka-persistence persistence
@@ -70,4 +73,20 @@ public interface DataPersistenceProvider {
      * @return the last sequence number
      */
     long getLastSequenceNumber();
+
+    /**
+     * Receive and potentially handle a {@link JournalProtocol} response.
+     *
+     * @param response A {@link JournalProtocol} response
+     * @return {@code true} if the response was handled
+     */
+    boolean handleJournalResponse(JournalProtocol.@NonNull Response response);
+
+    /**
+     * Receive and potentially handle a {@link SnapshotProtocol} response.
+     *
+     * @param response A {@link SnapshotProtocol} response
+     * @return {@code true} if the response was handled
+     */
+    boolean handleSnapshotResponse(SnapshotProtocol.@NonNull Response response);
 }
index f1a20fc..3210819 100644 (file)
@@ -8,6 +8,8 @@
 package org.opendaylight.controller.cluster;
 
 import akka.japi.Procedure;
+import akka.persistence.JournalProtocol;
+import akka.persistence.SnapshotProtocol;
 import akka.persistence.SnapshotSelectionCriteria;
 
 /**
@@ -18,11 +20,11 @@ import akka.persistence.SnapshotSelectionCriteria;
 public class DelegatingPersistentDataProvider implements DataPersistenceProvider {
     private DataPersistenceProvider delegate;
 
-    public DelegatingPersistentDataProvider(DataPersistenceProvider delegate) {
+    public DelegatingPersistentDataProvider(final DataPersistenceProvider delegate) {
         this.delegate = delegate;
     }
 
-    public void setDelegate(DataPersistenceProvider delegate) {
+    public void setDelegate(final DataPersistenceProvider delegate) {
         this.delegate = delegate;
     }
 
@@ -36,27 +38,27 @@ public class DelegatingPersistentDataProvider implements DataPersistenceProvider
     }
 
     @Override
-    public <T> void persist(T entry, Procedure<T> procedure) {
+    public <T> void persist(final T entry, final Procedure<T> procedure) {
         delegate.persist(entry, procedure);
     }
 
     @Override
-    public <T> void persistAsync(T entry, Procedure<T> procedure) {
+    public <T> void persistAsync(final T entry, final Procedure<T> procedure) {
         delegate.persistAsync(entry, procedure);
     }
 
     @Override
-    public void saveSnapshot(Object entry) {
+    public void saveSnapshot(final Object entry) {
         delegate.saveSnapshot(entry);
     }
 
     @Override
-    public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
+    public void deleteSnapshots(final SnapshotSelectionCriteria criteria) {
         delegate.deleteSnapshots(criteria);
     }
 
     @Override
-    public void deleteMessages(long sequenceNumber) {
+    public void deleteMessages(final long sequenceNumber) {
         delegate.deleteMessages(sequenceNumber);
     }
 
@@ -64,4 +66,14 @@ public class DelegatingPersistentDataProvider implements DataPersistenceProvider
     public long getLastSequenceNumber() {
         return delegate.getLastSequenceNumber();
     }
+
+    @Override
+    public boolean handleJournalResponse(final JournalProtocol.Response response) {
+        return delegate.handleJournalResponse(response);
+    }
+
+    @Override
+    public boolean handleSnapshotResponse(final SnapshotProtocol.Response response) {
+        return delegate.handleSnapshotResponse(response);
+    }
 }
index 9a4a34c..5461689 100644 (file)
@@ -10,6 +10,8 @@ package org.opendaylight.controller.cluster;
 import static java.util.Objects.requireNonNull;
 
 import akka.japi.Procedure;
+import akka.persistence.JournalProtocol;
+import akka.persistence.SnapshotProtocol;
 import akka.persistence.SnapshotSelectionCriteria;
 import org.opendaylight.controller.cluster.common.actor.ExecuteInSelfActor;
 import org.slf4j.Logger;
@@ -70,4 +72,14 @@ public class NonPersistentDataProvider implements DataPersistenceProvider {
             LOG.error("An unexpected error occurred", e);
         }
     }
+
+    @Override
+    public boolean handleJournalResponse(final JournalProtocol.Response response) {
+        return false;
+    }
+
+    @Override
+    public boolean handleSnapshotResponse(final SnapshotProtocol.Response response) {
+        return false;
+    }
 }
index 21102f1..1faee47 100644 (file)
@@ -11,16 +11,19 @@ import static java.util.Objects.requireNonNull;
 
 import akka.japi.Procedure;
 import akka.persistence.AbstractPersistentActor;
+import akka.persistence.DeleteMessagesSuccess;
+import akka.persistence.DeleteSnapshotsSuccess;
+import akka.persistence.JournalProtocol;
+import akka.persistence.SnapshotProtocol;
 import akka.persistence.SnapshotSelectionCriteria;
 
 /**
  * A DataPersistenceProvider implementation with persistence enabled.
  */
 public class PersistentDataProvider implements DataPersistenceProvider {
-
     private final AbstractPersistentActor persistentActor;
 
-    public PersistentDataProvider(AbstractPersistentActor persistentActor) {
+    public PersistentDataProvider(final AbstractPersistentActor persistentActor) {
         this.persistentActor = requireNonNull(persistentActor, "persistentActor can't be null");
     }
 
@@ -30,27 +33,27 @@ public class PersistentDataProvider implements DataPersistenceProvider {
     }
 
     @Override
-    public <T> void persist(T entry, Procedure<T> procedure) {
+    public <T> void persist(final T entry, final Procedure<T> procedure) {
         persistentActor.persist(entry, procedure);
     }
 
     @Override
-    public <T> void persistAsync(T entry, Procedure<T> procedure) {
+    public <T> void persistAsync(final T entry, final Procedure<T> procedure) {
         persistentActor.persistAsync(entry, procedure);
     }
 
     @Override
-    public void saveSnapshot(Object snapshot) {
+    public void saveSnapshot(final Object snapshot) {
         persistentActor.saveSnapshot(snapshot);
     }
 
     @Override
-    public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
+    public void deleteSnapshots(final SnapshotSelectionCriteria criteria) {
         persistentActor.deleteSnapshots(criteria);
     }
 
     @Override
-    public void deleteMessages(long sequenceNumber) {
+    public void deleteMessages(final long sequenceNumber) {
         persistentActor.deleteMessages(sequenceNumber);
     }
 
@@ -58,4 +61,14 @@ public class PersistentDataProvider implements DataPersistenceProvider {
     public long getLastSequenceNumber() {
         return persistentActor.lastSequenceNr();
     }
+
+    @Override
+    public boolean handleJournalResponse(final JournalProtocol.Response response) {
+        return response instanceof DeleteMessagesSuccess;
+    }
+
+    @Override
+    public boolean handleSnapshotResponse(final SnapshotProtocol.Response response) {
+        return response instanceof DeleteSnapshotsSuccess;
+    }
 }