X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=67fc8b5b45c34ca6a032d1f15b3ea7727450316b;hb=HEAD;hp=d10cfae21960750fcdf6af5cdbae14656f14ada6;hpb=d610d46f30872ebdea65686d0ef8535ac251f582;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index d10cfae219..db35a15c0d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -18,11 +18,9 @@ import akka.actor.Status; import akka.persistence.JournalProtocol; import akka.persistence.SnapshotProtocol; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -134,7 +132,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { context = new RaftActorContextImpl(getSelf(), getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG), -1, -1, peerAddresses, - configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(), + configParams.isPresent() ? configParams.orElseThrow() : new DefaultConfigParamsImpl(), delegatingPersistenceProvider, this::handleApplyState, LOG, this::executeInSelf); context.setPayloadVersion(payloadVersion); @@ -415,7 +413,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { Optional roleChangeNotifier = getRoleChangeNotifier(); if (getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent() && leaderTransitioning.getLeaderId().equals(getCurrentBehavior().getLeaderId())) { - roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), null, + roleChangeNotifier.orElseThrow().tell(newLeaderStateChanged(getId(), null, getCurrentBehavior().getLeaderPayloadVersion()), getSelf()); } } @@ -446,15 +444,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private void onGetOnDemandRaftStats() { // Debugging message to retrieve raft stats. - Map peerAddresses = new HashMap<>(); - Map peerVotingStates = new HashMap<>(); - for (PeerInfo info: context.getPeers()) { + final var peerAddresses = new HashMap(); + final var peerVotingStates = new HashMap(); + for (var info : context.getPeers()) { peerVotingStates.put(info.getId(), info.isVoting()); peerAddresses.put(info.getId(), info.getAddress() != null ? info.getAddress() : ""); } - final RaftActorBehavior currentBehavior = context.getCurrentBehavior(); - OnDemandRaftState.AbstractBuilder builder = newOnDemandRaftStateBuilder() + final var currentBehavior = context.getCurrentBehavior(); + final var builder = newOnDemandRaftStateBuilder() .commitIndex(context.getCommitIndex()) .currentTerm(context.getTermInformation().getCurrentTerm()) .inMemoryJournalDataSize(replicatedLog().dataSize()) @@ -474,28 +472,22 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { .peerVotingStates(peerVotingStates) .customRaftPolicyClassName(context.getConfigParams().getCustomRaftPolicyImplementationClass()); - ReplicatedLogEntry lastLogEntry = replicatedLog().last(); + final var lastLogEntry = replicatedLog().lastMeta(); if (lastLogEntry != null) { - builder.lastLogIndex(lastLogEntry.getIndex()); - builder.lastLogTerm(lastLogEntry.getTerm()); + builder.lastLogIndex(lastLogEntry.index()).lastLogTerm(lastLogEntry.term()); } - if (getCurrentBehavior() instanceof AbstractLeader leader) { - Collection followerIds = leader.getFollowerIds(); - List followerInfoList = new ArrayList<>(followerIds.size()); - for (String id : followerIds) { - final FollowerLogInformation info = leader.getFollower(id); - followerInfoList.add(new FollowerInfo(id, info.getNextIndex(), info.getMatchIndex(), - info.isFollowerActive(), DurationFormatUtils.formatDurationHMS( - TimeUnit.NANOSECONDS.toMillis(info.nanosSinceLastActivity())), - context.getPeerInfo(info.getId()).isVoting())); - } - - builder.followerInfoList(followerInfoList); + if (currentBehavior instanceof AbstractLeader leader) { + builder.followerInfoList(leader.getFollowerIds().stream() + .map(leader::getFollower) + .map(info -> new FollowerInfo(info.getId(), info.getNextIndex(), info.getMatchIndex(), + info.isFollowerActive(), DurationFormatUtils.formatDurationHMS( + TimeUnit.NANOSECONDS.toMillis(info.nanosSinceLastActivity())), + context.getPeerInfo(info.getId()).isVoting())) + .collect(ImmutableList.toImmutableList())); } sender().tell(builder.build(), self()); - } protected OnDemandRaftState.AbstractBuilder newOnDemandRaftStateBuilder() { @@ -518,7 +510,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if (!Objects.equals(lastLeaderId, currentBehavior.getLeaderId()) || oldBehaviorState.getLeaderPayloadVersion() != currentBehavior.getLeaderPayloadVersion()) { if (roleChangeNotifier.isPresent()) { - roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), currentBehavior.getLeaderId(), + roleChangeNotifier.orElseThrow().tell(newLeaderStateChanged(getId(), currentBehavior.getLeaderId(), currentBehavior.getLeaderPayloadVersion()), getSelf()); } @@ -535,7 +527,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if (roleChangeNotifier.isPresent() && (oldBehavior == null || oldBehavior.state() != currentBehavior.state())) { - roleChangeNotifier.get().tell(new RoleChanged(getId(), oldBehaviorStateName , + roleChangeNotifier.orElseThrow().tell(new RoleChanged(getId(), oldBehaviorStateName , currentBehavior.state().name()), getSelf()); } } @@ -543,10 +535,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private void handleApplyState(final ApplyState applyState) { long startTime = System.nanoTime(); - Payload payload = applyState.getReplicatedLogEntry().getData(); + final var entry = applyState.getReplicatedLogEntry(); + final var payload = entry.getData(); if (LOG.isDebugEnabled()) { LOG.debug("{}: Applying state for log index {} data {}", - persistenceId(), applyState.getReplicatedLogEntry().getIndex(), payload); + persistenceId(), entry.index(), payload); } if (!(payload instanceof NoopPayload) && !(payload instanceof ServerConfigurationPayload)) { @@ -605,15 +598,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if (!hasFollowers()) { // Increment the Commit Index and the Last Applied values - raftContext.setCommitIndex(persistedLogEntry.getIndex()); - raftContext.setLastApplied(persistedLogEntry.getIndex()); + raftContext.setCommitIndex(persistedLogEntry.index()); + raftContext.setLastApplied(persistedLogEntry.index()); // Apply the state immediately. handleApplyState(new ApplyState(clientActor, identifier, persistedLogEntry)); // Send a ApplyJournalEntries message so that we write the fact that we applied // the state to durable storage - self().tell(new ApplyJournalEntries(persistedLogEntry.getIndex()), self()); + self().tell(new ApplyJournalEntries(persistedLogEntry.index()), self()); } else { context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry); @@ -627,8 +620,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if (wasAppended && hasFollowers()) { // Send log entry for replication. - getCurrentBehavior().handleMessage(getSelf(), new Replicate(clientActor, identifier, replicatedLogEntry, - !batchHint)); + getCurrentBehavior().handleMessage(getSelf(), + new Replicate(replicatedLogEntry.index(), !batchHint, clientActor, identifier)); } } @@ -901,10 +894,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if (!snapshotManager.isCapturing()) { final long idx = getCurrentBehavior().getReplicatedToAllIndex(); + final var last = replicatedLog().lastMeta(); LOG.debug("Take a snapshot of current state. lastReplicatedLog is {} and replicatedToAllIndex is {}", - replicatedLog().last(), idx); + last, idx); - snapshotManager.captureWithForcedTrim(replicatedLog().last(), idx); + snapshotManager.captureWithForcedTrim(last, idx); } }