Route {Journal,Snapshot}Protocol messages to DataPersistenceProvider
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
index 6fd0693db22c77ae75ec68f7110ff0d427d6fcba..bc4c77af566c44a799a9c4ce8f0489d5912d3c37 100644 (file)
@@ -15,6 +15,8 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.PoisonPill;
 import akka.actor.Status;
+import akka.persistence.JournalProtocol;
+import akka.persistence.SnapshotProtocol;
 import com.google.common.annotations.VisibleForTesting;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.ArrayList;
@@ -131,9 +133,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         persistentProvider = new PersistentDataProvider(this);
         delegatingPersistenceProvider = new RaftActorDelegatingPersistentDataProvider(null, persistentProvider);
 
-        context = new RaftActorContextImpl(this.getSelf(),
-            this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG),
-            -1, -1, peerAddresses,
+        context = new RaftActorContextImpl(getSelf(), getContext(), id,
+            new ElectionTermImpl(persistentProvider, id, LOG), -1, -1, peerAddresses,
             configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(),
             delegatingPersistenceProvider, this::handleApplyState, LOG, this::executeInSelf);
 
@@ -246,12 +247,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex());
 
             persistence().persistAsync(applyEntries, NoopProcedure.instance());
-
         } else if (message instanceof FindLeader) {
-            getSender().tell(
-                new FindLeaderReply(getLeaderAddress()),
-                getSelf()
-            );
+            getSender().tell(new FindLeaderReply(getLeaderAddress()), getSelf());
         } else if (message instanceof GetOnDemandRaftState) {
             onGetOnDemandRaftStats();
         } else if (message instanceof InitiateCaptureSnapshot) {
@@ -269,7 +266,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         } else if (message instanceof RequestLeadership) {
             onRequestLeadership((RequestLeadership) message);
         } else if (!possiblyHandleBehaviorMessage(message)) {
-            handleNonRaftCommand(message);
+            if (message instanceof JournalProtocol.Response
+                && delegatingPersistenceProvider.handleJournalResponse((JournalProtocol.Response) message)) {
+                LOG.debug("{}: handled a journal response", persistenceId());
+            } else if (message instanceof SnapshotProtocol.Response
+                && delegatingPersistenceProvider.handleSnapshotResponse((SnapshotProtocol.Response) message)) {
+                LOG.debug("{}: handled a snapshot response", persistenceId());
+            } else {
+                handleNonRaftCommand(message);
+            }
         }
     }