X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=67fc8b5b45c34ca6a032d1f15b3ea7727450316b;hb=HEAD;hp=bc4c77af566c44a799a9c4ce8f0489d5912d3c37;hpb=d6d516aa953924121c3cf2a5bf9fd992b9c2b326;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index bc4c77af56..d71d879a5c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -18,11 +18,9 @@ import akka.actor.Status; import akka.persistence.JournalProtocol; import akka.persistence.SnapshotProtocol; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -53,12 +51,12 @@ import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; +import org.opendaylight.controller.cluster.raft.messages.Payload; import org.opendaylight.controller.cluster.raft.messages.RequestLeadership; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.persisted.NoopPayload; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.yangtools.concepts.Identifier; import org.opendaylight.yangtools.concepts.Immutable; @@ -103,8 +101,7 @@ import org.opendaylight.yangtools.concepts.Immutable; * */ public abstract class RaftActor extends AbstractUntypedPersistentActor { - - private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis + private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50); /** * This context should NOT be passed directly to any other actor it is @@ -135,7 +132,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { context = new RaftActorContextImpl(getSelf(), getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG), -1, -1, peerAddresses, - configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(), + configParams.isPresent() ? configParams.orElseThrow() : new DefaultConfigParamsImpl(), delegatingPersistenceProvider, this::handleApplyState, LOG, this::executeInSelf); context.setPayloadVersion(payloadVersion); @@ -228,9 +225,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if (snapshotSupport.handleSnapshotMessage(message, getSender())) { return; } - if (message instanceof ApplyState) { - ApplyState applyState = (ApplyState) message; - + if (message instanceof ApplyState applyState) { if (!hasFollowers()) { // for single node, the capture should happen after the apply state // as we delete messages from the persistent journal which have made it to the snapshot @@ -242,8 +237,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } possiblyHandleBehaviorMessage(message); - } else if (message instanceof ApplyJournalEntries) { - ApplyJournalEntries applyEntries = (ApplyJournalEntries) message; + } else if (message instanceof ApplyJournalEntries applyEntries) { LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex()); persistence().persistAsync(applyEntries, NoopProcedure.instance()); @@ -253,24 +247,24 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { onGetOnDemandRaftStats(); } else if (message instanceof InitiateCaptureSnapshot) { captureSnapshot(); - } else if (message instanceof SwitchBehavior) { - switchBehavior((SwitchBehavior) message); - } else if (message instanceof LeaderTransitioning) { - onLeaderTransitioning((LeaderTransitioning)message); + } else if (message instanceof SwitchBehavior switchBehavior) { + switchBehavior(switchBehavior); + } else if (message instanceof LeaderTransitioning leaderTransitioning) { + onLeaderTransitioning(leaderTransitioning); } else if (message instanceof Shutdown) { onShutDown(); - } else if (message instanceof Runnable) { - ((Runnable)message).run(); - } else if (message instanceof NoopPayload) { - persistData(null, null, (NoopPayload) message, false); - } else if (message instanceof RequestLeadership) { - onRequestLeadership((RequestLeadership) message); + } else if (message instanceof Runnable runnable) { + runnable.run(); + } else if (message instanceof NoopPayload noopPayload) { + persistData(null, null, noopPayload, false); + } else if (message instanceof RequestLeadership requestLeadership) { + onRequestLeadership(requestLeadership); } else if (!possiblyHandleBehaviorMessage(message)) { - if (message instanceof JournalProtocol.Response - && delegatingPersistenceProvider.handleJournalResponse((JournalProtocol.Response) message)) { + if (message instanceof JournalProtocol.Response response + && delegatingPersistenceProvider.handleJournalResponse(response)) { LOG.debug("{}: handled a journal response", persistenceId()); - } else if (message instanceof SnapshotProtocol.Response - && delegatingPersistenceProvider.handleSnapshotResponse((SnapshotProtocol.Response) message)) { + } else if (message instanceof SnapshotProtocol.Response response + && delegatingPersistenceProvider.handleSnapshotResponse(response)) { LOG.debug("{}: handled a snapshot response", persistenceId()); } else { handleNonRaftCommand(message); @@ -419,7 +413,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { Optional roleChangeNotifier = getRoleChangeNotifier(); if (getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent() && leaderTransitioning.getLeaderId().equals(getCurrentBehavior().getLeaderId())) { - roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), null, + roleChangeNotifier.orElseThrow().tell(newLeaderStateChanged(getId(), null, getCurrentBehavior().getLeaderPayloadVersion()), getSelf()); } } @@ -458,7 +452,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } final RaftActorBehavior currentBehavior = context.getCurrentBehavior(); - OnDemandRaftState.AbstractBuilder builder = newOnDemandRaftStateBuilder() + final var builder = newOnDemandRaftStateBuilder() .commitIndex(context.getCommitIndex()) .currentTerm(context.getTermInformation().getCurrentTerm()) .inMemoryJournalDataSize(replicatedLog().dataSize()) @@ -484,19 +478,14 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { builder.lastLogTerm(lastLogEntry.getTerm()); } - if (getCurrentBehavior() instanceof AbstractLeader) { - AbstractLeader leader = (AbstractLeader)getCurrentBehavior(); - Collection followerIds = leader.getFollowerIds(); - List followerInfoList = new ArrayList<>(followerIds.size()); - for (String id: followerIds) { - final FollowerLogInformation info = leader.getFollower(id); - followerInfoList.add(new FollowerInfo(id, info.getNextIndex(), info.getMatchIndex(), - info.isFollowerActive(), DurationFormatUtils.formatDurationHMS( - TimeUnit.NANOSECONDS.toMillis(info.nanosSinceLastActivity())), - context.getPeerInfo(info.getId()).isVoting())); - } - - builder.followerInfoList(followerInfoList); + if (getCurrentBehavior() instanceof AbstractLeader leader) { + builder.followerInfoList(leader.getFollowerIds().stream() + .map(leader::getFollower) + .map(info -> new FollowerInfo(info.getId(), info.getNextIndex(), info.getMatchIndex(), + info.isFollowerActive(), DurationFormatUtils.formatDurationHMS( + TimeUnit.NANOSECONDS.toMillis(info.nanosSinceLastActivity())), + context.getPeerInfo(info.getId()).isVoting())) + .collect(ImmutableList.toImmutableList())); } sender().tell(builder.build(), self()); @@ -523,7 +512,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if (!Objects.equals(lastLeaderId, currentBehavior.getLeaderId()) || oldBehaviorState.getLeaderPayloadVersion() != currentBehavior.getLeaderPayloadVersion()) { if (roleChangeNotifier.isPresent()) { - roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), currentBehavior.getLeaderId(), + roleChangeNotifier.orElseThrow().tell(newLeaderStateChanged(getId(), currentBehavior.getLeaderId(), currentBehavior.getLeaderPayloadVersion()), getSelf()); } @@ -540,7 +529,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if (roleChangeNotifier.isPresent() && (oldBehavior == null || oldBehavior.state() != currentBehavior.state())) { - roleChangeNotifier.get().tell(new RoleChanged(getId(), oldBehaviorStateName , + roleChangeNotifier.orElseThrow().tell(new RoleChanged(getId(), oldBehaviorStateName , currentBehavior.state().name()), getSelf()); } } @@ -632,8 +621,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if (wasAppended && hasFollowers()) { // Send log entry for replication. - getCurrentBehavior().handleMessage(getSelf(), new Replicate(clientActor, identifier, replicatedLogEntry, - !batchHint)); + getCurrentBehavior().handleMessage(getSelf(), + new Replicate(replicatedLogEntry.getIndex(), !batchHint, clientActor, identifier)); } }