import akka.persistence.JournalProtocol;
import akka.persistence.SnapshotProtocol;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
context = new RaftActorContextImpl(getSelf(), getContext(), id,
new ElectionTermImpl(persistentProvider, id, LOG), -1, -1, peerAddresses,
- configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(),
+ configParams.isPresent() ? configParams.orElseThrow() : new DefaultConfigParamsImpl(),
delegatingPersistenceProvider, this::handleApplyState, LOG, this::executeInSelf);
context.setPayloadVersion(payloadVersion);
Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
if (getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()
&& leaderTransitioning.getLeaderId().equals(getCurrentBehavior().getLeaderId())) {
- roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), null,
+ roleChangeNotifier.orElseThrow().tell(newLeaderStateChanged(getId(), null,
getCurrentBehavior().getLeaderPayloadVersion()), getSelf());
}
}
}
final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
- OnDemandRaftState.AbstractBuilder<?, ?> builder = newOnDemandRaftStateBuilder()
+ final var builder = newOnDemandRaftStateBuilder()
.commitIndex(context.getCommitIndex())
.currentTerm(context.getTermInformation().getCurrentTerm())
.inMemoryJournalDataSize(replicatedLog().dataSize())
}
if (getCurrentBehavior() instanceof AbstractLeader leader) {
- Collection<String> followerIds = leader.getFollowerIds();
- List<FollowerInfo> followerInfoList = new ArrayList<>(followerIds.size());
- for (String id : followerIds) {
- final FollowerLogInformation info = leader.getFollower(id);
- followerInfoList.add(new FollowerInfo(id, info.getNextIndex(), info.getMatchIndex(),
- info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(
- TimeUnit.NANOSECONDS.toMillis(info.nanosSinceLastActivity())),
- context.getPeerInfo(info.getId()).isVoting()));
- }
-
- builder.followerInfoList(followerInfoList);
+ builder.followerInfoList(leader.getFollowerIds().stream()
+ .map(leader::getFollower)
+ .map(info -> new FollowerInfo(info.getId(), info.getNextIndex(), info.getMatchIndex(),
+ info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(
+ TimeUnit.NANOSECONDS.toMillis(info.nanosSinceLastActivity())),
+ context.getPeerInfo(info.getId()).isVoting()))
+ .collect(ImmutableList.toImmutableList()));
}
sender().tell(builder.build(), self());
if (!Objects.equals(lastLeaderId, currentBehavior.getLeaderId())
|| oldBehaviorState.getLeaderPayloadVersion() != currentBehavior.getLeaderPayloadVersion()) {
if (roleChangeNotifier.isPresent()) {
- roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), currentBehavior.getLeaderId(),
+ roleChangeNotifier.orElseThrow().tell(newLeaderStateChanged(getId(), currentBehavior.getLeaderId(),
currentBehavior.getLeaderPayloadVersion()), getSelf());
}
if (roleChangeNotifier.isPresent()
&& (oldBehavior == null || oldBehavior.state() != currentBehavior.state())) {
- roleChangeNotifier.get().tell(new RoleChanged(getId(), oldBehaviorStateName ,
+ roleChangeNotifier.orElseThrow().tell(new RoleChanged(getId(), oldBehaviorStateName ,
currentBehavior.state().name()), getSelf());
}
}
if (wasAppended && hasFollowers()) {
// Send log entry for replication.
- getCurrentBehavior().handleMessage(getSelf(), new Replicate(clientActor, identifier, replicatedLogEntry,
- !batchHint));
+ getCurrentBehavior().handleMessage(getSelf(),
+ new Replicate(replicatedLogEntry.getIndex(), !batchHint, clientActor, identifier));
}
}