The way we wire our messages ends up being not exactly nice, as the
baseline RaftActor is not cognizant of all persistence operations. This
means it cannot correctly ignore responses to deletion of journal and
snapshot entries -- and hence we get dead letters logged.
Route JournalProtocol and SnapshotProtocol messages to
DataPersistenceProvider, which can then handle them and correctly
tell us whether to log them or not.
JIRA: CONTROLLER-2042
Change-Id: I47b8c3ae67d0a0ea0aad9f0a64e1bb8dc11400fc
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
import akka.actor.ActorSelection;
import akka.actor.PoisonPill;
import akka.actor.Status;
import akka.actor.ActorSelection;
import akka.actor.PoisonPill;
import akka.actor.Status;
+import akka.persistence.JournalProtocol;
+import akka.persistence.SnapshotProtocol;
import com.google.common.annotations.VisibleForTesting;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayList;
import com.google.common.annotations.VisibleForTesting;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayList;
persistentProvider = new PersistentDataProvider(this);
delegatingPersistenceProvider = new RaftActorDelegatingPersistentDataProvider(null, persistentProvider);
persistentProvider = new PersistentDataProvider(this);
delegatingPersistenceProvider = new RaftActorDelegatingPersistentDataProvider(null, persistentProvider);
- context = new RaftActorContextImpl(this.getSelf(),
- this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG),
- -1, -1, peerAddresses,
+ context = new RaftActorContextImpl(getSelf(), getContext(), id,
+ new ElectionTermImpl(persistentProvider, id, LOG), -1, -1, peerAddresses,
configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(),
delegatingPersistenceProvider, this::handleApplyState, LOG, this::executeInSelf);
configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(),
delegatingPersistenceProvider, this::handleApplyState, LOG, this::executeInSelf);
LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex());
persistence().persistAsync(applyEntries, NoopProcedure.instance());
LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex());
persistence().persistAsync(applyEntries, NoopProcedure.instance());
} else if (message instanceof FindLeader) {
} else if (message instanceof FindLeader) {
- getSender().tell(
- new FindLeaderReply(getLeaderAddress()),
- getSelf()
- );
+ getSender().tell(new FindLeaderReply(getLeaderAddress()), getSelf());
} else if (message instanceof GetOnDemandRaftState) {
onGetOnDemandRaftStats();
} else if (message instanceof InitiateCaptureSnapshot) {
} else if (message instanceof GetOnDemandRaftState) {
onGetOnDemandRaftStats();
} else if (message instanceof InitiateCaptureSnapshot) {
} else if (message instanceof RequestLeadership) {
onRequestLeadership((RequestLeadership) message);
} else if (!possiblyHandleBehaviorMessage(message)) {
} else if (message instanceof RequestLeadership) {
onRequestLeadership((RequestLeadership) message);
} else if (!possiblyHandleBehaviorMessage(message)) {
- handleNonRaftCommand(message);
+ if (message instanceof JournalProtocol.Response
+ && delegatingPersistenceProvider.handleJournalResponse((JournalProtocol.Response) message)) {
+ LOG.debug("{}: handled a journal response", persistenceId());
+ } else if (message instanceof SnapshotProtocol.Response
+ && delegatingPersistenceProvider.handleSnapshotResponse((SnapshotProtocol.Response) message)) {
+ LOG.debug("{}: handled a snapshot response", persistenceId());
+ } else {
+ handleNonRaftCommand(message);
+ }
package org.opendaylight.controller.cluster;
import akka.japi.Procedure;
package org.opendaylight.controller.cluster;
import akka.japi.Procedure;
+import akka.persistence.JournalProtocol;
+import akka.persistence.SnapshotProtocol;
import akka.persistence.SnapshotSelectionCriteria;
import akka.persistence.SnapshotSelectionCriteria;
+import org.eclipse.jdt.annotation.NonNull;
/**
* DataPersistenceProvider provides methods to persist data and is an abstraction of the akka-persistence persistence
/**
* DataPersistenceProvider provides methods to persist data and is an abstraction of the akka-persistence persistence
* @return the last sequence number
*/
long getLastSequenceNumber();
* @return the last sequence number
*/
long getLastSequenceNumber();
+
+ /**
+ * Receive and potentially handle a {@link JournalProtocol} response.
+ *
+ * @param response A {@link JournalProtocol} response
+ * @return {@code true} if the response was handled
+ */
+ boolean handleJournalResponse(JournalProtocol.@NonNull Response response);
+
+ /**
+ * Receive and potentially handle a {@link SnapshotProtocol} response.
+ *
+ * @param response A {@link SnapshotProtocol} response
+ * @return {@code true} if the response was handled
+ */
+ boolean handleSnapshotResponse(SnapshotProtocol.@NonNull Response response);
package org.opendaylight.controller.cluster;
import akka.japi.Procedure;
package org.opendaylight.controller.cluster;
import akka.japi.Procedure;
+import akka.persistence.JournalProtocol;
+import akka.persistence.SnapshotProtocol;
import akka.persistence.SnapshotSelectionCriteria;
/**
import akka.persistence.SnapshotSelectionCriteria;
/**
public class DelegatingPersistentDataProvider implements DataPersistenceProvider {
private DataPersistenceProvider delegate;
public class DelegatingPersistentDataProvider implements DataPersistenceProvider {
private DataPersistenceProvider delegate;
- public DelegatingPersistentDataProvider(DataPersistenceProvider delegate) {
+ public DelegatingPersistentDataProvider(final DataPersistenceProvider delegate) {
this.delegate = delegate;
}
this.delegate = delegate;
}
- public void setDelegate(DataPersistenceProvider delegate) {
+ public void setDelegate(final DataPersistenceProvider delegate) {
this.delegate = delegate;
}
this.delegate = delegate;
}
- public <T> void persist(T entry, Procedure<T> procedure) {
+ public <T> void persist(final T entry, final Procedure<T> procedure) {
delegate.persist(entry, procedure);
}
@Override
delegate.persist(entry, procedure);
}
@Override
- public <T> void persistAsync(T entry, Procedure<T> procedure) {
+ public <T> void persistAsync(final T entry, final Procedure<T> procedure) {
delegate.persistAsync(entry, procedure);
}
@Override
delegate.persistAsync(entry, procedure);
}
@Override
- public void saveSnapshot(Object entry) {
+ public void saveSnapshot(final Object entry) {
delegate.saveSnapshot(entry);
}
@Override
delegate.saveSnapshot(entry);
}
@Override
- public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
+ public void deleteSnapshots(final SnapshotSelectionCriteria criteria) {
delegate.deleteSnapshots(criteria);
}
@Override
delegate.deleteSnapshots(criteria);
}
@Override
- public void deleteMessages(long sequenceNumber) {
+ public void deleteMessages(final long sequenceNumber) {
delegate.deleteMessages(sequenceNumber);
}
delegate.deleteMessages(sequenceNumber);
}
public long getLastSequenceNumber() {
return delegate.getLastSequenceNumber();
}
public long getLastSequenceNumber() {
return delegate.getLastSequenceNumber();
}
+
+ @Override
+ public boolean handleJournalResponse(final JournalProtocol.Response response) {
+ return delegate.handleJournalResponse(response);
+ }
+
+ @Override
+ public boolean handleSnapshotResponse(final SnapshotProtocol.Response response) {
+ return delegate.handleSnapshotResponse(response);
+ }
import static java.util.Objects.requireNonNull;
import akka.japi.Procedure;
import static java.util.Objects.requireNonNull;
import akka.japi.Procedure;
+import akka.persistence.JournalProtocol;
+import akka.persistence.SnapshotProtocol;
import akka.persistence.SnapshotSelectionCriteria;
import org.opendaylight.controller.cluster.common.actor.ExecuteInSelfActor;
import org.slf4j.Logger;
import akka.persistence.SnapshotSelectionCriteria;
import org.opendaylight.controller.cluster.common.actor.ExecuteInSelfActor;
import org.slf4j.Logger;
LOG.error("An unexpected error occurred", e);
}
}
LOG.error("An unexpected error occurred", e);
}
}
+
+ @Override
+ public boolean handleJournalResponse(final JournalProtocol.Response response) {
+ return false;
+ }
+
+ @Override
+ public boolean handleSnapshotResponse(final SnapshotProtocol.Response response) {
+ return false;
+ }
import akka.japi.Procedure;
import akka.persistence.AbstractPersistentActor;
import akka.japi.Procedure;
import akka.persistence.AbstractPersistentActor;
+import akka.persistence.DeleteMessagesSuccess;
+import akka.persistence.DeleteSnapshotsSuccess;
+import akka.persistence.JournalProtocol;
+import akka.persistence.SnapshotProtocol;
import akka.persistence.SnapshotSelectionCriteria;
/**
* A DataPersistenceProvider implementation with persistence enabled.
*/
public class PersistentDataProvider implements DataPersistenceProvider {
import akka.persistence.SnapshotSelectionCriteria;
/**
* A DataPersistenceProvider implementation with persistence enabled.
*/
public class PersistentDataProvider implements DataPersistenceProvider {
private final AbstractPersistentActor persistentActor;
private final AbstractPersistentActor persistentActor;
- public PersistentDataProvider(AbstractPersistentActor persistentActor) {
+ public PersistentDataProvider(final AbstractPersistentActor persistentActor) {
this.persistentActor = requireNonNull(persistentActor, "persistentActor can't be null");
}
this.persistentActor = requireNonNull(persistentActor, "persistentActor can't be null");
}
- public <T> void persist(T entry, Procedure<T> procedure) {
+ public <T> void persist(final T entry, final Procedure<T> procedure) {
persistentActor.persist(entry, procedure);
}
@Override
persistentActor.persist(entry, procedure);
}
@Override
- public <T> void persistAsync(T entry, Procedure<T> procedure) {
+ public <T> void persistAsync(final T entry, final Procedure<T> procedure) {
persistentActor.persistAsync(entry, procedure);
}
@Override
persistentActor.persistAsync(entry, procedure);
}
@Override
- public void saveSnapshot(Object snapshot) {
+ public void saveSnapshot(final Object snapshot) {
persistentActor.saveSnapshot(snapshot);
}
@Override
persistentActor.saveSnapshot(snapshot);
}
@Override
- public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
+ public void deleteSnapshots(final SnapshotSelectionCriteria criteria) {
persistentActor.deleteSnapshots(criteria);
}
@Override
persistentActor.deleteSnapshots(criteria);
}
@Override
- public void deleteMessages(long sequenceNumber) {
+ public void deleteMessages(final long sequenceNumber) {
persistentActor.deleteMessages(sequenceNumber);
}
persistentActor.deleteMessages(sequenceNumber);
}
public long getLastSequenceNumber() {
return persistentActor.lastSequenceNr();
}
public long getLastSequenceNumber() {
return persistentActor.lastSequenceNr();
}
+
+ @Override
+ public boolean handleJournalResponse(final JournalProtocol.Response response) {
+ return response instanceof DeleteMessagesSuccess;
+ }
+
+ @Override
+ public boolean handleSnapshotResponse(final SnapshotProtocol.Response response) {
+ return response instanceof DeleteSnapshotsSuccess;
+ }