* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.raft.messages;
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.annotations.VisibleForTesting;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.controller.cluster.raft.RaftVersions;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
/**
- * Invoked by leader to replicate log entries (§5.3); also used as
- * heartbeat (§5.2).
+ * Invoked by leader to replicate log entries (§5.3); also used as heartbeat (§5.2).
*/
-public class AppendEntries extends AbstractRaftRPC {
+public final class AppendEntries extends AbstractRaftRPC {
private static final long serialVersionUID = 1L;
// So that follower can redirect clients
- private final String leaderId;
+ private final @NonNull String leaderId;
// Index of log entry immediately preceding new ones
private final long prevLogIndex;
// term of prevLogIndex entry
private final long prevLogTerm;
- // log entries to store (empty for heartbeat;
- // may send more than one for efficiency)
- private transient List<ReplicatedLogEntry> entries;
+ // log entries to store (empty for heart beat - may send more than one for efficiency)
+ private final @NonNull List<ReplicatedLogEntry> entries;
// leader's commitIndex
private final long leaderCommit;
private final short payloadVersion;
- public AppendEntries(long term, String leaderId, long prevLogIndex, long prevLogTerm,
- List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex, short payloadVersion) {
+ private final short recipientRaftVersion;
+
+ private final short leaderRaftVersion;
+
+ private final String leaderAddress;
+
+ AppendEntries(final long term, @NonNull final String leaderId, final long prevLogIndex,
+ final long prevLogTerm, @NonNull final List<ReplicatedLogEntry> entries, final long leaderCommit,
+ final long replicatedToAllIndex, final short payloadVersion, final short recipientRaftVersion,
+ final short leaderRaftVersion, @Nullable final String leaderAddress) {
super(term);
- this.leaderId = leaderId;
+ this.leaderId = requireNonNull(leaderId);
this.prevLogIndex = prevLogIndex;
this.prevLogTerm = prevLogTerm;
- this.entries = entries;
+ this.entries = requireNonNull(entries);
this.leaderCommit = leaderCommit;
this.replicatedToAllIndex = replicatedToAllIndex;
this.payloadVersion = payloadVersion;
+ this.recipientRaftVersion = recipientRaftVersion;
+ this.leaderRaftVersion = leaderRaftVersion;
+ this.leaderAddress = leaderAddress;
+ }
+
+ public AppendEntries(final long term, final @NonNull String leaderId, final long prevLogIndex,
+ final long prevLogTerm, final @NonNull List<ReplicatedLogEntry> entries, final long leaderCommit,
+ final long replicatedToAllIndex, final short payloadVersion, final short recipientRaftVersion,
+ final @Nullable String leaderAddress) {
+ this(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, replicatedToAllIndex, payloadVersion,
+ recipientRaftVersion, RaftVersions.CURRENT_VERSION, leaderAddress);
+ }
+
+ @VisibleForTesting
+ public AppendEntries(final long term, final @NonNull String leaderId, final long prevLogIndex,
+ final long prevLogTerm, final @NonNull List<ReplicatedLogEntry> entries, final long leaderCommit,
+ final long replicatedToAllIndex, final short payloadVersion) {
+ this(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, replicatedToAllIndex, payloadVersion,
+ RaftVersions.CURRENT_VERSION, null);
}
- public String getLeaderId() {
+ public @NonNull String getLeaderId() {
return leaderId;
}
return prevLogTerm;
}
- public List<ReplicatedLogEntry> getEntries() {
+ public @NonNull List<ReplicatedLogEntry> getEntries() {
return entries;
}
return payloadVersion;
}
+ public Optional<String> getLeaderAddress() {
+ return Optional.ofNullable(leaderAddress);
+ }
+
+ public short getLeaderRaftVersion() {
+ return leaderRaftVersion;
+ }
+
@Override
public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("AppendEntries [leaderId=").append(leaderId).append(", prevLogIndex=").append(prevLogIndex)
- .append(", prevLogTerm=").append(prevLogTerm).append(", leaderCommit=").append(leaderCommit)
- .append(", replicatedToAllIndex=").append(replicatedToAllIndex).append(", payloadVersion=")
- .append(payloadVersion).append(", entries=").append(entries).append("]");
- return builder.toString();
+ return "AppendEntries [leaderId=" + leaderId
+ + ", prevLogIndex=" + prevLogIndex
+ + ", prevLogTerm=" + prevLogTerm
+ + ", leaderCommit=" + leaderCommit
+ + ", replicatedToAllIndex=" + replicatedToAllIndex
+ + ", payloadVersion=" + payloadVersion
+ + ", recipientRaftVersion=" + recipientRaftVersion
+ + ", leaderRaftVersion=" + leaderRaftVersion
+ + ", leaderAddress=" + leaderAddress
+ + ", entries=" + entries + "]";
+ }
+
+ @Override
+ Object writeReplace() {
+ if (recipientRaftVersion <= RaftVersions.BORON_VERSION) {
+ return new Proxy(this);
+ }
+ return recipientRaftVersion == RaftVersions.FLUORINE_VERSION ? new ProxyV2(this) : new AE(this);
}
- private Object writeReplace() {
- return new Proxy(this);
+ /**
+ * Fluorine version that adds the leader address.
+ */
+ private static class ProxyV2 implements Externalizable {
+ private static final long serialVersionUID = 1L;
+
+ private AppendEntries appendEntries;
+
+ // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+ // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
+ public ProxyV2() {
+ }
+
+ ProxyV2(final AppendEntries appendEntries) {
+ this.appendEntries = appendEntries;
+ }
+
+ @Override
+ public void writeExternal(final ObjectOutput out) throws IOException {
+ out.writeShort(appendEntries.leaderRaftVersion);
+ out.writeLong(appendEntries.getTerm());
+ out.writeObject(appendEntries.leaderId);
+ out.writeLong(appendEntries.prevLogTerm);
+ out.writeLong(appendEntries.prevLogIndex);
+ out.writeLong(appendEntries.leaderCommit);
+ out.writeLong(appendEntries.replicatedToAllIndex);
+ out.writeShort(appendEntries.payloadVersion);
+
+ out.writeInt(appendEntries.entries.size());
+ for (ReplicatedLogEntry e: appendEntries.entries) {
+ out.writeLong(e.getIndex());
+ out.writeLong(e.getTerm());
+ out.writeObject(e.getData());
+ }
+
+ out.writeObject(appendEntries.leaderAddress);
+ }
+
+ @Override
+ public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+ short leaderRaftVersion = in.readShort();
+ long term = in.readLong();
+ String leaderId = (String) in.readObject();
+ long prevLogTerm = in.readLong();
+ long prevLogIndex = in.readLong();
+ long leaderCommit = in.readLong();
+ long replicatedToAllIndex = in.readLong();
+ short payloadVersion = in.readShort();
+
+ int size = in.readInt();
+ List<ReplicatedLogEntry> entries = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ entries.add(new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject()));
+ }
+
+ String leaderAddress = (String)in.readObject();
+
+ appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit,
+ replicatedToAllIndex, payloadVersion, RaftVersions.CURRENT_VERSION, leaderRaftVersion,
+ leaderAddress);
+ }
+
+ private Object readResolve() {
+ return appendEntries;
+ }
}
+ /**
+ * Pre-Fluorine version.
+ */
+ @Deprecated
private static class Proxy implements Externalizable {
private static final long serialVersionUID = 1L;
private AppendEntries appendEntries;
+ // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+ // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
public Proxy() {
}
- Proxy(AppendEntries appendEntries) {
+ Proxy(final AppendEntries appendEntries) {
this.appendEntries = appendEntries;
}
@Override
- public void writeExternal(ObjectOutput out) throws IOException {
+ public void writeExternal(final ObjectOutput out) throws IOException {
out.writeLong(appendEntries.getTerm());
out.writeObject(appendEntries.leaderId);
out.writeLong(appendEntries.prevLogTerm);
out.writeShort(appendEntries.payloadVersion);
out.writeInt(appendEntries.entries.size());
- for(ReplicatedLogEntry e: appendEntries.entries) {
+ for (ReplicatedLogEntry e: appendEntries.entries) {
out.writeLong(e.getIndex());
out.writeLong(e.getTerm());
out.writeObject(e.getData());
}
@Override
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
long term = in.readLong();
String leaderId = (String) in.readObject();
long prevLogTerm = in.readLong();
int size = in.readInt();
List<ReplicatedLogEntry> entries = new ArrayList<>(size);
- for(int i = 0; i < size; i++) {
- entries.add(new ReplicatedLogImplEntry(in.readLong(), in.readLong(), (Payload) in.readObject()));
+ for (int i = 0; i < size; i++) {
+ entries.add(new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject()));
}
appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit,
- replicatedToAllIndex, payloadVersion);
+ replicatedToAllIndex, payloadVersion, RaftVersions.CURRENT_VERSION, RaftVersions.BORON_VERSION, null);
}
private Object readResolve() {