import akka.actor.ActorSelection;
import akka.actor.PoisonPill;
import akka.actor.Status;
+import akka.persistence.JournalProtocol;
+import akka.persistence.SnapshotProtocol;
import com.google.common.annotations.VisibleForTesting;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayList;
persistentProvider = new PersistentDataProvider(this);
delegatingPersistenceProvider = new RaftActorDelegatingPersistentDataProvider(null, persistentProvider);
- context = new RaftActorContextImpl(this.getSelf(),
- this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG),
- -1, -1, peerAddresses,
+ context = new RaftActorContextImpl(getSelf(), getContext(), id,
+ new ElectionTermImpl(persistentProvider, id, LOG), -1, -1, peerAddresses,
configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(),
delegatingPersistenceProvider, this::handleApplyState, LOG, this::executeInSelf);
LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex());
persistence().persistAsync(applyEntries, NoopProcedure.instance());
-
} else if (message instanceof FindLeader) {
- getSender().tell(
- new FindLeaderReply(getLeaderAddress()),
- getSelf()
- );
+ getSender().tell(new FindLeaderReply(getLeaderAddress()), getSelf());
} else if (message instanceof GetOnDemandRaftState) {
onGetOnDemandRaftStats();
} else if (message instanceof InitiateCaptureSnapshot) {
} else if (message instanceof RequestLeadership) {
onRequestLeadership((RequestLeadership) message);
} else if (!possiblyHandleBehaviorMessage(message)) {
- handleNonRaftCommand(message);
+ if (message instanceof JournalProtocol.Response
+ && delegatingPersistenceProvider.handleJournalResponse((JournalProtocol.Response) message)) {
+ LOG.debug("{}: handled a journal response", persistenceId());
+ } else if (message instanceof SnapshotProtocol.Response
+ && delegatingPersistenceProvider.handleSnapshotResponse((SnapshotProtocol.Response) message)) {
+ LOG.debug("{}: handled a snapshot response", persistenceId());
+ } else {
+ handleNonRaftCommand(message);
+ }
}
}
package org.opendaylight.controller.cluster;
import akka.japi.Procedure;
+import akka.persistence.JournalProtocol;
+import akka.persistence.SnapshotProtocol;
import akka.persistence.SnapshotSelectionCriteria;
+import org.eclipse.jdt.annotation.NonNull;
/**
* DataPersistenceProvider provides methods to persist data and is an abstraction of the akka-persistence persistence
* @return the last sequence number
*/
long getLastSequenceNumber();
+
+ /**
+ * Receive and potentially handle a {@link JournalProtocol} response.
+ *
+ * @param response A {@link JournalProtocol} response
+ * @return {@code true} if the response was handled
+ */
+ boolean handleJournalResponse(JournalProtocol.@NonNull Response response);
+
+ /**
+ * Receive and potentially handle a {@link SnapshotProtocol} response.
+ *
+ * @param response A {@link SnapshotProtocol} response
+ * @return {@code true} if the response was handled
+ */
+ boolean handleSnapshotResponse(SnapshotProtocol.@NonNull Response response);
}
package org.opendaylight.controller.cluster;
import akka.japi.Procedure;
+import akka.persistence.JournalProtocol;
+import akka.persistence.SnapshotProtocol;
import akka.persistence.SnapshotSelectionCriteria;
/**
public class DelegatingPersistentDataProvider implements DataPersistenceProvider {
private DataPersistenceProvider delegate;
- public DelegatingPersistentDataProvider(DataPersistenceProvider delegate) {
+ public DelegatingPersistentDataProvider(final DataPersistenceProvider delegate) {
this.delegate = delegate;
}
- public void setDelegate(DataPersistenceProvider delegate) {
+ public void setDelegate(final DataPersistenceProvider delegate) {
this.delegate = delegate;
}
}
@Override
- public <T> void persist(T entry, Procedure<T> procedure) {
+ public <T> void persist(final T entry, final Procedure<T> procedure) {
delegate.persist(entry, procedure);
}
@Override
- public <T> void persistAsync(T entry, Procedure<T> procedure) {
+ public <T> void persistAsync(final T entry, final Procedure<T> procedure) {
delegate.persistAsync(entry, procedure);
}
@Override
- public void saveSnapshot(Object entry) {
+ public void saveSnapshot(final Object entry) {
delegate.saveSnapshot(entry);
}
@Override
- public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
+ public void deleteSnapshots(final SnapshotSelectionCriteria criteria) {
delegate.deleteSnapshots(criteria);
}
@Override
- public void deleteMessages(long sequenceNumber) {
+ public void deleteMessages(final long sequenceNumber) {
delegate.deleteMessages(sequenceNumber);
}
public long getLastSequenceNumber() {
return delegate.getLastSequenceNumber();
}
+
+ @Override
+ public boolean handleJournalResponse(final JournalProtocol.Response response) {
+ return delegate.handleJournalResponse(response);
+ }
+
+ @Override
+ public boolean handleSnapshotResponse(final SnapshotProtocol.Response response) {
+ return delegate.handleSnapshotResponse(response);
+ }
}
import static java.util.Objects.requireNonNull;
import akka.japi.Procedure;
+import akka.persistence.JournalProtocol;
+import akka.persistence.SnapshotProtocol;
import akka.persistence.SnapshotSelectionCriteria;
import org.opendaylight.controller.cluster.common.actor.ExecuteInSelfActor;
import org.slf4j.Logger;
LOG.error("An unexpected error occurred", e);
}
}
+
+ @Override
+ public boolean handleJournalResponse(final JournalProtocol.Response response) {
+ return false;
+ }
+
+ @Override
+ public boolean handleSnapshotResponse(final SnapshotProtocol.Response response) {
+ return false;
+ }
}
import akka.japi.Procedure;
import akka.persistence.AbstractPersistentActor;
+import akka.persistence.DeleteMessagesSuccess;
+import akka.persistence.DeleteSnapshotsSuccess;
+import akka.persistence.JournalProtocol;
+import akka.persistence.SnapshotProtocol;
import akka.persistence.SnapshotSelectionCriteria;
/**
* A DataPersistenceProvider implementation with persistence enabled.
*/
public class PersistentDataProvider implements DataPersistenceProvider {
-
private final AbstractPersistentActor persistentActor;
- public PersistentDataProvider(AbstractPersistentActor persistentActor) {
+ public PersistentDataProvider(final AbstractPersistentActor persistentActor) {
this.persistentActor = requireNonNull(persistentActor, "persistentActor can't be null");
}
}
@Override
- public <T> void persist(T entry, Procedure<T> procedure) {
+ public <T> void persist(final T entry, final Procedure<T> procedure) {
persistentActor.persist(entry, procedure);
}
@Override
- public <T> void persistAsync(T entry, Procedure<T> procedure) {
+ public <T> void persistAsync(final T entry, final Procedure<T> procedure) {
persistentActor.persistAsync(entry, procedure);
}
@Override
- public void saveSnapshot(Object snapshot) {
+ public void saveSnapshot(final Object snapshot) {
persistentActor.saveSnapshot(snapshot);
}
@Override
- public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
+ public void deleteSnapshots(final SnapshotSelectionCriteria criteria) {
persistentActor.deleteSnapshots(criteria);
}
@Override
- public void deleteMessages(long sequenceNumber) {
+ public void deleteMessages(final long sequenceNumber) {
persistentActor.deleteMessages(sequenceNumber);
}
public long getLastSequenceNumber() {
return persistentActor.lastSequenceNr();
}
+
+ @Override
+ public boolean handleJournalResponse(final JournalProtocol.Response response) {
+ return response instanceof DeleteMessagesSuccess;
+ }
+
+ @Override
+ public boolean handleSnapshotResponse(final SnapshotProtocol.Response response) {
+ return response instanceof DeleteSnapshotsSuccess;
+ }
}