X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=bc4c77af566c44a799a9c4ce8f0489d5912d3c37;hp=dc97336a49d1e1dfdb7067a9aeaed37d2ebc6f67;hb=d0f46920468c8e4b67c68bd9058572b2d10d75f1;hpb=20733d0406fb31b701e32ee74ee13dc8769a256c diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java old mode 100755 new mode 100644 index dc97336a49..bc4c77af56 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -15,8 +15,11 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.PoisonPill; import akka.actor.Status; +import akka.persistence.JournalProtocol; +import akka.persistence.SnapshotProtocol; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -32,6 +35,7 @@ import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider; import org.opendaylight.controller.cluster.NonPersistentDataProvider; import org.opendaylight.controller.cluster.PersistentDataProvider; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; +import org.opendaylight.controller.cluster.mgmt.api.FollowerInfo; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RoleChanged; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; @@ -46,7 +50,6 @@ import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; -import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo; import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; @@ -123,15 +126,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private boolean shuttingDown; + @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "Akka class design") protected RaftActor(final String id, final Map peerAddresses, final Optional configParams, final short payloadVersion) { persistentProvider = new PersistentDataProvider(this); delegatingPersistenceProvider = new RaftActorDelegatingPersistentDataProvider(null, persistentProvider); - context = new RaftActorContextImpl(this.getSelf(), - this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG), - -1, -1, peerAddresses, + context = new RaftActorContextImpl(getSelf(), getContext(), id, + new ElectionTermImpl(persistentProvider, id, LOG), -1, -1, peerAddresses, configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(), delegatingPersistenceProvider, this::handleApplyState, LOG, this::executeInSelf); @@ -151,7 +154,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } @Override - public void postStop() { + public void postStop() throws Exception { context.close(); super.postStop(); } @@ -213,7 +216,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { * Handles a message. * * @deprecated This method is not final for testing purposes. DO NOT OVERRIDE IT, override - * {@link #handleNonRaftCommand(Object)} instead. + * {@link #handleNonRaftCommand(Object)} instead. */ @Deprecated @Override @@ -244,12 +247,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex()); persistence().persistAsync(applyEntries, NoopProcedure.instance()); - } else if (message instanceof FindLeader) { - getSender().tell( - new FindLeaderReply(getLeaderAddress()), - getSelf() - ); + getSender().tell(new FindLeaderReply(getLeaderAddress()), getSelf()); } else if (message instanceof GetOnDemandRaftState) { onGetOnDemandRaftStats(); } else if (message instanceof InitiateCaptureSnapshot) { @@ -267,7 +266,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } else if (message instanceof RequestLeadership) { onRequestLeadership((RequestLeadership) message); } else if (!possiblyHandleBehaviorMessage(message)) { - handleNonRaftCommand(message); + if (message instanceof JournalProtocol.Response + && delegatingPersistenceProvider.handleJournalResponse((JournalProtocol.Response) message)) { + LOG.debug("{}: handled a journal response", persistenceId()); + } else if (message instanceof SnapshotProtocol.Response + && delegatingPersistenceProvider.handleSnapshotResponse((SnapshotProtocol.Response) message)) { + LOG.debug("{}: handled a snapshot response", persistenceId()); + } else { + handleNonRaftCommand(message); + } } } @@ -480,7 +487,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if (getCurrentBehavior() instanceof AbstractLeader) { AbstractLeader leader = (AbstractLeader)getCurrentBehavior(); Collection followerIds = leader.getFollowerIds(); - List followerInfoList = Lists.newArrayListWithCapacity(followerIds.size()); + List followerInfoList = new ArrayList<>(followerIds.size()); for (String id: followerIds) { final FollowerLogInformation info = leader.getFollower(id); followerInfoList.add(new FollowerInfo(id, info.getNextIndex(), info.getMatchIndex(), @@ -902,7 +909,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { LOG.debug("Take a snapshot of current state. lastReplicatedLog is {} and replicatedToAllIndex is {}", replicatedLog().last(), idx); - snapshotManager.capture(replicatedLog().last(), idx); + snapshotManager.captureWithForcedTrim(replicatedLog().last(), idx); } } @@ -963,7 +970,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { this.lastValidLeaderId = lastValidLeaderId; this.lastLeaderId = lastLeaderId; this.behavior = requireNonNull(behavior); - this.leaderPayloadVersion = behavior.getLeaderPayloadVersion(); + leaderPayloadVersion = behavior.getLeaderPayloadVersion(); } @Override