X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fmessages%2FAppendEntries.java;h=67c6899231912df7a412e7e9f162736e8b689d99;hb=refs%2Fchanges%2F11%2F80211%2F6;hp=7ebc857444b1d2fae58cd2ce1b36337646b4068a;hpb=97542f208267cb5392fc8c8d9baf6c1d3ee4ae32;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java index 7ebc857444..67c6899231 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java @@ -5,17 +5,23 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.raft.messages; +import static java.util.Objects.requireNonNull; + +import com.google.common.annotations.VisibleForTesting; import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.util.ArrayList; import java.util.List; +import java.util.Optional; +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; +import org.opendaylight.controller.cluster.raft.RaftVersions; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; +import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; /** @@ -45,19 +51,44 @@ public class AppendEntries extends AbstractRaftRPC { private final short payloadVersion; - public AppendEntries(long term, String leaderId, long prevLogIndex, long prevLogTerm, - List entries, long leaderCommit, long replicatedToAllIndex, short payloadVersion) { + private final short recipientRaftVersion; + + private final short leaderRaftVersion; + + private final String leaderAddress; + + private AppendEntries(long term, @NonNull String leaderId, long prevLogIndex, long prevLogTerm, + @NonNull List entries, long leaderCommit, long replicatedToAllIndex, + short payloadVersion, short recipientRaftVersion, short leaderRaftVersion, @Nullable String leaderAddress) { super(term); - this.leaderId = leaderId; + this.leaderId = requireNonNull(leaderId); this.prevLogIndex = prevLogIndex; this.prevLogTerm = prevLogTerm; - this.entries = entries; + this.entries = requireNonNull(entries); this.leaderCommit = leaderCommit; this.replicatedToAllIndex = replicatedToAllIndex; this.payloadVersion = payloadVersion; + this.recipientRaftVersion = recipientRaftVersion; + this.leaderRaftVersion = leaderRaftVersion; + this.leaderAddress = leaderAddress; + } + + public AppendEntries(long term, @NonNull String leaderId, long prevLogIndex, long prevLogTerm, + @NonNull List entries, long leaderCommit, long replicatedToAllIndex, + short payloadVersion, short recipientRaftVersion, @Nullable String leaderAddress) { + this(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, replicatedToAllIndex, payloadVersion, + recipientRaftVersion, RaftVersions.CURRENT_VERSION, leaderAddress); + } + + @VisibleForTesting + public AppendEntries(long term, @NonNull String leaderId, long prevLogIndex, long prevLogTerm, + @NonNull List entries, long leaderCommit, long replicatedToAllIndex, + short payloadVersion) { + this(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, replicatedToAllIndex, payloadVersion, + RaftVersions.CURRENT_VERSION, null); } - public String getLeaderId() { + public @NonNull String getLeaderId() { return leaderId; } @@ -69,7 +100,7 @@ public class AppendEntries extends AbstractRaftRPC { return prevLogTerm; } - public List getEntries() { + public @NonNull List getEntries() { return entries; } @@ -85,20 +116,104 @@ public class AppendEntries extends AbstractRaftRPC { return payloadVersion; } + public Optional getLeaderAddress() { + return Optional.ofNullable(leaderAddress); + } + + public short getLeaderRaftVersion() { + return leaderRaftVersion; + } + @Override public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("AppendEntries [leaderId=").append(leaderId).append(", prevLogIndex=").append(prevLogIndex) - .append(", prevLogTerm=").append(prevLogTerm).append(", leaderCommit=").append(leaderCommit) - .append(", replicatedToAllIndex=").append(replicatedToAllIndex).append(", payloadVersion=") - .append(payloadVersion).append(", entries=").append(entries).append("]"); - return builder.toString(); + return "AppendEntries [leaderId=" + leaderId + + ", prevLogIndex=" + prevLogIndex + + ", prevLogTerm=" + prevLogTerm + + ", leaderCommit=" + leaderCommit + + ", replicatedToAllIndex=" + replicatedToAllIndex + + ", payloadVersion=" + payloadVersion + + ", recipientRaftVersion=" + recipientRaftVersion + + ", leaderRaftVersion=" + leaderRaftVersion + + ", leaderAddress=" + leaderAddress + + ", entries=" + entries + "]"; } private Object writeReplace() { - return new Proxy(this); + return recipientRaftVersion >= RaftVersions.FLUORINE_VERSION ? new ProxyV2(this) : new Proxy(this); + } + + /** + * Fluorine version that adds the leader address. + */ + private static class ProxyV2 implements Externalizable { + private static final long serialVersionUID = 1L; + + private AppendEntries appendEntries; + + // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't + // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") + public ProxyV2() { + } + + ProxyV2(AppendEntries appendEntries) { + this.appendEntries = appendEntries; + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + out.writeShort(appendEntries.leaderRaftVersion); + out.writeLong(appendEntries.getTerm()); + out.writeObject(appendEntries.leaderId); + out.writeLong(appendEntries.prevLogTerm); + out.writeLong(appendEntries.prevLogIndex); + out.writeLong(appendEntries.leaderCommit); + out.writeLong(appendEntries.replicatedToAllIndex); + out.writeShort(appendEntries.payloadVersion); + + out.writeInt(appendEntries.entries.size()); + for (ReplicatedLogEntry e: appendEntries.entries) { + out.writeLong(e.getIndex()); + out.writeLong(e.getTerm()); + out.writeObject(e.getData()); + } + + out.writeObject(appendEntries.leaderAddress); + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + short leaderRaftVersion = in.readShort(); + long term = in.readLong(); + String leaderId = (String) in.readObject(); + long prevLogTerm = in.readLong(); + long prevLogIndex = in.readLong(); + long leaderCommit = in.readLong(); + long replicatedToAllIndex = in.readLong(); + short payloadVersion = in.readShort(); + + int size = in.readInt(); + List entries = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + entries.add(new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject())); + } + + String leaderAddress = (String)in.readObject(); + + appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, + replicatedToAllIndex, payloadVersion, RaftVersions.CURRENT_VERSION, leaderRaftVersion, + leaderAddress); + } + + private Object readResolve() { + return appendEntries; + } } + /** + * Pre-Fluorine version. + */ + @Deprecated private static class Proxy implements Externalizable { private static final long serialVersionUID = 1L; @@ -145,11 +260,11 @@ public class AppendEntries extends AbstractRaftRPC { int size = in.readInt(); List entries = new ArrayList<>(size); for (int i = 0; i < size; i++) { - entries.add(new ReplicatedLogImplEntry(in.readLong(), in.readLong(), (Payload) in.readObject())); + entries.add(new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject())); } appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, - replicatedToAllIndex, payloadVersion); + replicatedToAllIndex, payloadVersion, RaftVersions.CURRENT_VERSION, RaftVersions.BORON_VERSION, null); } private Object readResolve() {