X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=350fa54c8302c800e21ed065734bd10ab0e7badd;hp=1c1f9114ae89e655a50bd5beb9a5885bc80609df;hb=348a37f613ef444b10a0e65b400390396552fc48;hpb=57d7e4788a488d992b9868d44ebc392b06e317c5 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 1c1f9114ae..350fa54c83 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -12,7 +12,6 @@ package org.opendaylight.controller.cluster.raft; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.PoisonPill; -import akka.japi.Procedure; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; @@ -52,6 +51,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftS import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.yangtools.concepts.Identifier; import org.opendaylight.yangtools.concepts.Immutable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -205,7 +205,24 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { handleBehaviorChange(state, newBehavior); } + /** + * Method exposed for subclasses to plug-in their logic. This method is invoked by {@link #handleCommand(Object)} + * for messages which are not handled by this class. Subclasses overriding this class should fall back to this + * implementation for messages which they do not handle + * + * @param message Incoming command message + */ + protected void handleNonRaftCommand(final Object message) { + unhandled(message); + } + + /** + * @deprecated This method is not final for testing purposes. DO NOT OVERRIDE IT, override + * {@link #handleNonRaftCommand(Object)} instead. + */ + @Deprecated @Override + // FIXME: make this method final once our unit tests do not need to override it protected void handleCommand(final Object message) { if (serverConfigurationSupport.handleMessage(message, getSender())) { return; @@ -273,8 +290,16 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { // Processing the message may affect the state, hence we need to capture it final RaftActorBehavior currentBehavior = getCurrentBehavior(); final BehaviorState state = behaviorStateTracker.capture(currentBehavior); + + // A behavior indicates that it processed the change by returning a reference to the next behavior + // to be used. A null return indicates it has not processed the message and we should be passing it to + // the subclass for handling. final RaftActorBehavior nextBehavior = currentBehavior.handleMessage(getSender(), message); - switchBehavior(state, nextBehavior); + if (nextBehavior != null) { + switchBehavior(state, nextBehavior); + } else { + handleNonRaftCommand(message); + } } } @@ -487,8 +512,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { * @param identifier * @param data */ - protected void persistData(final ActorRef clientActor, final String identifier, - final Payload data) { + protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data) { ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry( context.getReplicatedLog().lastIndex() + 1, @@ -500,28 +524,25 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { final RaftActorContext raftContext = getRaftActorContext(); - replicatedLog().appendAndPersist(replicatedLogEntry, new Procedure() { - @Override - public void apply(ReplicatedLogEntry replicatedLogEntry) { - if (!hasFollowers()){ - // Increment the Commit Index and the Last Applied values - raftContext.setCommitIndex(replicatedLogEntry.getIndex()); - raftContext.setLastApplied(replicatedLogEntry.getIndex()); + replicatedLog().appendAndPersist(replicatedLogEntry, replicatedLogEntry1 -> { + if (!hasFollowers()){ + // Increment the Commit Index and the Last Applied values + raftContext.setCommitIndex(replicatedLogEntry1.getIndex()); + raftContext.setLastApplied(replicatedLogEntry1.getIndex()); - // Apply the state immediately. - self().tell(new ApplyState(clientActor, identifier, replicatedLogEntry), self()); + // Apply the state immediately. + self().tell(new ApplyState(clientActor, identifier, replicatedLogEntry1), self()); - // Send a ApplyJournalEntries message so that we write the fact that we applied - // the state to durable storage - self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self()); + // Send a ApplyJournalEntries message so that we write the fact that we applied + // the state to durable storage + self().tell(new ApplyJournalEntries(replicatedLogEntry1.getIndex()), self()); - } else if (clientActor != null) { - context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry); + } else if (clientActor != null) { + context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry1); - // Send message for replication - getCurrentBehavior().handleMessage(getSelf(), - new Replicate(clientActor, identifier, replicatedLogEntry)); - } + // Send message for replication + getCurrentBehavior().handleMessage(getSelf(), + new Replicate(clientActor, identifier, replicatedLogEntry1)); } }); } @@ -699,8 +720,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { * @param data A piece of data that was persisted by the persistData call. * This should NEVER be null. */ - protected abstract void applyState(ActorRef clientActor, String identifier, - Object data); + protected abstract void applyState(ActorRef clientActor, Identifier identifier, Object data); /** * Returns the RaftActorRecoveryCohort to participate in persistence recovery.