2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.messages;
11 import static java.util.Objects.requireNonNull;
13 import com.google.common.annotations.VisibleForTesting;
14 import java.io.Externalizable;
15 import java.io.IOException;
16 import java.io.ObjectInput;
17 import java.io.ObjectOutput;
18 import java.util.ArrayList;
19 import java.util.List;
20 import java.util.Optional;
21 import javax.annotation.Nonnull;
22 import javax.annotation.Nullable;
23 import org.opendaylight.controller.cluster.raft.RaftVersions;
24 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
25 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
26 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
29 * Invoked by leader to replicate log entries (§5.3); also used as
32 public class AppendEntries extends AbstractRaftRPC {
33 private static final long serialVersionUID = 1L;
35 // So that follower can redirect clients
36 private final String leaderId;
38 // Index of log entry immediately preceding new ones
39 private final long prevLogIndex;
41 // term of prevLogIndex entry
42 private final long prevLogTerm;
44 // log entries to store (empty for heart beat - may send more than one for efficiency)
45 private final List<ReplicatedLogEntry> entries;
47 // leader's commitIndex
48 private final long leaderCommit;
50 // index which has been replicated successfully to all followers, -1 if none
51 private final long replicatedToAllIndex;
53 private final short payloadVersion;
55 private final short recipientRaftVersion;
57 private final short leaderRaftVersion;
59 private final String leaderAddress;
61 private AppendEntries(long term, @Nonnull String leaderId, long prevLogIndex, long prevLogTerm,
62 @Nonnull List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex,
63 short payloadVersion, short recipientRaftVersion, short leaderRaftVersion, @Nullable String leaderAddress) {
65 this.leaderId = requireNonNull(leaderId);
66 this.prevLogIndex = prevLogIndex;
67 this.prevLogTerm = prevLogTerm;
68 this.entries = requireNonNull(entries);
69 this.leaderCommit = leaderCommit;
70 this.replicatedToAllIndex = replicatedToAllIndex;
71 this.payloadVersion = payloadVersion;
72 this.recipientRaftVersion = recipientRaftVersion;
73 this.leaderRaftVersion = leaderRaftVersion;
74 this.leaderAddress = leaderAddress;
77 public AppendEntries(long term, @Nonnull String leaderId, long prevLogIndex, long prevLogTerm,
78 @Nonnull List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex,
79 short payloadVersion, short recipientRaftVersion, @Nullable String leaderAddress) {
80 this(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, replicatedToAllIndex, payloadVersion,
81 recipientRaftVersion, RaftVersions.CURRENT_VERSION, leaderAddress);
85 public AppendEntries(long term, @Nonnull String leaderId, long prevLogIndex, long prevLogTerm,
86 @Nonnull List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex,
87 short payloadVersion) {
88 this(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, replicatedToAllIndex, payloadVersion,
89 RaftVersions.CURRENT_VERSION, null);
93 public String getLeaderId() {
97 public long getPrevLogIndex() {
101 public long getPrevLogTerm() {
106 public List<ReplicatedLogEntry> getEntries() {
110 public long getLeaderCommit() {
114 public long getReplicatedToAllIndex() {
115 return replicatedToAllIndex;
118 public short getPayloadVersion() {
119 return payloadVersion;
122 public Optional<String> getLeaderAddress() {
123 return Optional.ofNullable(leaderAddress);
126 public short getLeaderRaftVersion() {
127 return leaderRaftVersion;
131 public String toString() {
132 return "AppendEntries [leaderId=" + leaderId
133 + ", prevLogIndex=" + prevLogIndex
134 + ", prevLogTerm=" + prevLogTerm
135 + ", leaderCommit=" + leaderCommit
136 + ", replicatedToAllIndex=" + replicatedToAllIndex
137 + ", payloadVersion=" + payloadVersion
138 + ", recipientRaftVersion=" + recipientRaftVersion
139 + ", leaderRaftVersion=" + leaderRaftVersion
140 + ", leaderAddress=" + leaderAddress
141 + ", entries=" + entries + "]";
144 private Object writeReplace() {
145 return recipientRaftVersion >= RaftVersions.FLUORINE_VERSION ? new ProxyV2(this) : new Proxy(this);
149 * Fluorine version that adds the leader address.
151 private static class ProxyV2 implements Externalizable {
152 private static final long serialVersionUID = 1L;
154 private AppendEntries appendEntries;
156 // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
157 // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
158 @SuppressWarnings("checkstyle:RedundantModifier")
162 ProxyV2(AppendEntries appendEntries) {
163 this.appendEntries = appendEntries;
167 public void writeExternal(ObjectOutput out) throws IOException {
168 out.writeShort(appendEntries.leaderRaftVersion);
169 out.writeLong(appendEntries.getTerm());
170 out.writeObject(appendEntries.leaderId);
171 out.writeLong(appendEntries.prevLogTerm);
172 out.writeLong(appendEntries.prevLogIndex);
173 out.writeLong(appendEntries.leaderCommit);
174 out.writeLong(appendEntries.replicatedToAllIndex);
175 out.writeShort(appendEntries.payloadVersion);
177 out.writeInt(appendEntries.entries.size());
178 for (ReplicatedLogEntry e: appendEntries.entries) {
179 out.writeLong(e.getIndex());
180 out.writeLong(e.getTerm());
181 out.writeObject(e.getData());
184 out.writeObject(appendEntries.leaderAddress);
188 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
189 short leaderRaftVersion = in.readShort();
190 long term = in.readLong();
191 String leaderId = (String) in.readObject();
192 long prevLogTerm = in.readLong();
193 long prevLogIndex = in.readLong();
194 long leaderCommit = in.readLong();
195 long replicatedToAllIndex = in.readLong();
196 short payloadVersion = in.readShort();
198 int size = in.readInt();
199 List<ReplicatedLogEntry> entries = new ArrayList<>(size);
200 for (int i = 0; i < size; i++) {
201 entries.add(new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject()));
204 String leaderAddress = (String)in.readObject();
206 appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit,
207 replicatedToAllIndex, payloadVersion, RaftVersions.CURRENT_VERSION, leaderRaftVersion,
211 private Object readResolve() {
212 return appendEntries;
217 * Pre-Fluorine version.
220 private static class Proxy implements Externalizable {
221 private static final long serialVersionUID = 1L;
223 private AppendEntries appendEntries;
225 // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
226 // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
227 @SuppressWarnings("checkstyle:RedundantModifier")
231 Proxy(AppendEntries appendEntries) {
232 this.appendEntries = appendEntries;
236 public void writeExternal(ObjectOutput out) throws IOException {
237 out.writeLong(appendEntries.getTerm());
238 out.writeObject(appendEntries.leaderId);
239 out.writeLong(appendEntries.prevLogTerm);
240 out.writeLong(appendEntries.prevLogIndex);
241 out.writeLong(appendEntries.leaderCommit);
242 out.writeLong(appendEntries.replicatedToAllIndex);
243 out.writeShort(appendEntries.payloadVersion);
245 out.writeInt(appendEntries.entries.size());
246 for (ReplicatedLogEntry e: appendEntries.entries) {
247 out.writeLong(e.getIndex());
248 out.writeLong(e.getTerm());
249 out.writeObject(e.getData());
254 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
255 long term = in.readLong();
256 String leaderId = (String) in.readObject();
257 long prevLogTerm = in.readLong();
258 long prevLogIndex = in.readLong();
259 long leaderCommit = in.readLong();
260 long replicatedToAllIndex = in.readLong();
261 short payloadVersion = in.readShort();
263 int size = in.readInt();
264 List<ReplicatedLogEntry> entries = new ArrayList<>(size);
265 for (int i = 0; i < size; i++) {
266 entries.add(new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject()));
269 appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit,
270 replicatedToAllIndex, payloadVersion, RaftVersions.CURRENT_VERSION, RaftVersions.BORON_VERSION, null);
273 private Object readResolve() {
274 return appendEntries;