Separate out RaftEntryMeta
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
index d10cfae21960750fcdf6af5cdbae14656f14ada6..db35a15c0de7c1b41ff76aae232fc45bcda945dc 100644 (file)
@@ -18,11 +18,9 @@ import akka.actor.Status;
 import akka.persistence.JournalProtocol;
 import akka.persistence.SnapshotProtocol;
 import com.google.common.annotations.VisibleForTesting;
 import akka.persistence.JournalProtocol;
 import akka.persistence.SnapshotProtocol;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
@@ -134,7 +132,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         context = new RaftActorContextImpl(getSelf(), getContext(), id,
             new ElectionTermImpl(persistentProvider, id, LOG), -1, -1, peerAddresses,
 
         context = new RaftActorContextImpl(getSelf(), getContext(), id,
             new ElectionTermImpl(persistentProvider, id, LOG), -1, -1, peerAddresses,
-            configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(),
+            configParams.isPresent() ? configParams.orElseThrow() : new DefaultConfigParamsImpl(),
             delegatingPersistenceProvider, this::handleApplyState, LOG, this::executeInSelf);
 
         context.setPayloadVersion(payloadVersion);
             delegatingPersistenceProvider, this::handleApplyState, LOG, this::executeInSelf);
 
         context.setPayloadVersion(payloadVersion);
@@ -415,7 +413,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
         if (getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()
                 && leaderTransitioning.getLeaderId().equals(getCurrentBehavior().getLeaderId())) {
         Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
         if (getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()
                 && leaderTransitioning.getLeaderId().equals(getCurrentBehavior().getLeaderId())) {
-            roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), null,
+            roleChangeNotifier.orElseThrow().tell(newLeaderStateChanged(getId(), null,
                 getCurrentBehavior().getLeaderPayloadVersion()), getSelf());
         }
     }
                 getCurrentBehavior().getLeaderPayloadVersion()), getSelf());
         }
     }
@@ -446,15 +444,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     private void onGetOnDemandRaftStats() {
         // Debugging message to retrieve raft stats.
 
     private void onGetOnDemandRaftStats() {
         // Debugging message to retrieve raft stats.
 
-        Map<String, String> peerAddresses = new HashMap<>();
-        Map<String, Boolean> peerVotingStates = new HashMap<>();
-        for (PeerInfo info: context.getPeers()) {
+        final var peerAddresses = new HashMap<String, String>();
+        final var peerVotingStates = new HashMap<String, Boolean>();
+        for (var info : context.getPeers()) {
             peerVotingStates.put(info.getId(), info.isVoting());
             peerAddresses.put(info.getId(), info.getAddress() != null ? info.getAddress() : "");
         }
 
             peerVotingStates.put(info.getId(), info.isVoting());
             peerAddresses.put(info.getId(), info.getAddress() != null ? info.getAddress() : "");
         }
 
-        final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
-        OnDemandRaftState.AbstractBuilder<?, ?> builder = newOnDemandRaftStateBuilder()
+        final var currentBehavior = context.getCurrentBehavior();
+        final var builder = newOnDemandRaftStateBuilder()
                 .commitIndex(context.getCommitIndex())
                 .currentTerm(context.getTermInformation().getCurrentTerm())
                 .inMemoryJournalDataSize(replicatedLog().dataSize())
                 .commitIndex(context.getCommitIndex())
                 .currentTerm(context.getTermInformation().getCurrentTerm())
                 .inMemoryJournalDataSize(replicatedLog().dataSize())
@@ -474,28 +472,22 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 .peerVotingStates(peerVotingStates)
                 .customRaftPolicyClassName(context.getConfigParams().getCustomRaftPolicyImplementationClass());
 
                 .peerVotingStates(peerVotingStates)
                 .customRaftPolicyClassName(context.getConfigParams().getCustomRaftPolicyImplementationClass());
 
-        ReplicatedLogEntry lastLogEntry = replicatedLog().last();
+        final var lastLogEntry = replicatedLog().lastMeta();
         if (lastLogEntry != null) {
         if (lastLogEntry != null) {
-            builder.lastLogIndex(lastLogEntry.getIndex());
-            builder.lastLogTerm(lastLogEntry.getTerm());
+            builder.lastLogIndex(lastLogEntry.index()).lastLogTerm(lastLogEntry.term());
         }
 
         }
 
-        if (getCurrentBehavior() instanceof AbstractLeader leader) {
-            Collection<String> followerIds = leader.getFollowerIds();
-            List<FollowerInfo> followerInfoList = new ArrayList<>(followerIds.size());
-            for (String id : followerIds) {
-                final FollowerLogInformation info = leader.getFollower(id);
-                followerInfoList.add(new FollowerInfo(id, info.getNextIndex(), info.getMatchIndex(),
-                        info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(
-                            TimeUnit.NANOSECONDS.toMillis(info.nanosSinceLastActivity())),
-                        context.getPeerInfo(info.getId()).isVoting()));
-            }
-
-            builder.followerInfoList(followerInfoList);
+        if (currentBehavior instanceof AbstractLeader leader) {
+            builder.followerInfoList(leader.getFollowerIds().stream()
+                .map(leader::getFollower)
+                .map(info -> new FollowerInfo(info.getId(), info.getNextIndex(), info.getMatchIndex(),
+                    info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(
+                        TimeUnit.NANOSECONDS.toMillis(info.nanosSinceLastActivity())),
+                    context.getPeerInfo(info.getId()).isVoting()))
+                .collect(ImmutableList.toImmutableList()));
         }
 
         sender().tell(builder.build(), self());
         }
 
         sender().tell(builder.build(), self());
-
     }
 
     protected OnDemandRaftState.AbstractBuilder<?, ?> newOnDemandRaftStateBuilder() {
     }
 
     protected OnDemandRaftState.AbstractBuilder<?, ?> newOnDemandRaftStateBuilder() {
@@ -518,7 +510,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         if (!Objects.equals(lastLeaderId, currentBehavior.getLeaderId())
                 || oldBehaviorState.getLeaderPayloadVersion() != currentBehavior.getLeaderPayloadVersion()) {
             if (roleChangeNotifier.isPresent()) {
         if (!Objects.equals(lastLeaderId, currentBehavior.getLeaderId())
                 || oldBehaviorState.getLeaderPayloadVersion() != currentBehavior.getLeaderPayloadVersion()) {
             if (roleChangeNotifier.isPresent()) {
-                roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), currentBehavior.getLeaderId(),
+                roleChangeNotifier.orElseThrow().tell(newLeaderStateChanged(getId(), currentBehavior.getLeaderId(),
                         currentBehavior.getLeaderPayloadVersion()), getSelf());
             }
 
                         currentBehavior.getLeaderPayloadVersion()), getSelf());
             }
 
@@ -535,7 +527,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         if (roleChangeNotifier.isPresent()
                 && (oldBehavior == null || oldBehavior.state() != currentBehavior.state())) {
 
         if (roleChangeNotifier.isPresent()
                 && (oldBehavior == null || oldBehavior.state() != currentBehavior.state())) {
-            roleChangeNotifier.get().tell(new RoleChanged(getId(), oldBehaviorStateName ,
+            roleChangeNotifier.orElseThrow().tell(new RoleChanged(getId(), oldBehaviorStateName ,
                     currentBehavior.state().name()), getSelf());
         }
     }
                     currentBehavior.state().name()), getSelf());
         }
     }
@@ -543,10 +535,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     private void handleApplyState(final ApplyState applyState) {
         long startTime = System.nanoTime();
 
     private void handleApplyState(final ApplyState applyState) {
         long startTime = System.nanoTime();
 
-        Payload payload = applyState.getReplicatedLogEntry().getData();
+        final var entry = applyState.getReplicatedLogEntry();
+        final var payload = entry.getData();
         if (LOG.isDebugEnabled()) {
             LOG.debug("{}: Applying state for log index {} data {}",
         if (LOG.isDebugEnabled()) {
             LOG.debug("{}: Applying state for log index {} data {}",
-                persistenceId(), applyState.getReplicatedLogEntry().getIndex(), payload);
+                persistenceId(), entry.index(), payload);
         }
 
         if (!(payload instanceof NoopPayload) && !(payload instanceof ServerConfigurationPayload)) {
         }
 
         if (!(payload instanceof NoopPayload) && !(payload instanceof ServerConfigurationPayload)) {
@@ -605,15 +598,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
             if (!hasFollowers()) {
                 // Increment the Commit Index and the Last Applied values
 
             if (!hasFollowers()) {
                 // Increment the Commit Index and the Last Applied values
-                raftContext.setCommitIndex(persistedLogEntry.getIndex());
-                raftContext.setLastApplied(persistedLogEntry.getIndex());
+                raftContext.setCommitIndex(persistedLogEntry.index());
+                raftContext.setLastApplied(persistedLogEntry.index());
 
                 // Apply the state immediately.
                 handleApplyState(new ApplyState(clientActor, identifier, persistedLogEntry));
 
                 // Send a ApplyJournalEntries message so that we write the fact that we applied
                 // the state to durable storage
 
                 // Apply the state immediately.
                 handleApplyState(new ApplyState(clientActor, identifier, persistedLogEntry));
 
                 // Send a ApplyJournalEntries message so that we write the fact that we applied
                 // the state to durable storage
-                self().tell(new ApplyJournalEntries(persistedLogEntry.getIndex()), self());
+                self().tell(new ApplyJournalEntries(persistedLogEntry.index()), self());
 
             } else {
                 context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry);
 
             } else {
                 context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry);
@@ -627,8 +620,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         if (wasAppended && hasFollowers()) {
             // Send log entry for replication.
 
         if (wasAppended && hasFollowers()) {
             // Send log entry for replication.
-            getCurrentBehavior().handleMessage(getSelf(), new Replicate(clientActor, identifier, replicatedLogEntry,
-                    !batchHint));
+            getCurrentBehavior().handleMessage(getSelf(),
+                new Replicate(replicatedLogEntry.index(), !batchHint, clientActor, identifier));
         }
     }
 
         }
     }
 
@@ -901,10 +894,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         if (!snapshotManager.isCapturing()) {
             final long idx = getCurrentBehavior().getReplicatedToAllIndex();
 
         if (!snapshotManager.isCapturing()) {
             final long idx = getCurrentBehavior().getReplicatedToAllIndex();
+            final var last = replicatedLog().lastMeta();
             LOG.debug("Take a snapshot of current state. lastReplicatedLog is {} and replicatedToAllIndex is {}",
             LOG.debug("Take a snapshot of current state. lastReplicatedLog is {} and replicatedToAllIndex is {}",
-                replicatedLog().last(), idx);
+                last, idx);
 
 
-            snapshotManager.captureWithForcedTrim(replicatedLog().last(), idx);
+            snapshotManager.captureWithForcedTrim(last, idx);
         }
     }
 
         }
     }