2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft.messages;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.collect.ImmutableList;
14 import java.io.Externalizable;
15 import java.io.IOException;
16 import java.io.ObjectInput;
17 import java.io.ObjectOutput;
18 import java.util.List;
19 import org.eclipse.jdt.annotation.NonNull;
20 import org.eclipse.jdt.annotation.Nullable;
21 import org.opendaylight.controller.cluster.raft.RaftVersions;
22 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
23 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
26 * Invoked by leader to replicate log entries (§5.3); also used as heartbeat (§5.2).
28 public final class AppendEntries extends AbstractRaftRPC {
30 private static final long serialVersionUID = 1L;
32 // So that follower can redirect clients
33 private final @NonNull String leaderId;
35 // Index of log entry immediately preceding new ones
36 private final long prevLogIndex;
38 // term of prevLogIndex entry
39 private final long prevLogTerm;
41 // log entries to store (empty for heart beat - may send more than one for efficiency)
42 private final @NonNull List<ReplicatedLogEntry> entries;
44 // leader's commitIndex
45 private final long leaderCommit;
47 // index which has been replicated successfully to all followers, -1 if none
48 private final long replicatedToAllIndex;
50 private final short payloadVersion;
52 private final short recipientRaftVersion;
54 private final short leaderRaftVersion;
56 private final String leaderAddress;
58 AppendEntries(final long term, @NonNull final String leaderId, final long prevLogIndex,
59 final long prevLogTerm, @NonNull final List<ReplicatedLogEntry> entries, final long leaderCommit,
60 final long replicatedToAllIndex, final short payloadVersion, final short recipientRaftVersion,
61 final short leaderRaftVersion, @Nullable final String leaderAddress) {
63 this.leaderId = requireNonNull(leaderId);
64 this.prevLogIndex = prevLogIndex;
65 this.prevLogTerm = prevLogTerm;
66 this.entries = requireNonNull(entries);
67 this.leaderCommit = leaderCommit;
68 this.replicatedToAllIndex = replicatedToAllIndex;
69 this.payloadVersion = payloadVersion;
70 this.recipientRaftVersion = recipientRaftVersion;
71 this.leaderRaftVersion = leaderRaftVersion;
72 this.leaderAddress = leaderAddress;
75 public AppendEntries(final long term, final @NonNull String leaderId, final long prevLogIndex,
76 final long prevLogTerm, final @NonNull List<ReplicatedLogEntry> entries, final long leaderCommit,
77 final long replicatedToAllIndex, final short payloadVersion, final short recipientRaftVersion,
78 final @Nullable String leaderAddress) {
79 this(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, replicatedToAllIndex, payloadVersion,
80 recipientRaftVersion, RaftVersions.CURRENT_VERSION, leaderAddress);
84 public AppendEntries(final long term, final @NonNull String leaderId, final long prevLogIndex,
85 final long prevLogTerm, final @NonNull List<ReplicatedLogEntry> entries, final long leaderCommit,
86 final long replicatedToAllIndex, final short payloadVersion) {
87 this(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, replicatedToAllIndex, payloadVersion,
88 RaftVersions.CURRENT_VERSION, null);
91 public @NonNull String getLeaderId() {
95 public long getPrevLogIndex() {
99 public long getPrevLogTerm() {
103 public @NonNull List<ReplicatedLogEntry> getEntries() {
107 public long getLeaderCommit() {
111 public long getReplicatedToAllIndex() {
112 return replicatedToAllIndex;
115 public short getPayloadVersion() {
116 return payloadVersion;
119 public @Nullable String leaderAddress() {
120 return leaderAddress;
123 public short getLeaderRaftVersion() {
124 return leaderRaftVersion;
128 public String toString() {
129 return "AppendEntries [leaderId=" + leaderId
130 + ", prevLogIndex=" + prevLogIndex
131 + ", prevLogTerm=" + prevLogTerm
132 + ", leaderCommit=" + leaderCommit
133 + ", replicatedToAllIndex=" + replicatedToAllIndex
134 + ", payloadVersion=" + payloadVersion
135 + ", recipientRaftVersion=" + recipientRaftVersion
136 + ", leaderRaftVersion=" + leaderRaftVersion
137 + ", leaderAddress=" + leaderAddress
138 + ", entries=" + entries + "]";
142 Object writeReplace() {
143 return recipientRaftVersion <= RaftVersions.FLUORINE_VERSION ? new ProxyV2(this) : new AE(this);
147 * Fluorine version that adds the leader address.
149 private static class ProxyV2 implements Externalizable {
151 private static final long serialVersionUID = 1L;
153 private AppendEntries appendEntries;
155 // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
156 // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
157 @SuppressWarnings("checkstyle:RedundantModifier")
161 ProxyV2(final AppendEntries appendEntries) {
162 this.appendEntries = appendEntries;
166 public void writeExternal(final ObjectOutput out) throws IOException {
167 out.writeShort(appendEntries.leaderRaftVersion);
168 out.writeLong(appendEntries.getTerm());
169 out.writeObject(appendEntries.leaderId);
170 out.writeLong(appendEntries.prevLogTerm);
171 out.writeLong(appendEntries.prevLogIndex);
172 out.writeLong(appendEntries.leaderCommit);
173 out.writeLong(appendEntries.replicatedToAllIndex);
174 out.writeShort(appendEntries.payloadVersion);
176 out.writeInt(appendEntries.entries.size());
177 for (ReplicatedLogEntry e: appendEntries.entries) {
178 out.writeLong(e.index());
179 out.writeLong(e.term());
180 out.writeObject(e.getData());
183 out.writeObject(appendEntries.leaderAddress);
187 public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
188 short leaderRaftVersion = in.readShort();
189 long term = in.readLong();
190 String leaderId = (String) in.readObject();
191 long prevLogTerm = in.readLong();
192 long prevLogIndex = in.readLong();
193 long leaderCommit = in.readLong();
194 long replicatedToAllIndex = in.readLong();
195 short payloadVersion = in.readShort();
197 int size = in.readInt();
198 var entries = ImmutableList.<ReplicatedLogEntry>builderWithExpectedSize(size);
199 for (int i = 0; i < size; i++) {
200 entries.add(new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject()));
203 String leaderAddress = (String)in.readObject();
205 appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries.build(), leaderCommit,
206 replicatedToAllIndex, payloadVersion, RaftVersions.CURRENT_VERSION, leaderRaftVersion,
211 private Object readResolve() {
212 return appendEntries;