Send leader's full address via AppendEntries 07/78507/5
authorTom Pantelis <tompantelis@gmail.com>
Thu, 6 Dec 2018 16:43:26 +0000 (11:43 -0500)
committerRobert Varga <nite@hq.sk>
Mon, 7 Jan 2019 12:50:43 +0000 (12:50 +0000)
Added the leader's full actor address to AppendEntries. The
address can be rather long and we don't need to normally send
it so it's an Optional. The follower indicates via AppendEntriesReply
whether it needs the leader to send its address (new
needsLeaderAddress field). On receipt of the leader's address,
the follower sets it in its local RaftActorContext and notifies
the PeerAddressResolver. The ShardPeerAddressResolver impl
updates its local cache thus enabling transactions to resolve
the remote leader actor.

Since we're changing the serialized footprint of AppendEntries
and AppendEntriesReply, I preserved backwards compatibility by
versioning a new externalizable Proxy for each.

JIRA: CONTROLLER-1861
Change-Id: I1c0870a596b1782015eb973153b74dfcd48694e7
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
14 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/PeerAddressResolver.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftVersions.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReplyTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardPeerAddressResolver.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardPeerAddressResolverTest.java

index 3952b386b2498a6e2fecf56d6f5a1cb3f4dda8fc..fe836362c819211dc0720daadbbdaa89c8c8dd5d 100644 (file)
@@ -51,6 +51,8 @@ public final class FollowerLogInformation {
 
     private long slicedLogEntryIndex = NO_INDEX;
 
+    private boolean needsLeaderAddress;
+
     /**
      * Constructs an instance.
      *
@@ -336,6 +338,15 @@ public final class FollowerLogInformation {
         return slicedLogEntryIndex != NO_INDEX;
     }
 
+    public void setNeedsLeaderAddress(boolean value) {
+        needsLeaderAddress = value;
+    }
+
+    @Nullable
+    public String needsLeaderAddress(String leaderId) {
+        return needsLeaderAddress ? context.getPeerAddress(leaderId) : null;
+    }
+
     @Override
     public String toString() {
         return "FollowerLogInformation [id=" + getId() + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex
index 302f2fafa43cb961f15ab912793c40d916b8d8c0..c7d81570c41b3fd8eb3c7d6e39b77ae263d46d7d 100644 (file)
@@ -17,10 +17,19 @@ import javax.annotation.Nullable;
 @FunctionalInterface
 public interface PeerAddressResolver {
     /**
-     * Resolves a raft actor peer id to it's remote actor address.
+     * Resolves a raft actor peer id to its remote actor address.
      *
      * @param peerId the id of the peer to resolve
      * @return the peer's actor path string or null if not found
      */
     @Nullable String resolve(String peerId);
+
+    /**
+     * Sets the actor address for a raft peer.
+     *
+     * @param peerId the id of the peer
+     * @param address the peer's actor's address
+     */
+    default void setResolved(String peerId, String address) {
+    }
 }
index 364ca5af7b4704c67c7da1359a14d70c672f25ce..5ec376412be1f4e31daffdea6f53760db7a7b8be 100644 (file)
@@ -16,5 +16,6 @@ public interface RaftVersions {
     short HELIUM_VERSION = 0;
     short LITHIUM_VERSION = 1;
     short BORON_VERSION = 3;
-    short CURRENT_VERSION = BORON_VERSION;
+    short FLUORINE_VERSION = 4;
+    short CURRENT_VERSION = FLUORINE_VERSION;
 }
index e585066d736225565a13700a513bb6a607ffd61a..2175eb75557d743cbe09020f9408035f69e31c75 100644 (file)
@@ -227,6 +227,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         followerLogInformation.markFollowerActive();
         followerLogInformation.setPayloadVersion(appendEntriesReply.getPayloadVersion());
         followerLogInformation.setRaftVersion(appendEntriesReply.getRaftVersion());
+        followerLogInformation.setNeedsLeaderAddress(appendEntriesReply.isNeedsLeaderAddress());
 
         long followerLastLogIndex = appendEntriesReply.getLogLastIndex();
         boolean updated = false;
@@ -816,7 +817,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
             getLogEntryIndex(followerNextIndex - 1),
             getLogEntryTerm(followerNextIndex - 1), entries,
-            leaderCommitIndex, super.getReplicatedToAllIndex(), context.getPayloadVersion());
+            leaderCommitIndex, super.getReplicatedToAllIndex(), context.getPayloadVersion(),
+            followerLogInformation.getRaftVersion(), followerLogInformation.needsLeaderAddress(getId()));
 
         if (!entries.isEmpty() || log.isTraceEnabled()) {
             log.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerLogInformation.getId(),
index b63a7cc5c99482b7890ccda0ad52dd19406daa68..087c656b1836daaf9c05ac7b91e32eba1057c02c 100644 (file)
@@ -137,7 +137,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
                     appendEntries.getTerm(), currentTerm());
 
             sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex(), lastTerm(),
-                    context.getPayloadVersion()), actor());
+                    context.getPayloadVersion(), false, false, appendEntries.getLeaderRaftVersion()), actor());
             return this;
         }
 
index 2377fbf442d45099856d9ebd61f402b01588e4b7..f6276a65e0863982cdd574122b9f1a4b5b44b3f8 100644 (file)
@@ -148,7 +148,8 @@ public class Follower extends AbstractRaftActorBehavior {
         if (snapshotTracker != null || context.getSnapshotManager().isApplying()) {
             // if snapshot install is in progress, follower should just acknowledge append entries with a reply.
             AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
-                    lastIndex(), lastTerm(), context.getPayloadVersion());
+                    lastIndex(), lastTerm(), context.getPayloadVersion(), false, needsLeaderAddress(),
+                    appendEntries.getLeaderRaftVersion());
 
             log.debug("{}: snapshot install is in progress, replying immediately with {}", logName(), reply);
             sender.tell(reply, actor());
@@ -160,6 +161,14 @@ public class Follower extends AbstractRaftActorBehavior {
         leaderId = appendEntries.getLeaderId();
         leaderPayloadVersion = appendEntries.getPayloadVersion();
 
+        if (appendEntries.getLeaderAddress().isPresent()) {
+            final String address = appendEntries.getLeaderAddress().get();
+            log.debug("New leader address: {}", address);
+
+            context.setPeerAddress(leaderId, address);
+            context.getConfigParams().getPeerAddressResolver().setResolved(leaderId, address);
+        }
+
         // First check if the logs are in sync or not
         if (isOutOfSync(appendEntries, sender)) {
             updateInitialSyncStatus(appendEntries.getLeaderCommit(), appendEntries.getLeaderId());
@@ -184,7 +193,8 @@ public class Follower extends AbstractRaftActorBehavior {
         }
 
         AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
-                lastIndex, lastTerm(), context.getPayloadVersion());
+                lastIndex, lastTerm(), context.getPayloadVersion(), false, needsLeaderAddress(),
+                appendEntries.getLeaderRaftVersion());
 
         if (log.isTraceEnabled()) {
             log.trace("{}: handleAppendEntries returning : {}", logName(), reply);
@@ -267,14 +277,16 @@ public class Follower extends AbstractRaftActorBehavior {
 
                         log.info("{}: Could not remove entries - sending reply to force snapshot", logName());
                         sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
-                                lastTerm(), context.getPayloadVersion(), true), actor());
+                                lastTerm(), context.getPayloadVersion(), true, needsLeaderAddress(),
+                                appendEntries.getLeaderRaftVersion()), actor());
                         return false;
                     }
 
                     break;
                 } else {
                     sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
-                            lastTerm(), context.getPayloadVersion(), true), actor());
+                            lastTerm(), context.getPayloadVersion(), true, needsLeaderAddress(),
+                            appendEntries.getLeaderRaftVersion()), actor());
                     return false;
                 }
             }
@@ -332,7 +344,7 @@ public class Follower extends AbstractRaftActorBehavior {
             log.info("{}: The followers log is empty and the senders prevLogIndex is {}", logName(),
                 appendEntries.getPrevLogIndex());
 
-            sendOutOfSyncAppendEntriesReply(sender, false);
+            sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion());
             return true;
         }
 
@@ -351,7 +363,7 @@ public class Follower extends AbstractRaftActorBehavior {
                         appendEntries.getPrevLogTerm(), lastIndex, context.getReplicatedLog().getSnapshotIndex(),
                         context.getReplicatedLog().getSnapshotTerm());
 
-                    sendOutOfSyncAppendEntriesReply(sender, false);
+                    sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion());
                     return true;
                 }
             } else if (appendEntries.getPrevLogIndex() != -1) {
@@ -362,7 +374,7 @@ public class Follower extends AbstractRaftActorBehavior {
                         + "snapshotIndex: {}, snapshotTerm: {}", logName(), appendEntries.getPrevLogIndex(), lastIndex,
                         context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm());
 
-                sendOutOfSyncAppendEntriesReply(sender, false);
+                sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion());
                 return true;
             }
         }
@@ -378,7 +390,7 @@ public class Follower extends AbstractRaftActorBehavior {
                         appendEntries.getReplicatedToAllIndex(), lastIndex,
                         context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm());
 
-                sendOutOfSyncAppendEntriesReply(sender, false);
+                sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion());
                 return true;
             }
 
@@ -389,7 +401,7 @@ public class Follower extends AbstractRaftActorBehavior {
                         entries.get(0).getIndex() - 1, lastIndex, context.getReplicatedLog().getSnapshotIndex(),
                         context.getReplicatedLog().getSnapshotTerm());
 
-                sendOutOfSyncAppendEntriesReply(sender, false);
+                sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion());
                 return true;
             }
         }
@@ -397,15 +409,21 @@ public class Follower extends AbstractRaftActorBehavior {
         return false;
     }
 
-    private void sendOutOfSyncAppendEntriesReply(final ActorRef sender, boolean forceInstallSnapshot) {
+    private void sendOutOfSyncAppendEntriesReply(final ActorRef sender, boolean forceInstallSnapshot,
+            short leaderRaftVersion) {
         // We found that the log was out of sync so just send a negative reply.
         final AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex(),
-                lastTerm(), context.getPayloadVersion(), forceInstallSnapshot);
+                lastTerm(), context.getPayloadVersion(), forceInstallSnapshot, needsLeaderAddress(),
+                leaderRaftVersion);
 
         log.info("{}: Follower is out-of-sync so sending negative reply: {}", logName(), reply);
         sender.tell(reply, actor());
     }
 
+    private boolean needsLeaderAddress() {
+        return context.getPeerAddress(leaderId) == null;
+    }
+
     @Override
     protected RaftActorBehavior handleAppendEntriesReply(final ActorRef sender,
         final AppendEntriesReply appendEntriesReply) {
index 4d4b7e4fdd8204bc29dd8a66667a47913e2aca18..d77a0841489347ea69177e80764b7fa6a45dd049 100644 (file)
@@ -8,14 +8,19 @@
 
 package org.opendaylight.controller.cluster.raft.messages;
 
-import com.google.common.base.Preconditions;
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.annotations.VisibleForTesting;
 import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.raft.RaftVersions;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
@@ -47,17 +52,41 @@ public class AppendEntries extends AbstractRaftRPC {
 
     private final short payloadVersion;
 
-    public AppendEntries(long term, @Nonnull String leaderId, long prevLogIndex, long prevLogTerm,
+    private final short recipientRaftVersion;
+
+    private final short leaderRaftVersion;
+
+    private final String leaderAddress;
+
+    private AppendEntries(long term, @Nonnull String leaderId, long prevLogIndex, long prevLogTerm,
             @Nonnull List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex,
-            short payloadVersion) {
+            short payloadVersion, short recipientRaftVersion, short leaderRaftVersion, @Nullable String leaderAddress) {
         super(term);
-        this.leaderId = Preconditions.checkNotNull(leaderId);
+        this.leaderId = requireNonNull(leaderId);
         this.prevLogIndex = prevLogIndex;
         this.prevLogTerm = prevLogTerm;
-        this.entries = Preconditions.checkNotNull(entries);
+        this.entries = requireNonNull(entries);
         this.leaderCommit = leaderCommit;
         this.replicatedToAllIndex = replicatedToAllIndex;
         this.payloadVersion = payloadVersion;
+        this.recipientRaftVersion = recipientRaftVersion;
+        this.leaderRaftVersion = leaderRaftVersion;
+        this.leaderAddress = leaderAddress;
+    }
+
+    public AppendEntries(long term, @Nonnull String leaderId, long prevLogIndex, long prevLogTerm,
+            @Nonnull List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex,
+            short payloadVersion, short recipientRaftVersion, @Nullable String leaderAddress) {
+        this(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, replicatedToAllIndex, payloadVersion,
+                recipientRaftVersion, RaftVersions.CURRENT_VERSION, leaderAddress);
+    }
+
+    @VisibleForTesting
+    public AppendEntries(long term, @Nonnull String leaderId, long prevLogIndex, long prevLogTerm,
+            @Nonnull List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex,
+            short payloadVersion) {
+        this(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, replicatedToAllIndex, payloadVersion,
+                RaftVersions.CURRENT_VERSION, null);
     }
 
     @Nonnull
@@ -90,6 +119,14 @@ public class AppendEntries extends AbstractRaftRPC {
         return payloadVersion;
     }
 
+    public Optional<String> getLeaderAddress() {
+        return Optional.ofNullable(leaderAddress);
+    }
+
+    public short getLeaderRaftVersion() {
+        return leaderRaftVersion;
+    }
+
     @Override
     public String toString() {
         return "AppendEntries [leaderId=" + leaderId
@@ -98,13 +135,88 @@ public class AppendEntries extends AbstractRaftRPC {
                 + ", leaderCommit=" + leaderCommit
                 + ", replicatedToAllIndex=" + replicatedToAllIndex
                 + ", payloadVersion=" + payloadVersion
+                + ", recipientRaftVersion=" + recipientRaftVersion
+                + ", leaderRaftVersion=" + leaderRaftVersion
+                + ", leaderAddress=" + leaderAddress
                 + ", entries=" + entries + "]";
     }
 
     private Object writeReplace() {
-        return new Proxy(this);
+        return recipientRaftVersion >= RaftVersions.FLUORINE_VERSION ? new ProxyV2(this) : new Proxy(this);
+    }
+
+    /**
+     * Fluorine version that adds the leader address.
+     */
+    private static class ProxyV2 implements Externalizable {
+        private static final long serialVersionUID = 1L;
+
+        private AppendEntries appendEntries;
+
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
+        public ProxyV2() {
+        }
+
+        ProxyV2(AppendEntries appendEntries) {
+            this.appendEntries = appendEntries;
+        }
+
+        @Override
+        public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeShort(appendEntries.leaderRaftVersion);
+            out.writeLong(appendEntries.getTerm());
+            out.writeObject(appendEntries.leaderId);
+            out.writeLong(appendEntries.prevLogTerm);
+            out.writeLong(appendEntries.prevLogIndex);
+            out.writeLong(appendEntries.leaderCommit);
+            out.writeLong(appendEntries.replicatedToAllIndex);
+            out.writeShort(appendEntries.payloadVersion);
+
+            out.writeInt(appendEntries.entries.size());
+            for (ReplicatedLogEntry e: appendEntries.entries) {
+                out.writeLong(e.getIndex());
+                out.writeLong(e.getTerm());
+                out.writeObject(e.getData());
+            }
+
+            out.writeObject(appendEntries.leaderAddress);
+        }
+
+        @Override
+        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            short leaderRaftVersion = in.readShort();
+            long term = in.readLong();
+            String leaderId = (String) in.readObject();
+            long prevLogTerm = in.readLong();
+            long prevLogIndex = in.readLong();
+            long leaderCommit = in.readLong();
+            long replicatedToAllIndex = in.readLong();
+            short payloadVersion = in.readShort();
+
+            int size = in.readInt();
+            List<ReplicatedLogEntry> entries = new ArrayList<>(size);
+            for (int i = 0; i < size; i++) {
+                entries.add(new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject()));
+            }
+
+            String leaderAddress = (String)in.readObject();
+
+            appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit,
+                    replicatedToAllIndex, payloadVersion, RaftVersions.CURRENT_VERSION, leaderRaftVersion,
+                    leaderAddress);
+        }
+
+        private Object readResolve() {
+            return appendEntries;
+        }
     }
 
+    /**
+     * Pre-Fluorine version.
+     */
+    @Deprecated
     private static class Proxy implements Externalizable {
         private static final long serialVersionUID = 1L;
 
@@ -155,7 +267,7 @@ public class AppendEntries extends AbstractRaftRPC {
             }
 
             appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit,
-                    replicatedToAllIndex, payloadVersion);
+                replicatedToAllIndex, payloadVersion, RaftVersions.CURRENT_VERSION, RaftVersions.BORON_VERSION, null);
         }
 
         private Object readResolve() {
index e2f0ba9014c9e469b21599359db3007e7cb4e2be..902b9a03b757930ccceeb07c4eb621bab2dbeb78 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.controller.cluster.raft.messages;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
@@ -41,22 +42,29 @@ public class AppendEntriesReply extends AbstractRaftRPC {
 
     private final boolean forceInstallSnapshot;
 
+    private final boolean needsLeaderAddress;
+
+    private final short recipientRaftVersion;
+
+    @VisibleForTesting
     public AppendEntriesReply(String followerId, long term, boolean success, long logLastIndex, long logLastTerm,
             short payloadVersion) {
-        this(followerId, term, success, logLastIndex, logLastTerm, payloadVersion, false);
+        this(followerId, term, success, logLastIndex, logLastTerm, payloadVersion, false, false,
+                RaftVersions.CURRENT_VERSION);
     }
 
     public AppendEntriesReply(String followerId, long term, boolean success, long logLastIndex, long logLastTerm,
-            short payloadVersion, boolean forceInstallSnapshot) {
+            short payloadVersion, boolean forceInstallSnapshot, boolean needsLeaderAddress,
+            short recipientRaftVersion) {
         this(followerId, term, success, logLastIndex, logLastTerm, payloadVersion, forceInstallSnapshot,
-                RaftVersions.CURRENT_VERSION);
+                needsLeaderAddress, RaftVersions.CURRENT_VERSION, recipientRaftVersion);
 
     }
 
     private AppendEntriesReply(String followerId, long term, boolean success, long logLastIndex, long logLastTerm,
-                              short payloadVersion, boolean forceInstallSnapshot, short raftVersion) {
+            short payloadVersion, boolean forceInstallSnapshot, boolean needsLeaderAddress, short raftVersion,
+            short recipientRaftVersion) {
         super(term);
-
         this.followerId = followerId;
         this.success = success;
         this.logLastIndex = logLastIndex;
@@ -64,6 +72,8 @@ public class AppendEntriesReply extends AbstractRaftRPC {
         this.payloadVersion = payloadVersion;
         this.forceInstallSnapshot = forceInstallSnapshot;
         this.raftVersion = raftVersion;
+        this.needsLeaderAddress = needsLeaderAddress;
+        this.recipientRaftVersion = recipientRaftVersion;
     }
 
     public boolean isSuccess() {
@@ -94,17 +104,80 @@ public class AppendEntriesReply extends AbstractRaftRPC {
         return forceInstallSnapshot;
     }
 
+    public boolean isNeedsLeaderAddress() {
+        return needsLeaderAddress;
+    }
+
     @Override
     public String toString() {
         return "AppendEntriesReply [term=" + getTerm() + ", success=" + success + ", followerId=" + followerId
                 + ", logLastIndex=" + logLastIndex + ", logLastTerm=" + logLastTerm + ", forceInstallSnapshot="
-                + forceInstallSnapshot + ", payloadVersion=" + payloadVersion + ", raftVersion=" + raftVersion + "]";
+                + forceInstallSnapshot + ", needsLeaderAddress=" + needsLeaderAddress
+                + ", payloadVersion=" + payloadVersion + ", raftVersion=" + raftVersion
+                + ", recipientRaftVersion=" + recipientRaftVersion + "]";
     }
 
     private Object writeReplace() {
-        return new Proxy(this);
+        return recipientRaftVersion >= RaftVersions.FLUORINE_VERSION ? new Proxy2(this) : new Proxy(this);
+    }
+
+    /**
+     * Fluorine version that adds the needsLeaderAddress flag.
+     */
+    private static class Proxy2 implements Externalizable {
+        private static final long serialVersionUID = 1L;
+
+        private AppendEntriesReply appendEntriesReply;
+
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
+        public Proxy2() {
+        }
+
+        Proxy2(AppendEntriesReply appendEntriesReply) {
+            this.appendEntriesReply = appendEntriesReply;
+        }
+
+        @Override
+        public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeShort(appendEntriesReply.raftVersion);
+            out.writeLong(appendEntriesReply.getTerm());
+            out.writeObject(appendEntriesReply.followerId);
+            out.writeBoolean(appendEntriesReply.success);
+            out.writeLong(appendEntriesReply.logLastIndex);
+            out.writeLong(appendEntriesReply.logLastTerm);
+            out.writeShort(appendEntriesReply.payloadVersion);
+            out.writeBoolean(appendEntriesReply.forceInstallSnapshot);
+            out.writeBoolean(appendEntriesReply.needsLeaderAddress);
+        }
+
+        @Override
+        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            short raftVersion = in.readShort();
+            long term = in.readLong();
+            String followerId = (String) in.readObject();
+            boolean success = in.readBoolean();
+            long logLastIndex = in.readLong();
+            long logLastTerm = in.readLong();
+            short payloadVersion = in.readShort();
+            boolean forceInstallSnapshot = in.readBoolean();
+            boolean needsLeaderAddress = in.readBoolean();
+
+            appendEntriesReply = new AppendEntriesReply(followerId, term, success, logLastIndex, logLastTerm,
+                    payloadVersion, forceInstallSnapshot, needsLeaderAddress, raftVersion,
+                    RaftVersions.CURRENT_VERSION);
+        }
+
+        private Object readResolve() {
+            return appendEntriesReply;
+        }
     }
 
+    /**
+     * Pre-Fluorine version.
+     */
+    @Deprecated
     private static class Proxy implements Externalizable {
         private static final long serialVersionUID = 1L;
 
@@ -144,7 +217,7 @@ public class AppendEntriesReply extends AbstractRaftRPC {
             boolean forceInstallSnapshot = in.readBoolean();
 
             appendEntriesReply = new AppendEntriesReply(followerId, term, success, logLastIndex, logLastTerm,
-                    payloadVersion, forceInstallSnapshot, raftVersion);
+                    payloadVersion, forceInstallSnapshot, false, raftVersion, RaftVersions.CURRENT_VERSION);
         }
 
         private Object readResolve() {
index 4dd2b9b5363b41b7d56bcc7adb912a753526bf76..2ba06d6925efbde5933bc042505d1f39c0cff2ab 100644 (file)
@@ -15,6 +15,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
@@ -46,9 +47,13 @@ import org.opendaylight.controller.cluster.raft.MockRaftActor;
 import org.opendaylight.controller.cluster.raft.MockRaftActor.Builder;
 import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
+import org.opendaylight.controller.cluster.raft.NoopPeerAddressResolver;
+import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
+import org.opendaylight.controller.cluster.raft.RaftVersions;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.VotingState;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
@@ -111,6 +116,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
     protected  MockRaftActorContext createActorContext(final ActorRef actorRef) {
         MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
         context.setPayloadVersion(payloadVersion);
+        ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(
+            peerId -> leaderActor.path().toString());
         return context;
     }
 
@@ -804,7 +811,6 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
     }
 
-
     /**
      * This test verifies that when InstallSnapshot is received by
      * the follower its applied correctly.
@@ -1301,6 +1307,36 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
                 MockRaftActor.fromState(snapshot.getState()));
     }
 
+    @Test
+    public void testNeedsLeaderAddress() {
+        logStart("testNeedsLeaderAddress");
+
+        MockRaftActorContext context = createActorContext();
+        context.setReplicatedLog(new MockRaftActorContext.SimpleReplicatedLog());
+        context.addToPeers("leader", null, VotingState.VOTING);
+        ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(NoopPeerAddressResolver.INSTANCE);
+
+        follower = createBehavior(context);
+
+        follower.handleMessage(leaderActor,
+                new AppendEntries(1, "leader", -1, -1, Collections.emptyList(), -1, -1, (short)0));
+
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+        assertTrue(reply.isNeedsLeaderAddress());
+        MessageCollectorActor.clearMessages(leaderActor);
+
+        PeerAddressResolver mockResolver = mock(PeerAddressResolver.class);
+        ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(mockResolver);
+
+        follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(), -1, -1,
+                (short)0, RaftVersions.CURRENT_VERSION, leaderActor.path().toString()));
+
+        reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+        assertFalse(reply.isNeedsLeaderAddress());
+
+        verify(mockResolver).setResolved("leader", leaderActor.path().toString());
+    }
+
     @SuppressWarnings("checkstyle:IllegalCatch")
     private static RaftActorSnapshotCohort newRaftActorSnapshotCohort(
             final AtomicReference<MockRaftActor> followerRaftActor) {
@@ -1366,6 +1402,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
         assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
         assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
         assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot());
+        assertEquals("isNeedsLeaderAddress", false, reply.isNeedsLeaderAddress());
     }
 
 
index 74f02c09a2417c0acfbdd70e8e684403b35d46de..78ec33cba12deca93e9be5746eb7ce682301f2ac 100644 (file)
@@ -775,7 +775,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
         // installed with a SendInstallSnapshot
-        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
+                RaftVersions.CURRENT_VERSION));
 
         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
 
@@ -795,7 +796,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
 
         // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture.
-        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
+                RaftVersions.CURRENT_VERSION));
         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
 
         // Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent.
@@ -807,7 +809,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         // Sending another AppendEntriesReply to force a snapshot should be a no-op and not try to re-send the chunk.
         MessageCollectorActor.clearMessages(followerActor);
-        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
+                RaftVersions.CURRENT_VERSION));
         MessageCollectorActor.assertNoneMatching(followerActor, InstallSnapshot.class, 200);
     }
 
@@ -2376,6 +2379,59 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
     }
 
+    @Test
+    public void testLeaderAddressInAppendEntries() {
+        logStart("testLeaderAddressInAppendEntries");
+
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                FiniteDuration.create(50, TimeUnit.MILLISECONDS));
+        leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+        leaderActorContext.setCommitIndex(-1);
+        leaderActorContext.setLastApplied(-1);
+
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setPeerAddressResolver(
+            peerId -> leaderActor.path().toString());
+
+        leader = new Leader(leaderActorContext);
+        leaderActorContext.setCurrentBehavior(leader);
+
+        // Initial heartbeat shouldn't have the leader address
+
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertFalse(appendEntries.getLeaderAddress().isPresent());
+        MessageCollectorActor.clearMessages(followerActor);
+
+        // Send AppendEntriesReply indicating the follower needs the leader address
+
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, true,
+                RaftVersions.CURRENT_VERSION));
+
+        // Sleep for the heartbeat interval so AppendEntries is sent.
+        Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+                .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
+
+        leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+
+        appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertTrue(appendEntries.getLeaderAddress().isPresent());
+        assertEquals(leaderActor.path().toString(), appendEntries.getLeaderAddress().get());
+        MessageCollectorActor.clearMessages(followerActor);
+
+        // Send AppendEntriesReply indicating the follower does not need the leader address
+
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, false,
+                RaftVersions.CURRENT_VERSION));
+
+        Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+                .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
+
+        leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+
+        appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertFalse(appendEntries.getLeaderAddress().isPresent());
+    }
+
     @Override
     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
             final ActorRef actorRef, final RaftRPC rpc) {
index 629312110dd2bd5114b9c0209ef4bdf811de2d0e..8452a71c24a1b14faf5525b9513eebf80bda1722 100644 (file)
@@ -11,6 +11,7 @@ import static org.junit.Assert.assertEquals;
 
 import org.apache.commons.lang.SerializationUtils;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.RaftVersions;
 
 /**
  * Unit tests for AppendEntriesReply.
@@ -21,7 +22,8 @@ public class AppendEntriesReplyTest {
 
     @Test
     public void testSerialization() {
-        AppendEntriesReply expected = new AppendEntriesReply("follower", 5, true, 100, 4, (short)6);
+        AppendEntriesReply expected = new AppendEntriesReply("follower", 5, true, 100, 4, (short)6, true, true,
+                RaftVersions.CURRENT_VERSION);
         AppendEntriesReply cloned = (AppendEntriesReply) SerializationUtils.clone(expected);
 
         assertEquals("getTerm", expected.getTerm(), cloned.getTerm());
@@ -30,5 +32,24 @@ public class AppendEntriesReplyTest {
         assertEquals("getLogLastIndex", expected.getLogLastIndex(), cloned.getLogLastIndex());
         assertEquals("getPayloadVersion", expected.getPayloadVersion(), cloned.getPayloadVersion());
         assertEquals("getRaftVersion", expected.getRaftVersion(), cloned.getRaftVersion());
+        assertEquals("isForceInstallSnapshot", expected.isForceInstallSnapshot(), cloned.isForceInstallSnapshot());
+        assertEquals("isNeedsLeaderAddress", expected.isNeedsLeaderAddress(), cloned.isNeedsLeaderAddress());
+    }
+
+    @Test
+    @Deprecated
+    public void testPreFluorineSerialization() {
+        AppendEntriesReply expected = new AppendEntriesReply("follower", 5, true, 100, 4, (short)6, true, true,
+                RaftVersions.BORON_VERSION);
+        AppendEntriesReply cloned = (AppendEntriesReply) SerializationUtils.clone(expected);
+
+        assertEquals("getTerm", expected.getTerm(), cloned.getTerm());
+        assertEquals("getFollowerId", expected.getFollowerId(), cloned.getFollowerId());
+        assertEquals("getLogLastTerm", expected.getLogLastTerm(), cloned.getLogLastTerm());
+        assertEquals("getLogLastIndex", expected.getLogLastIndex(), cloned.getLogLastIndex());
+        assertEquals("getPayloadVersion", expected.getPayloadVersion(), cloned.getPayloadVersion());
+        assertEquals("getRaftVersion", expected.getRaftVersion(), cloned.getRaftVersion());
+        assertEquals("isForceInstallSnapshot", expected.isForceInstallSnapshot(), cloned.isForceInstallSnapshot());
+        assertEquals("isNeedsLeaderAddress", false, cloned.isNeedsLeaderAddress());
     }
 }
index d6ee6141bc022007773aeb220b1b894146d5baee..a7c3c8b9d5e97bde0e4b6fb431e105545eea754b 100644 (file)
@@ -8,12 +8,14 @@
 package org.opendaylight.controller.cluster.raft.messages;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 
 import java.util.Arrays;
 import java.util.Iterator;
 import org.apache.commons.lang.SerializationUtils;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
+import org.opendaylight.controller.cluster.raft.RaftVersions;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
 
@@ -31,15 +33,44 @@ public class AppendEntriesTest {
         ReplicatedLogEntry entry2 = new SimpleReplicatedLogEntry(3, 4, new MockPayload("payload2"));
 
         short payloadVersion = 5;
+
+        // Without leader address
+
         AppendEntries expected = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry1, entry2), 10L,
-                -1, payloadVersion);
+                -1, payloadVersion, RaftVersions.CURRENT_VERSION, null);
 
         AppendEntries cloned = (AppendEntries) SerializationUtils.clone(expected);
 
-        verifyAppendEntries(expected, cloned);
+        verifyAppendEntries(expected, cloned, RaftVersions.CURRENT_VERSION);
+
+        // With leader address
+
+        expected = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry1, entry2), 10L,
+                -1, payloadVersion, RaftVersions.CURRENT_VERSION, "leader address");
+
+        cloned = (AppendEntries) SerializationUtils.clone(expected);
+
+        verifyAppendEntries(expected, cloned, RaftVersions.CURRENT_VERSION);
     }
 
-    private static void verifyAppendEntries(AppendEntries expected, AppendEntries actual) {
+    @Test
+    @Deprecated
+    public void testPreFluorineSerialization() {
+        ReplicatedLogEntry entry1 = new SimpleReplicatedLogEntry(1, 2, new MockPayload("payload1"));
+
+        ReplicatedLogEntry entry2 = new SimpleReplicatedLogEntry(3, 4, new MockPayload("payload2"));
+
+        short payloadVersion = 5;
+
+        AppendEntries expected = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry1, entry2), 10L,
+                -1, payloadVersion, RaftVersions.BORON_VERSION, "leader address");
+
+        AppendEntries cloned = (AppendEntries) SerializationUtils.clone(expected);
+
+        verifyAppendEntries(expected, cloned, RaftVersions.BORON_VERSION);
+    }
+
+    private static void verifyAppendEntries(AppendEntries expected, AppendEntries actual, short recipientRaftVersion) {
         assertEquals("getLeaderId", expected.getLeaderId(), actual.getLeaderId());
         assertEquals("getTerm", expected.getTerm(), actual.getTerm());
         assertEquals("getLeaderCommit", expected.getLeaderCommit(), actual.getLeaderCommit());
@@ -53,6 +84,14 @@ public class AppendEntriesTest {
         for (ReplicatedLogEntry e: actual.getEntries()) {
             verifyReplicatedLogEntry(iter.next(), e);
         }
+
+        if (recipientRaftVersion >= RaftVersions.FLUORINE_VERSION) {
+            assertEquals("getLeaderAddress", expected.getLeaderAddress(), actual.getLeaderAddress());
+            assertEquals("getLeaderRaftVersion", RaftVersions.CURRENT_VERSION, actual.getLeaderRaftVersion());
+        } else {
+            assertFalse(actual.getLeaderAddress().isPresent());
+            assertEquals("getLeaderRaftVersion", RaftVersions.BORON_VERSION, actual.getLeaderRaftVersion());
+        }
     }
 
     private static void verifyReplicatedLogEntry(ReplicatedLogEntry expected, ReplicatedLogEntry actual) {
index 468d650d4da2225412e12be588e0a8003e5f7b51..b1e9079f508669163b619fe67ebf7fd3c6b017bb 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.cluster.datastore.shardmanager;
 
 import akka.actor.Address;
+import akka.actor.AddressFromURIString;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -94,4 +95,10 @@ class ShardPeerAddressResolver implements PeerAddressResolver {
         ShardIdentifier shardId = ShardIdentifier.fromShardIdString(peerId);
         return getShardActorAddress(shardId.getShardName(), shardId.getMemberName());
     }
+
+    @Override
+    public void setResolved(String peerId, String address) {
+        memberNameToAddress.put(ShardIdentifier.fromShardIdString(peerId).getMemberName(),
+                AddressFromURIString.parse(address));
+    }
 }
index c735d9dfd4e0e416c09d1ae24988f2116e6ae5e8..0c18e799a5092a73bb266c26f6065eff021ce92b 100644 (file)
@@ -80,6 +80,21 @@ public class ShardPeerAddressResolverTest {
         assertEquals("resolve", shardAddress, resolver.resolve(peerId));
     }
 
+    @Test
+    public void testSetResolved() {
+        String type = "config";
+        ShardPeerAddressResolver resolver = new ShardPeerAddressResolver(type, MEMBER_1);
+
+        String peerId = ShardIdentifier.create("default", MEMBER_2, type).toString();
+
+        String address = "akka.tcp://opendaylight-cluster-data@127.0.0.1:2550/user/shardmanager-" + type
+                + "/" + MEMBER_2.getName() + "-shard-default-" + type;
+
+        resolver.setResolved(peerId, address);
+
+        assertEquals("resolve", address, resolver.resolve(peerId));
+    }
+
     @Test
     public void testGetShardManagerPeerActorAddresses() {
         ShardPeerAddressResolver resolver = new ShardPeerAddressResolver("config", MEMBER_1);