X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=67fc8b5b45c34ca6a032d1f15b3ea7727450316b;hb=HEAD;hp=775ab0cdef376061df6677cdd2b7b760fa393139;hpb=b4bf55727093657662d8c16a50fa85f87978a586;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java old mode 100755 new mode 100644 index 775ab0cdef..db35a15c0d --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -15,14 +15,15 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.PoisonPill; import akka.actor.Status; +import akka.persistence.JournalProtocol; +import akka.persistence.SnapshotProtocol; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.collect.Lists; -import java.util.Collection; +import com.google.common.collect.ImmutableList; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.time.DurationFormatUtils; import org.eclipse.jdt.annotation.NonNull; @@ -32,6 +33,7 @@ import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider; import org.opendaylight.controller.cluster.NonPersistentDataProvider; import org.opendaylight.controller.cluster.PersistentDataProvider; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; +import org.opendaylight.controller.cluster.mgmt.api.FollowerInfo; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RoleChanged; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; @@ -46,16 +48,15 @@ import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; -import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo; import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; +import org.opendaylight.controller.cluster.raft.messages.Payload; import org.opendaylight.controller.cluster.raft.messages.RequestLeadership; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.persisted.NoopPayload; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.yangtools.concepts.Identifier; import org.opendaylight.yangtools.concepts.Immutable; @@ -100,8 +101,7 @@ import org.opendaylight.yangtools.concepts.Immutable; * */ public abstract class RaftActor extends AbstractUntypedPersistentActor { - - private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis + private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50); /** * This context should NOT be passed directly to any other actor it is @@ -123,17 +123,17 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private boolean shuttingDown; + @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "Akka class design") protected RaftActor(final String id, final Map peerAddresses, final Optional configParams, final short payloadVersion) { persistentProvider = new PersistentDataProvider(this); delegatingPersistenceProvider = new RaftActorDelegatingPersistentDataProvider(null, persistentProvider); - context = new RaftActorContextImpl(this.getSelf(), - this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG), - -1, -1, peerAddresses, - configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(), - delegatingPersistenceProvider, this::handleApplyState, LOG); + context = new RaftActorContextImpl(getSelf(), getContext(), id, + new ElectionTermImpl(persistentProvider, id, LOG), -1, -1, peerAddresses, + configParams.isPresent() ? configParams.orElseThrow() : new DefaultConfigParamsImpl(), + delegatingPersistenceProvider, this::handleApplyState, LOG, this::executeInSelf); context.setPayloadVersion(payloadVersion); context.setReplicatedLog(ReplicatedLogImpl.newInstance(context)); @@ -151,7 +151,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } @Override - public void postStop() { + public void postStop() throws Exception { context.close(); super.postStop(); } @@ -213,7 +213,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { * Handles a message. * * @deprecated This method is not final for testing purposes. DO NOT OVERRIDE IT, override - * {@link #handleNonRaftCommand(Object)} instead. + * {@link #handleNonRaftCommand(Object)} instead. */ @Deprecated @Override @@ -225,9 +225,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if (snapshotSupport.handleSnapshotMessage(message, getSender())) { return; } - if (message instanceof ApplyState) { - ApplyState applyState = (ApplyState) message; - + if (message instanceof ApplyState applyState) { if (!hasFollowers()) { // for single node, the capture should happen after the apply state // as we delete messages from the persistent journal which have made it to the snapshot @@ -239,35 +237,38 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } possiblyHandleBehaviorMessage(message); - } else if (message instanceof ApplyJournalEntries) { - ApplyJournalEntries applyEntries = (ApplyJournalEntries) message; + } else if (message instanceof ApplyJournalEntries applyEntries) { LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex()); persistence().persistAsync(applyEntries, NoopProcedure.instance()); - } else if (message instanceof FindLeader) { - getSender().tell( - new FindLeaderReply(getLeaderAddress()), - getSelf() - ); + getSender().tell(new FindLeaderReply(getLeaderAddress()), getSelf()); } else if (message instanceof GetOnDemandRaftState) { onGetOnDemandRaftStats(); } else if (message instanceof InitiateCaptureSnapshot) { captureSnapshot(); - } else if (message instanceof SwitchBehavior) { - switchBehavior((SwitchBehavior) message); - } else if (message instanceof LeaderTransitioning) { - onLeaderTransitioning((LeaderTransitioning)message); + } else if (message instanceof SwitchBehavior switchBehavior) { + switchBehavior(switchBehavior); + } else if (message instanceof LeaderTransitioning leaderTransitioning) { + onLeaderTransitioning(leaderTransitioning); } else if (message instanceof Shutdown) { onShutDown(); - } else if (message instanceof Runnable) { - ((Runnable)message).run(); - } else if (message instanceof NoopPayload) { - persistData(null, null, (NoopPayload) message, false); - } else if (message instanceof RequestLeadership) { - onRequestLeadership((RequestLeadership) message); + } else if (message instanceof Runnable runnable) { + runnable.run(); + } else if (message instanceof NoopPayload noopPayload) { + persistData(null, null, noopPayload, false); + } else if (message instanceof RequestLeadership requestLeadership) { + onRequestLeadership(requestLeadership); } else if (!possiblyHandleBehaviorMessage(message)) { - handleNonRaftCommand(message); + if (message instanceof JournalProtocol.Response response + && delegatingPersistenceProvider.handleJournalResponse(response)) { + LOG.debug("{}: handled a journal response", persistenceId()); + } else if (message instanceof SnapshotProtocol.Response response + && delegatingPersistenceProvider.handleSnapshotResponse(response)) { + LOG.debug("{}: handled a snapshot response", persistenceId()); + } else { + handleNonRaftCommand(message); + } } } @@ -412,7 +413,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { Optional roleChangeNotifier = getRoleChangeNotifier(); if (getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent() && leaderTransitioning.getLeaderId().equals(getCurrentBehavior().getLeaderId())) { - roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), null, + roleChangeNotifier.orElseThrow().tell(newLeaderStateChanged(getId(), null, getCurrentBehavior().getLeaderPayloadVersion()), getSelf()); } } @@ -443,15 +444,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private void onGetOnDemandRaftStats() { // Debugging message to retrieve raft stats. - Map peerAddresses = new HashMap<>(); - Map peerVotingStates = new HashMap<>(); - for (PeerInfo info: context.getPeers()) { + final var peerAddresses = new HashMap(); + final var peerVotingStates = new HashMap(); + for (var info : context.getPeers()) { peerVotingStates.put(info.getId(), info.isVoting()); peerAddresses.put(info.getId(), info.getAddress() != null ? info.getAddress() : ""); } - final RaftActorBehavior currentBehavior = context.getCurrentBehavior(); - OnDemandRaftState.AbstractBuilder builder = newOnDemandRaftStateBuilder() + final var currentBehavior = context.getCurrentBehavior(); + final var builder = newOnDemandRaftStateBuilder() .commitIndex(context.getCommitIndex()) .currentTerm(context.getTermInformation().getCurrentTerm()) .inMemoryJournalDataSize(replicatedLog().dataSize()) @@ -471,29 +472,22 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { .peerVotingStates(peerVotingStates) .customRaftPolicyClassName(context.getConfigParams().getCustomRaftPolicyImplementationClass()); - ReplicatedLogEntry lastLogEntry = replicatedLog().last(); + final var lastLogEntry = replicatedLog().lastMeta(); if (lastLogEntry != null) { - builder.lastLogIndex(lastLogEntry.getIndex()); - builder.lastLogTerm(lastLogEntry.getTerm()); + builder.lastLogIndex(lastLogEntry.index()).lastLogTerm(lastLogEntry.term()); } - if (getCurrentBehavior() instanceof AbstractLeader) { - AbstractLeader leader = (AbstractLeader)getCurrentBehavior(); - Collection followerIds = leader.getFollowerIds(); - List followerInfoList = Lists.newArrayListWithCapacity(followerIds.size()); - for (String id: followerIds) { - final FollowerLogInformation info = leader.getFollower(id); - followerInfoList.add(new FollowerInfo(id, info.getNextIndex(), info.getMatchIndex(), - info.isFollowerActive(), DurationFormatUtils.formatDurationHMS( - TimeUnit.NANOSECONDS.toMillis(info.nanosSinceLastActivity())), - context.getPeerInfo(info.getId()).isVoting())); - } - - builder.followerInfoList(followerInfoList); + if (currentBehavior instanceof AbstractLeader leader) { + builder.followerInfoList(leader.getFollowerIds().stream() + .map(leader::getFollower) + .map(info -> new FollowerInfo(info.getId(), info.getNextIndex(), info.getMatchIndex(), + info.isFollowerActive(), DurationFormatUtils.formatDurationHMS( + TimeUnit.NANOSECONDS.toMillis(info.nanosSinceLastActivity())), + context.getPeerInfo(info.getId()).isVoting())) + .collect(ImmutableList.toImmutableList())); } sender().tell(builder.build(), self()); - } protected OnDemandRaftState.AbstractBuilder newOnDemandRaftStateBuilder() { @@ -516,7 +510,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if (!Objects.equals(lastLeaderId, currentBehavior.getLeaderId()) || oldBehaviorState.getLeaderPayloadVersion() != currentBehavior.getLeaderPayloadVersion()) { if (roleChangeNotifier.isPresent()) { - roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), currentBehavior.getLeaderId(), + roleChangeNotifier.orElseThrow().tell(newLeaderStateChanged(getId(), currentBehavior.getLeaderId(), currentBehavior.getLeaderPayloadVersion()), getSelf()); } @@ -533,7 +527,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if (roleChangeNotifier.isPresent() && (oldBehavior == null || oldBehavior.state() != currentBehavior.state())) { - roleChangeNotifier.get().tell(new RoleChanged(getId(), oldBehaviorStateName , + roleChangeNotifier.orElseThrow().tell(new RoleChanged(getId(), oldBehaviorStateName , currentBehavior.state().name()), getSelf()); } } @@ -541,10 +535,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private void handleApplyState(final ApplyState applyState) { long startTime = System.nanoTime(); - Payload payload = applyState.getReplicatedLogEntry().getData(); + final var entry = applyState.getReplicatedLogEntry(); + final var payload = entry.getData(); if (LOG.isDebugEnabled()) { LOG.debug("{}: Applying state for log index {} data {}", - persistenceId(), applyState.getReplicatedLogEntry().getIndex(), payload); + persistenceId(), entry.index(), payload); } if (!(payload instanceof NoopPayload) && !(payload instanceof ServerConfigurationPayload)) { @@ -603,15 +598,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if (!hasFollowers()) { // Increment the Commit Index and the Last Applied values - raftContext.setCommitIndex(persistedLogEntry.getIndex()); - raftContext.setLastApplied(persistedLogEntry.getIndex()); + raftContext.setCommitIndex(persistedLogEntry.index()); + raftContext.setLastApplied(persistedLogEntry.index()); // Apply the state immediately. handleApplyState(new ApplyState(clientActor, identifier, persistedLogEntry)); // Send a ApplyJournalEntries message so that we write the fact that we applied // the state to durable storage - self().tell(new ApplyJournalEntries(persistedLogEntry.getIndex()), self()); + self().tell(new ApplyJournalEntries(persistedLogEntry.index()), self()); } else { context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry); @@ -625,8 +620,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if (wasAppended && hasFollowers()) { // Send log entry for replication. - getCurrentBehavior().handleMessage(getSelf(), new Replicate(clientActor, identifier, replicatedLogEntry, - !batchHint)); + getCurrentBehavior().handleMessage(getSelf(), + new Replicate(replicatedLogEntry.index(), !batchHint, clientActor, identifier)); } } @@ -899,10 +894,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if (!snapshotManager.isCapturing()) { final long idx = getCurrentBehavior().getReplicatedToAllIndex(); + final var last = replicatedLog().lastMeta(); LOG.debug("Take a snapshot of current state. lastReplicatedLog is {} and replicatedToAllIndex is {}", - replicatedLog().last(), idx); + last, idx); - snapshotManager.capture(replicatedLog().last(), idx); + snapshotManager.captureWithForcedTrim(last, idx); } } @@ -963,7 +959,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { this.lastValidLeaderId = lastValidLeaderId; this.lastLeaderId = lastLeaderId; this.behavior = requireNonNull(behavior); - this.leaderPayloadVersion = behavior.getLeaderPayloadVersion(); + leaderPayloadVersion = behavior.getLeaderPayloadVersion(); } @Override