private long slicedLogEntryIndex = NO_INDEX;
+ private boolean needsLeaderAddress;
+
/**
* Constructs an instance.
*
return slicedLogEntryIndex != NO_INDEX;
}
+ public void setNeedsLeaderAddress(boolean value) {
+ needsLeaderAddress = value;
+ }
+
+ @Nullable
+ public String needsLeaderAddress(String leaderId) {
+ return needsLeaderAddress ? context.getPeerAddress(leaderId) : null;
+ }
+
@Override
public String toString() {
return "FollowerLogInformation [id=" + getId() + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex
@FunctionalInterface
public interface PeerAddressResolver {
/**
- * Resolves a raft actor peer id to it's remote actor address.
+ * Resolves a raft actor peer id to its remote actor address.
*
* @param peerId the id of the peer to resolve
* @return the peer's actor path string or null if not found
*/
@Nullable String resolve(String peerId);
+
+ /**
+ * Sets the actor address for a raft peer.
+ *
+ * @param peerId the id of the peer
+ * @param address the peer's actor's address
+ */
+ default void setResolved(String peerId, String address) {
+ }
}
short HELIUM_VERSION = 0;
short LITHIUM_VERSION = 1;
short BORON_VERSION = 3;
- short CURRENT_VERSION = BORON_VERSION;
+ short FLUORINE_VERSION = 4;
+ short CURRENT_VERSION = FLUORINE_VERSION;
}
followerLogInformation.markFollowerActive();
followerLogInformation.setPayloadVersion(appendEntriesReply.getPayloadVersion());
followerLogInformation.setRaftVersion(appendEntriesReply.getRaftVersion());
+ followerLogInformation.setNeedsLeaderAddress(appendEntriesReply.isNeedsLeaderAddress());
long followerLastLogIndex = appendEntriesReply.getLogLastIndex();
boolean updated = false;
AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
getLogEntryIndex(followerNextIndex - 1),
getLogEntryTerm(followerNextIndex - 1), entries,
- leaderCommitIndex, super.getReplicatedToAllIndex(), context.getPayloadVersion());
+ leaderCommitIndex, super.getReplicatedToAllIndex(), context.getPayloadVersion(),
+ followerLogInformation.getRaftVersion(), followerLogInformation.needsLeaderAddress(getId()));
if (!entries.isEmpty() || log.isTraceEnabled()) {
log.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerLogInformation.getId(),
appendEntries.getTerm(), currentTerm());
sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex(), lastTerm(),
- context.getPayloadVersion()), actor());
+ context.getPayloadVersion(), false, false, appendEntries.getLeaderRaftVersion()), actor());
return this;
}
if (snapshotTracker != null || context.getSnapshotManager().isApplying()) {
// if snapshot install is in progress, follower should just acknowledge append entries with a reply.
AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
- lastIndex(), lastTerm(), context.getPayloadVersion());
+ lastIndex(), lastTerm(), context.getPayloadVersion(), false, needsLeaderAddress(),
+ appendEntries.getLeaderRaftVersion());
log.debug("{}: snapshot install is in progress, replying immediately with {}", logName(), reply);
sender.tell(reply, actor());
leaderId = appendEntries.getLeaderId();
leaderPayloadVersion = appendEntries.getPayloadVersion();
+ if (appendEntries.getLeaderAddress().isPresent()) {
+ final String address = appendEntries.getLeaderAddress().get();
+ log.debug("New leader address: {}", address);
+
+ context.setPeerAddress(leaderId, address);
+ context.getConfigParams().getPeerAddressResolver().setResolved(leaderId, address);
+ }
+
// First check if the logs are in sync or not
if (isOutOfSync(appendEntries, sender)) {
updateInitialSyncStatus(appendEntries.getLeaderCommit(), appendEntries.getLeaderId());
}
AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
- lastIndex, lastTerm(), context.getPayloadVersion());
+ lastIndex, lastTerm(), context.getPayloadVersion(), false, needsLeaderAddress(),
+ appendEntries.getLeaderRaftVersion());
if (log.isTraceEnabled()) {
log.trace("{}: handleAppendEntries returning : {}", logName(), reply);
log.info("{}: Could not remove entries - sending reply to force snapshot", logName());
sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
- lastTerm(), context.getPayloadVersion(), true), actor());
+ lastTerm(), context.getPayloadVersion(), true, needsLeaderAddress(),
+ appendEntries.getLeaderRaftVersion()), actor());
return false;
}
break;
} else {
sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
- lastTerm(), context.getPayloadVersion(), true), actor());
+ lastTerm(), context.getPayloadVersion(), true, needsLeaderAddress(),
+ appendEntries.getLeaderRaftVersion()), actor());
return false;
}
}
log.info("{}: The followers log is empty and the senders prevLogIndex is {}", logName(),
appendEntries.getPrevLogIndex());
- sendOutOfSyncAppendEntriesReply(sender, false);
+ sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion());
return true;
}
appendEntries.getPrevLogTerm(), lastIndex, context.getReplicatedLog().getSnapshotIndex(),
context.getReplicatedLog().getSnapshotTerm());
- sendOutOfSyncAppendEntriesReply(sender, false);
+ sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion());
return true;
}
} else if (appendEntries.getPrevLogIndex() != -1) {
+ "snapshotIndex: {}, snapshotTerm: {}", logName(), appendEntries.getPrevLogIndex(), lastIndex,
context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm());
- sendOutOfSyncAppendEntriesReply(sender, false);
+ sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion());
return true;
}
}
appendEntries.getReplicatedToAllIndex(), lastIndex,
context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm());
- sendOutOfSyncAppendEntriesReply(sender, false);
+ sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion());
return true;
}
entries.get(0).getIndex() - 1, lastIndex, context.getReplicatedLog().getSnapshotIndex(),
context.getReplicatedLog().getSnapshotTerm());
- sendOutOfSyncAppendEntriesReply(sender, false);
+ sendOutOfSyncAppendEntriesReply(sender, false, appendEntries.getLeaderRaftVersion());
return true;
}
}
return false;
}
- private void sendOutOfSyncAppendEntriesReply(final ActorRef sender, boolean forceInstallSnapshot) {
+ private void sendOutOfSyncAppendEntriesReply(final ActorRef sender, boolean forceInstallSnapshot,
+ short leaderRaftVersion) {
// We found that the log was out of sync so just send a negative reply.
final AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex(),
- lastTerm(), context.getPayloadVersion(), forceInstallSnapshot);
+ lastTerm(), context.getPayloadVersion(), forceInstallSnapshot, needsLeaderAddress(),
+ leaderRaftVersion);
log.info("{}: Follower is out-of-sync so sending negative reply: {}", logName(), reply);
sender.tell(reply, actor());
}
+ private boolean needsLeaderAddress() {
+ return context.getPeerAddress(leaderId) == null;
+ }
+
@Override
protected RaftActorBehavior handleAppendEntriesReply(final ActorRef sender,
final AppendEntriesReply appendEntriesReply) {
package org.opendaylight.controller.cluster.raft.messages;
-import com.google.common.base.Preconditions;
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.annotations.VisibleForTesting;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.raft.RaftVersions;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
private final short payloadVersion;
- public AppendEntries(long term, @Nonnull String leaderId, long prevLogIndex, long prevLogTerm,
+ private final short recipientRaftVersion;
+
+ private final short leaderRaftVersion;
+
+ private final String leaderAddress;
+
+ private AppendEntries(long term, @Nonnull String leaderId, long prevLogIndex, long prevLogTerm,
@Nonnull List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex,
- short payloadVersion) {
+ short payloadVersion, short recipientRaftVersion, short leaderRaftVersion, @Nullable String leaderAddress) {
super(term);
- this.leaderId = Preconditions.checkNotNull(leaderId);
+ this.leaderId = requireNonNull(leaderId);
this.prevLogIndex = prevLogIndex;
this.prevLogTerm = prevLogTerm;
- this.entries = Preconditions.checkNotNull(entries);
+ this.entries = requireNonNull(entries);
this.leaderCommit = leaderCommit;
this.replicatedToAllIndex = replicatedToAllIndex;
this.payloadVersion = payloadVersion;
+ this.recipientRaftVersion = recipientRaftVersion;
+ this.leaderRaftVersion = leaderRaftVersion;
+ this.leaderAddress = leaderAddress;
+ }
+
+ public AppendEntries(long term, @Nonnull String leaderId, long prevLogIndex, long prevLogTerm,
+ @Nonnull List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex,
+ short payloadVersion, short recipientRaftVersion, @Nullable String leaderAddress) {
+ this(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, replicatedToAllIndex, payloadVersion,
+ recipientRaftVersion, RaftVersions.CURRENT_VERSION, leaderAddress);
+ }
+
+ @VisibleForTesting
+ public AppendEntries(long term, @Nonnull String leaderId, long prevLogIndex, long prevLogTerm,
+ @Nonnull List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex,
+ short payloadVersion) {
+ this(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, replicatedToAllIndex, payloadVersion,
+ RaftVersions.CURRENT_VERSION, null);
}
@Nonnull
return payloadVersion;
}
+ public Optional<String> getLeaderAddress() {
+ return Optional.ofNullable(leaderAddress);
+ }
+
+ public short getLeaderRaftVersion() {
+ return leaderRaftVersion;
+ }
+
@Override
public String toString() {
return "AppendEntries [leaderId=" + leaderId
+ ", leaderCommit=" + leaderCommit
+ ", replicatedToAllIndex=" + replicatedToAllIndex
+ ", payloadVersion=" + payloadVersion
+ + ", recipientRaftVersion=" + recipientRaftVersion
+ + ", leaderRaftVersion=" + leaderRaftVersion
+ + ", leaderAddress=" + leaderAddress
+ ", entries=" + entries + "]";
}
private Object writeReplace() {
- return new Proxy(this);
+ return recipientRaftVersion >= RaftVersions.FLUORINE_VERSION ? new ProxyV2(this) : new Proxy(this);
+ }
+
+ /**
+ * Fluorine version that adds the leader address.
+ */
+ private static class ProxyV2 implements Externalizable {
+ private static final long serialVersionUID = 1L;
+
+ private AppendEntries appendEntries;
+
+ // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+ // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
+ public ProxyV2() {
+ }
+
+ ProxyV2(AppendEntries appendEntries) {
+ this.appendEntries = appendEntries;
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeShort(appendEntries.leaderRaftVersion);
+ out.writeLong(appendEntries.getTerm());
+ out.writeObject(appendEntries.leaderId);
+ out.writeLong(appendEntries.prevLogTerm);
+ out.writeLong(appendEntries.prevLogIndex);
+ out.writeLong(appendEntries.leaderCommit);
+ out.writeLong(appendEntries.replicatedToAllIndex);
+ out.writeShort(appendEntries.payloadVersion);
+
+ out.writeInt(appendEntries.entries.size());
+ for (ReplicatedLogEntry e: appendEntries.entries) {
+ out.writeLong(e.getIndex());
+ out.writeLong(e.getTerm());
+ out.writeObject(e.getData());
+ }
+
+ out.writeObject(appendEntries.leaderAddress);
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ short leaderRaftVersion = in.readShort();
+ long term = in.readLong();
+ String leaderId = (String) in.readObject();
+ long prevLogTerm = in.readLong();
+ long prevLogIndex = in.readLong();
+ long leaderCommit = in.readLong();
+ long replicatedToAllIndex = in.readLong();
+ short payloadVersion = in.readShort();
+
+ int size = in.readInt();
+ List<ReplicatedLogEntry> entries = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ entries.add(new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject()));
+ }
+
+ String leaderAddress = (String)in.readObject();
+
+ appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit,
+ replicatedToAllIndex, payloadVersion, RaftVersions.CURRENT_VERSION, leaderRaftVersion,
+ leaderAddress);
+ }
+
+ private Object readResolve() {
+ return appendEntries;
+ }
}
+ /**
+ * Pre-Fluorine version.
+ */
+ @Deprecated
private static class Proxy implements Externalizable {
private static final long serialVersionUID = 1L;
}
appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit,
- replicatedToAllIndex, payloadVersion);
+ replicatedToAllIndex, payloadVersion, RaftVersions.CURRENT_VERSION, RaftVersions.BORON_VERSION, null);
}
private Object readResolve() {
package org.opendaylight.controller.cluster.raft.messages;
+import com.google.common.annotations.VisibleForTesting;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
private final boolean forceInstallSnapshot;
+ private final boolean needsLeaderAddress;
+
+ private final short recipientRaftVersion;
+
+ @VisibleForTesting
public AppendEntriesReply(String followerId, long term, boolean success, long logLastIndex, long logLastTerm,
short payloadVersion) {
- this(followerId, term, success, logLastIndex, logLastTerm, payloadVersion, false);
+ this(followerId, term, success, logLastIndex, logLastTerm, payloadVersion, false, false,
+ RaftVersions.CURRENT_VERSION);
}
public AppendEntriesReply(String followerId, long term, boolean success, long logLastIndex, long logLastTerm,
- short payloadVersion, boolean forceInstallSnapshot) {
+ short payloadVersion, boolean forceInstallSnapshot, boolean needsLeaderAddress,
+ short recipientRaftVersion) {
this(followerId, term, success, logLastIndex, logLastTerm, payloadVersion, forceInstallSnapshot,
- RaftVersions.CURRENT_VERSION);
+ needsLeaderAddress, RaftVersions.CURRENT_VERSION, recipientRaftVersion);
}
private AppendEntriesReply(String followerId, long term, boolean success, long logLastIndex, long logLastTerm,
- short payloadVersion, boolean forceInstallSnapshot, short raftVersion) {
+ short payloadVersion, boolean forceInstallSnapshot, boolean needsLeaderAddress, short raftVersion,
+ short recipientRaftVersion) {
super(term);
-
this.followerId = followerId;
this.success = success;
this.logLastIndex = logLastIndex;
this.payloadVersion = payloadVersion;
this.forceInstallSnapshot = forceInstallSnapshot;
this.raftVersion = raftVersion;
+ this.needsLeaderAddress = needsLeaderAddress;
+ this.recipientRaftVersion = recipientRaftVersion;
}
public boolean isSuccess() {
return forceInstallSnapshot;
}
+ public boolean isNeedsLeaderAddress() {
+ return needsLeaderAddress;
+ }
+
@Override
public String toString() {
return "AppendEntriesReply [term=" + getTerm() + ", success=" + success + ", followerId=" + followerId
+ ", logLastIndex=" + logLastIndex + ", logLastTerm=" + logLastTerm + ", forceInstallSnapshot="
- + forceInstallSnapshot + ", payloadVersion=" + payloadVersion + ", raftVersion=" + raftVersion + "]";
+ + forceInstallSnapshot + ", needsLeaderAddress=" + needsLeaderAddress
+ + ", payloadVersion=" + payloadVersion + ", raftVersion=" + raftVersion
+ + ", recipientRaftVersion=" + recipientRaftVersion + "]";
}
private Object writeReplace() {
- return new Proxy(this);
+ return recipientRaftVersion >= RaftVersions.FLUORINE_VERSION ? new Proxy2(this) : new Proxy(this);
+ }
+
+ /**
+ * Fluorine version that adds the needsLeaderAddress flag.
+ */
+ private static class Proxy2 implements Externalizable {
+ private static final long serialVersionUID = 1L;
+
+ private AppendEntriesReply appendEntriesReply;
+
+ // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+ // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
+ public Proxy2() {
+ }
+
+ Proxy2(AppendEntriesReply appendEntriesReply) {
+ this.appendEntriesReply = appendEntriesReply;
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeShort(appendEntriesReply.raftVersion);
+ out.writeLong(appendEntriesReply.getTerm());
+ out.writeObject(appendEntriesReply.followerId);
+ out.writeBoolean(appendEntriesReply.success);
+ out.writeLong(appendEntriesReply.logLastIndex);
+ out.writeLong(appendEntriesReply.logLastTerm);
+ out.writeShort(appendEntriesReply.payloadVersion);
+ out.writeBoolean(appendEntriesReply.forceInstallSnapshot);
+ out.writeBoolean(appendEntriesReply.needsLeaderAddress);
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ short raftVersion = in.readShort();
+ long term = in.readLong();
+ String followerId = (String) in.readObject();
+ boolean success = in.readBoolean();
+ long logLastIndex = in.readLong();
+ long logLastTerm = in.readLong();
+ short payloadVersion = in.readShort();
+ boolean forceInstallSnapshot = in.readBoolean();
+ boolean needsLeaderAddress = in.readBoolean();
+
+ appendEntriesReply = new AppendEntriesReply(followerId, term, success, logLastIndex, logLastTerm,
+ payloadVersion, forceInstallSnapshot, needsLeaderAddress, raftVersion,
+ RaftVersions.CURRENT_VERSION);
+ }
+
+ private Object readResolve() {
+ return appendEntriesReply;
+ }
}
+ /**
+ * Pre-Fluorine version.
+ */
+ @Deprecated
private static class Proxy implements Externalizable {
private static final long serialVersionUID = 1L;
boolean forceInstallSnapshot = in.readBoolean();
appendEntriesReply = new AppendEntriesReply(followerId, term, success, logLastIndex, logLastTerm,
- payloadVersion, forceInstallSnapshot, raftVersion);
+ payloadVersion, forceInstallSnapshot, false, raftVersion, RaftVersions.CURRENT_VERSION);
}
private Object readResolve() {
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import org.opendaylight.controller.cluster.raft.MockRaftActor.Builder;
import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
+import org.opendaylight.controller.cluster.raft.NoopPeerAddressResolver;
+import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
+import org.opendaylight.controller.cluster.raft.RaftVersions;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.VotingState;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
protected MockRaftActorContext createActorContext(final ActorRef actorRef) {
MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
context.setPayloadVersion(payloadVersion);
+ ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(
+ peerId -> leaderActor.path().toString());
return context;
}
expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
}
-
/**
* This test verifies that when InstallSnapshot is received by
* the follower its applied correctly.
MockRaftActor.fromState(snapshot.getState()));
}
+ @Test
+ public void testNeedsLeaderAddress() {
+ logStart("testNeedsLeaderAddress");
+
+ MockRaftActorContext context = createActorContext();
+ context.setReplicatedLog(new MockRaftActorContext.SimpleReplicatedLog());
+ context.addToPeers("leader", null, VotingState.VOTING);
+ ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(NoopPeerAddressResolver.INSTANCE);
+
+ follower = createBehavior(context);
+
+ follower.handleMessage(leaderActor,
+ new AppendEntries(1, "leader", -1, -1, Collections.emptyList(), -1, -1, (short)0));
+
+ AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+ assertTrue(reply.isNeedsLeaderAddress());
+ MessageCollectorActor.clearMessages(leaderActor);
+
+ PeerAddressResolver mockResolver = mock(PeerAddressResolver.class);
+ ((DefaultConfigParamsImpl)context.getConfigParams()).setPeerAddressResolver(mockResolver);
+
+ follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(), -1, -1,
+ (short)0, RaftVersions.CURRENT_VERSION, leaderActor.path().toString()));
+
+ reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+ assertFalse(reply.isNeedsLeaderAddress());
+
+ verify(mockResolver).setResolved("leader", leaderActor.path().toString());
+ }
+
@SuppressWarnings("checkstyle:IllegalCatch")
private static RaftActorSnapshotCohort newRaftActorSnapshotCohort(
final AtomicReference<MockRaftActor> followerRaftActor) {
assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot());
+ assertEquals("isNeedsLeaderAddress", false, reply.isNeedsLeaderAddress());
}
// Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
// installed with a SendInstallSnapshot
- leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
+ RaftVersions.CURRENT_VERSION));
assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
// Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture.
- leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
+ RaftVersions.CURRENT_VERSION));
assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
// Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent.
// Sending another AppendEntriesReply to force a snapshot should be a no-op and not try to re-send the chunk.
MessageCollectorActor.clearMessages(followerActor);
- leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false,
+ RaftVersions.CURRENT_VERSION));
MessageCollectorActor.assertNoneMatching(followerActor, InstallSnapshot.class, 200);
}
MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
}
+ @Test
+ public void testLeaderAddressInAppendEntries() {
+ logStart("testLeaderAddressInAppendEntries");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ FiniteDuration.create(50, TimeUnit.MILLISECONDS));
+ leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+ leaderActorContext.setCommitIndex(-1);
+ leaderActorContext.setLastApplied(-1);
+
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setPeerAddressResolver(
+ peerId -> leaderActor.path().toString());
+
+ leader = new Leader(leaderActorContext);
+ leaderActorContext.setCurrentBehavior(leader);
+
+ // Initial heartbeat shouldn't have the leader address
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertFalse(appendEntries.getLeaderAddress().isPresent());
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Send AppendEntriesReply indicating the follower needs the leader address
+
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, true,
+ RaftVersions.CURRENT_VERSION));
+
+ // Sleep for the heartbeat interval so AppendEntries is sent.
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+ .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertTrue(appendEntries.getLeaderAddress().isPresent());
+ assertEquals(leaderActor.path().toString(), appendEntries.getLeaderAddress().get());
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Send AppendEntriesReply indicating the follower does not need the leader address
+
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, false,
+ RaftVersions.CURRENT_VERSION));
+
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+ .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertFalse(appendEntries.getLeaderAddress().isPresent());
+ }
+
@Override
protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
final ActorRef actorRef, final RaftRPC rpc) {
import org.apache.commons.lang.SerializationUtils;
import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.RaftVersions;
/**
* Unit tests for AppendEntriesReply.
@Test
public void testSerialization() {
- AppendEntriesReply expected = new AppendEntriesReply("follower", 5, true, 100, 4, (short)6);
+ AppendEntriesReply expected = new AppendEntriesReply("follower", 5, true, 100, 4, (short)6, true, true,
+ RaftVersions.CURRENT_VERSION);
AppendEntriesReply cloned = (AppendEntriesReply) SerializationUtils.clone(expected);
assertEquals("getTerm", expected.getTerm(), cloned.getTerm());
assertEquals("getLogLastIndex", expected.getLogLastIndex(), cloned.getLogLastIndex());
assertEquals("getPayloadVersion", expected.getPayloadVersion(), cloned.getPayloadVersion());
assertEquals("getRaftVersion", expected.getRaftVersion(), cloned.getRaftVersion());
+ assertEquals("isForceInstallSnapshot", expected.isForceInstallSnapshot(), cloned.isForceInstallSnapshot());
+ assertEquals("isNeedsLeaderAddress", expected.isNeedsLeaderAddress(), cloned.isNeedsLeaderAddress());
+ }
+
+ @Test
+ @Deprecated
+ public void testPreFluorineSerialization() {
+ AppendEntriesReply expected = new AppendEntriesReply("follower", 5, true, 100, 4, (short)6, true, true,
+ RaftVersions.BORON_VERSION);
+ AppendEntriesReply cloned = (AppendEntriesReply) SerializationUtils.clone(expected);
+
+ assertEquals("getTerm", expected.getTerm(), cloned.getTerm());
+ assertEquals("getFollowerId", expected.getFollowerId(), cloned.getFollowerId());
+ assertEquals("getLogLastTerm", expected.getLogLastTerm(), cloned.getLogLastTerm());
+ assertEquals("getLogLastIndex", expected.getLogLastIndex(), cloned.getLogLastIndex());
+ assertEquals("getPayloadVersion", expected.getPayloadVersion(), cloned.getPayloadVersion());
+ assertEquals("getRaftVersion", expected.getRaftVersion(), cloned.getRaftVersion());
+ assertEquals("isForceInstallSnapshot", expected.isForceInstallSnapshot(), cloned.isForceInstallSnapshot());
+ assertEquals("isNeedsLeaderAddress", false, cloned.isNeedsLeaderAddress());
}
}
package org.opendaylight.controller.cluster.raft.messages;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import java.util.Arrays;
import java.util.Iterator;
import org.apache.commons.lang.SerializationUtils;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
+import org.opendaylight.controller.cluster.raft.RaftVersions;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
ReplicatedLogEntry entry2 = new SimpleReplicatedLogEntry(3, 4, new MockPayload("payload2"));
short payloadVersion = 5;
+
+ // Without leader address
+
AppendEntries expected = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry1, entry2), 10L,
- -1, payloadVersion);
+ -1, payloadVersion, RaftVersions.CURRENT_VERSION, null);
AppendEntries cloned = (AppendEntries) SerializationUtils.clone(expected);
- verifyAppendEntries(expected, cloned);
+ verifyAppendEntries(expected, cloned, RaftVersions.CURRENT_VERSION);
+
+ // With leader address
+
+ expected = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry1, entry2), 10L,
+ -1, payloadVersion, RaftVersions.CURRENT_VERSION, "leader address");
+
+ cloned = (AppendEntries) SerializationUtils.clone(expected);
+
+ verifyAppendEntries(expected, cloned, RaftVersions.CURRENT_VERSION);
}
- private static void verifyAppendEntries(AppendEntries expected, AppendEntries actual) {
+ @Test
+ @Deprecated
+ public void testPreFluorineSerialization() {
+ ReplicatedLogEntry entry1 = new SimpleReplicatedLogEntry(1, 2, new MockPayload("payload1"));
+
+ ReplicatedLogEntry entry2 = new SimpleReplicatedLogEntry(3, 4, new MockPayload("payload2"));
+
+ short payloadVersion = 5;
+
+ AppendEntries expected = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry1, entry2), 10L,
+ -1, payloadVersion, RaftVersions.BORON_VERSION, "leader address");
+
+ AppendEntries cloned = (AppendEntries) SerializationUtils.clone(expected);
+
+ verifyAppendEntries(expected, cloned, RaftVersions.BORON_VERSION);
+ }
+
+ private static void verifyAppendEntries(AppendEntries expected, AppendEntries actual, short recipientRaftVersion) {
assertEquals("getLeaderId", expected.getLeaderId(), actual.getLeaderId());
assertEquals("getTerm", expected.getTerm(), actual.getTerm());
assertEquals("getLeaderCommit", expected.getLeaderCommit(), actual.getLeaderCommit());
for (ReplicatedLogEntry e: actual.getEntries()) {
verifyReplicatedLogEntry(iter.next(), e);
}
+
+ if (recipientRaftVersion >= RaftVersions.FLUORINE_VERSION) {
+ assertEquals("getLeaderAddress", expected.getLeaderAddress(), actual.getLeaderAddress());
+ assertEquals("getLeaderRaftVersion", RaftVersions.CURRENT_VERSION, actual.getLeaderRaftVersion());
+ } else {
+ assertFalse(actual.getLeaderAddress().isPresent());
+ assertEquals("getLeaderRaftVersion", RaftVersions.BORON_VERSION, actual.getLeaderRaftVersion());
+ }
}
private static void verifyReplicatedLogEntry(ReplicatedLogEntry expected, ReplicatedLogEntry actual) {
package org.opendaylight.controller.cluster.datastore.shardmanager;
import akka.actor.Address;
+import akka.actor.AddressFromURIString;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collection;
ShardIdentifier shardId = ShardIdentifier.fromShardIdString(peerId);
return getShardActorAddress(shardId.getShardName(), shardId.getMemberName());
}
+
+ @Override
+ public void setResolved(String peerId, String address) {
+ memberNameToAddress.put(ShardIdentifier.fromShardIdString(peerId).getMemberName(),
+ AddressFromURIString.parse(address));
+ }
}
assertEquals("resolve", shardAddress, resolver.resolve(peerId));
}
+ @Test
+ public void testSetResolved() {
+ String type = "config";
+ ShardPeerAddressResolver resolver = new ShardPeerAddressResolver(type, MEMBER_1);
+
+ String peerId = ShardIdentifier.create("default", MEMBER_2, type).toString();
+
+ String address = "akka.tcp://opendaylight-cluster-data@127.0.0.1:2550/user/shardmanager-" + type
+ + "/" + MEMBER_2.getName() + "-shard-default-" + type;
+
+ resolver.setResolved(peerId, address);
+
+ assertEquals("resolve", address, resolver.resolve(peerId));
+ }
+
@Test
public void testGetShardManagerPeerActorAddresses() {
ShardPeerAddressResolver resolver = new ShardPeerAddressResolver("config", MEMBER_1);