2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft.messages;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.annotations.VisibleForTesting;
13 import java.io.Externalizable;
14 import java.io.IOException;
15 import java.io.ObjectInput;
16 import java.io.ObjectOutput;
17 import java.util.ArrayList;
18 import java.util.List;
19 import java.util.Optional;
20 import org.eclipse.jdt.annotation.NonNull;
21 import org.eclipse.jdt.annotation.Nullable;
22 import org.opendaylight.controller.cluster.raft.RaftVersions;
23 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
24 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
27 * Invoked by leader to replicate log entries (§5.3); also used as heartbeat (§5.2).
29 public final class AppendEntries extends AbstractRaftRPC {
30 private static final long serialVersionUID = 1L;
32 // So that follower can redirect clients
33 private final @NonNull String leaderId;
35 // Index of log entry immediately preceding new ones
36 private final long prevLogIndex;
38 // term of prevLogIndex entry
39 private final long prevLogTerm;
41 // log entries to store (empty for heart beat - may send more than one for efficiency)
42 private final @NonNull List<ReplicatedLogEntry> entries;
44 // leader's commitIndex
45 private final long leaderCommit;
47 // index which has been replicated successfully to all followers, -1 if none
48 private final long replicatedToAllIndex;
50 private final short payloadVersion;
52 private final short recipientRaftVersion;
54 private final short leaderRaftVersion;
56 private final String leaderAddress;
58 AppendEntries(final long term, @NonNull final String leaderId, final long prevLogIndex,
59 final long prevLogTerm, @NonNull final List<ReplicatedLogEntry> entries, final long leaderCommit,
60 final long replicatedToAllIndex, final short payloadVersion, final short recipientRaftVersion,
61 final short leaderRaftVersion, @Nullable final String leaderAddress) {
63 this.leaderId = requireNonNull(leaderId);
64 this.prevLogIndex = prevLogIndex;
65 this.prevLogTerm = prevLogTerm;
66 this.entries = requireNonNull(entries);
67 this.leaderCommit = leaderCommit;
68 this.replicatedToAllIndex = replicatedToAllIndex;
69 this.payloadVersion = payloadVersion;
70 this.recipientRaftVersion = recipientRaftVersion;
71 this.leaderRaftVersion = leaderRaftVersion;
72 this.leaderAddress = leaderAddress;
75 public AppendEntries(final long term, final @NonNull String leaderId, final long prevLogIndex,
76 final long prevLogTerm, final @NonNull List<ReplicatedLogEntry> entries, final long leaderCommit,
77 final long replicatedToAllIndex, final short payloadVersion, final short recipientRaftVersion,
78 final @Nullable String leaderAddress) {
79 this(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, replicatedToAllIndex, payloadVersion,
80 recipientRaftVersion, RaftVersions.CURRENT_VERSION, leaderAddress);
84 public AppendEntries(final long term, final @NonNull String leaderId, final long prevLogIndex,
85 final long prevLogTerm, final @NonNull List<ReplicatedLogEntry> entries, final long leaderCommit,
86 final long replicatedToAllIndex, final short payloadVersion) {
87 this(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, replicatedToAllIndex, payloadVersion,
88 RaftVersions.CURRENT_VERSION, null);
91 public @NonNull String getLeaderId() {
95 public long getPrevLogIndex() {
99 public long getPrevLogTerm() {
103 public @NonNull List<ReplicatedLogEntry> getEntries() {
107 public long getLeaderCommit() {
111 public long getReplicatedToAllIndex() {
112 return replicatedToAllIndex;
115 public short getPayloadVersion() {
116 return payloadVersion;
119 public Optional<String> getLeaderAddress() {
120 return Optional.ofNullable(leaderAddress);
123 public short getLeaderRaftVersion() {
124 return leaderRaftVersion;
128 public String toString() {
129 return "AppendEntries [leaderId=" + leaderId
130 + ", prevLogIndex=" + prevLogIndex
131 + ", prevLogTerm=" + prevLogTerm
132 + ", leaderCommit=" + leaderCommit
133 + ", replicatedToAllIndex=" + replicatedToAllIndex
134 + ", payloadVersion=" + payloadVersion
135 + ", recipientRaftVersion=" + recipientRaftVersion
136 + ", leaderRaftVersion=" + leaderRaftVersion
137 + ", leaderAddress=" + leaderAddress
138 + ", entries=" + entries + "]";
142 Object writeReplace() {
143 if (recipientRaftVersion <= RaftVersions.BORON_VERSION) {
144 return new Proxy(this);
146 return recipientRaftVersion == RaftVersions.FLUORINE_VERSION ? new ProxyV2(this) : new AE(this);
150 * Fluorine version that adds the leader address.
152 private static class ProxyV2 implements Externalizable {
153 private static final long serialVersionUID = 1L;
155 private AppendEntries appendEntries;
157 // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
158 // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
159 @SuppressWarnings("checkstyle:RedundantModifier")
163 ProxyV2(final AppendEntries appendEntries) {
164 this.appendEntries = appendEntries;
168 public void writeExternal(final ObjectOutput out) throws IOException {
169 out.writeShort(appendEntries.leaderRaftVersion);
170 out.writeLong(appendEntries.getTerm());
171 out.writeObject(appendEntries.leaderId);
172 out.writeLong(appendEntries.prevLogTerm);
173 out.writeLong(appendEntries.prevLogIndex);
174 out.writeLong(appendEntries.leaderCommit);
175 out.writeLong(appendEntries.replicatedToAllIndex);
176 out.writeShort(appendEntries.payloadVersion);
178 out.writeInt(appendEntries.entries.size());
179 for (ReplicatedLogEntry e: appendEntries.entries) {
180 out.writeLong(e.getIndex());
181 out.writeLong(e.getTerm());
182 out.writeObject(e.getData());
185 out.writeObject(appendEntries.leaderAddress);
189 public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
190 short leaderRaftVersion = in.readShort();
191 long term = in.readLong();
192 String leaderId = (String) in.readObject();
193 long prevLogTerm = in.readLong();
194 long prevLogIndex = in.readLong();
195 long leaderCommit = in.readLong();
196 long replicatedToAllIndex = in.readLong();
197 short payloadVersion = in.readShort();
199 int size = in.readInt();
200 List<ReplicatedLogEntry> entries = new ArrayList<>(size);
201 for (int i = 0; i < size; i++) {
202 entries.add(new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject()));
205 String leaderAddress = (String)in.readObject();
207 appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit,
208 replicatedToAllIndex, payloadVersion, RaftVersions.CURRENT_VERSION, leaderRaftVersion,
212 private Object readResolve() {
213 return appendEntries;
218 * Pre-Fluorine version.
221 private static class Proxy implements Externalizable {
222 private static final long serialVersionUID = 1L;
224 private AppendEntries appendEntries;
226 // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
227 // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
228 @SuppressWarnings("checkstyle:RedundantModifier")
232 Proxy(final AppendEntries appendEntries) {
233 this.appendEntries = appendEntries;
237 public void writeExternal(final ObjectOutput out) throws IOException {
238 out.writeLong(appendEntries.getTerm());
239 out.writeObject(appendEntries.leaderId);
240 out.writeLong(appendEntries.prevLogTerm);
241 out.writeLong(appendEntries.prevLogIndex);
242 out.writeLong(appendEntries.leaderCommit);
243 out.writeLong(appendEntries.replicatedToAllIndex);
244 out.writeShort(appendEntries.payloadVersion);
246 out.writeInt(appendEntries.entries.size());
247 for (ReplicatedLogEntry e: appendEntries.entries) {
248 out.writeLong(e.getIndex());
249 out.writeLong(e.getTerm());
250 out.writeObject(e.getData());
255 public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
256 long term = in.readLong();
257 String leaderId = (String) in.readObject();
258 long prevLogTerm = in.readLong();
259 long prevLogIndex = in.readLong();
260 long leaderCommit = in.readLong();
261 long replicatedToAllIndex = in.readLong();
262 short payloadVersion = in.readShort();
264 int size = in.readInt();
265 List<ReplicatedLogEntry> entries = new ArrayList<>(size);
266 for (int i = 0; i < size; i++) {
267 entries.add(new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject()));
270 appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit,
271 replicatedToAllIndex, payloadVersion, RaftVersions.CURRENT_VERSION, RaftVersions.BORON_VERSION, null);
274 private Object readResolve() {
275 return appendEntries;