Send leader's full address via AppendEntries
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / messages / AppendEntries.java
index 4d4b7e4fdd8204bc29dd8a66667a47913e2aca18..d77a0841489347ea69177e80764b7fa6a45dd049 100644 (file)
@@ -8,14 +8,19 @@
 
 package org.opendaylight.controller.cluster.raft.messages;
 
-import com.google.common.base.Preconditions;
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.annotations.VisibleForTesting;
 import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.raft.RaftVersions;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
@@ -47,17 +52,41 @@ public class AppendEntries extends AbstractRaftRPC {
 
     private final short payloadVersion;
 
-    public AppendEntries(long term, @Nonnull String leaderId, long prevLogIndex, long prevLogTerm,
+    private final short recipientRaftVersion;
+
+    private final short leaderRaftVersion;
+
+    private final String leaderAddress;
+
+    private AppendEntries(long term, @Nonnull String leaderId, long prevLogIndex, long prevLogTerm,
             @Nonnull List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex,
-            short payloadVersion) {
+            short payloadVersion, short recipientRaftVersion, short leaderRaftVersion, @Nullable String leaderAddress) {
         super(term);
-        this.leaderId = Preconditions.checkNotNull(leaderId);
+        this.leaderId = requireNonNull(leaderId);
         this.prevLogIndex = prevLogIndex;
         this.prevLogTerm = prevLogTerm;
-        this.entries = Preconditions.checkNotNull(entries);
+        this.entries = requireNonNull(entries);
         this.leaderCommit = leaderCommit;
         this.replicatedToAllIndex = replicatedToAllIndex;
         this.payloadVersion = payloadVersion;
+        this.recipientRaftVersion = recipientRaftVersion;
+        this.leaderRaftVersion = leaderRaftVersion;
+        this.leaderAddress = leaderAddress;
+    }
+
+    public AppendEntries(long term, @Nonnull String leaderId, long prevLogIndex, long prevLogTerm,
+            @Nonnull List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex,
+            short payloadVersion, short recipientRaftVersion, @Nullable String leaderAddress) {
+        this(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, replicatedToAllIndex, payloadVersion,
+                recipientRaftVersion, RaftVersions.CURRENT_VERSION, leaderAddress);
+    }
+
+    @VisibleForTesting
+    public AppendEntries(long term, @Nonnull String leaderId, long prevLogIndex, long prevLogTerm,
+            @Nonnull List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex,
+            short payloadVersion) {
+        this(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, replicatedToAllIndex, payloadVersion,
+                RaftVersions.CURRENT_VERSION, null);
     }
 
     @Nonnull
@@ -90,6 +119,14 @@ public class AppendEntries extends AbstractRaftRPC {
         return payloadVersion;
     }
 
+    public Optional<String> getLeaderAddress() {
+        return Optional.ofNullable(leaderAddress);
+    }
+
+    public short getLeaderRaftVersion() {
+        return leaderRaftVersion;
+    }
+
     @Override
     public String toString() {
         return "AppendEntries [leaderId=" + leaderId
@@ -98,13 +135,88 @@ public class AppendEntries extends AbstractRaftRPC {
                 + ", leaderCommit=" + leaderCommit
                 + ", replicatedToAllIndex=" + replicatedToAllIndex
                 + ", payloadVersion=" + payloadVersion
+                + ", recipientRaftVersion=" + recipientRaftVersion
+                + ", leaderRaftVersion=" + leaderRaftVersion
+                + ", leaderAddress=" + leaderAddress
                 + ", entries=" + entries + "]";
     }
 
     private Object writeReplace() {
-        return new Proxy(this);
+        return recipientRaftVersion >= RaftVersions.FLUORINE_VERSION ? new ProxyV2(this) : new Proxy(this);
+    }
+
+    /**
+     * Fluorine version that adds the leader address.
+     */
+    private static class ProxyV2 implements Externalizable {
+        private static final long serialVersionUID = 1L;
+
+        private AppendEntries appendEntries;
+
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
+        public ProxyV2() {
+        }
+
+        ProxyV2(AppendEntries appendEntries) {
+            this.appendEntries = appendEntries;
+        }
+
+        @Override
+        public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeShort(appendEntries.leaderRaftVersion);
+            out.writeLong(appendEntries.getTerm());
+            out.writeObject(appendEntries.leaderId);
+            out.writeLong(appendEntries.prevLogTerm);
+            out.writeLong(appendEntries.prevLogIndex);
+            out.writeLong(appendEntries.leaderCommit);
+            out.writeLong(appendEntries.replicatedToAllIndex);
+            out.writeShort(appendEntries.payloadVersion);
+
+            out.writeInt(appendEntries.entries.size());
+            for (ReplicatedLogEntry e: appendEntries.entries) {
+                out.writeLong(e.getIndex());
+                out.writeLong(e.getTerm());
+                out.writeObject(e.getData());
+            }
+
+            out.writeObject(appendEntries.leaderAddress);
+        }
+
+        @Override
+        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            short leaderRaftVersion = in.readShort();
+            long term = in.readLong();
+            String leaderId = (String) in.readObject();
+            long prevLogTerm = in.readLong();
+            long prevLogIndex = in.readLong();
+            long leaderCommit = in.readLong();
+            long replicatedToAllIndex = in.readLong();
+            short payloadVersion = in.readShort();
+
+            int size = in.readInt();
+            List<ReplicatedLogEntry> entries = new ArrayList<>(size);
+            for (int i = 0; i < size; i++) {
+                entries.add(new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject()));
+            }
+
+            String leaderAddress = (String)in.readObject();
+
+            appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit,
+                    replicatedToAllIndex, payloadVersion, RaftVersions.CURRENT_VERSION, leaderRaftVersion,
+                    leaderAddress);
+        }
+
+        private Object readResolve() {
+            return appendEntries;
+        }
     }
 
+    /**
+     * Pre-Fluorine version.
+     */
+    @Deprecated
     private static class Proxy implements Externalizable {
         private static final long serialVersionUID = 1L;
 
@@ -155,7 +267,7 @@ public class AppendEntries extends AbstractRaftRPC {
             }
 
             appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit,
-                    replicatedToAllIndex, payloadVersion);
+                replicatedToAllIndex, payloadVersion, RaftVersions.CURRENT_VERSION, RaftVersions.BORON_VERSION, null);
         }
 
         private Object readResolve() {