Send leader's full address via AppendEntries 07/78507/5
authorTom Pantelis <tompantelis@gmail.com>
Thu, 6 Dec 2018 16:43:26 +0000 (11:43 -0500)
committerRobert Varga <nite@hq.sk>
Mon, 7 Jan 2019 12:50:43 +0000 (12:50 +0000)
Added the leader's full actor address to AppendEntries. The
address can be rather long and we don't need to normally send
it so it's an Optional. The follower indicates via AppendEntriesReply
whether it needs the leader to send its address (new
needsLeaderAddress field). On receipt of the leader's address,
the follower sets it in its local RaftActorContext and notifies
the PeerAddressResolver. The ShardPeerAddressResolver impl
updates its local cache thus enabling transactions to resolve
the remote leader actor.

Since we're changing the serialized footprint of AppendEntries
and AppendEntriesReply, I preserved backwards compatibility by
versioning a new externalizable Proxy for each.

JIRA: CONTROLLER-1861
Change-Id: I1c0870a596b1782015eb973153b74dfcd48694e7
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
14 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/PeerAddressResolver.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftVersions.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReplyTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardPeerAddressResolver.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardPeerAddressResolverTest.java

index 3952b38..fe83636 100644 (file)
@@ -51,6 +51,8 @@ public final class FollowerLogInformation {
 
     private long slicedLogEntryIndex = NO_INDEX;
 
+    private boolean needsLeaderAddress;
+
     /**
      * Constructs an instance.
      *
@@ -336,6 +338,15 @@ public final class FollowerLogInformation {
         return slicedLogEntryIndex != NO_INDEX;
     }
 
+    public void setNeedsLeaderAddress(boolean value) {
+        needsLeaderAddress = value;
+    }
+
+    @Nullable
+    public String needsLeaderAddress(String leaderId) {
+        return needsLeaderAddress ? context.getPeerAddress(leaderId) : null;
+    }
+
     @Override
     public String toString() {
         return "FollowerLogInformation [id=" + getId() + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex
index 302f2fa..c7d8157 100644 (file)
@@ -17,10 +17,19 @@ import javax.annotation.Nullable;
 @FunctionalInterface
 public interface PeerAddressResolver {
     /**
-     * Resolves a raft actor peer id to it's remote actor address.
+     * Resolves a raft actor peer id to its remote actor address.
      *
      * @param peerId the id of the peer to resolve
      * @return the peer's actor path string or null if not found
      */
     @Nullable String resolve(String peerId);
+
+    /**
+     * Sets the actor address for a raft peer.
+     *
+     * @param peerId the id of the peer
+     * @param address the peer's actor's address
+     */
+    default void setResolved(String peerId, String address) {
+    }
 }
index 364ca5a..5ec3764 100644 (file)
@@ -16,5 +16,6 @@ public interface RaftVersions {
     short HELIUM_VERSION = 0;
     short LITHIUM_VERSION = 1;
     short BORON_VERSION = 3;
-    short CURRENT_VERSION = BORON_VERSION;
+    short FLUORINE_VERSION = 4;
+    short CURRENT_VERSION = FLUORINE_VERSION;
 }
index e585066..2175eb7 100644 (file)
@@ -227,6 +227,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         followerLogInformation.markFollowerActive();
         followerLogInformation.setPayloadVersion(appendEntriesReply.getPayloadVersion());
         followerLogInformation.setRaftVersion(appendEntriesReply.getRaftVersion());
+        followerLogInformation.setNeedsLeaderAddress(appendEntriesReply.isNeedsLeaderAddress());
 
         long followerLastLogIndex = appendEntriesReply.getLogLastIndex();
         boolean updated = false;
@@ -816,7 +817,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
             getLogEntryIndex(followerNextIndex - 1),
             getLogEntryTerm(followerNextIndex - 1), entries,
-            leaderCommitIndex, super.getReplicatedToAllIndex(), context.getPayloadVersion());
+            leaderCommitIndex, super.getReplicatedToAllIndex(), context.getPayloadVersion(),
+            followerLogInformation.getRaftVersion(), followerLogInformation.needsLeaderAddress(getId()));
 
         if (!entries.isEmpty() || log.isTraceEnabled()) {
             log.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerLogInformation.getId(),
index b63a7cc..087c656 100644 (file)
@@ -137,7 +137,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
                     appendEntries.getTerm(), currentTerm());
 
             sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex(), lastTerm(),
-                    context.getPayloadVersion()), actor());
+                    context.getPayloadVersion(), false, false, appendEntries.getLeaderRaftVersion()), actor());
             return this;
         }
 
index 2377fbf..f6276a6 100644 (file)
@@ -148,7 +148,8 @@ public class Follower extends AbstractRaftActorBehavior {
         if (snapshotTracker != null || context.getSnapshotManager().isApplying()) {
             // if snapshot install is in progress, follower should just acknowledge append entries with a reply.
             AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
-                    lastIndex(), lastTerm(), context.getPayloadVersion());
+                    lastIndex(), lastTerm(), context.getPayloadVersion(), false, needsLeaderAddress(),
+                    appendEntries.getLeaderRaftVersion());
 
             log.debug("{}: snapshot install is in progress, replying immediately with {}", logName(), reply);
             sender.tell(reply, actor());
@@ -160,6 +161,14 @@ public class Follower extends AbstractRaftActorBehavior {
         leaderId = appendEntries.getLeaderId();
         leaderPayloadVersion = appendEntries.getPayloadVersion();
 
+        if (appendEntries.getLeaderAddress().isPresent()) {
+            final String address = appendEntries.getLeaderAddress().get();
+            log.debug("New leader address: {}", address);
+
+            context.setPeerAddress(leaderId, address);
+            context.getConfigParams().getPeerAddressResolver().setResolved(leaderId, address);
+        }
+
         // First check if the logs are in sync or not
         if (isOutOfSync(appendEntries, sender)) {
             updateInitialSyncStatus(appendEntries.getLeaderCommit(), appendEntries.getLeaderId());
@@ -184,7 +193,8 @@ public class Follower extends AbstractRaftActorBehavior {
         }
 
         AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
-                lastIndex, lastTerm(), context.getPayloadVersion());
+                lastIndex, lastTerm(), context.getPayloadVersion(), false, needsLeaderAddress(),
+                appendEntries.getLeaderRaftVersion());
 
         if (log.isTraceEnabled()) {
             log.trace("{}: handleAppendEntries returning : {}", logName(), reply);
@@ -267,14 +277,16 @@ public class Follower extends AbstractRaftActorBehavior {
 
                         log.info("{}: Could not remove entries - sending reply to force snapshot", logName());
                         sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
-                                lastTerm(), context.getPayloadVersion(), true), actor());
+                                lastTerm(), context.getPayloadVersion(), true, needsLeaderAddress(),
+                                appendEntries.getLeaderRaftVersion()), actor());
                         return false;
                     }
 
                     break;
                 } else {
                     sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
-                            lastTerm(), context.getPayloadVersion(), true), actor());
+                            lastTerm(), context.getPayloadVersion(), true, needsLeaderAddress(),
+                            appendEntries.getLeaderRaftVersion()), actor());
                     return false;
                 }
             }
@@ -332,7 +344,7 @@ public class Follower extends AbstractRaftActorBehavior {
             log.info("{}: The followers log is empty and the senders prevLogIndex is {}", logName(),
                 appendEntries.getPrevLogIndex());
 
-            sendOutOfSyncAppendEntriesReply(sender, false);
+            sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion());
             return true;
         }
 
@@ -351,7 +363,7 @@ public class Follower extends AbstractRaftActorBehavior {
                         appendEntries.getPrevLogTerm(), lastIndex, context.getReplicatedLog().getSnapshotIndex(),
                         context.getReplicatedLog().getSnapshotTerm());
 
-                    sendOutOfSyncAppendEntriesReply(sender, false);
+                    sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion());
                     return true;
                 }
             } else if (appendEntries.getPrevLogIndex() != -1) {
@@ -362,7 +374,7 @@ public class Follower extends AbstractRaftActorBehavior {
                         + "snapshotIndex: {}, snapshotTerm: {}", logName(), appendEntries.getPrevLogIndex(), lastIndex,
                         context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm());
 
-                sendOutOfSyncAppendEntriesReply(sender, false);
+                sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion());
                 return true;
             }
         }
@@ -378,7 +390,7 @@ public class Follower extends AbstractRaftActorBehavior {
                         appendEntries.getReplicatedToAllIndex(), lastIndex,
                         context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm());
 
-                sendOutOfSyncAppendEntriesReply(sender, false);
+                sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion());
                 return true;
             }
 
@@ -389,7 +401,7 @@ public class Follower extends AbstractRaftActorBehavior {
                         entries.get(0).getIndex() - 1, lastIndex, context.getReplicatedLog().getSnapshotIndex(),
                         context.getReplicatedLog().getSnapshotTerm());
 
-                sendOutOfSyncAppendEntriesReply(sender, false);
+                sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion());
                 return true;
             }
         }
@@ -397,15 +409,21 @@ public class Follower extends AbstractRaftActorBehavior {
         return false;
     }
 
-    private void sendOutOfSyncAppendEntriesReply(final ActorRef sender, boolean forceInstallSnapshot) {
+    private void sendOutOfSyncAppendEntriesReply(final ActorRef sender, boolean forceInstallSnapshot,
+            short leaderRaftVersion) {
         // We found that the log was out of sync so just send a negative reply.
         final AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex(),
-                lastTerm(), context.getPayloadVersion(), forceInstallSnapshot);
+                lastTerm(), context.getPayloadVersion(), forceInstallSnapshot, needsLeaderAddress(),
+                leaderRaftVersion);
 
         log.info("{}: Follower is out-of-sync so sending negative reply: {}", logName(), reply);
         sender.tell(reply, actor());
     }
 
+    private boolean needsLeaderAddress() {
+        return context.getPeerAddress(leaderId) == null;
+    }
+
     @Override
     protected RaftActorBehavior handleAppendEntriesReply(final ActorRef sender,
         final AppendEntriesReply appendEntriesReply) {
index 4d4b7e4..d77a084 100644 (file)
@@ -8,14 +8,19 @@
 
 package org.opendaylight.controller.cluster.raft.messages;
 
-import com.google.common.base.Preconditions;
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.annotations.VisibleForTesting;
 import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.raft.RaftVersions;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
@@ -47,17 +52,41 @@ public class AppendEntries extends AbstractRaftRPC {
 
     private final short payloadVersion;
 
-    public AppendEntries(long term, @Nonnull String leaderId, long prevLogIndex, long prevLogTerm,
+    private final short recipientRaftVersion;
+
+    private final short leaderRaftVersion;
+
+    private final String leaderAddress;
+
+    private AppendEntries(long term, @Nonnull String leaderId, long prevLogIndex, long prevLogTerm,
             @Nonnull List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex,
-            short payloadVersion) {
+            short payloadVersion, short recipientRaftVersion, short leaderRaftVersion, @Nullable String leaderAddress) {
         super(term);
-        this.leaderId = Preconditions.checkNotNull(leaderId);
+        this.leaderId = requireNonNull(leaderId);
         this.prevLogIndex = prevLogIndex;
         this.prevLogTerm = prevLogTerm;
-        this.entries = Preconditions.checkNotNull(entries);
+        this.entries = requireNonNull(entries);
         this.leaderCommit = leaderCommit;
         this.replicatedToAllIndex = replicatedToAllIndex;
         this.payloadVersion = payloadVersion;
+        this.recipientRaftVersion = recipientRaftVersion;
+        this.leaderRaftVersion = leaderRaftVersion;
+        this.leaderAddress = leaderAddress;
+    }
+
+    public AppendEntries(long term, @Nonnull String leaderId, long prevLogIndex, long prevLogTerm,
+            @Nonnull List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex,
+            short payloadVersion, short recipientRaftVersion, @Nullable String leaderAddress) {
+        this(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, replicatedToAllIndex, payloadVersion,
+                recipientRaftVersion, RaftVersions.CURRENT_VERSION, leaderAddress);
+    }
+
+    @VisibleForTesting
+    public AppendEntries(long term, @Nonnull String leaderId, long prevLogIndex, long prevLogTerm,
+            @Nonnull List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex,
+            short payloadVersion) {
+        this(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, replicatedToAllIndex, payloadVersion,
+                RaftVersions.CURRENT_VERSION, null);
     }
 
     @Nonnull
@@ -90,6 +119,14 @@ public class AppendEntries extends AbstractRaftRPC {
         return payloadVersion;
     }
 
+    public Optional<String> getLeaderAddress() {
+        return Optional.ofNullable(leaderAddress);
+    }
+
+    public short getLeaderRaftVersion() {
+        return leaderRaftVersion;
+    }
+
     @Override
     public String toString() {
         return "AppendEntries [leaderId=" + leaderId
@@ -98,13 +135,88 @@ public class AppendEntries extends AbstractRaftRPC {
                 + ", leaderCommit=" + leaderCommit
                 + ", replicatedToAllIndex=" + replicatedToAllIndex
                 + ", payloadVersion=" + payloadVersion
+                + ", recipientRaftVersion=" + recipientRaftVersion
+                + ", leaderRaftVersion=" + leaderRaftVersion
+                + ", leaderAddress=" + leaderAddress
                 + ", entries=" + entries + "]";
     }
 
     private Object writeReplace() {
-        return new Proxy(this);
+        return recipientRaftVersion >= RaftVersions.FLUORINE_VERSION ? new ProxyV2(this) : new Proxy(this);
+    }
+
+    /**
+     * Fluorine version that adds the leader address.
+     */
+    private static class ProxyV2 implements Externalizable {
+        private static final long serialVersionUID = 1L;
+
+        private AppendEntries appendEntries;
+
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
+        public ProxyV2() {
+        }
+
+        ProxyV2(AppendEntries appendEntries) {
+            this.appendEntries = appendEntries;
+        }
+
+        @Override
+        public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeShort(appendEntries.leaderRaftVersion);
+            out.writeLong(appendEntries.getTerm());
+            out.writeObject(appendEntries.leaderId);
+            out.writeLong(appendEntries.prevLogTerm);
+            out.writeLong(appendEntries.prevLogIndex);
+            out.writeLong(appendEntries.leaderCommit);
+            out.writeLong(appendEntries.replicatedToAllIndex);
+            out.writeShort(appendEntries.payloadVersion);
+
+            out.writeInt(appendEntries.entries.size());
+            for (ReplicatedLogEntry e: appendEntries.entries) {
+                out.writeLong(e.getIndex());
+                out.writeLong(e.getTerm());
+                out.writeObject(e.getData());
+            }
+
+            out.writeObject(appendEntries.leaderAddress);
+        }
+
+        @Override
+        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            short leaderRaftVersion = in.readShort();
+            long term = in.readLong();
+            String leaderId = (String) in.readObject();
+            long prevLogTerm = in.readLong();
+            long prevLogIndex = in.readLong();
+            long leaderCommit = in.readLong();
+            long replicatedToAllIndex = in.readLong();
+            short payloadVersion = in.readShort();
+
+            int size = in.readInt();
+            List<ReplicatedLogEntry> entries = new ArrayList<>(size);
+            for (int i = 0; i < size; i++) {
+                entries.add(new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject()));
+            }
+
+            String leaderAddress = (String)in.readObject();
+
+            appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit,
+                    replicatedToAllIndex, payloadVersion, RaftVersions.CURRENT_VERSION, leaderRaftVersion,
+                    leaderAddress);
+        }
+
+        private Object readResolve() {
+            return appendEntries;
+        }
     }
 
+    /**
+     * Pre-Fluorine version.
+     */
+    @Deprecated
     private static class Proxy implements Externalizable {
         private static final long serialVersionUID = 1L;
 
@@ -155,7 +267,7 @@ public class AppendEntries extends AbstractRaftRPC {
             }
 
             appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit,
-                    replicatedToAllIndex, payloadVersion);
+                replicatedToAllIndex, payloadVersion, RaftVersions.CURRENT_VERSION, RaftVersions.BORON_VERSION, null);
         }
 
         private Object readResolve() {
index e2f0ba9..902b9a0 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.controller.cluster.raft.messages;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
@@ -41,22 +42,29 @@ public class AppendEntriesReply extends AbstractRaftRPC {
 
     private final boolean forceInstallSnapshot;
 
+    private final boolean needsLeaderAddress;
+
+    private final short recipientRaftVersion;
+
+    @VisibleForTesting
     public AppendEntriesReply(String followerId, long term, boolean success, long logLastIndex, long logLastTerm,
             short payloadVersion) {
-        this(followerId, term, success, logLastIndex, logLastTerm, payloadVersion, false);
+        this(followerId, term, success, logLastIndex, logLastTerm, payloadVersion, false, false,
+                RaftVersions.CURRENT_VERSION);
     }
 
     public AppendEntriesReply(String followerId, long term, boolean success, long logLastIndex, long logLastTerm,
-            short payloadVersion, boolean forceInstallSnapshot) {
+            short payloadVersion, boolean forceInstallSnapshot, boolean needsLeaderAddress,
+            short recipientRaftVersion) {
         this(followerId, term, success, logLastIndex, logLastTerm, payloadVersion, forceInstallSnapshot,
-                RaftVersions.CURRENT_VERSION);
+                needsLeaderAddress, RaftVersions.CURRENT_VERSION, recipientRaftVersion);
 
     }
 
     private AppendEntriesReply(String followerId, long term, boolean success, long logLastIndex, long logLastTerm,
-                              short payloadVersion, boolean forceInstallSnapshot, short raftVersion) {
+            short payloadVersion, boolean forceInstallSnapshot, boolean needsLeaderAddress, short raftVersion,
+            short recipientRaftVersion) {
         super(term);
-
         this.followerId = followerId;
         this.success = success;
         this.logLastIndex = logLastIndex;
@@ -64,6 +72,8 @@ public class AppendEntriesReply extends AbstractRaftRPC {
         this.payloadVersion = payloadVersion;
         this.forceInstallSnapshot = forceInstallSnapshot;
         this.raftVersion = raftVersion;
+        this.needsLeaderAddress = needsLeaderAddress;
+        this.recipientRaftVersion = recipientRaftVersion;
     }
 
     public boolean isSuccess() {
@@ -94,17 +104,80 @@ public class AppendEntriesReply extends AbstractRaftRPC {
         return forceInstallSnapshot;
     }
 
+    public boolean isNeedsLeaderAddress() {
+        return needsLeaderAddress;
+    }
+
     @Override
     public String toString() {
         return "AppendEntriesReply [term=" + getTerm() + ", success=" + success + ", followerId=" + followerId
                 + ", logLastIndex=" + logLastIndex + ", logLastTerm=" + logLastTerm + ", forceInstallSnapshot="
-                + forceInstallSnapshot + ", payloadVersion=" + payloadVersion + ", raftVersion=" + raftVersion + "]";
+                + forceInstallSnapshot + ", needsLeaderAddress=" + needsLeaderAddress
+                + ", payloadVersion=" + payloadVersion + ", raftVersion=" + raftVersion
+                + ", recipientRaftVersion=" + recipientRaftVersion + "]";
     }
 
     private Object writeReplace() {
-        return new Proxy(this);
+        return recipientRaftVersion >= RaftVersions.FLUORINE_VERSION ? new Proxy2(this) : new Proxy(this);
+    }
+
+    /**
+     * Fluorine version that adds the needsLeaderAddress flag.
+     */
+    private static class Proxy2 implements Externalizable {
+        private static final long serialVersionUID = 1L;
+
+        private AppendEntriesReply appendEntriesReply;
+
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
+        public Proxy2() {
+        }
+
+        Proxy2(AppendEntriesReply appendEntriesReply) {
+            this.appendEntriesReply = appendEntriesReply;
+        }
+
+        @Override
+        public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeShort(appendEntriesReply.raftVersion);
+            out.writeLong(appendEntriesReply.getTerm());
+            out.writeObject(appendEntriesReply.followerId);
+            out.writeBoolean(appendEntriesReply.success);
+            out.writeLong(appendEntriesReply.logLastIndex);
+            out.writeLong(appendEntriesReply.logLastTerm);
+            out.writeShort(appendEntriesReply.payloadVersion);
+            out.writeBoolean(appendEntriesReply.forceInstallSnapshot);
+            out.writeBoolean(appendEntriesReply.needsLeaderAddress);
+        }
+
+        @Override
+        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            short raftVersion = in.readShort();
+            long term = in.readLong();
+            String followerId = (String) in.readObject();
+            boolean success = in.readBoolean();
+            long logLastIndex = in.readLong();
+            long logLastTerm = in.readLong();
+            short payloadVersion = in.readShort();
+            boolean forceInstallSnapshot = in.readBoolean();
+            boolean needsLeaderAddress = in.readBoolean();
+
+            appendEntriesReply = new AppendEntriesReply(followerId, term, success, logLastIndex, logLastTerm,
+                    payloadVersion, forceInstallSnapshot, needsLeaderAddress, raftVersion,
+                    RaftVersions.CURRENT_VERSION);
+        }
+
+        private Object readResolve() {
+            return appendEntriesReply;
+        }
     }
 
+    /**
+     * Pre-Fluorine version.
+     */
+    @Deprecated
     private static class Proxy implements Externalizable {
         private static final long serialVersionUID = 1L;
 
@@ -144,7 +217,7 @@ public class AppendEntriesReply extends AbstractRaftRPC {
             boolean forceInstallSnapshot = in.readBoolean();
 
             appendEntriesReply = new AppendEntriesReply(followerId, term, success, logLastIndex, logLastTerm,
-                    payloadVersion, forceInstallSnapshot, raftVersion);
+                    payloadVersion, forceInstallSnapshot, false, raftVersion, RaftVersions.CURRENT_VERSION);
         }
 
         private Object readResolve() {
index 4dd2b9b..2ba06d6 100644 (file)
@@ -15,6 +15,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
@@ -46,9 +47,13 @@ import org.opendaylight.controller.cluster.raft.MockRaftActor;
 import org.opendaylight.controller.cluster.raft.MockRaftActor.Builder;
 import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
+import org.opendaylight.controller.cluster.raft.NoopPeerAddressResolver;
+import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
+import org.opendaylight.controller.cluster.raft.RaftVersions;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.VotingState;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
@@ -111,6 +116,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
     protected  MockRaftActorContext createActorContext(final ActorRef actorRef) {
         MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
         context.setPayloadVersion(payloadVersion);
+        ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(
+            peerId -> leaderActor.path().toString());
         return context;
     }
 
@@ -804,7 +811,6 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
     }
 
-
     /**
      * This test verifies that when InstallSnapshot is received by
      * the follower its applied correctly.
@@ -1301,6 +1307,36 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
                 MockRaftActor.fromState(snapshot.getState()));
     }
 
+    @Test
+    public void testNeedsLeaderAddress() {
+        logStart("testNeedsLeaderAddress");
+
+        MockRaftActorContext context = createActorContext();
+        context.setReplicatedLog(new MockRaftActorContext.SimpleReplicatedLog());
+        context.addToPeers("leader", null, VotingState.VOTING);
+        ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(NoopPeerAddressResolver.INSTANCE);
+
+        follower = createBehavior(context);
+
+        follower.handleMessage(leaderActor,
+                new AppendEntries(1, "leader", -1, -1, Collections.emptyList(), -1, -1, (short)0));
+
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+        assertTrue(reply.isNeedsLeaderAddress());
+        MessageCollectorActor.clearMessages(leaderActor);
+
+        PeerAddressResolver mockResolver = mock(PeerAddressResolver.class);
+        ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(mockResolver);
+
+        follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(), -1, -1,
+                (short)0, RaftVersions.CURRENT_VERSION, leaderActor.path().toString()));
+
+        reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+        assertFalse(reply.isNeedsLeaderAddress());
+
+        verify(mockResolver).setResolved("leader", leaderActor.path().toString());
+    }
+
     @SuppressWarnings("checkstyle:IllegalCatch")
     private static RaftActorSnapshotCohort newRaftActorSnapshotCohort(
             final AtomicReference<MockRaftActor> followerRaftActor) {
@@ -1366,6 +1402,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
         assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
         assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
         assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot());
+        assertEquals("isNeedsLeaderAddress", false, reply.isNeedsLeaderAddress());
     }
 
 
index 74f02c0..78ec33c 100644 (file)
@@ -775,7 +775,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
         // installed with a SendInstallSnapshot
-        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
+                RaftVersions.CURRENT_VERSION));
 
         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
 
@@ -795,7 +796,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
 
         // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture.
-        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
+                RaftVersions.CURRENT_VERSION));
         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
 
         // Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent.
@@ -807,7 +809,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         // Sending another AppendEntriesReply to force a snapshot should be a no-op and not try to re-send the chunk.
         MessageCollectorActor.clearMessages(followerActor);
-        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
+                RaftVersions.CURRENT_VERSION));
         MessageCollectorActor.assertNoneMatching(followerActor, InstallSnapshot.class, 200);
     }
 
@@ -2376,6 +2379,59 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
     }
 
+    @Test
+    public void testLeaderAddressInAppendEntries() {
+        logStart("testLeaderAddressInAppendEntries");
+
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                FiniteDuration.create(50, TimeUnit.MILLISECONDS));
+        leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+        leaderActorContext.setCommitIndex(-1);
+        leaderActorContext.setLastApplied(-1);
+
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setPeerAddressResolver(
+            peerId -> leaderActor.path().toString());
+
+        leader = new Leader(leaderActorContext);
+        leaderActorContext.setCurrentBehavior(leader);
+
+        // Initial heartbeat shouldn't have the leader address
+
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertFalse(appendEntries.getLeaderAddress().isPresent());
+        MessageCollectorActor.clearMessages(followerActor);
+
+        // Send AppendEntriesReply indicating the follower needs the leader address
+
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, true,
+                RaftVersions.CURRENT_VERSION));
+
+        // Sleep for the heartbeat interval so AppendEntries is sent.
+        Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+                .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
+
+        leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+
+        appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertTrue(appendEntries.getLeaderAddress().isPresent());
+        assertEquals(leaderActor.path().toString(), appendEntries.getLeaderAddress().get());
+        MessageCollectorActor.clearMessages(followerActor);
+
+        // Send AppendEntriesReply indicating the follower does not need the leader address
+
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, false,
+                RaftVersions.CURRENT_VERSION));
+
+        Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+                .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
+
+        leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+
+        appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertFalse(appendEntries.getLeaderAddress().isPresent());
+    }
+
     @Override
     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
             final ActorRef actorRef, final RaftRPC rpc) {
index 6293121..8452a71 100644 (file)
@@ -11,6 +11,7 @@ import static org.junit.Assert.assertEquals;
 
 import org.apache.commons.lang.SerializationUtils;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.RaftVersions;
 
 /**
  * Unit tests for AppendEntriesReply.
@@ -21,7 +22,8 @@ public class AppendEntriesReplyTest {
 
     @Test
     public void testSerialization() {
-        AppendEntriesReply expected = new AppendEntriesReply("follower", 5, true, 100, 4, (short)6);
+        AppendEntriesReply expected = new AppendEntriesReply("follower", 5, true, 100, 4, (short)6, true, true,
+                RaftVersions.CURRENT_VERSION);
         AppendEntriesReply cloned = (AppendEntriesReply) SerializationUtils.clone(expected);
 
         assertEquals("getTerm", expected.getTerm(), cloned.getTerm());
@@ -30,5 +32,24 @@ public class AppendEntriesReplyTest {
         assertEquals("getLogLastIndex", expected.getLogLastIndex(), cloned.getLogLastIndex());
         assertEquals("getPayloadVersion", expected.getPayloadVersion(), cloned.getPayloadVersion());
         assertEquals("getRaftVersion", expected.getRaftVersion(), cloned.getRaftVersion());
+        assertEquals("isForceInstallSnapshot", expected.isForceInstallSnapshot(), cloned.isForceInstallSnapshot());
+        assertEquals("isNeedsLeaderAddress", expected.isNeedsLeaderAddress(), cloned.isNeedsLeaderAddress());
+    }
+
+    @Test
+    @Deprecated
+    public void testPreFluorineSerialization() {
+        AppendEntriesReply expected = new AppendEntriesReply("follower", 5, true, 100, 4, (short)6, true, true,
+                RaftVersions.BORON_VERSION);
+        AppendEntriesReply cloned = (AppendEntriesReply) SerializationUtils.clone(expected);
+
+        assertEquals("getTerm", expected.getTerm(), cloned.getTerm());
+        assertEquals("getFollowerId", expected.getFollowerId(), cloned.getFollowerId());
+        assertEquals("getLogLastTerm", expected.getLogLastTerm(), cloned.getLogLastTerm());
+        assertEquals("getLogLastIndex", expected.getLogLastIndex(), cloned.getLogLastIndex());
+        assertEquals("getPayloadVersion", expected.getPayloadVersion(), cloned.getPayloadVersion());
+        assertEquals("getRaftVersion", expected.getRaftVersion(), cloned.getRaftVersion());
+        assertEquals("isForceInstallSnapshot", expected.isForceInstallSnapshot(), cloned.isForceInstallSnapshot());
+        assertEquals("isNeedsLeaderAddress", false, cloned.isNeedsLeaderAddress());
     }
 }
index d6ee614..a7c3c8b 100644 (file)
@@ -8,12 +8,14 @@
 package org.opendaylight.controller.cluster.raft.messages;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 
 import java.util.Arrays;
 import java.util.Iterator;
 import org.apache.commons.lang.SerializationUtils;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
+import org.opendaylight.controller.cluster.raft.RaftVersions;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
 
@@ -31,15 +33,44 @@ public class AppendEntriesTest {
         ReplicatedLogEntry entry2 = new SimpleReplicatedLogEntry(3, 4, new MockPayload("payload2"));
 
         short payloadVersion = 5;
+
+        // Without leader address
+
         AppendEntries expected = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry1, entry2), 10L,
-                -1, payloadVersion);
+                -1, payloadVersion, RaftVersions.CURRENT_VERSION, null);
 
         AppendEntries cloned = (AppendEntries) SerializationUtils.clone(expected);
 
-        verifyAppendEntries(expected, cloned);
+        verifyAppendEntries(expected, cloned, RaftVersions.CURRENT_VERSION);
+
+        // With leader address
+
+        expected = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry1, entry2), 10L,
+                -1, payloadVersion, RaftVersions.CURRENT_VERSION, "leader address");
+
+        cloned = (AppendEntries) SerializationUtils.clone(expected);
+
+        verifyAppendEntries(expected, cloned, RaftVersions.CURRENT_VERSION);
     }
 
-    private static void verifyAppendEntries(AppendEntries expected, AppendEntries actual) {
+    @Test
+    @Deprecated
+    public void testPreFluorineSerialization() {
+        ReplicatedLogEntry entry1 = new SimpleReplicatedLogEntry(1, 2, new MockPayload("payload1"));
+
+        ReplicatedLogEntry entry2 = new SimpleReplicatedLogEntry(3, 4, new MockPayload("payload2"));
+
+        short payloadVersion = 5;
+
+        AppendEntries expected = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry1, entry2), 10L,
+                -1, payloadVersion, RaftVersions.BORON_VERSION, "leader address");
+
+        AppendEntries cloned = (AppendEntries) SerializationUtils.clone(expected);
+
+        verifyAppendEntries(expected, cloned, RaftVersions.BORON_VERSION);
+    }
+
+    private static void verifyAppendEntries(AppendEntries expected, AppendEntries actual, short recipientRaftVersion) {
         assertEquals("getLeaderId", expected.getLeaderId(), actual.getLeaderId());
         assertEquals("getTerm", expected.getTerm(), actual.getTerm());
         assertEquals("getLeaderCommit", expected.getLeaderCommit(), actual.getLeaderCommit());
@@ -53,6 +84,14 @@ public class AppendEntriesTest {
         for (ReplicatedLogEntry e: actual.getEntries()) {
             verifyReplicatedLogEntry(iter.next(), e);
         }
+
+        if (recipientRaftVersion >= RaftVersions.FLUORINE_VERSION) {
+            assertEquals("getLeaderAddress", expected.getLeaderAddress(), actual.getLeaderAddress());
+            assertEquals("getLeaderRaftVersion", RaftVersions.CURRENT_VERSION, actual.getLeaderRaftVersion());
+        } else {
+            assertFalse(actual.getLeaderAddress().isPresent());
+            assertEquals("getLeaderRaftVersion", RaftVersions.BORON_VERSION, actual.getLeaderRaftVersion());
+        }
     }
 
     private static void verifyReplicatedLogEntry(ReplicatedLogEntry expected, ReplicatedLogEntry actual) {
index 468d650..b1e9079 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.cluster.datastore.shardmanager;
 
 import akka.actor.Address;
+import akka.actor.AddressFromURIString;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -94,4 +95,10 @@ class ShardPeerAddressResolver implements PeerAddressResolver {
         ShardIdentifier shardId = ShardIdentifier.fromShardIdString(peerId);
         return getShardActorAddress(shardId.getShardName(), shardId.getMemberName());
     }
+
+    @Override
+    public void setResolved(String peerId, String address) {
+        memberNameToAddress.put(ShardIdentifier.fromShardIdString(peerId).getMemberName(),
+                AddressFromURIString.parse(address));
+    }
 }
index c735d9d..0c18e79 100644 (file)
@@ -80,6 +80,21 @@ public class ShardPeerAddressResolverTest {
         assertEquals("resolve", shardAddress, resolver.resolve(peerId));
     }
 
+    @Test
+    public void testSetResolved() {
+        String type = "config";
+        ShardPeerAddressResolver resolver = new ShardPeerAddressResolver(type, MEMBER_1);
+
+        String peerId = ShardIdentifier.create("default", MEMBER_2, type).toString();
+
+        String address = "akka.tcp://opendaylight-cluster-data@127.0.0.1:2550/user/shardmanager-" + type
+                + "/" + MEMBER_2.getName() + "-shard-default-" + type;
+
+        resolver.setResolved(peerId, address);
+
+        assertEquals("resolve", address, resolver.resolve(peerId));
+    }
+
     @Test
     public void testGetShardManagerPeerActorAddresses() {
         ShardPeerAddressResolver resolver = new ShardPeerAddressResolver("config", MEMBER_1);