X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=d10cfae21960750fcdf6af5cdbae14656f14ada6;hb=537c44438b1e8854b719903ac2a4a0d5e12d6606;hp=494ec98b8f0300951aa92d470cda7b1ce142ef81;hpb=614e6974b6e79c0eb21f4b114139ad5d07e5c96c;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java old mode 100755 new mode 100644 index 494ec98b8f..d10cfae219 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -15,7 +15,10 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.PoisonPill; import akka.actor.Status; +import akka.persistence.JournalProtocol; +import akka.persistence.SnapshotProtocol; import com.google.common.annotations.VisibleForTesting; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -32,6 +35,7 @@ import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider; import org.opendaylight.controller.cluster.NonPersistentDataProvider; import org.opendaylight.controller.cluster.PersistentDataProvider; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; +import org.opendaylight.controller.cluster.mgmt.api.FollowerInfo; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RoleChanged; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; @@ -46,16 +50,15 @@ import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; -import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo; import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; +import org.opendaylight.controller.cluster.raft.messages.Payload; import org.opendaylight.controller.cluster.raft.messages.RequestLeadership; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.persisted.NoopPayload; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.yangtools.concepts.Identifier; import org.opendaylight.yangtools.concepts.Immutable; @@ -100,8 +103,7 @@ import org.opendaylight.yangtools.concepts.Immutable; * */ public abstract class RaftActor extends AbstractUntypedPersistentActor { - - private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis + private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50); /** * This context should NOT be passed directly to any other actor it is @@ -123,15 +125,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private boolean shuttingDown; + @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "Akka class design") protected RaftActor(final String id, final Map peerAddresses, final Optional configParams, final short payloadVersion) { persistentProvider = new PersistentDataProvider(this); delegatingPersistenceProvider = new RaftActorDelegatingPersistentDataProvider(null, persistentProvider); - context = new RaftActorContextImpl(this.getSelf(), - this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG), - -1, -1, peerAddresses, + context = new RaftActorContextImpl(getSelf(), getContext(), id, + new ElectionTermImpl(persistentProvider, id, LOG), -1, -1, peerAddresses, configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(), delegatingPersistenceProvider, this::handleApplyState, LOG, this::executeInSelf); @@ -213,7 +215,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { * Handles a message. * * @deprecated This method is not final for testing purposes. DO NOT OVERRIDE IT, override - * {@link #handleNonRaftCommand(Object)} instead. + * {@link #handleNonRaftCommand(Object)} instead. */ @Deprecated @Override @@ -225,9 +227,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if (snapshotSupport.handleSnapshotMessage(message, getSender())) { return; } - if (message instanceof ApplyState) { - ApplyState applyState = (ApplyState) message; - + if (message instanceof ApplyState applyState) { if (!hasFollowers()) { // for single node, the capture should happen after the apply state // as we delete messages from the persistent journal which have made it to the snapshot @@ -239,35 +239,38 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } possiblyHandleBehaviorMessage(message); - } else if (message instanceof ApplyJournalEntries) { - ApplyJournalEntries applyEntries = (ApplyJournalEntries) message; + } else if (message instanceof ApplyJournalEntries applyEntries) { LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex()); persistence().persistAsync(applyEntries, NoopProcedure.instance()); - } else if (message instanceof FindLeader) { - getSender().tell( - new FindLeaderReply(getLeaderAddress()), - getSelf() - ); + getSender().tell(new FindLeaderReply(getLeaderAddress()), getSelf()); } else if (message instanceof GetOnDemandRaftState) { onGetOnDemandRaftStats(); } else if (message instanceof InitiateCaptureSnapshot) { captureSnapshot(); - } else if (message instanceof SwitchBehavior) { - switchBehavior((SwitchBehavior) message); - } else if (message instanceof LeaderTransitioning) { - onLeaderTransitioning((LeaderTransitioning)message); + } else if (message instanceof SwitchBehavior switchBehavior) { + switchBehavior(switchBehavior); + } else if (message instanceof LeaderTransitioning leaderTransitioning) { + onLeaderTransitioning(leaderTransitioning); } else if (message instanceof Shutdown) { onShutDown(); - } else if (message instanceof Runnable) { - ((Runnable)message).run(); - } else if (message instanceof NoopPayload) { - persistData(null, null, (NoopPayload) message, false); - } else if (message instanceof RequestLeadership) { - onRequestLeadership((RequestLeadership) message); + } else if (message instanceof Runnable runnable) { + runnable.run(); + } else if (message instanceof NoopPayload noopPayload) { + persistData(null, null, noopPayload, false); + } else if (message instanceof RequestLeadership requestLeadership) { + onRequestLeadership(requestLeadership); } else if (!possiblyHandleBehaviorMessage(message)) { - handleNonRaftCommand(message); + if (message instanceof JournalProtocol.Response response + && delegatingPersistenceProvider.handleJournalResponse(response)) { + LOG.debug("{}: handled a journal response", persistenceId()); + } else if (message instanceof SnapshotProtocol.Response response + && delegatingPersistenceProvider.handleSnapshotResponse(response)) { + LOG.debug("{}: handled a snapshot response", persistenceId()); + } else { + handleNonRaftCommand(message); + } } } @@ -477,11 +480,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { builder.lastLogTerm(lastLogEntry.getTerm()); } - if (getCurrentBehavior() instanceof AbstractLeader) { - AbstractLeader leader = (AbstractLeader)getCurrentBehavior(); + if (getCurrentBehavior() instanceof AbstractLeader leader) { Collection followerIds = leader.getFollowerIds(); List followerInfoList = new ArrayList<>(followerIds.size()); - for (String id: followerIds) { + for (String id : followerIds) { final FollowerLogInformation info = leader.getFollower(id); followerInfoList.add(new FollowerInfo(id, info.getNextIndex(), info.getMatchIndex(), info.isFollowerActive(), DurationFormatUtils.formatDurationHMS( @@ -902,7 +904,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { LOG.debug("Take a snapshot of current state. lastReplicatedLog is {} and replicatedToAllIndex is {}", replicatedLog().last(), idx); - snapshotManager.capture(replicatedLog().last(), idx); + snapshotManager.captureWithForcedTrim(replicatedLog().last(), idx); } } @@ -963,7 +965,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { this.lastValidLeaderId = lastValidLeaderId; this.lastLeaderId = lastLeaderId; this.behavior = requireNonNull(behavior); - this.leaderPayloadVersion = behavior.getLeaderPayloadVersion(); + leaderPayloadVersion = behavior.getLeaderPayloadVersion(); } @Override