X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=bc4c77af566c44a799a9c4ce8f0489d5912d3c37;hb=refs%2Fchanges%2F69%2F100769%2F1;hp=6cbeda6098d2026225767d1564aae76109a1fa00;hpb=634dfac8eead60f443bf75e749c70d1f2bb29198;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java old mode 100755 new mode 100644 index 6cbeda6098..bc4c77af56 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -6,32 +6,36 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.raft; +import static com.google.common.base.Verify.verify; +import static java.util.Objects.requireNonNull; + import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.PoisonPill; import akka.actor.Status; +import akka.persistence.JournalProtocol; +import akka.persistence.SnapshotProtocol; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.base.Verify; -import com.google.common.collect.Lists; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.TimeUnit; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; import org.apache.commons.lang3.time.DurationFormatUtils; +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider; import org.opendaylight.controller.cluster.NonPersistentDataProvider; import org.opendaylight.controller.cluster.PersistentDataProvider; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; +import org.opendaylight.controller.cluster.mgmt.api.FollowerInfo; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RoleChanged; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; @@ -46,7 +50,6 @@ import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; -import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo; import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; @@ -123,17 +126,17 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private boolean shuttingDown; + @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "Akka class design") protected RaftActor(final String id, final Map peerAddresses, final Optional configParams, final short payloadVersion) { persistentProvider = new PersistentDataProvider(this); delegatingPersistenceProvider = new RaftActorDelegatingPersistentDataProvider(null, persistentProvider); - context = new RaftActorContextImpl(this.getSelf(), - this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG), - -1, -1, peerAddresses, + context = new RaftActorContextImpl(getSelf(), getContext(), id, + new ElectionTermImpl(persistentProvider, id, LOG), -1, -1, peerAddresses, configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(), - delegatingPersistenceProvider, this::handleApplyState, LOG); + delegatingPersistenceProvider, this::handleApplyState, LOG, this::executeInSelf); context.setPayloadVersion(payloadVersion); context.setReplicatedLog(ReplicatedLogImpl.newInstance(context)); @@ -151,7 +154,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } @Override - public void postStop() { + public void postStop() throws Exception { context.close(); super.postStop(); } @@ -213,7 +216,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { * Handles a message. * * @deprecated This method is not final for testing purposes. DO NOT OVERRIDE IT, override - * {@link #handleNonRaftCommand(Object)} instead. + * {@link #handleNonRaftCommand(Object)} instead. */ @Deprecated @Override @@ -244,12 +247,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex()); persistence().persistAsync(applyEntries, NoopProcedure.instance()); - } else if (message instanceof FindLeader) { - getSender().tell( - new FindLeaderReply(getLeaderAddress()), - getSelf() - ); + getSender().tell(new FindLeaderReply(getLeaderAddress()), getSelf()); } else if (message instanceof GetOnDemandRaftState) { onGetOnDemandRaftStats(); } else if (message instanceof InitiateCaptureSnapshot) { @@ -267,7 +266,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } else if (message instanceof RequestLeadership) { onRequestLeadership((RequestLeadership) message); } else if (!possiblyHandleBehaviorMessage(message)) { - handleNonRaftCommand(message); + if (message instanceof JournalProtocol.Response + && delegatingPersistenceProvider.handleJournalResponse((JournalProtocol.Response) message)) { + LOG.debug("{}: handled a journal response", persistenceId()); + } else if (message instanceof SnapshotProtocol.Response + && delegatingPersistenceProvider.handleSnapshotResponse((SnapshotProtocol.Response) message)) { + LOG.debug("{}: handled a snapshot response", persistenceId()); + } else { + handleNonRaftCommand(message); + } } } @@ -327,7 +334,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete, - @Nullable final String followerId, final long newLeaderTimeoutInMillis) { + final @Nullable String followerId, final long newLeaderTimeoutInMillis) { LOG.debug("{}: Initiating leader transfer", persistenceId()); RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort(); @@ -480,7 +487,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if (getCurrentBehavior() instanceof AbstractLeader) { AbstractLeader leader = (AbstractLeader)getCurrentBehavior(); Collection followerIds = leader.getFollowerIds(); - List followerInfoList = Lists.newArrayListWithCapacity(followerIds.size()); + List followerInfoList = new ArrayList<>(followerIds.size()); for (String id: followerIds) { final FollowerLogInformation info = leader.getFollower(id); followerInfoList.add(new FollowerInfo(id, info.getNextIndex(), info.getMatchIndex(), @@ -814,8 +821,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { /** * Returns the RaftActorRecoveryCohort to participate in persistence recovery. */ - @Nonnull - protected abstract RaftActorRecoveryCohort getRaftActorRecoveryCohort(); + protected abstract @NonNull RaftActorRecoveryCohort getRaftActorRecoveryCohort(); /** * This method is called when recovery is complete. @@ -825,8 +831,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { /** * Returns the RaftActorSnapshotCohort to participate in snapshot captures. */ - @Nonnull - protected abstract RaftActorSnapshotCohort getRaftActorSnapshotCohort(); + protected abstract @NonNull RaftActorSnapshotCohort getRaftActorSnapshotCohort(); /** * This method will be called by the RaftActor when the state of the @@ -904,7 +909,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { LOG.debug("Take a snapshot of current state. lastReplicatedLog is {} and replicatedToAllIndex is {}", replicatedLog().last(), idx); - snapshotManager.capture(replicatedLog().last(), idx); + snapshotManager.captureWithForcedTrim(replicatedLog().last(), idx); } } @@ -948,7 +953,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { @Nullable abstract String getLastLeaderId(); - @Nullable abstract short getLeaderPayloadVersion(); + abstract short getLeaderPayloadVersion(); } /** @@ -964,8 +969,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { final RaftActorBehavior behavior) { this.lastValidLeaderId = lastValidLeaderId; this.lastLeaderId = lastLeaderId; - this.behavior = Preconditions.checkNotNull(behavior); - this.leaderPayloadVersion = behavior.getLeaderPayloadVersion(); + this.behavior = requireNonNull(behavior); + leaderPayloadVersion = behavior.getLeaderPayloadVersion(); } @Override @@ -1027,7 +1032,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { BehaviorState capture(final RaftActorBehavior behavior) { if (behavior == null) { - Verify.verify(lastValidLeaderId == null, "Null behavior with non-null last leader"); + verify(lastValidLeaderId == null, "Null behavior with non-null last leader"); return NULL_BEHAVIOR_STATE; }