2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft.messages;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.collect.ImmutableList;
14 import java.io.Externalizable;
15 import java.io.IOException;
16 import java.io.ObjectInput;
17 import java.io.ObjectOutput;
18 import java.util.List;
19 import java.util.Optional;
20 import org.eclipse.jdt.annotation.NonNull;
21 import org.eclipse.jdt.annotation.Nullable;
22 import org.opendaylight.controller.cluster.raft.RaftVersions;
23 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
24 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
27 * Invoked by leader to replicate log entries (§5.3); also used as heartbeat (§5.2).
29 public final class AppendEntries extends AbstractRaftRPC {
31 private static final long serialVersionUID = 1L;
33 // So that follower can redirect clients
34 private final @NonNull String leaderId;
36 // Index of log entry immediately preceding new ones
37 private final long prevLogIndex;
39 // term of prevLogIndex entry
40 private final long prevLogTerm;
42 // log entries to store (empty for heart beat - may send more than one for efficiency)
43 private final @NonNull List<ReplicatedLogEntry> entries;
45 // leader's commitIndex
46 private final long leaderCommit;
48 // index which has been replicated successfully to all followers, -1 if none
49 private final long replicatedToAllIndex;
51 private final short payloadVersion;
53 private final short recipientRaftVersion;
55 private final short leaderRaftVersion;
57 private final String leaderAddress;
59 AppendEntries(final long term, @NonNull final String leaderId, final long prevLogIndex,
60 final long prevLogTerm, @NonNull final List<ReplicatedLogEntry> entries, final long leaderCommit,
61 final long replicatedToAllIndex, final short payloadVersion, final short recipientRaftVersion,
62 final short leaderRaftVersion, @Nullable final String leaderAddress) {
64 this.leaderId = requireNonNull(leaderId);
65 this.prevLogIndex = prevLogIndex;
66 this.prevLogTerm = prevLogTerm;
67 this.entries = requireNonNull(entries);
68 this.leaderCommit = leaderCommit;
69 this.replicatedToAllIndex = replicatedToAllIndex;
70 this.payloadVersion = payloadVersion;
71 this.recipientRaftVersion = recipientRaftVersion;
72 this.leaderRaftVersion = leaderRaftVersion;
73 this.leaderAddress = leaderAddress;
76 public AppendEntries(final long term, final @NonNull String leaderId, final long prevLogIndex,
77 final long prevLogTerm, final @NonNull List<ReplicatedLogEntry> entries, final long leaderCommit,
78 final long replicatedToAllIndex, final short payloadVersion, final short recipientRaftVersion,
79 final @Nullable String leaderAddress) {
80 this(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, replicatedToAllIndex, payloadVersion,
81 recipientRaftVersion, RaftVersions.CURRENT_VERSION, leaderAddress);
85 public AppendEntries(final long term, final @NonNull String leaderId, final long prevLogIndex,
86 final long prevLogTerm, final @NonNull List<ReplicatedLogEntry> entries, final long leaderCommit,
87 final long replicatedToAllIndex, final short payloadVersion) {
88 this(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, replicatedToAllIndex, payloadVersion,
89 RaftVersions.CURRENT_VERSION, null);
92 public @NonNull String getLeaderId() {
96 public long getPrevLogIndex() {
100 public long getPrevLogTerm() {
104 public @NonNull List<ReplicatedLogEntry> getEntries() {
108 public long getLeaderCommit() {
112 public long getReplicatedToAllIndex() {
113 return replicatedToAllIndex;
116 public short getPayloadVersion() {
117 return payloadVersion;
120 public Optional<String> getLeaderAddress() {
121 return Optional.ofNullable(leaderAddress);
124 public short getLeaderRaftVersion() {
125 return leaderRaftVersion;
129 public String toString() {
130 return "AppendEntries [leaderId=" + leaderId
131 + ", prevLogIndex=" + prevLogIndex
132 + ", prevLogTerm=" + prevLogTerm
133 + ", leaderCommit=" + leaderCommit
134 + ", replicatedToAllIndex=" + replicatedToAllIndex
135 + ", payloadVersion=" + payloadVersion
136 + ", recipientRaftVersion=" + recipientRaftVersion
137 + ", leaderRaftVersion=" + leaderRaftVersion
138 + ", leaderAddress=" + leaderAddress
139 + ", entries=" + entries + "]";
143 Object writeReplace() {
144 if (recipientRaftVersion <= RaftVersions.BORON_VERSION) {
145 return new Proxy(this);
147 return recipientRaftVersion == RaftVersions.FLUORINE_VERSION ? new ProxyV2(this) : new AE(this);
151 * Fluorine version that adds the leader address.
153 private static class ProxyV2 implements Externalizable {
154 private static final long serialVersionUID = 1L;
156 private AppendEntries appendEntries;
158 // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
159 // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
160 @SuppressWarnings("checkstyle:RedundantModifier")
164 ProxyV2(final AppendEntries appendEntries) {
165 this.appendEntries = appendEntries;
169 public void writeExternal(final ObjectOutput out) throws IOException {
170 out.writeShort(appendEntries.leaderRaftVersion);
171 out.writeLong(appendEntries.getTerm());
172 out.writeObject(appendEntries.leaderId);
173 out.writeLong(appendEntries.prevLogTerm);
174 out.writeLong(appendEntries.prevLogIndex);
175 out.writeLong(appendEntries.leaderCommit);
176 out.writeLong(appendEntries.replicatedToAllIndex);
177 out.writeShort(appendEntries.payloadVersion);
179 out.writeInt(appendEntries.entries.size());
180 for (ReplicatedLogEntry e: appendEntries.entries) {
181 out.writeLong(e.getIndex());
182 out.writeLong(e.getTerm());
183 out.writeObject(e.getData());
186 out.writeObject(appendEntries.leaderAddress);
190 public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
191 short leaderRaftVersion = in.readShort();
192 long term = in.readLong();
193 String leaderId = (String) in.readObject();
194 long prevLogTerm = in.readLong();
195 long prevLogIndex = in.readLong();
196 long leaderCommit = in.readLong();
197 long replicatedToAllIndex = in.readLong();
198 short payloadVersion = in.readShort();
200 int size = in.readInt();
201 var entries = ImmutableList.<ReplicatedLogEntry>builderWithExpectedSize(size);
202 for (int i = 0; i < size; i++) {
203 entries.add(new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject()));
206 String leaderAddress = (String)in.readObject();
208 appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries.build(), leaderCommit,
209 replicatedToAllIndex, payloadVersion, RaftVersions.CURRENT_VERSION, leaderRaftVersion,
213 private Object readResolve() {
214 return appendEntries;
219 * Pre-Fluorine version.
222 private static class Proxy implements Externalizable {
223 private static final long serialVersionUID = 1L;
225 private AppendEntries appendEntries;
227 // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
228 // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
229 @SuppressWarnings("checkstyle:RedundantModifier")
233 Proxy(final AppendEntries appendEntries) {
234 this.appendEntries = appendEntries;
238 public void writeExternal(final ObjectOutput out) throws IOException {
239 out.writeLong(appendEntries.getTerm());
240 out.writeObject(appendEntries.leaderId);
241 out.writeLong(appendEntries.prevLogTerm);
242 out.writeLong(appendEntries.prevLogIndex);
243 out.writeLong(appendEntries.leaderCommit);
244 out.writeLong(appendEntries.replicatedToAllIndex);
245 out.writeShort(appendEntries.payloadVersion);
247 out.writeInt(appendEntries.entries.size());
248 for (ReplicatedLogEntry e: appendEntries.entries) {
249 out.writeLong(e.getIndex());
250 out.writeLong(e.getTerm());
251 out.writeObject(e.getData());
256 public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
257 long term = in.readLong();
258 String leaderId = (String) in.readObject();
259 long prevLogTerm = in.readLong();
260 long prevLogIndex = in.readLong();
261 long leaderCommit = in.readLong();
262 long replicatedToAllIndex = in.readLong();
263 short payloadVersion = in.readShort();
265 int size = in.readInt();
266 var entries = ImmutableList.<ReplicatedLogEntry>builderWithExpectedSize(size);
267 for (int i = 0; i < size; i++) {
268 entries.add(new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject()));
271 appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries.build(), leaderCommit,
272 replicatedToAllIndex, payloadVersion, RaftVersions.CURRENT_VERSION, RaftVersions.BORON_VERSION, null);
275 private Object readResolve() {
276 return appendEntries;