67c6899231912df7a412e7e9f162736e8b689d99
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / messages / AppendEntries.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft.messages;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.annotations.VisibleForTesting;
13 import java.io.Externalizable;
14 import java.io.IOException;
15 import java.io.ObjectInput;
16 import java.io.ObjectOutput;
17 import java.util.ArrayList;
18 import java.util.List;
19 import java.util.Optional;
20 import org.eclipse.jdt.annotation.NonNull;
21 import org.eclipse.jdt.annotation.Nullable;
22 import org.opendaylight.controller.cluster.raft.RaftVersions;
23 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
24 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
25 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
26
27 /**
28  * Invoked by leader to replicate log entries (§5.3); also used as
29  * heartbeat (§5.2).
30  */
31 public class AppendEntries extends AbstractRaftRPC {
32     private static final long serialVersionUID = 1L;
33
34     // So that follower can redirect clients
35     private final String leaderId;
36
37     // Index of log entry immediately preceding new ones
38     private final long prevLogIndex;
39
40     // term of prevLogIndex entry
41     private final long prevLogTerm;
42
43     // log entries to store (empty for heart beat - may send more than one for efficiency)
44     private final List<ReplicatedLogEntry> entries;
45
46     // leader's commitIndex
47     private final long leaderCommit;
48
49     // index which has been replicated successfully to all followers, -1 if none
50     private final long replicatedToAllIndex;
51
52     private final short payloadVersion;
53
54     private final short recipientRaftVersion;
55
56     private final short leaderRaftVersion;
57
58     private final String leaderAddress;
59
60     private AppendEntries(long term, @NonNull String leaderId, long prevLogIndex, long prevLogTerm,
61             @NonNull List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex,
62             short payloadVersion, short recipientRaftVersion, short leaderRaftVersion, @Nullable String leaderAddress) {
63         super(term);
64         this.leaderId = requireNonNull(leaderId);
65         this.prevLogIndex = prevLogIndex;
66         this.prevLogTerm = prevLogTerm;
67         this.entries = requireNonNull(entries);
68         this.leaderCommit = leaderCommit;
69         this.replicatedToAllIndex = replicatedToAllIndex;
70         this.payloadVersion = payloadVersion;
71         this.recipientRaftVersion = recipientRaftVersion;
72         this.leaderRaftVersion = leaderRaftVersion;
73         this.leaderAddress = leaderAddress;
74     }
75
76     public AppendEntries(long term, @NonNull String leaderId, long prevLogIndex, long prevLogTerm,
77             @NonNull List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex,
78             short payloadVersion, short recipientRaftVersion, @Nullable String leaderAddress) {
79         this(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, replicatedToAllIndex, payloadVersion,
80                 recipientRaftVersion, RaftVersions.CURRENT_VERSION, leaderAddress);
81     }
82
83     @VisibleForTesting
84     public AppendEntries(long term, @NonNull String leaderId, long prevLogIndex, long prevLogTerm,
85             @NonNull List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex,
86             short payloadVersion) {
87         this(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, replicatedToAllIndex, payloadVersion,
88                 RaftVersions.CURRENT_VERSION, null);
89     }
90
91     public @NonNull String getLeaderId() {
92         return leaderId;
93     }
94
95     public long getPrevLogIndex() {
96         return prevLogIndex;
97     }
98
99     public long getPrevLogTerm() {
100         return prevLogTerm;
101     }
102
103     public @NonNull List<ReplicatedLogEntry> getEntries() {
104         return entries;
105     }
106
107     public long getLeaderCommit() {
108         return leaderCommit;
109     }
110
111     public long getReplicatedToAllIndex() {
112         return replicatedToAllIndex;
113     }
114
115     public short getPayloadVersion() {
116         return payloadVersion;
117     }
118
119     public Optional<String> getLeaderAddress() {
120         return Optional.ofNullable(leaderAddress);
121     }
122
123     public short getLeaderRaftVersion() {
124         return leaderRaftVersion;
125     }
126
127     @Override
128     public String toString() {
129         return "AppendEntries [leaderId=" + leaderId
130                 + ", prevLogIndex=" + prevLogIndex
131                 + ", prevLogTerm=" + prevLogTerm
132                 + ", leaderCommit=" + leaderCommit
133                 + ", replicatedToAllIndex=" + replicatedToAllIndex
134                 + ", payloadVersion=" + payloadVersion
135                 + ", recipientRaftVersion=" + recipientRaftVersion
136                 + ", leaderRaftVersion=" + leaderRaftVersion
137                 + ", leaderAddress=" + leaderAddress
138                 + ", entries=" + entries + "]";
139     }
140
141     private Object writeReplace() {
142         return recipientRaftVersion >= RaftVersions.FLUORINE_VERSION ? new ProxyV2(this) : new Proxy(this);
143     }
144
145     /**
146      * Fluorine version that adds the leader address.
147      */
148     private static class ProxyV2 implements Externalizable {
149         private static final long serialVersionUID = 1L;
150
151         private AppendEntries appendEntries;
152
153         // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
154         // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
155         @SuppressWarnings("checkstyle:RedundantModifier")
156         public ProxyV2() {
157         }
158
159         ProxyV2(AppendEntries appendEntries) {
160             this.appendEntries = appendEntries;
161         }
162
163         @Override
164         public void writeExternal(ObjectOutput out) throws IOException {
165             out.writeShort(appendEntries.leaderRaftVersion);
166             out.writeLong(appendEntries.getTerm());
167             out.writeObject(appendEntries.leaderId);
168             out.writeLong(appendEntries.prevLogTerm);
169             out.writeLong(appendEntries.prevLogIndex);
170             out.writeLong(appendEntries.leaderCommit);
171             out.writeLong(appendEntries.replicatedToAllIndex);
172             out.writeShort(appendEntries.payloadVersion);
173
174             out.writeInt(appendEntries.entries.size());
175             for (ReplicatedLogEntry e: appendEntries.entries) {
176                 out.writeLong(e.getIndex());
177                 out.writeLong(e.getTerm());
178                 out.writeObject(e.getData());
179             }
180
181             out.writeObject(appendEntries.leaderAddress);
182         }
183
184         @Override
185         public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
186             short leaderRaftVersion = in.readShort();
187             long term = in.readLong();
188             String leaderId = (String) in.readObject();
189             long prevLogTerm = in.readLong();
190             long prevLogIndex = in.readLong();
191             long leaderCommit = in.readLong();
192             long replicatedToAllIndex = in.readLong();
193             short payloadVersion = in.readShort();
194
195             int size = in.readInt();
196             List<ReplicatedLogEntry> entries = new ArrayList<>(size);
197             for (int i = 0; i < size; i++) {
198                 entries.add(new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject()));
199             }
200
201             String leaderAddress = (String)in.readObject();
202
203             appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit,
204                     replicatedToAllIndex, payloadVersion, RaftVersions.CURRENT_VERSION, leaderRaftVersion,
205                     leaderAddress);
206         }
207
208         private Object readResolve() {
209             return appendEntries;
210         }
211     }
212
213     /**
214      * Pre-Fluorine version.
215      */
216     @Deprecated
217     private static class Proxy implements Externalizable {
218         private static final long serialVersionUID = 1L;
219
220         private AppendEntries appendEntries;
221
222         // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
223         // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
224         @SuppressWarnings("checkstyle:RedundantModifier")
225         public Proxy() {
226         }
227
228         Proxy(AppendEntries appendEntries) {
229             this.appendEntries = appendEntries;
230         }
231
232         @Override
233         public void writeExternal(ObjectOutput out) throws IOException {
234             out.writeLong(appendEntries.getTerm());
235             out.writeObject(appendEntries.leaderId);
236             out.writeLong(appendEntries.prevLogTerm);
237             out.writeLong(appendEntries.prevLogIndex);
238             out.writeLong(appendEntries.leaderCommit);
239             out.writeLong(appendEntries.replicatedToAllIndex);
240             out.writeShort(appendEntries.payloadVersion);
241
242             out.writeInt(appendEntries.entries.size());
243             for (ReplicatedLogEntry e: appendEntries.entries) {
244                 out.writeLong(e.getIndex());
245                 out.writeLong(e.getTerm());
246                 out.writeObject(e.getData());
247             }
248         }
249
250         @Override
251         public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
252             long term = in.readLong();
253             String leaderId = (String) in.readObject();
254             long prevLogTerm = in.readLong();
255             long prevLogIndex = in.readLong();
256             long leaderCommit = in.readLong();
257             long replicatedToAllIndex = in.readLong();
258             short payloadVersion = in.readShort();
259
260             int size = in.readInt();
261             List<ReplicatedLogEntry> entries = new ArrayList<>(size);
262             for (int i = 0; i < size; i++) {
263                 entries.add(new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject()));
264             }
265
266             appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit,
267                 replicatedToAllIndex, payloadVersion, RaftVersions.CURRENT_VERSION, RaftVersions.BORON_VERSION, null);
268         }
269
270         private Object readResolve() {
271             return appendEntries;
272         }
273     }
274 }