From d6d516aa953924121c3cf2a5bf9fd992b9c2b326 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Sun, 24 Apr 2022 13:17:15 +0200 Subject: [PATCH] Route {Journal,Snapshot}Protocol messages to DataPersistenceProvider The way we wire our messages ends up being not exactly nice, as the baseline RaftActor is not cognizant of all persistence operations. This means it cannot correctly ignore responses to deletion of journal and snapshot entries -- and hence we get dead letters logged. Route JournalProtocol and SnapshotProtocol messages to DataPersistenceProvider, which can then handle them and correctly tell us whether to log them or not. JIRA: CONTROLLER-2042 Change-Id: I47b8c3ae67d0a0ea0aad9f0a64e1bb8dc11400fc Signed-off-by: Robert Varga --- .../controller/cluster/raft/RaftActor.java | 23 +++++++++------- .../cluster/DataPersistenceProvider.java | 19 +++++++++++++ .../DelegatingPersistentDataProvider.java | 26 +++++++++++++----- .../cluster/NonPersistentDataProvider.java | 12 +++++++++ .../cluster/PersistentDataProvider.java | 27 ++++++++++++++----- 5 files changed, 84 insertions(+), 23 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 6fd0693db2..bc4c77af56 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -15,6 +15,8 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.PoisonPill; import akka.actor.Status; +import akka.persistence.JournalProtocol; +import akka.persistence.SnapshotProtocol; import com.google.common.annotations.VisibleForTesting; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.ArrayList; @@ -131,9 +133,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { persistentProvider = new PersistentDataProvider(this); delegatingPersistenceProvider = new RaftActorDelegatingPersistentDataProvider(null, persistentProvider); - context = new RaftActorContextImpl(this.getSelf(), - this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG), - -1, -1, peerAddresses, + context = new RaftActorContextImpl(getSelf(), getContext(), id, + new ElectionTermImpl(persistentProvider, id, LOG), -1, -1, peerAddresses, configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(), delegatingPersistenceProvider, this::handleApplyState, LOG, this::executeInSelf); @@ -246,12 +247,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex()); persistence().persistAsync(applyEntries, NoopProcedure.instance()); - } else if (message instanceof FindLeader) { - getSender().tell( - new FindLeaderReply(getLeaderAddress()), - getSelf() - ); + getSender().tell(new FindLeaderReply(getLeaderAddress()), getSelf()); } else if (message instanceof GetOnDemandRaftState) { onGetOnDemandRaftStats(); } else if (message instanceof InitiateCaptureSnapshot) { @@ -269,7 +266,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } else if (message instanceof RequestLeadership) { onRequestLeadership((RequestLeadership) message); } else if (!possiblyHandleBehaviorMessage(message)) { - handleNonRaftCommand(message); + if (message instanceof JournalProtocol.Response + && delegatingPersistenceProvider.handleJournalResponse((JournalProtocol.Response) message)) { + LOG.debug("{}: handled a journal response", persistenceId()); + } else if (message instanceof SnapshotProtocol.Response + && delegatingPersistenceProvider.handleSnapshotResponse((SnapshotProtocol.Response) message)) { + LOG.debug("{}: handled a snapshot response", persistenceId()); + } else { + handleNonRaftCommand(message); + } } } diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/DataPersistenceProvider.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/DataPersistenceProvider.java index c655dcdb89..44afa634cc 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/DataPersistenceProvider.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/DataPersistenceProvider.java @@ -9,7 +9,10 @@ package org.opendaylight.controller.cluster; import akka.japi.Procedure; +import akka.persistence.JournalProtocol; +import akka.persistence.SnapshotProtocol; import akka.persistence.SnapshotSelectionCriteria; +import org.eclipse.jdt.annotation.NonNull; /** * DataPersistenceProvider provides methods to persist data and is an abstraction of the akka-persistence persistence @@ -70,4 +73,20 @@ public interface DataPersistenceProvider { * @return the last sequence number */ long getLastSequenceNumber(); + + /** + * Receive and potentially handle a {@link JournalProtocol} response. + * + * @param response A {@link JournalProtocol} response + * @return {@code true} if the response was handled + */ + boolean handleJournalResponse(JournalProtocol.@NonNull Response response); + + /** + * Receive and potentially handle a {@link SnapshotProtocol} response. + * + * @param response A {@link SnapshotProtocol} response + * @return {@code true} if the response was handled + */ + boolean handleSnapshotResponse(SnapshotProtocol.@NonNull Response response); } diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/DelegatingPersistentDataProvider.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/DelegatingPersistentDataProvider.java index f1a20fcc8e..3210819225 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/DelegatingPersistentDataProvider.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/DelegatingPersistentDataProvider.java @@ -8,6 +8,8 @@ package org.opendaylight.controller.cluster; import akka.japi.Procedure; +import akka.persistence.JournalProtocol; +import akka.persistence.SnapshotProtocol; import akka.persistence.SnapshotSelectionCriteria; /** @@ -18,11 +20,11 @@ import akka.persistence.SnapshotSelectionCriteria; public class DelegatingPersistentDataProvider implements DataPersistenceProvider { private DataPersistenceProvider delegate; - public DelegatingPersistentDataProvider(DataPersistenceProvider delegate) { + public DelegatingPersistentDataProvider(final DataPersistenceProvider delegate) { this.delegate = delegate; } - public void setDelegate(DataPersistenceProvider delegate) { + public void setDelegate(final DataPersistenceProvider delegate) { this.delegate = delegate; } @@ -36,27 +38,27 @@ public class DelegatingPersistentDataProvider implements DataPersistenceProvider } @Override - public void persist(T entry, Procedure procedure) { + public void persist(final T entry, final Procedure procedure) { delegate.persist(entry, procedure); } @Override - public void persistAsync(T entry, Procedure procedure) { + public void persistAsync(final T entry, final Procedure procedure) { delegate.persistAsync(entry, procedure); } @Override - public void saveSnapshot(Object entry) { + public void saveSnapshot(final Object entry) { delegate.saveSnapshot(entry); } @Override - public void deleteSnapshots(SnapshotSelectionCriteria criteria) { + public void deleteSnapshots(final SnapshotSelectionCriteria criteria) { delegate.deleteSnapshots(criteria); } @Override - public void deleteMessages(long sequenceNumber) { + public void deleteMessages(final long sequenceNumber) { delegate.deleteMessages(sequenceNumber); } @@ -64,4 +66,14 @@ public class DelegatingPersistentDataProvider implements DataPersistenceProvider public long getLastSequenceNumber() { return delegate.getLastSequenceNumber(); } + + @Override + public boolean handleJournalResponse(final JournalProtocol.Response response) { + return delegate.handleJournalResponse(response); + } + + @Override + public boolean handleSnapshotResponse(final SnapshotProtocol.Response response) { + return delegate.handleSnapshotResponse(response); + } } diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/NonPersistentDataProvider.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/NonPersistentDataProvider.java index 9a4a34cf59..5461689d2a 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/NonPersistentDataProvider.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/NonPersistentDataProvider.java @@ -10,6 +10,8 @@ package org.opendaylight.controller.cluster; import static java.util.Objects.requireNonNull; import akka.japi.Procedure; +import akka.persistence.JournalProtocol; +import akka.persistence.SnapshotProtocol; import akka.persistence.SnapshotSelectionCriteria; import org.opendaylight.controller.cluster.common.actor.ExecuteInSelfActor; import org.slf4j.Logger; @@ -70,4 +72,14 @@ public class NonPersistentDataProvider implements DataPersistenceProvider { LOG.error("An unexpected error occurred", e); } } + + @Override + public boolean handleJournalResponse(final JournalProtocol.Response response) { + return false; + } + + @Override + public boolean handleSnapshotResponse(final SnapshotProtocol.Response response) { + return false; + } } diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/PersistentDataProvider.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/PersistentDataProvider.java index 21102f1f0e..1faee47f52 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/PersistentDataProvider.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/PersistentDataProvider.java @@ -11,16 +11,19 @@ import static java.util.Objects.requireNonNull; import akka.japi.Procedure; import akka.persistence.AbstractPersistentActor; +import akka.persistence.DeleteMessagesSuccess; +import akka.persistence.DeleteSnapshotsSuccess; +import akka.persistence.JournalProtocol; +import akka.persistence.SnapshotProtocol; import akka.persistence.SnapshotSelectionCriteria; /** * A DataPersistenceProvider implementation with persistence enabled. */ public class PersistentDataProvider implements DataPersistenceProvider { - private final AbstractPersistentActor persistentActor; - public PersistentDataProvider(AbstractPersistentActor persistentActor) { + public PersistentDataProvider(final AbstractPersistentActor persistentActor) { this.persistentActor = requireNonNull(persistentActor, "persistentActor can't be null"); } @@ -30,27 +33,27 @@ public class PersistentDataProvider implements DataPersistenceProvider { } @Override - public void persist(T entry, Procedure procedure) { + public void persist(final T entry, final Procedure procedure) { persistentActor.persist(entry, procedure); } @Override - public void persistAsync(T entry, Procedure procedure) { + public void persistAsync(final T entry, final Procedure procedure) { persistentActor.persistAsync(entry, procedure); } @Override - public void saveSnapshot(Object snapshot) { + public void saveSnapshot(final Object snapshot) { persistentActor.saveSnapshot(snapshot); } @Override - public void deleteSnapshots(SnapshotSelectionCriteria criteria) { + public void deleteSnapshots(final SnapshotSelectionCriteria criteria) { persistentActor.deleteSnapshots(criteria); } @Override - public void deleteMessages(long sequenceNumber) { + public void deleteMessages(final long sequenceNumber) { persistentActor.deleteMessages(sequenceNumber); } @@ -58,4 +61,14 @@ public class PersistentDataProvider implements DataPersistenceProvider { public long getLastSequenceNumber() { return persistentActor.lastSequenceNr(); } + + @Override + public boolean handleJournalResponse(final JournalProtocol.Response response) { + return response instanceof DeleteMessagesSuccess; + } + + @Override + public boolean handleSnapshotResponse(final SnapshotProtocol.Response response) { + return response instanceof DeleteSnapshotsSuccess; + } } -- 2.36.6