Route {Journal,Snapshot}Protocol messages to DataPersistenceProvider
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
index 7148986877330cccdbccf4b46856b7b987fb8c26..bc4c77af566c44a799a9c4ce8f0489d5912d3c37 100644 (file)
@@ -15,7 +15,10 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.PoisonPill;
 import akka.actor.Status;
+import akka.persistence.JournalProtocol;
+import akka.persistence.SnapshotProtocol;
 import com.google.common.annotations.VisibleForTesting;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -32,6 +35,7 @@ import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
 import org.opendaylight.controller.cluster.PersistentDataProvider;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
+import org.opendaylight.controller.cluster.mgmt.api.FollowerInfo;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
@@ -46,7 +50,6 @@ import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
-import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo;
 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
@@ -123,15 +126,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private boolean shuttingDown;
 
+    @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "Akka class design")
     protected RaftActor(final String id, final Map<String, String> peerAddresses,
          final Optional<ConfigParams> configParams, final short payloadVersion) {
 
         persistentProvider = new PersistentDataProvider(this);
         delegatingPersistenceProvider = new RaftActorDelegatingPersistentDataProvider(null, persistentProvider);
 
-        context = new RaftActorContextImpl(this.getSelf(),
-            this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG),
-            -1, -1, peerAddresses,
+        context = new RaftActorContextImpl(getSelf(), getContext(), id,
+            new ElectionTermImpl(persistentProvider, id, LOG), -1, -1, peerAddresses,
             configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(),
             delegatingPersistenceProvider, this::handleApplyState, LOG, this::executeInSelf);
 
@@ -213,7 +216,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      * Handles a message.
      *
      * @deprecated This method is not final for testing purposes. DO NOT OVERRIDE IT, override
-     * {@link #handleNonRaftCommand(Object)} instead.
+     *             {@link #handleNonRaftCommand(Object)} instead.
      */
     @Deprecated
     @Override
@@ -244,12 +247,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex());
 
             persistence().persistAsync(applyEntries, NoopProcedure.instance());
-
         } else if (message instanceof FindLeader) {
-            getSender().tell(
-                new FindLeaderReply(getLeaderAddress()),
-                getSelf()
-            );
+            getSender().tell(new FindLeaderReply(getLeaderAddress()), getSelf());
         } else if (message instanceof GetOnDemandRaftState) {
             onGetOnDemandRaftStats();
         } else if (message instanceof InitiateCaptureSnapshot) {
@@ -267,7 +266,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         } else if (message instanceof RequestLeadership) {
             onRequestLeadership((RequestLeadership) message);
         } else if (!possiblyHandleBehaviorMessage(message)) {
-            handleNonRaftCommand(message);
+            if (message instanceof JournalProtocol.Response
+                && delegatingPersistenceProvider.handleJournalResponse((JournalProtocol.Response) message)) {
+                LOG.debug("{}: handled a journal response", persistenceId());
+            } else if (message instanceof SnapshotProtocol.Response
+                && delegatingPersistenceProvider.handleSnapshotResponse((SnapshotProtocol.Response) message)) {
+                LOG.debug("{}: handled a snapshot response", persistenceId());
+            } else {
+                handleNonRaftCommand(message);
+            }
         }
     }
 
@@ -963,7 +970,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             this.lastValidLeaderId = lastValidLeaderId;
             this.lastLeaderId = lastLeaderId;
             this.behavior = requireNonNull(behavior);
-            this.leaderPayloadVersion = behavior.getLeaderPayloadVersion();
+            leaderPayloadVersion = behavior.getLeaderPayloadVersion();
         }
 
         @Override