2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft.messages;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.annotations.VisibleForTesting;
13 import java.io.Externalizable;
14 import java.io.IOException;
15 import java.io.ObjectInput;
16 import java.io.ObjectOutput;
17 import java.util.ArrayList;
18 import java.util.List;
19 import java.util.Optional;
20 import org.eclipse.jdt.annotation.NonNull;
21 import org.eclipse.jdt.annotation.Nullable;
22 import org.opendaylight.controller.cluster.raft.RaftVersions;
23 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
24 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
25 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
28 * Invoked by leader to replicate log entries (§5.3); also used as
31 public class AppendEntries extends AbstractRaftRPC {
32 private static final long serialVersionUID = 1L;
34 // So that follower can redirect clients
35 private final String leaderId;
37 // Index of log entry immediately preceding new ones
38 private final long prevLogIndex;
40 // term of prevLogIndex entry
41 private final long prevLogTerm;
43 // log entries to store (empty for heart beat - may send more than one for efficiency)
44 private final List<ReplicatedLogEntry> entries;
46 // leader's commitIndex
47 private final long leaderCommit;
49 // index which has been replicated successfully to all followers, -1 if none
50 private final long replicatedToAllIndex;
52 private final short payloadVersion;
54 private final short recipientRaftVersion;
56 private final short leaderRaftVersion;
58 private final String leaderAddress;
60 private AppendEntries(long term, @NonNull String leaderId, long prevLogIndex, long prevLogTerm,
61 @NonNull List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex,
62 short payloadVersion, short recipientRaftVersion, short leaderRaftVersion, @Nullable String leaderAddress) {
64 this.leaderId = requireNonNull(leaderId);
65 this.prevLogIndex = prevLogIndex;
66 this.prevLogTerm = prevLogTerm;
67 this.entries = requireNonNull(entries);
68 this.leaderCommit = leaderCommit;
69 this.replicatedToAllIndex = replicatedToAllIndex;
70 this.payloadVersion = payloadVersion;
71 this.recipientRaftVersion = recipientRaftVersion;
72 this.leaderRaftVersion = leaderRaftVersion;
73 this.leaderAddress = leaderAddress;
76 public AppendEntries(long term, @NonNull String leaderId, long prevLogIndex, long prevLogTerm,
77 @NonNull List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex,
78 short payloadVersion, short recipientRaftVersion, @Nullable String leaderAddress) {
79 this(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, replicatedToAllIndex, payloadVersion,
80 recipientRaftVersion, RaftVersions.CURRENT_VERSION, leaderAddress);
84 public AppendEntries(long term, @NonNull String leaderId, long prevLogIndex, long prevLogTerm,
85 @NonNull List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex,
86 short payloadVersion) {
87 this(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, replicatedToAllIndex, payloadVersion,
88 RaftVersions.CURRENT_VERSION, null);
91 public @NonNull String getLeaderId() {
95 public long getPrevLogIndex() {
99 public long getPrevLogTerm() {
103 public @NonNull List<ReplicatedLogEntry> getEntries() {
107 public long getLeaderCommit() {
111 public long getReplicatedToAllIndex() {
112 return replicatedToAllIndex;
115 public short getPayloadVersion() {
116 return payloadVersion;
119 public Optional<String> getLeaderAddress() {
120 return Optional.ofNullable(leaderAddress);
123 public short getLeaderRaftVersion() {
124 return leaderRaftVersion;
128 public String toString() {
129 return "AppendEntries [leaderId=" + leaderId
130 + ", prevLogIndex=" + prevLogIndex
131 + ", prevLogTerm=" + prevLogTerm
132 + ", leaderCommit=" + leaderCommit
133 + ", replicatedToAllIndex=" + replicatedToAllIndex
134 + ", payloadVersion=" + payloadVersion
135 + ", recipientRaftVersion=" + recipientRaftVersion
136 + ", leaderRaftVersion=" + leaderRaftVersion
137 + ", leaderAddress=" + leaderAddress
138 + ", entries=" + entries + "]";
141 private Object writeReplace() {
142 return recipientRaftVersion >= RaftVersions.FLUORINE_VERSION ? new ProxyV2(this) : new Proxy(this);
146 * Fluorine version that adds the leader address.
148 private static class ProxyV2 implements Externalizable {
149 private static final long serialVersionUID = 1L;
151 private AppendEntries appendEntries;
153 // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
154 // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
155 @SuppressWarnings("checkstyle:RedundantModifier")
159 ProxyV2(AppendEntries appendEntries) {
160 this.appendEntries = appendEntries;
164 public void writeExternal(ObjectOutput out) throws IOException {
165 out.writeShort(appendEntries.leaderRaftVersion);
166 out.writeLong(appendEntries.getTerm());
167 out.writeObject(appendEntries.leaderId);
168 out.writeLong(appendEntries.prevLogTerm);
169 out.writeLong(appendEntries.prevLogIndex);
170 out.writeLong(appendEntries.leaderCommit);
171 out.writeLong(appendEntries.replicatedToAllIndex);
172 out.writeShort(appendEntries.payloadVersion);
174 out.writeInt(appendEntries.entries.size());
175 for (ReplicatedLogEntry e: appendEntries.entries) {
176 out.writeLong(e.getIndex());
177 out.writeLong(e.getTerm());
178 out.writeObject(e.getData());
181 out.writeObject(appendEntries.leaderAddress);
185 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
186 short leaderRaftVersion = in.readShort();
187 long term = in.readLong();
188 String leaderId = (String) in.readObject();
189 long prevLogTerm = in.readLong();
190 long prevLogIndex = in.readLong();
191 long leaderCommit = in.readLong();
192 long replicatedToAllIndex = in.readLong();
193 short payloadVersion = in.readShort();
195 int size = in.readInt();
196 List<ReplicatedLogEntry> entries = new ArrayList<>(size);
197 for (int i = 0; i < size; i++) {
198 entries.add(new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject()));
201 String leaderAddress = (String)in.readObject();
203 appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit,
204 replicatedToAllIndex, payloadVersion, RaftVersions.CURRENT_VERSION, leaderRaftVersion,
208 private Object readResolve() {
209 return appendEntries;
214 * Pre-Fluorine version.
217 private static class Proxy implements Externalizable {
218 private static final long serialVersionUID = 1L;
220 private AppendEntries appendEntries;
222 // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
223 // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
224 @SuppressWarnings("checkstyle:RedundantModifier")
228 Proxy(AppendEntries appendEntries) {
229 this.appendEntries = appendEntries;
233 public void writeExternal(ObjectOutput out) throws IOException {
234 out.writeLong(appendEntries.getTerm());
235 out.writeObject(appendEntries.leaderId);
236 out.writeLong(appendEntries.prevLogTerm);
237 out.writeLong(appendEntries.prevLogIndex);
238 out.writeLong(appendEntries.leaderCommit);
239 out.writeLong(appendEntries.replicatedToAllIndex);
240 out.writeShort(appendEntries.payloadVersion);
242 out.writeInt(appendEntries.entries.size());
243 for (ReplicatedLogEntry e: appendEntries.entries) {
244 out.writeLong(e.getIndex());
245 out.writeLong(e.getTerm());
246 out.writeObject(e.getData());
251 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
252 long term = in.readLong();
253 String leaderId = (String) in.readObject();
254 long prevLogTerm = in.readLong();
255 long prevLogIndex = in.readLong();
256 long leaderCommit = in.readLong();
257 long replicatedToAllIndex = in.readLong();
258 short payloadVersion = in.readShort();
260 int size = in.readInt();
261 List<ReplicatedLogEntry> entries = new ArrayList<>(size);
262 for (int i = 0; i < size; i++) {
263 entries.add(new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject()));
266 appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit,
267 replicatedToAllIndex, payloadVersion, RaftVersions.CURRENT_VERSION, RaftVersions.BORON_VERSION, null);
270 private Object readResolve() {
271 return appendEntries;