Route {Journal,Snapshot}Protocol messages to DataPersistenceProvider 69/100769/1
authorRobert Varga <robert.varga@pantheon.tech>
Sun, 24 Apr 2022 11:17:15 +0000 (13:17 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Sun, 24 Apr 2022 11:17:53 +0000 (13:17 +0200)
The way we wire our messages ends up being not exactly nice, as the
baseline RaftActor is not cognizant of all persistence operations. This
means it cannot correctly ignore responses to deletion of journal and
snapshot entries -- and hence we get dead letters logged.

Route JournalProtocol and SnapshotProtocol messages to
DataPersistenceProvider, which can then handle them and correctly
tell us whether to log them or not.

JIRA: CONTROLLER-2042
Change-Id: I47b8c3ae67d0a0ea0aad9f0a64e1bb8dc11400fc
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/DataPersistenceProvider.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/DelegatingPersistentDataProvider.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/NonPersistentDataProvider.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/PersistentDataProvider.java

index 6fd0693db22c77ae75ec68f7110ff0d427d6fcba..bc4c77af566c44a799a9c4ce8f0489d5912d3c37 100644 (file)
@@ -15,6 +15,8 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.PoisonPill;
 import akka.actor.Status;
+import akka.persistence.JournalProtocol;
+import akka.persistence.SnapshotProtocol;
 import com.google.common.annotations.VisibleForTesting;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.ArrayList;
@@ -131,9 +133,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         persistentProvider = new PersistentDataProvider(this);
         delegatingPersistenceProvider = new RaftActorDelegatingPersistentDataProvider(null, persistentProvider);
 
-        context = new RaftActorContextImpl(this.getSelf(),
-            this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG),
-            -1, -1, peerAddresses,
+        context = new RaftActorContextImpl(getSelf(), getContext(), id,
+            new ElectionTermImpl(persistentProvider, id, LOG), -1, -1, peerAddresses,
             configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(),
             delegatingPersistenceProvider, this::handleApplyState, LOG, this::executeInSelf);
 
@@ -246,12 +247,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex());
 
             persistence().persistAsync(applyEntries, NoopProcedure.instance());
-
         } else if (message instanceof FindLeader) {
-            getSender().tell(
-                new FindLeaderReply(getLeaderAddress()),
-                getSelf()
-            );
+            getSender().tell(new FindLeaderReply(getLeaderAddress()), getSelf());
         } else if (message instanceof GetOnDemandRaftState) {
             onGetOnDemandRaftStats();
         } else if (message instanceof InitiateCaptureSnapshot) {
@@ -269,7 +266,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         } else if (message instanceof RequestLeadership) {
             onRequestLeadership((RequestLeadership) message);
         } else if (!possiblyHandleBehaviorMessage(message)) {
-            handleNonRaftCommand(message);
+            if (message instanceof JournalProtocol.Response
+                && delegatingPersistenceProvider.handleJournalResponse((JournalProtocol.Response) message)) {
+                LOG.debug("{}: handled a journal response", persistenceId());
+            } else if (message instanceof SnapshotProtocol.Response
+                && delegatingPersistenceProvider.handleSnapshotResponse((SnapshotProtocol.Response) message)) {
+                LOG.debug("{}: handled a snapshot response", persistenceId());
+            } else {
+                handleNonRaftCommand(message);
+            }
         }
     }
 
index c655dcdb891488b52f1f42741046594e9651a5e6..44afa634ccfc0b812761e9bb9fe97c6be9061900 100644 (file)
@@ -9,7 +9,10 @@
 package org.opendaylight.controller.cluster;
 
 import akka.japi.Procedure;
+import akka.persistence.JournalProtocol;
+import akka.persistence.SnapshotProtocol;
 import akka.persistence.SnapshotSelectionCriteria;
+import org.eclipse.jdt.annotation.NonNull;
 
 /**
  * DataPersistenceProvider provides methods to persist data and is an abstraction of the akka-persistence persistence
@@ -70,4 +73,20 @@ public interface DataPersistenceProvider {
      * @return the last sequence number
      */
     long getLastSequenceNumber();
+
+    /**
+     * Receive and potentially handle a {@link JournalProtocol} response.
+     *
+     * @param response A {@link JournalProtocol} response
+     * @return {@code true} if the response was handled
+     */
+    boolean handleJournalResponse(JournalProtocol.@NonNull Response response);
+
+    /**
+     * Receive and potentially handle a {@link SnapshotProtocol} response.
+     *
+     * @param response A {@link SnapshotProtocol} response
+     * @return {@code true} if the response was handled
+     */
+    boolean handleSnapshotResponse(SnapshotProtocol.@NonNull Response response);
 }
index f1a20fcc8e54f4e2f4908a8ec032477a6bd89b0f..3210819225b11e7b349772b8fc6a2735800a5bee 100644 (file)
@@ -8,6 +8,8 @@
 package org.opendaylight.controller.cluster;
 
 import akka.japi.Procedure;
+import akka.persistence.JournalProtocol;
+import akka.persistence.SnapshotProtocol;
 import akka.persistence.SnapshotSelectionCriteria;
 
 /**
@@ -18,11 +20,11 @@ import akka.persistence.SnapshotSelectionCriteria;
 public class DelegatingPersistentDataProvider implements DataPersistenceProvider {
     private DataPersistenceProvider delegate;
 
-    public DelegatingPersistentDataProvider(DataPersistenceProvider delegate) {
+    public DelegatingPersistentDataProvider(final DataPersistenceProvider delegate) {
         this.delegate = delegate;
     }
 
-    public void setDelegate(DataPersistenceProvider delegate) {
+    public void setDelegate(final DataPersistenceProvider delegate) {
         this.delegate = delegate;
     }
 
@@ -36,27 +38,27 @@ public class DelegatingPersistentDataProvider implements DataPersistenceProvider
     }
 
     @Override
-    public <T> void persist(T entry, Procedure<T> procedure) {
+    public <T> void persist(final T entry, final Procedure<T> procedure) {
         delegate.persist(entry, procedure);
     }
 
     @Override
-    public <T> void persistAsync(T entry, Procedure<T> procedure) {
+    public <T> void persistAsync(final T entry, final Procedure<T> procedure) {
         delegate.persistAsync(entry, procedure);
     }
 
     @Override
-    public void saveSnapshot(Object entry) {
+    public void saveSnapshot(final Object entry) {
         delegate.saveSnapshot(entry);
     }
 
     @Override
-    public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
+    public void deleteSnapshots(final SnapshotSelectionCriteria criteria) {
         delegate.deleteSnapshots(criteria);
     }
 
     @Override
-    public void deleteMessages(long sequenceNumber) {
+    public void deleteMessages(final long sequenceNumber) {
         delegate.deleteMessages(sequenceNumber);
     }
 
@@ -64,4 +66,14 @@ public class DelegatingPersistentDataProvider implements DataPersistenceProvider
     public long getLastSequenceNumber() {
         return delegate.getLastSequenceNumber();
     }
+
+    @Override
+    public boolean handleJournalResponse(final JournalProtocol.Response response) {
+        return delegate.handleJournalResponse(response);
+    }
+
+    @Override
+    public boolean handleSnapshotResponse(final SnapshotProtocol.Response response) {
+        return delegate.handleSnapshotResponse(response);
+    }
 }
index 9a4a34cf596d00dbf1d1eca8dd1973b88f3d236d..5461689d2aebc84739b165657db6330e5feba59f 100644 (file)
@@ -10,6 +10,8 @@ package org.opendaylight.controller.cluster;
 import static java.util.Objects.requireNonNull;
 
 import akka.japi.Procedure;
+import akka.persistence.JournalProtocol;
+import akka.persistence.SnapshotProtocol;
 import akka.persistence.SnapshotSelectionCriteria;
 import org.opendaylight.controller.cluster.common.actor.ExecuteInSelfActor;
 import org.slf4j.Logger;
@@ -70,4 +72,14 @@ public class NonPersistentDataProvider implements DataPersistenceProvider {
             LOG.error("An unexpected error occurred", e);
         }
     }
+
+    @Override
+    public boolean handleJournalResponse(final JournalProtocol.Response response) {
+        return false;
+    }
+
+    @Override
+    public boolean handleSnapshotResponse(final SnapshotProtocol.Response response) {
+        return false;
+    }
 }
index 21102f1f0e368a504bef81e526762ebd79464672..1faee47f526ac119d384ab41a4b29362a236c89f 100644 (file)
@@ -11,16 +11,19 @@ import static java.util.Objects.requireNonNull;
 
 import akka.japi.Procedure;
 import akka.persistence.AbstractPersistentActor;
+import akka.persistence.DeleteMessagesSuccess;
+import akka.persistence.DeleteSnapshotsSuccess;
+import akka.persistence.JournalProtocol;
+import akka.persistence.SnapshotProtocol;
 import akka.persistence.SnapshotSelectionCriteria;
 
 /**
  * A DataPersistenceProvider implementation with persistence enabled.
  */
 public class PersistentDataProvider implements DataPersistenceProvider {
-
     private final AbstractPersistentActor persistentActor;
 
-    public PersistentDataProvider(AbstractPersistentActor persistentActor) {
+    public PersistentDataProvider(final AbstractPersistentActor persistentActor) {
         this.persistentActor = requireNonNull(persistentActor, "persistentActor can't be null");
     }
 
@@ -30,27 +33,27 @@ public class PersistentDataProvider implements DataPersistenceProvider {
     }
 
     @Override
-    public <T> void persist(T entry, Procedure<T> procedure) {
+    public <T> void persist(final T entry, final Procedure<T> procedure) {
         persistentActor.persist(entry, procedure);
     }
 
     @Override
-    public <T> void persistAsync(T entry, Procedure<T> procedure) {
+    public <T> void persistAsync(final T entry, final Procedure<T> procedure) {
         persistentActor.persistAsync(entry, procedure);
     }
 
     @Override
-    public void saveSnapshot(Object snapshot) {
+    public void saveSnapshot(final Object snapshot) {
         persistentActor.saveSnapshot(snapshot);
     }
 
     @Override
-    public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
+    public void deleteSnapshots(final SnapshotSelectionCriteria criteria) {
         persistentActor.deleteSnapshots(criteria);
     }
 
     @Override
-    public void deleteMessages(long sequenceNumber) {
+    public void deleteMessages(final long sequenceNumber) {
         persistentActor.deleteMessages(sequenceNumber);
     }
 
@@ -58,4 +61,14 @@ public class PersistentDataProvider implements DataPersistenceProvider {
     public long getLastSequenceNumber() {
         return persistentActor.lastSequenceNr();
     }
+
+    @Override
+    public boolean handleJournalResponse(final JournalProtocol.Response response) {
+        return response instanceof DeleteMessagesSuccess;
+    }
+
+    @Override
+    public boolean handleSnapshotResponse(final SnapshotProtocol.Response response) {
+        return response instanceof DeleteSnapshotsSuccess;
+    }
 }