X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fmessages%2FAppendEntries.java;h=45ac9a9ebfbecbb2fb13881dd3f365f8ac9f7e8e;hb=refs%2Fchanges%2F54%2F57354%2F8;hp=fd4e0a274c6f9122e40809e7b120f6a7a8b936bd;hpb=583f6075e842a6a37b83bd01e478aebc70c6af73;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java index fd4e0a274c..4d4b7e4fdd 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java @@ -8,13 +8,25 @@ package org.opendaylight.controller.cluster.raft.messages; +import com.google.common.base.Preconditions; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.ArrayList; import java.util.List; +import javax.annotation.Nonnull; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; /** * Invoked by leader to replicate log entries (§5.3); also used as * heartbeat (§5.2). */ public class AppendEntries extends AbstractRaftRPC { + private static final long serialVersionUID = 1L; + // So that follower can redirect clients private final String leaderId; @@ -24,23 +36,31 @@ public class AppendEntries extends AbstractRaftRPC { // term of prevLogIndex entry private final long prevLogTerm; - // log entries to store (empty for heartbeat; - // may send more than one for efficiency) - private final List entries; + // log entries to store (empty for heart beat - may send more than one for efficiency) + private final List entries; // leader's commitIndex private final long leaderCommit; - public AppendEntries(long term, String leaderId, long prevLogIndex, - long prevLogTerm, List entries, long leaderCommit) { + // index which has been replicated successfully to all followers, -1 if none + private final long replicatedToAllIndex; + + private final short payloadVersion; + + public AppendEntries(long term, @Nonnull String leaderId, long prevLogIndex, long prevLogTerm, + @Nonnull List entries, long leaderCommit, long replicatedToAllIndex, + short payloadVersion) { super(term); - this.leaderId = leaderId; + this.leaderId = Preconditions.checkNotNull(leaderId); this.prevLogIndex = prevLogIndex; this.prevLogTerm = prevLogTerm; - this.entries = entries; + this.entries = Preconditions.checkNotNull(entries); this.leaderCommit = leaderCommit; + this.replicatedToAllIndex = replicatedToAllIndex; + this.payloadVersion = payloadVersion; } + @Nonnull public String getLeaderId() { return leaderId; } @@ -53,11 +73,93 @@ public class AppendEntries extends AbstractRaftRPC { return prevLogTerm; } - public List getEntries() { + @Nonnull + public List getEntries() { return entries; } public long getLeaderCommit() { return leaderCommit; } + + public long getReplicatedToAllIndex() { + return replicatedToAllIndex; + } + + public short getPayloadVersion() { + return payloadVersion; + } + + @Override + public String toString() { + return "AppendEntries [leaderId=" + leaderId + + ", prevLogIndex=" + prevLogIndex + + ", prevLogTerm=" + prevLogTerm + + ", leaderCommit=" + leaderCommit + + ", replicatedToAllIndex=" + replicatedToAllIndex + + ", payloadVersion=" + payloadVersion + + ", entries=" + entries + "]"; + } + + private Object writeReplace() { + return new Proxy(this); + } + + private static class Proxy implements Externalizable { + private static final long serialVersionUID = 1L; + + private AppendEntries appendEntries; + + // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't + // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") + public Proxy() { + } + + Proxy(AppendEntries appendEntries) { + this.appendEntries = appendEntries; + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + out.writeLong(appendEntries.getTerm()); + out.writeObject(appendEntries.leaderId); + out.writeLong(appendEntries.prevLogTerm); + out.writeLong(appendEntries.prevLogIndex); + out.writeLong(appendEntries.leaderCommit); + out.writeLong(appendEntries.replicatedToAllIndex); + out.writeShort(appendEntries.payloadVersion); + + out.writeInt(appendEntries.entries.size()); + for (ReplicatedLogEntry e: appendEntries.entries) { + out.writeLong(e.getIndex()); + out.writeLong(e.getTerm()); + out.writeObject(e.getData()); + } + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + long term = in.readLong(); + String leaderId = (String) in.readObject(); + long prevLogTerm = in.readLong(); + long prevLogIndex = in.readLong(); + long leaderCommit = in.readLong(); + long replicatedToAllIndex = in.readLong(); + short payloadVersion = in.readShort(); + + int size = in.readInt(); + List entries = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + entries.add(new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject())); + } + + appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, + replicatedToAllIndex, payloadVersion); + } + + private Object readResolve() { + return appendEntries; + } + } }