Free disk buffers
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / messages / AppendEntries.java
index d77a0841489347ea69177e80764b7fa6a45dd049..892ea3356a58259b5a91b6ab3ea7bcfda8ef70b9 100644 (file)
@@ -5,35 +5,32 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.cluster.raft.messages;
 
 import static java.util.Objects.requireNonNull;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
-import java.util.ArrayList;
 import java.util.List;
-import java.util.Optional;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.controller.cluster.raft.RaftVersions;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 
 /**
- * Invoked by leader to replicate log entries (§5.3); also used as
- * heartbeat (§5.2).
+ * Invoked by leader to replicate log entries (§5.3); also used as heartbeat (§5.2).
  */
-public class AppendEntries extends AbstractRaftRPC {
+public final class AppendEntries extends AbstractRaftRPC {
+    @java.io.Serial
     private static final long serialVersionUID = 1L;
 
     // So that follower can redirect clients
-    private final String leaderId;
+    private final @NonNull String leaderId;
 
     // Index of log entry immediately preceding new ones
     private final long prevLogIndex;
@@ -42,7 +39,7 @@ public class AppendEntries extends AbstractRaftRPC {
     private final long prevLogTerm;
 
     // log entries to store (empty for heart beat - may send more than one for efficiency)
-    private final List<ReplicatedLogEntry> entries;
+    private final @NonNull List<ReplicatedLogEntry> entries;
 
     // leader's commitIndex
     private final long leaderCommit;
@@ -58,9 +55,10 @@ public class AppendEntries extends AbstractRaftRPC {
 
     private final String leaderAddress;
 
-    private AppendEntries(long term, @Nonnull String leaderId, long prevLogIndex, long prevLogTerm,
-            @Nonnull List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex,
-            short payloadVersion, short recipientRaftVersion, short leaderRaftVersion, @Nullable String leaderAddress) {
+    AppendEntries(final long term, @NonNull final String leaderId, final long prevLogIndex,
+            final long prevLogTerm, @NonNull final List<ReplicatedLogEntry> entries, final long leaderCommit,
+            final long replicatedToAllIndex, final short payloadVersion, final short recipientRaftVersion,
+            final short leaderRaftVersion, @Nullable final String leaderAddress) {
         super(term);
         this.leaderId = requireNonNull(leaderId);
         this.prevLogIndex = prevLogIndex;
@@ -74,23 +72,23 @@ public class AppendEntries extends AbstractRaftRPC {
         this.leaderAddress = leaderAddress;
     }
 
-    public AppendEntries(long term, @Nonnull String leaderId, long prevLogIndex, long prevLogTerm,
-            @Nonnull List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex,
-            short payloadVersion, short recipientRaftVersion, @Nullable String leaderAddress) {
+    public AppendEntries(final long term, final @NonNull String leaderId, final long prevLogIndex,
+            final long prevLogTerm, final @NonNull List<ReplicatedLogEntry> entries, final long leaderCommit,
+            final long replicatedToAllIndex, final short payloadVersion, final short recipientRaftVersion,
+            final @Nullable String leaderAddress) {
         this(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, replicatedToAllIndex, payloadVersion,
                 recipientRaftVersion, RaftVersions.CURRENT_VERSION, leaderAddress);
     }
 
     @VisibleForTesting
-    public AppendEntries(long term, @Nonnull String leaderId, long prevLogIndex, long prevLogTerm,
-            @Nonnull List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex,
-            short payloadVersion) {
+    public AppendEntries(final long term, final @NonNull String leaderId, final long prevLogIndex,
+            final long prevLogTerm, final @NonNull List<ReplicatedLogEntry> entries, final long leaderCommit,
+            final long replicatedToAllIndex, final short payloadVersion) {
         this(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, replicatedToAllIndex, payloadVersion,
                 RaftVersions.CURRENT_VERSION, null);
     }
 
-    @Nonnull
-    public String getLeaderId() {
+    public @NonNull String getLeaderId() {
         return leaderId;
     }
 
@@ -102,8 +100,7 @@ public class AppendEntries extends AbstractRaftRPC {
         return prevLogTerm;
     }
 
-    @Nonnull
-    public List<ReplicatedLogEntry> getEntries() {
+    public @NonNull List<ReplicatedLogEntry> getEntries() {
         return entries;
     }
 
@@ -119,8 +116,8 @@ public class AppendEntries extends AbstractRaftRPC {
         return payloadVersion;
     }
 
-    public Optional<String> getLeaderAddress() {
-        return Optional.ofNullable(leaderAddress);
+    public @Nullable String leaderAddress() {
+        return leaderAddress;
     }
 
     public short getLeaderRaftVersion() {
@@ -141,14 +138,16 @@ public class AppendEntries extends AbstractRaftRPC {
                 + ", entries=" + entries + "]";
     }
 
-    private Object writeReplace() {
-        return recipientRaftVersion >= RaftVersions.FLUORINE_VERSION ? new ProxyV2(this) : new Proxy(this);
+    @Override
+    Object writeReplace() {
+        return recipientRaftVersion <= RaftVersions.FLUORINE_VERSION ? new ProxyV2(this) : new AE(this);
     }
 
     /**
      * Fluorine version that adds the leader address.
      */
     private static class ProxyV2 implements Externalizable {
+        @java.io.Serial
         private static final long serialVersionUID = 1L;
 
         private AppendEntries appendEntries;
@@ -159,12 +158,12 @@ public class AppendEntries extends AbstractRaftRPC {
         public ProxyV2() {
         }
 
-        ProxyV2(AppendEntries appendEntries) {
+        ProxyV2(final AppendEntries appendEntries) {
             this.appendEntries = appendEntries;
         }
 
         @Override
-        public void writeExternal(ObjectOutput out) throws IOException {
+        public void writeExternal(final ObjectOutput out) throws IOException {
             out.writeShort(appendEntries.leaderRaftVersion);
             out.writeLong(appendEntries.getTerm());
             out.writeObject(appendEntries.leaderId);
@@ -185,7 +184,7 @@ public class AppendEntries extends AbstractRaftRPC {
         }
 
         @Override
-        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
             short leaderRaftVersion = in.readShort();
             long term = in.readLong();
             String leaderId = (String) in.readObject();
@@ -196,80 +195,19 @@ public class AppendEntries extends AbstractRaftRPC {
             short payloadVersion = in.readShort();
 
             int size = in.readInt();
-            List<ReplicatedLogEntry> entries = new ArrayList<>(size);
+            var entries = ImmutableList.<ReplicatedLogEntry>builderWithExpectedSize(size);
             for (int i = 0; i < size; i++) {
                 entries.add(new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject()));
             }
 
             String leaderAddress = (String)in.readObject();
 
-            appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit,
+            appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries.build(), leaderCommit,
                     replicatedToAllIndex, payloadVersion, RaftVersions.CURRENT_VERSION, leaderRaftVersion,
                     leaderAddress);
         }
 
-        private Object readResolve() {
-            return appendEntries;
-        }
-    }
-
-    /**
-     * Pre-Fluorine version.
-     */
-    @Deprecated
-    private static class Proxy implements Externalizable {
-        private static final long serialVersionUID = 1L;
-
-        private AppendEntries appendEntries;
-
-        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
-        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
-        @SuppressWarnings("checkstyle:RedundantModifier")
-        public Proxy() {
-        }
-
-        Proxy(AppendEntries appendEntries) {
-            this.appendEntries = appendEntries;
-        }
-
-        @Override
-        public void writeExternal(ObjectOutput out) throws IOException {
-            out.writeLong(appendEntries.getTerm());
-            out.writeObject(appendEntries.leaderId);
-            out.writeLong(appendEntries.prevLogTerm);
-            out.writeLong(appendEntries.prevLogIndex);
-            out.writeLong(appendEntries.leaderCommit);
-            out.writeLong(appendEntries.replicatedToAllIndex);
-            out.writeShort(appendEntries.payloadVersion);
-
-            out.writeInt(appendEntries.entries.size());
-            for (ReplicatedLogEntry e: appendEntries.entries) {
-                out.writeLong(e.getIndex());
-                out.writeLong(e.getTerm());
-                out.writeObject(e.getData());
-            }
-        }
-
-        @Override
-        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            long term = in.readLong();
-            String leaderId = (String) in.readObject();
-            long prevLogTerm = in.readLong();
-            long prevLogIndex = in.readLong();
-            long leaderCommit = in.readLong();
-            long replicatedToAllIndex = in.readLong();
-            short payloadVersion = in.readShort();
-
-            int size = in.readInt();
-            List<ReplicatedLogEntry> entries = new ArrayList<>(size);
-            for (int i = 0; i < size; i++) {
-                entries.add(new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject()));
-            }
-
-            appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit,
-                replicatedToAllIndex, payloadVersion, RaftVersions.CURRENT_VERSION, RaftVersions.BORON_VERSION, null);
-        }
-
+        @java.io.Serial
         private Object readResolve() {
             return appendEntries;
         }