From: Tom Pantelis Date: Thu, 6 Dec 2018 16:43:26 +0000 (-0500) Subject: Send leader's full address via AppendEntries X-Git-Tag: release/neon~21 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=1ffd1f44c4beacdb28683c028bc0eaa209731098 Send leader's full address via AppendEntries Added the leader's full actor address to AppendEntries. The address can be rather long and we don't need to normally send it so it's an Optional. The follower indicates via AppendEntriesReply whether it needs the leader to send its address (new needsLeaderAddress field). On receipt of the leader's address, the follower sets it in its local RaftActorContext and notifies the PeerAddressResolver. The ShardPeerAddressResolver impl updates its local cache thus enabling transactions to resolve the remote leader actor. Since we're changing the serialized footprint of AppendEntries and AppendEntriesReply, I preserved backwards compatibility by versioning a new externalizable Proxy for each. JIRA: CONTROLLER-1861 Change-Id: I1c0870a596b1782015eb973153b74dfcd48694e7 Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java index 3952b386b2..fe836362c8 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java @@ -51,6 +51,8 @@ public final class FollowerLogInformation { private long slicedLogEntryIndex = NO_INDEX; + private boolean needsLeaderAddress; + /** * Constructs an instance. * @@ -336,6 +338,15 @@ public final class FollowerLogInformation { return slicedLogEntryIndex != NO_INDEX; } + public void setNeedsLeaderAddress(boolean value) { + needsLeaderAddress = value; + } + + @Nullable + public String needsLeaderAddress(String leaderId) { + return needsLeaderAddress ? context.getPeerAddress(leaderId) : null; + } + @Override public String toString() { return "FollowerLogInformation [id=" + getId() + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/PeerAddressResolver.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/PeerAddressResolver.java index 302f2fafa4..c7d81570c4 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/PeerAddressResolver.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/PeerAddressResolver.java @@ -17,10 +17,19 @@ import javax.annotation.Nullable; @FunctionalInterface public interface PeerAddressResolver { /** - * Resolves a raft actor peer id to it's remote actor address. + * Resolves a raft actor peer id to its remote actor address. * * @param peerId the id of the peer to resolve * @return the peer's actor path string or null if not found */ @Nullable String resolve(String peerId); + + /** + * Sets the actor address for a raft peer. + * + * @param peerId the id of the peer + * @param address the peer's actor's address + */ + default void setResolved(String peerId, String address) { + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftVersions.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftVersions.java index 364ca5af7b..5ec376412b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftVersions.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftVersions.java @@ -16,5 +16,6 @@ public interface RaftVersions { short HELIUM_VERSION = 0; short LITHIUM_VERSION = 1; short BORON_VERSION = 3; - short CURRENT_VERSION = BORON_VERSION; + short FLUORINE_VERSION = 4; + short CURRENT_VERSION = FLUORINE_VERSION; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index e585066d73..2175eb7555 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -227,6 +227,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerLogInformation.markFollowerActive(); followerLogInformation.setPayloadVersion(appendEntriesReply.getPayloadVersion()); followerLogInformation.setRaftVersion(appendEntriesReply.getRaftVersion()); + followerLogInformation.setNeedsLeaderAddress(appendEntriesReply.isNeedsLeaderAddress()); long followerLastLogIndex = appendEntriesReply.getLogLastIndex(); boolean updated = false; @@ -816,7 +817,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(), getLogEntryIndex(followerNextIndex - 1), getLogEntryTerm(followerNextIndex - 1), entries, - leaderCommitIndex, super.getReplicatedToAllIndex(), context.getPayloadVersion()); + leaderCommitIndex, super.getReplicatedToAllIndex(), context.getPayloadVersion(), + followerLogInformation.getRaftVersion(), followerLogInformation.needsLeaderAddress(getId())); if (!entries.isEmpty() || log.isTraceEnabled()) { log.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerLogInformation.getId(), diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index b63a7cc5c9..087c656b18 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -137,7 +137,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { appendEntries.getTerm(), currentTerm()); sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex(), lastTerm(), - context.getPayloadVersion()), actor()); + context.getPayloadVersion(), false, false, appendEntries.getLeaderRaftVersion()), actor()); return this; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index 2377fbf442..f6276a65e0 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -148,7 +148,8 @@ public class Follower extends AbstractRaftActorBehavior { if (snapshotTracker != null || context.getSnapshotManager().isApplying()) { // if snapshot install is in progress, follower should just acknowledge append entries with a reply. AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true, - lastIndex(), lastTerm(), context.getPayloadVersion()); + lastIndex(), lastTerm(), context.getPayloadVersion(), false, needsLeaderAddress(), + appendEntries.getLeaderRaftVersion()); log.debug("{}: snapshot install is in progress, replying immediately with {}", logName(), reply); sender.tell(reply, actor()); @@ -160,6 +161,14 @@ public class Follower extends AbstractRaftActorBehavior { leaderId = appendEntries.getLeaderId(); leaderPayloadVersion = appendEntries.getPayloadVersion(); + if (appendEntries.getLeaderAddress().isPresent()) { + final String address = appendEntries.getLeaderAddress().get(); + log.debug("New leader address: {}", address); + + context.setPeerAddress(leaderId, address); + context.getConfigParams().getPeerAddressResolver().setResolved(leaderId, address); + } + // First check if the logs are in sync or not if (isOutOfSync(appendEntries, sender)) { updateInitialSyncStatus(appendEntries.getLeaderCommit(), appendEntries.getLeaderId()); @@ -184,7 +193,8 @@ public class Follower extends AbstractRaftActorBehavior { } AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true, - lastIndex, lastTerm(), context.getPayloadVersion()); + lastIndex, lastTerm(), context.getPayloadVersion(), false, needsLeaderAddress(), + appendEntries.getLeaderRaftVersion()); if (log.isTraceEnabled()) { log.trace("{}: handleAppendEntries returning : {}", logName(), reply); @@ -267,14 +277,16 @@ public class Follower extends AbstractRaftActorBehavior { log.info("{}: Could not remove entries - sending reply to force snapshot", logName()); sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex, - lastTerm(), context.getPayloadVersion(), true), actor()); + lastTerm(), context.getPayloadVersion(), true, needsLeaderAddress(), + appendEntries.getLeaderRaftVersion()), actor()); return false; } break; } else { sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex, - lastTerm(), context.getPayloadVersion(), true), actor()); + lastTerm(), context.getPayloadVersion(), true, needsLeaderAddress(), + appendEntries.getLeaderRaftVersion()), actor()); return false; } } @@ -332,7 +344,7 @@ public class Follower extends AbstractRaftActorBehavior { log.info("{}: The followers log is empty and the senders prevLogIndex is {}", logName(), appendEntries.getPrevLogIndex()); - sendOutOfSyncAppendEntriesReply(sender, false); + sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion()); return true; } @@ -351,7 +363,7 @@ public class Follower extends AbstractRaftActorBehavior { appendEntries.getPrevLogTerm(), lastIndex, context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm()); - sendOutOfSyncAppendEntriesReply(sender, false); + sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion()); return true; } } else if (appendEntries.getPrevLogIndex() != -1) { @@ -362,7 +374,7 @@ public class Follower extends AbstractRaftActorBehavior { + "snapshotIndex: {}, snapshotTerm: {}", logName(), appendEntries.getPrevLogIndex(), lastIndex, context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm()); - sendOutOfSyncAppendEntriesReply(sender, false); + sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion()); return true; } } @@ -378,7 +390,7 @@ public class Follower extends AbstractRaftActorBehavior { appendEntries.getReplicatedToAllIndex(), lastIndex, context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm()); - sendOutOfSyncAppendEntriesReply(sender, false); + sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion()); return true; } @@ -389,7 +401,7 @@ public class Follower extends AbstractRaftActorBehavior { entries.get(0).getIndex() - 1, lastIndex, context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm()); - sendOutOfSyncAppendEntriesReply(sender, false); + sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion()); return true; } } @@ -397,15 +409,21 @@ public class Follower extends AbstractRaftActorBehavior { return false; } - private void sendOutOfSyncAppendEntriesReply(final ActorRef sender, boolean forceInstallSnapshot) { + private void sendOutOfSyncAppendEntriesReply(final ActorRef sender, boolean forceInstallSnapshot, + short leaderRaftVersion) { // We found that the log was out of sync so just send a negative reply. final AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex(), - lastTerm(), context.getPayloadVersion(), forceInstallSnapshot); + lastTerm(), context.getPayloadVersion(), forceInstallSnapshot, needsLeaderAddress(), + leaderRaftVersion); log.info("{}: Follower is out-of-sync so sending negative reply: {}", logName(), reply); sender.tell(reply, actor()); } + private boolean needsLeaderAddress() { + return context.getPeerAddress(leaderId) == null; + } + @Override protected RaftActorBehavior handleAppendEntriesReply(final ActorRef sender, final AppendEntriesReply appendEntriesReply) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java index 4d4b7e4fdd..d77a084148 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java @@ -8,14 +8,19 @@ package org.opendaylight.controller.cluster.raft.messages; -import com.google.common.base.Preconditions; +import static java.util.Objects.requireNonNull; + +import com.google.common.annotations.VisibleForTesting; import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.opendaylight.controller.cluster.raft.RaftVersions; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; @@ -47,17 +52,41 @@ public class AppendEntries extends AbstractRaftRPC { private final short payloadVersion; - public AppendEntries(long term, @Nonnull String leaderId, long prevLogIndex, long prevLogTerm, + private final short recipientRaftVersion; + + private final short leaderRaftVersion; + + private final String leaderAddress; + + private AppendEntries(long term, @Nonnull String leaderId, long prevLogIndex, long prevLogTerm, @Nonnull List entries, long leaderCommit, long replicatedToAllIndex, - short payloadVersion) { + short payloadVersion, short recipientRaftVersion, short leaderRaftVersion, @Nullable String leaderAddress) { super(term); - this.leaderId = Preconditions.checkNotNull(leaderId); + this.leaderId = requireNonNull(leaderId); this.prevLogIndex = prevLogIndex; this.prevLogTerm = prevLogTerm; - this.entries = Preconditions.checkNotNull(entries); + this.entries = requireNonNull(entries); this.leaderCommit = leaderCommit; this.replicatedToAllIndex = replicatedToAllIndex; this.payloadVersion = payloadVersion; + this.recipientRaftVersion = recipientRaftVersion; + this.leaderRaftVersion = leaderRaftVersion; + this.leaderAddress = leaderAddress; + } + + public AppendEntries(long term, @Nonnull String leaderId, long prevLogIndex, long prevLogTerm, + @Nonnull List entries, long leaderCommit, long replicatedToAllIndex, + short payloadVersion, short recipientRaftVersion, @Nullable String leaderAddress) { + this(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, replicatedToAllIndex, payloadVersion, + recipientRaftVersion, RaftVersions.CURRENT_VERSION, leaderAddress); + } + + @VisibleForTesting + public AppendEntries(long term, @Nonnull String leaderId, long prevLogIndex, long prevLogTerm, + @Nonnull List entries, long leaderCommit, long replicatedToAllIndex, + short payloadVersion) { + this(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, replicatedToAllIndex, payloadVersion, + RaftVersions.CURRENT_VERSION, null); } @Nonnull @@ -90,6 +119,14 @@ public class AppendEntries extends AbstractRaftRPC { return payloadVersion; } + public Optional getLeaderAddress() { + return Optional.ofNullable(leaderAddress); + } + + public short getLeaderRaftVersion() { + return leaderRaftVersion; + } + @Override public String toString() { return "AppendEntries [leaderId=" + leaderId @@ -98,13 +135,88 @@ public class AppendEntries extends AbstractRaftRPC { + ", leaderCommit=" + leaderCommit + ", replicatedToAllIndex=" + replicatedToAllIndex + ", payloadVersion=" + payloadVersion + + ", recipientRaftVersion=" + recipientRaftVersion + + ", leaderRaftVersion=" + leaderRaftVersion + + ", leaderAddress=" + leaderAddress + ", entries=" + entries + "]"; } private Object writeReplace() { - return new Proxy(this); + return recipientRaftVersion >= RaftVersions.FLUORINE_VERSION ? new ProxyV2(this) : new Proxy(this); + } + + /** + * Fluorine version that adds the leader address. + */ + private static class ProxyV2 implements Externalizable { + private static final long serialVersionUID = 1L; + + private AppendEntries appendEntries; + + // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't + // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") + public ProxyV2() { + } + + ProxyV2(AppendEntries appendEntries) { + this.appendEntries = appendEntries; + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + out.writeShort(appendEntries.leaderRaftVersion); + out.writeLong(appendEntries.getTerm()); + out.writeObject(appendEntries.leaderId); + out.writeLong(appendEntries.prevLogTerm); + out.writeLong(appendEntries.prevLogIndex); + out.writeLong(appendEntries.leaderCommit); + out.writeLong(appendEntries.replicatedToAllIndex); + out.writeShort(appendEntries.payloadVersion); + + out.writeInt(appendEntries.entries.size()); + for (ReplicatedLogEntry e: appendEntries.entries) { + out.writeLong(e.getIndex()); + out.writeLong(e.getTerm()); + out.writeObject(e.getData()); + } + + out.writeObject(appendEntries.leaderAddress); + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + short leaderRaftVersion = in.readShort(); + long term = in.readLong(); + String leaderId = (String) in.readObject(); + long prevLogTerm = in.readLong(); + long prevLogIndex = in.readLong(); + long leaderCommit = in.readLong(); + long replicatedToAllIndex = in.readLong(); + short payloadVersion = in.readShort(); + + int size = in.readInt(); + List entries = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + entries.add(new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject())); + } + + String leaderAddress = (String)in.readObject(); + + appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, + replicatedToAllIndex, payloadVersion, RaftVersions.CURRENT_VERSION, leaderRaftVersion, + leaderAddress); + } + + private Object readResolve() { + return appendEntries; + } } + /** + * Pre-Fluorine version. + */ + @Deprecated private static class Proxy implements Externalizable { private static final long serialVersionUID = 1L; @@ -155,7 +267,7 @@ public class AppendEntries extends AbstractRaftRPC { } appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, - replicatedToAllIndex, payloadVersion); + replicatedToAllIndex, payloadVersion, RaftVersions.CURRENT_VERSION, RaftVersions.BORON_VERSION, null); } private Object readResolve() { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java index e2f0ba9014..902b9a03b7 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.cluster.raft.messages; +import com.google.common.annotations.VisibleForTesting; import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; @@ -41,22 +42,29 @@ public class AppendEntriesReply extends AbstractRaftRPC { private final boolean forceInstallSnapshot; + private final boolean needsLeaderAddress; + + private final short recipientRaftVersion; + + @VisibleForTesting public AppendEntriesReply(String followerId, long term, boolean success, long logLastIndex, long logLastTerm, short payloadVersion) { - this(followerId, term, success, logLastIndex, logLastTerm, payloadVersion, false); + this(followerId, term, success, logLastIndex, logLastTerm, payloadVersion, false, false, + RaftVersions.CURRENT_VERSION); } public AppendEntriesReply(String followerId, long term, boolean success, long logLastIndex, long logLastTerm, - short payloadVersion, boolean forceInstallSnapshot) { + short payloadVersion, boolean forceInstallSnapshot, boolean needsLeaderAddress, + short recipientRaftVersion) { this(followerId, term, success, logLastIndex, logLastTerm, payloadVersion, forceInstallSnapshot, - RaftVersions.CURRENT_VERSION); + needsLeaderAddress, RaftVersions.CURRENT_VERSION, recipientRaftVersion); } private AppendEntriesReply(String followerId, long term, boolean success, long logLastIndex, long logLastTerm, - short payloadVersion, boolean forceInstallSnapshot, short raftVersion) { + short payloadVersion, boolean forceInstallSnapshot, boolean needsLeaderAddress, short raftVersion, + short recipientRaftVersion) { super(term); - this.followerId = followerId; this.success = success; this.logLastIndex = logLastIndex; @@ -64,6 +72,8 @@ public class AppendEntriesReply extends AbstractRaftRPC { this.payloadVersion = payloadVersion; this.forceInstallSnapshot = forceInstallSnapshot; this.raftVersion = raftVersion; + this.needsLeaderAddress = needsLeaderAddress; + this.recipientRaftVersion = recipientRaftVersion; } public boolean isSuccess() { @@ -94,17 +104,80 @@ public class AppendEntriesReply extends AbstractRaftRPC { return forceInstallSnapshot; } + public boolean isNeedsLeaderAddress() { + return needsLeaderAddress; + } + @Override public String toString() { return "AppendEntriesReply [term=" + getTerm() + ", success=" + success + ", followerId=" + followerId + ", logLastIndex=" + logLastIndex + ", logLastTerm=" + logLastTerm + ", forceInstallSnapshot=" - + forceInstallSnapshot + ", payloadVersion=" + payloadVersion + ", raftVersion=" + raftVersion + "]"; + + forceInstallSnapshot + ", needsLeaderAddress=" + needsLeaderAddress + + ", payloadVersion=" + payloadVersion + ", raftVersion=" + raftVersion + + ", recipientRaftVersion=" + recipientRaftVersion + "]"; } private Object writeReplace() { - return new Proxy(this); + return recipientRaftVersion >= RaftVersions.FLUORINE_VERSION ? new Proxy2(this) : new Proxy(this); + } + + /** + * Fluorine version that adds the needsLeaderAddress flag. + */ + private static class Proxy2 implements Externalizable { + private static final long serialVersionUID = 1L; + + private AppendEntriesReply appendEntriesReply; + + // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't + // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") + public Proxy2() { + } + + Proxy2(AppendEntriesReply appendEntriesReply) { + this.appendEntriesReply = appendEntriesReply; + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + out.writeShort(appendEntriesReply.raftVersion); + out.writeLong(appendEntriesReply.getTerm()); + out.writeObject(appendEntriesReply.followerId); + out.writeBoolean(appendEntriesReply.success); + out.writeLong(appendEntriesReply.logLastIndex); + out.writeLong(appendEntriesReply.logLastTerm); + out.writeShort(appendEntriesReply.payloadVersion); + out.writeBoolean(appendEntriesReply.forceInstallSnapshot); + out.writeBoolean(appendEntriesReply.needsLeaderAddress); + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + short raftVersion = in.readShort(); + long term = in.readLong(); + String followerId = (String) in.readObject(); + boolean success = in.readBoolean(); + long logLastIndex = in.readLong(); + long logLastTerm = in.readLong(); + short payloadVersion = in.readShort(); + boolean forceInstallSnapshot = in.readBoolean(); + boolean needsLeaderAddress = in.readBoolean(); + + appendEntriesReply = new AppendEntriesReply(followerId, term, success, logLastIndex, logLastTerm, + payloadVersion, forceInstallSnapshot, needsLeaderAddress, raftVersion, + RaftVersions.CURRENT_VERSION); + } + + private Object readResolve() { + return appendEntriesReply; + } } + /** + * Pre-Fluorine version. + */ + @Deprecated private static class Proxy implements Externalizable { private static final long serialVersionUID = 1L; @@ -144,7 +217,7 @@ public class AppendEntriesReply extends AbstractRaftRPC { boolean forceInstallSnapshot = in.readBoolean(); appendEntriesReply = new AppendEntriesReply(followerId, term, success, logLastIndex, logLastTerm, - payloadVersion, forceInstallSnapshot, raftVersion); + payloadVersion, forceInstallSnapshot, false, raftVersion, RaftVersions.CURRENT_VERSION); } private Object readResolve() { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index 4dd2b9b536..2ba06d6925 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -15,6 +15,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -46,9 +47,13 @@ import org.opendaylight.controller.cluster.raft.MockRaftActor; import org.opendaylight.controller.cluster.raft.MockRaftActor.Builder; import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; +import org.opendaylight.controller.cluster.raft.NoopPeerAddressResolver; +import org.opendaylight.controller.cluster.raft.PeerAddressResolver; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; +import org.opendaylight.controller.cluster.raft.RaftVersions; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.VotingState; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; @@ -111,6 +116,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { protected MockRaftActorContext createActorContext(final ActorRef actorRef) { MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef); context.setPayloadVersion(payloadVersion); + ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver( + peerId -> leaderActor.path().toString()); return context; } @@ -804,7 +811,6 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4); } - /** * This test verifies that when InstallSnapshot is received by * the follower its applied correctly. @@ -1301,6 +1307,36 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { MockRaftActor.fromState(snapshot.getState())); } + @Test + public void testNeedsLeaderAddress() { + logStart("testNeedsLeaderAddress"); + + MockRaftActorContext context = createActorContext(); + context.setReplicatedLog(new MockRaftActorContext.SimpleReplicatedLog()); + context.addToPeers("leader", null, VotingState.VOTING); + ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(NoopPeerAddressResolver.INSTANCE); + + follower = createBehavior(context); + + follower.handleMessage(leaderActor, + new AppendEntries(1, "leader", -1, -1, Collections.emptyList(), -1, -1, (short)0)); + + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + assertTrue(reply.isNeedsLeaderAddress()); + MessageCollectorActor.clearMessages(leaderActor); + + PeerAddressResolver mockResolver = mock(PeerAddressResolver.class); + ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(mockResolver); + + follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(), -1, -1, + (short)0, RaftVersions.CURRENT_VERSION, leaderActor.path().toString())); + + reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + assertFalse(reply.isNeedsLeaderAddress()); + + verify(mockResolver).setResolved("leader", leaderActor.path().toString()); + } + @SuppressWarnings("checkstyle:IllegalCatch") private static RaftActorSnapshotCohort newRaftActorSnapshotCohort( final AtomicReference followerRaftActor) { @@ -1366,6 +1402,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex()); assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion()); assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot()); + assertEquals("isNeedsLeaderAddress", false, reply.isNeedsLeaderAddress()); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 74f02c09a2..78ec33cba1 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -775,7 +775,8 @@ public class LeaderTest extends AbstractLeaderTest { // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets // installed with a SendInstallSnapshot - leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true)); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false, + RaftVersions.CURRENT_VERSION)); assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing()); @@ -795,7 +796,8 @@ public class LeaderTest extends AbstractLeaderTest { assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture. - leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true)); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false, + RaftVersions.CURRENT_VERSION)); assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); // Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent. @@ -807,7 +809,8 @@ public class LeaderTest extends AbstractLeaderTest { // Sending another AppendEntriesReply to force a snapshot should be a no-op and not try to re-send the chunk. MessageCollectorActor.clearMessages(followerActor); - leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true)); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false, + RaftVersions.CURRENT_VERSION)); MessageCollectorActor.assertNoneMatching(followerActor, InstallSnapshot.class, 200); } @@ -2376,6 +2379,59 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class); } + @Test + public void testLeaderAddressInAppendEntries() { + logStart("testLeaderAddressInAppendEntries"); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + FiniteDuration.create(50, TimeUnit.MILLISECONDS)); + leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + leaderActorContext.setCommitIndex(-1); + leaderActorContext.setLastApplied(-1); + + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setPeerAddressResolver( + peerId -> leaderActor.path().toString()); + + leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); + + // Initial heartbeat shouldn't have the leader address + + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertFalse(appendEntries.getLeaderAddress().isPresent()); + MessageCollectorActor.clearMessages(followerActor); + + // Send AppendEntriesReply indicating the follower needs the leader address + + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, true, + RaftVersions.CURRENT_VERSION)); + + // Sleep for the heartbeat interval so AppendEntries is sent. + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams() + .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS); + + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); + + appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertTrue(appendEntries.getLeaderAddress().isPresent()); + assertEquals(leaderActor.path().toString(), appendEntries.getLeaderAddress().get()); + MessageCollectorActor.clearMessages(followerActor); + + // Send AppendEntriesReply indicating the follower does not need the leader address + + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, false, + RaftVersions.CURRENT_VERSION)); + + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams() + .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS); + + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); + + appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertFalse(appendEntries.getLeaderAddress().isPresent()); + } + @Override protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext, final ActorRef actorRef, final RaftRPC rpc) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReplyTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReplyTest.java index 629312110d..8452a71c24 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReplyTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReplyTest.java @@ -11,6 +11,7 @@ import static org.junit.Assert.assertEquals; import org.apache.commons.lang.SerializationUtils; import org.junit.Test; +import org.opendaylight.controller.cluster.raft.RaftVersions; /** * Unit tests for AppendEntriesReply. @@ -21,7 +22,8 @@ public class AppendEntriesReplyTest { @Test public void testSerialization() { - AppendEntriesReply expected = new AppendEntriesReply("follower", 5, true, 100, 4, (short)6); + AppendEntriesReply expected = new AppendEntriesReply("follower", 5, true, 100, 4, (short)6, true, true, + RaftVersions.CURRENT_VERSION); AppendEntriesReply cloned = (AppendEntriesReply) SerializationUtils.clone(expected); assertEquals("getTerm", expected.getTerm(), cloned.getTerm()); @@ -30,5 +32,24 @@ public class AppendEntriesReplyTest { assertEquals("getLogLastIndex", expected.getLogLastIndex(), cloned.getLogLastIndex()); assertEquals("getPayloadVersion", expected.getPayloadVersion(), cloned.getPayloadVersion()); assertEquals("getRaftVersion", expected.getRaftVersion(), cloned.getRaftVersion()); + assertEquals("isForceInstallSnapshot", expected.isForceInstallSnapshot(), cloned.isForceInstallSnapshot()); + assertEquals("isNeedsLeaderAddress", expected.isNeedsLeaderAddress(), cloned.isNeedsLeaderAddress()); + } + + @Test + @Deprecated + public void testPreFluorineSerialization() { + AppendEntriesReply expected = new AppendEntriesReply("follower", 5, true, 100, 4, (short)6, true, true, + RaftVersions.BORON_VERSION); + AppendEntriesReply cloned = (AppendEntriesReply) SerializationUtils.clone(expected); + + assertEquals("getTerm", expected.getTerm(), cloned.getTerm()); + assertEquals("getFollowerId", expected.getFollowerId(), cloned.getFollowerId()); + assertEquals("getLogLastTerm", expected.getLogLastTerm(), cloned.getLogLastTerm()); + assertEquals("getLogLastIndex", expected.getLogLastIndex(), cloned.getLogLastIndex()); + assertEquals("getPayloadVersion", expected.getPayloadVersion(), cloned.getPayloadVersion()); + assertEquals("getRaftVersion", expected.getRaftVersion(), cloned.getRaftVersion()); + assertEquals("isForceInstallSnapshot", expected.isForceInstallSnapshot(), cloned.isForceInstallSnapshot()); + assertEquals("isNeedsLeaderAddress", false, cloned.isNeedsLeaderAddress()); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesTest.java index d6ee6141bc..a7c3c8b9d5 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesTest.java @@ -8,12 +8,14 @@ package org.opendaylight.controller.cluster.raft.messages; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import java.util.Arrays; import java.util.Iterator; import org.apache.commons.lang.SerializationUtils; import org.junit.Test; import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload; +import org.opendaylight.controller.cluster.raft.RaftVersions; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; @@ -31,15 +33,44 @@ public class AppendEntriesTest { ReplicatedLogEntry entry2 = new SimpleReplicatedLogEntry(3, 4, new MockPayload("payload2")); short payloadVersion = 5; + + // Without leader address + AppendEntries expected = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry1, entry2), 10L, - -1, payloadVersion); + -1, payloadVersion, RaftVersions.CURRENT_VERSION, null); AppendEntries cloned = (AppendEntries) SerializationUtils.clone(expected); - verifyAppendEntries(expected, cloned); + verifyAppendEntries(expected, cloned, RaftVersions.CURRENT_VERSION); + + // With leader address + + expected = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry1, entry2), 10L, + -1, payloadVersion, RaftVersions.CURRENT_VERSION, "leader address"); + + cloned = (AppendEntries) SerializationUtils.clone(expected); + + verifyAppendEntries(expected, cloned, RaftVersions.CURRENT_VERSION); } - private static void verifyAppendEntries(AppendEntries expected, AppendEntries actual) { + @Test + @Deprecated + public void testPreFluorineSerialization() { + ReplicatedLogEntry entry1 = new SimpleReplicatedLogEntry(1, 2, new MockPayload("payload1")); + + ReplicatedLogEntry entry2 = new SimpleReplicatedLogEntry(3, 4, new MockPayload("payload2")); + + short payloadVersion = 5; + + AppendEntries expected = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry1, entry2), 10L, + -1, payloadVersion, RaftVersions.BORON_VERSION, "leader address"); + + AppendEntries cloned = (AppendEntries) SerializationUtils.clone(expected); + + verifyAppendEntries(expected, cloned, RaftVersions.BORON_VERSION); + } + + private static void verifyAppendEntries(AppendEntries expected, AppendEntries actual, short recipientRaftVersion) { assertEquals("getLeaderId", expected.getLeaderId(), actual.getLeaderId()); assertEquals("getTerm", expected.getTerm(), actual.getTerm()); assertEquals("getLeaderCommit", expected.getLeaderCommit(), actual.getLeaderCommit()); @@ -53,6 +84,14 @@ public class AppendEntriesTest { for (ReplicatedLogEntry e: actual.getEntries()) { verifyReplicatedLogEntry(iter.next(), e); } + + if (recipientRaftVersion >= RaftVersions.FLUORINE_VERSION) { + assertEquals("getLeaderAddress", expected.getLeaderAddress(), actual.getLeaderAddress()); + assertEquals("getLeaderRaftVersion", RaftVersions.CURRENT_VERSION, actual.getLeaderRaftVersion()); + } else { + assertFalse(actual.getLeaderAddress().isPresent()); + assertEquals("getLeaderRaftVersion", RaftVersions.BORON_VERSION, actual.getLeaderRaftVersion()); + } } private static void verifyReplicatedLogEntry(ReplicatedLogEntry expected, ReplicatedLogEntry actual) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardPeerAddressResolver.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardPeerAddressResolver.java index 468d650d4d..b1e9079f50 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardPeerAddressResolver.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardPeerAddressResolver.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.cluster.datastore.shardmanager; import akka.actor.Address; +import akka.actor.AddressFromURIString; import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Collection; @@ -94,4 +95,10 @@ class ShardPeerAddressResolver implements PeerAddressResolver { ShardIdentifier shardId = ShardIdentifier.fromShardIdString(peerId); return getShardActorAddress(shardId.getShardName(), shardId.getMemberName()); } + + @Override + public void setResolved(String peerId, String address) { + memberNameToAddress.put(ShardIdentifier.fromShardIdString(peerId).getMemberName(), + AddressFromURIString.parse(address)); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardPeerAddressResolverTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardPeerAddressResolverTest.java index c735d9dfd4..0c18e799a5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardPeerAddressResolverTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardPeerAddressResolverTest.java @@ -80,6 +80,21 @@ public class ShardPeerAddressResolverTest { assertEquals("resolve", shardAddress, resolver.resolve(peerId)); } + @Test + public void testSetResolved() { + String type = "config"; + ShardPeerAddressResolver resolver = new ShardPeerAddressResolver(type, MEMBER_1); + + String peerId = ShardIdentifier.create("default", MEMBER_2, type).toString(); + + String address = "akka.tcp://opendaylight-cluster-data@127.0.0.1:2550/user/shardmanager-" + type + + "/" + MEMBER_2.getName() + "-shard-default-" + type; + + resolver.setResolved(peerId, address); + + assertEquals("resolve", address, resolver.resolve(peerId)); + } + @Test public void testGetShardManagerPeerActorAddresses() { ShardPeerAddressResolver resolver = new ShardPeerAddressResolver("config", MEMBER_1);