*/
package org.opendaylight.controller.cluster.raft;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
private short payloadVersion = -1;
- // Assume the HELIUM_VERSION version initially for backwards compatibility until we obtain the follower's
- // actual version via AppendEntriesReply. Although we no longer support the Helium version, a pre-Boron
- // follower will not have the version field in AppendEntriesReply so it will be set to 0 which is
- // HELIUM_VERSION.
- private short raftVersion = RaftVersions.HELIUM_VERSION;
+ // Assume the FLUORINE_VERSION version initially, as we no longer support pre-Fluorine versions.
+ private short raftVersion = RaftVersions.FLUORINE_VERSION;
private final PeerInfo peerInfo;
*/
@VisibleForTesting
FollowerLogInformation(final PeerInfo peerInfo, final long matchIndex, final RaftActorContext context) {
- this.nextIndex = context.getCommitIndex();
+ nextIndex = context.getCommitIndex();
this.matchIndex = matchIndex;
this.context = context;
this.peerInfo = requireNonNull(peerInfo);
* @param raftVersion the raft version.
*/
public void setRaftVersion(final short raftVersion) {
+ checkArgument(raftVersion >= RaftVersions.FLUORINE_VERSION, "Unexpected version %s", raftVersion);
this.raftVersion = raftVersion;
}
* @param state the LeaderInstallSnapshotState
*/
public void setLeaderInstallSnapshotState(final @NonNull LeaderInstallSnapshotState state) {
- if (this.installSnapshotState == null) {
- this.installSnapshotState = requireNonNull(state);
+ if (installSnapshotState == null) {
+ installSnapshotState = requireNonNull(state);
}
}
* @author Thomas Pantelis
*/
public final class RaftVersions {
- @Deprecated(since = "7.0.0", forRemoval = true)
- public static final short HELIUM_VERSION = 0;
- @Deprecated(since = "7.0.0", forRemoval = true)
- public static final short LITHIUM_VERSION = 1;
- @Deprecated(since = "7.0.0", forRemoval = true)
- public static final short BORON_VERSION = 3;
+ // HELIUM_VERSION = 0
+ // LITHIUM_VERSION = 1
+ // BORON_VERSION = 3
public static final short FLUORINE_VERSION = 4;
public static final short ARGON_VERSION = 5;
public static final short CURRENT_VERSION = ARGON_VERSION;
private RaftVersions() {
-
+ // Hidden on purpose
}
}
import org.opendaylight.controller.cluster.raft.PeerInfo;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.RaftVersions;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.VotingState;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
return this;
}
+ final var followerRaftVersion = appendEntriesReply.getRaftVersion();
+ if (followerRaftVersion < RaftVersions.FLUORINE_VERSION) {
+ log.warn("{}: handleAppendEntriesReply - ignoring reply from follower {} raft version {}", logName(),
+ followerId, followerRaftVersion);
+ return this;
+ }
+
final long lastActivityNanos = followerLogInformation.nanosSinceLastActivity();
if (lastActivityNanos > context.getConfigParams().getElectionTimeOutInterval().toNanos()) {
log.warn("{} : handleAppendEntriesReply delayed beyond election timeout, "
followerLogInformation.markFollowerActive();
followerLogInformation.setPayloadVersion(appendEntriesReply.getPayloadVersion());
- followerLogInformation.setRaftVersion(appendEntriesReply.getRaftVersion());
+ followerLogInformation.setRaftVersion(followerRaftVersion);
followerLogInformation.setNeedsLeaderAddress(appendEntriesReply.isNeedsLeaderAddress());
long followerLastLogIndex = appendEntriesReply.getLogLastIndex();
@Override
Object writeReplace() {
- if (recipientRaftVersion <= RaftVersions.BORON_VERSION) {
- return new Proxy(this);
- }
- return recipientRaftVersion == RaftVersions.FLUORINE_VERSION ? new ProxyV2(this) : new AE(this);
+ return recipientRaftVersion <= RaftVersions.FLUORINE_VERSION ? new ProxyV2(this) : new AE(this);
}
/**
* Fluorine version that adds the leader address.
*/
private static class ProxyV2 implements Externalizable {
+ @java.io.Serial
private static final long serialVersionUID = 1L;
private AppendEntries appendEntries;
leaderAddress);
}
- private Object readResolve() {
- return appendEntries;
- }
- }
-
- /**
- * Pre-Fluorine version.
- */
- @Deprecated
- private static class Proxy implements Externalizable {
- private static final long serialVersionUID = 1L;
-
- private AppendEntries appendEntries;
-
- // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
- // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
- @SuppressWarnings("checkstyle:RedundantModifier")
- public Proxy() {
- }
-
- Proxy(final AppendEntries appendEntries) {
- this.appendEntries = appendEntries;
- }
-
- @Override
- public void writeExternal(final ObjectOutput out) throws IOException {
- out.writeLong(appendEntries.getTerm());
- out.writeObject(appendEntries.leaderId);
- out.writeLong(appendEntries.prevLogTerm);
- out.writeLong(appendEntries.prevLogIndex);
- out.writeLong(appendEntries.leaderCommit);
- out.writeLong(appendEntries.replicatedToAllIndex);
- out.writeShort(appendEntries.payloadVersion);
-
- out.writeInt(appendEntries.entries.size());
- for (ReplicatedLogEntry e: appendEntries.entries) {
- out.writeLong(e.getIndex());
- out.writeLong(e.getTerm());
- out.writeObject(e.getData());
- }
- }
-
- @Override
- public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
- long term = in.readLong();
- String leaderId = (String) in.readObject();
- long prevLogTerm = in.readLong();
- long prevLogIndex = in.readLong();
- long leaderCommit = in.readLong();
- long replicatedToAllIndex = in.readLong();
- short payloadVersion = in.readShort();
-
- int size = in.readInt();
- var entries = ImmutableList.<ReplicatedLogEntry>builderWithExpectedSize(size);
- for (int i = 0; i < size; i++) {
- entries.add(new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject()));
- }
-
- appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries.build(), leaderCommit,
- replicatedToAllIndex, payloadVersion, RaftVersions.CURRENT_VERSION, RaftVersions.BORON_VERSION, null);
- }
-
+ @java.io.Serial
private Object readResolve() {
return appendEntries;
}
@Override
Object writeReplace() {
- if (recipientRaftVersion <= RaftVersions.BORON_VERSION) {
- return new Proxy(this);
- }
- return recipientRaftVersion == RaftVersions.FLUORINE_VERSION ? new Proxy2(this) : new AR(this);
+ return recipientRaftVersion <= RaftVersions.FLUORINE_VERSION ? new Proxy2(this) : new AR(this);
}
/**
* Fluorine version that adds the needsLeaderAddress flag.
*/
private static class Proxy2 implements Externalizable {
+ @java.io.Serial
private static final long serialVersionUID = 1L;
private AppendEntriesReply appendEntriesReply;
RaftVersions.CURRENT_VERSION);
}
- private Object readResolve() {
- return appendEntriesReply;
- }
- }
-
- /**
- * Pre-Fluorine version.
- */
- @Deprecated
- private static class Proxy implements Externalizable {
- private static final long serialVersionUID = 1L;
-
- private AppendEntriesReply appendEntriesReply;
-
- // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
- // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
- @SuppressWarnings("checkstyle:RedundantModifier")
- public Proxy() {
- }
-
- Proxy(final AppendEntriesReply appendEntriesReply) {
- this.appendEntriesReply = appendEntriesReply;
- }
-
- @Override
- public void writeExternal(final ObjectOutput out) throws IOException {
- out.writeShort(appendEntriesReply.raftVersion);
- out.writeLong(appendEntriesReply.getTerm());
- out.writeObject(appendEntriesReply.followerId);
- out.writeBoolean(appendEntriesReply.success);
- out.writeLong(appendEntriesReply.logLastIndex);
- out.writeLong(appendEntriesReply.logLastTerm);
- out.writeShort(appendEntriesReply.payloadVersion);
- out.writeBoolean(appendEntriesReply.forceInstallSnapshot);
- }
-
- @Override
- public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
- short raftVersion = in.readShort();
- long term = in.readLong();
- String followerId = (String) in.readObject();
- boolean success = in.readBoolean();
- long logLastIndex = in.readLong();
- long logLastTerm = in.readLong();
- short payloadVersion = in.readShort();
- boolean forceInstallSnapshot = in.readBoolean();
-
- appendEntriesReply = new AppendEntriesReply(followerId, term, success, logLastIndex, logLastTerm,
- payloadVersion, forceInstallSnapshot, false, raftVersion, RaftVersions.CURRENT_VERSION);
- }
-
+ @java.io.Serial
private Object readResolve() {
return appendEntriesReply;
}
FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
- assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
+ assertEquals(RaftVersions.FLUORINE_VERSION, followerInfo.getRaftVersion());
AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
assertEquals("isForceInstallSnapshot", expected.isForceInstallSnapshot(), cloned.isForceInstallSnapshot());
assertEquals("isNeedsLeaderAddress", expected.isNeedsLeaderAddress(), cloned.isNeedsLeaderAddress());
}
-
- @Test
- @Deprecated
- public void testPreFluorineSerialization() {
- final var expected = new AppendEntriesReply("follower", 5, true, 100, 4, (short)6, true, true,
- RaftVersions.BORON_VERSION);
-
- final var bytes = SerializationUtils.serialize(expected);
- assertEquals(141, bytes.length);
- final var cloned = (AppendEntriesReply) SerializationUtils.deserialize(bytes);
-
- assertEquals("getTerm", expected.getTerm(), cloned.getTerm());
- assertEquals("getFollowerId", expected.getFollowerId(), cloned.getFollowerId());
- assertEquals("getLogLastTerm", expected.getLogLastTerm(), cloned.getLogLastTerm());
- assertEquals("getLogLastIndex", expected.getLogLastIndex(), cloned.getLogLastIndex());
- assertEquals("getPayloadVersion", expected.getPayloadVersion(), cloned.getPayloadVersion());
- assertEquals("getRaftVersion", expected.getRaftVersion(), cloned.getRaftVersion());
- assertEquals("isForceInstallSnapshot", expected.isForceInstallSnapshot(), cloned.isForceInstallSnapshot());
- assertEquals("isNeedsLeaderAddress", false, cloned.isNeedsLeaderAddress());
- }
}
package org.opendaylight.controller.cluster.raft.messages;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import java.util.Iterator;
import java.util.List;
verifyAppendEntries(expected, cloned, RaftVersions.CURRENT_VERSION);
}
- @Test
- @Deprecated
- public void testPreFluorineSerialization() {
- ReplicatedLogEntry entry1 = new SimpleReplicatedLogEntry(1, 2, new MockPayload("payload1"));
-
- ReplicatedLogEntry entry2 = new SimpleReplicatedLogEntry(3, 4, new MockPayload("payload2"));
-
- short payloadVersion = 5;
-
- final var expected = new AppendEntries(5L, "node1", 7L, 8L, List.of(entry1, entry2), 10L, -1,
- payloadVersion, RaftVersions.BORON_VERSION, "leader address");
-
- final var bytes = SerializationUtils.serialize(expected);
- assertEquals(350, bytes.length);
- final var cloned = (AppendEntries) SerializationUtils.deserialize(bytes);
-
- verifyAppendEntries(expected, cloned, RaftVersions.BORON_VERSION);
- }
-
private static void verifyAppendEntries(final AppendEntries expected, final AppendEntries actual,
final short recipientRaftVersion) {
assertEquals("getLeaderId", expected.getLeaderId(), actual.getLeaderId());
verifyReplicatedLogEntry(iter.next(), e);
}
- if (recipientRaftVersion > RaftVersions.BORON_VERSION) {
- assertEquals("getLeaderAddress", expected.getLeaderAddress(), actual.getLeaderAddress());
- assertEquals("getLeaderRaftVersion", RaftVersions.CURRENT_VERSION, actual.getLeaderRaftVersion());
- } else {
- assertFalse(actual.getLeaderAddress().isPresent());
- assertEquals("getLeaderRaftVersion", RaftVersions.BORON_VERSION, actual.getLeaderRaftVersion());
- }
+ assertEquals("getLeaderAddress", expected.getLeaderAddress(), actual.getLeaderAddress());
+ assertEquals("getLeaderRaftVersion", RaftVersions.CURRENT_VERSION, actual.getLeaderRaftVersion());
}
private static void verifyReplicatedLogEntry(final ReplicatedLogEntry expected, final ReplicatedLogEntry actual) {