5ca5d3a5e235bee0890187b68dedceeb5de52436
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / messages / AppendEntries.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft.messages;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.collect.ImmutableList;
14 import java.io.Externalizable;
15 import java.io.IOException;
16 import java.io.ObjectInput;
17 import java.io.ObjectOutput;
18 import java.util.List;
19 import java.util.Optional;
20 import org.eclipse.jdt.annotation.NonNull;
21 import org.eclipse.jdt.annotation.Nullable;
22 import org.opendaylight.controller.cluster.raft.RaftVersions;
23 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
24 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
25
26 /**
27  * Invoked by leader to replicate log entries (§5.3); also used as heartbeat (§5.2).
28  */
29 public final class AppendEntries extends AbstractRaftRPC {
30     private static final long serialVersionUID = 1L;
31
32     // So that follower can redirect clients
33     private final @NonNull String leaderId;
34
35     // Index of log entry immediately preceding new ones
36     private final long prevLogIndex;
37
38     // term of prevLogIndex entry
39     private final long prevLogTerm;
40
41     // log entries to store (empty for heart beat - may send more than one for efficiency)
42     private final @NonNull List<ReplicatedLogEntry> entries;
43
44     // leader's commitIndex
45     private final long leaderCommit;
46
47     // index which has been replicated successfully to all followers, -1 if none
48     private final long replicatedToAllIndex;
49
50     private final short payloadVersion;
51
52     private final short recipientRaftVersion;
53
54     private final short leaderRaftVersion;
55
56     private final String leaderAddress;
57
58     AppendEntries(final long term, @NonNull final String leaderId, final long prevLogIndex,
59             final long prevLogTerm, @NonNull final List<ReplicatedLogEntry> entries, final long leaderCommit,
60             final long replicatedToAllIndex, final short payloadVersion, final short recipientRaftVersion,
61             final short leaderRaftVersion, @Nullable final String leaderAddress) {
62         super(term);
63         this.leaderId = requireNonNull(leaderId);
64         this.prevLogIndex = prevLogIndex;
65         this.prevLogTerm = prevLogTerm;
66         this.entries = requireNonNull(entries);
67         this.leaderCommit = leaderCommit;
68         this.replicatedToAllIndex = replicatedToAllIndex;
69         this.payloadVersion = payloadVersion;
70         this.recipientRaftVersion = recipientRaftVersion;
71         this.leaderRaftVersion = leaderRaftVersion;
72         this.leaderAddress = leaderAddress;
73     }
74
75     public AppendEntries(final long term, final @NonNull String leaderId, final long prevLogIndex,
76             final long prevLogTerm, final @NonNull List<ReplicatedLogEntry> entries, final long leaderCommit,
77             final long replicatedToAllIndex, final short payloadVersion, final short recipientRaftVersion,
78             final @Nullable String leaderAddress) {
79         this(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, replicatedToAllIndex, payloadVersion,
80                 recipientRaftVersion, RaftVersions.CURRENT_VERSION, leaderAddress);
81     }
82
83     @VisibleForTesting
84     public AppendEntries(final long term, final @NonNull String leaderId, final long prevLogIndex,
85             final long prevLogTerm, final @NonNull List<ReplicatedLogEntry> entries, final long leaderCommit,
86             final long replicatedToAllIndex, final short payloadVersion) {
87         this(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, replicatedToAllIndex, payloadVersion,
88                 RaftVersions.CURRENT_VERSION, null);
89     }
90
91     public @NonNull String getLeaderId() {
92         return leaderId;
93     }
94
95     public long getPrevLogIndex() {
96         return prevLogIndex;
97     }
98
99     public long getPrevLogTerm() {
100         return prevLogTerm;
101     }
102
103     public @NonNull List<ReplicatedLogEntry> getEntries() {
104         return entries;
105     }
106
107     public long getLeaderCommit() {
108         return leaderCommit;
109     }
110
111     public long getReplicatedToAllIndex() {
112         return replicatedToAllIndex;
113     }
114
115     public short getPayloadVersion() {
116         return payloadVersion;
117     }
118
119     public Optional<String> getLeaderAddress() {
120         return Optional.ofNullable(leaderAddress);
121     }
122
123     public short getLeaderRaftVersion() {
124         return leaderRaftVersion;
125     }
126
127     @Override
128     public String toString() {
129         return "AppendEntries [leaderId=" + leaderId
130                 + ", prevLogIndex=" + prevLogIndex
131                 + ", prevLogTerm=" + prevLogTerm
132                 + ", leaderCommit=" + leaderCommit
133                 + ", replicatedToAllIndex=" + replicatedToAllIndex
134                 + ", payloadVersion=" + payloadVersion
135                 + ", recipientRaftVersion=" + recipientRaftVersion
136                 + ", leaderRaftVersion=" + leaderRaftVersion
137                 + ", leaderAddress=" + leaderAddress
138                 + ", entries=" + entries + "]";
139     }
140
141     @Override
142     Object writeReplace() {
143         if (recipientRaftVersion <= RaftVersions.BORON_VERSION) {
144             return new Proxy(this);
145         }
146         return recipientRaftVersion == RaftVersions.FLUORINE_VERSION ? new ProxyV2(this) : new AE(this);
147     }
148
149     /**
150      * Fluorine version that adds the leader address.
151      */
152     private static class ProxyV2 implements Externalizable {
153         private static final long serialVersionUID = 1L;
154
155         private AppendEntries appendEntries;
156
157         // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
158         // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
159         @SuppressWarnings("checkstyle:RedundantModifier")
160         public ProxyV2() {
161         }
162
163         ProxyV2(final AppendEntries appendEntries) {
164             this.appendEntries = appendEntries;
165         }
166
167         @Override
168         public void writeExternal(final ObjectOutput out) throws IOException {
169             out.writeShort(appendEntries.leaderRaftVersion);
170             out.writeLong(appendEntries.getTerm());
171             out.writeObject(appendEntries.leaderId);
172             out.writeLong(appendEntries.prevLogTerm);
173             out.writeLong(appendEntries.prevLogIndex);
174             out.writeLong(appendEntries.leaderCommit);
175             out.writeLong(appendEntries.replicatedToAllIndex);
176             out.writeShort(appendEntries.payloadVersion);
177
178             out.writeInt(appendEntries.entries.size());
179             for (ReplicatedLogEntry e: appendEntries.entries) {
180                 out.writeLong(e.getIndex());
181                 out.writeLong(e.getTerm());
182                 out.writeObject(e.getData());
183             }
184
185             out.writeObject(appendEntries.leaderAddress);
186         }
187
188         @Override
189         public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
190             short leaderRaftVersion = in.readShort();
191             long term = in.readLong();
192             String leaderId = (String) in.readObject();
193             long prevLogTerm = in.readLong();
194             long prevLogIndex = in.readLong();
195             long leaderCommit = in.readLong();
196             long replicatedToAllIndex = in.readLong();
197             short payloadVersion = in.readShort();
198
199             int size = in.readInt();
200             var entries = ImmutableList.<ReplicatedLogEntry>builderWithExpectedSize(size);
201             for (int i = 0; i < size; i++) {
202                 entries.add(new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject()));
203             }
204
205             String leaderAddress = (String)in.readObject();
206
207             appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries.build(), leaderCommit,
208                     replicatedToAllIndex, payloadVersion, RaftVersions.CURRENT_VERSION, leaderRaftVersion,
209                     leaderAddress);
210         }
211
212         private Object readResolve() {
213             return appendEntries;
214         }
215     }
216
217     /**
218      * Pre-Fluorine version.
219      */
220     @Deprecated
221     private static class Proxy implements Externalizable {
222         private static final long serialVersionUID = 1L;
223
224         private AppendEntries appendEntries;
225
226         // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
227         // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
228         @SuppressWarnings("checkstyle:RedundantModifier")
229         public Proxy() {
230         }
231
232         Proxy(final AppendEntries appendEntries) {
233             this.appendEntries = appendEntries;
234         }
235
236         @Override
237         public void writeExternal(final ObjectOutput out) throws IOException {
238             out.writeLong(appendEntries.getTerm());
239             out.writeObject(appendEntries.leaderId);
240             out.writeLong(appendEntries.prevLogTerm);
241             out.writeLong(appendEntries.prevLogIndex);
242             out.writeLong(appendEntries.leaderCommit);
243             out.writeLong(appendEntries.replicatedToAllIndex);
244             out.writeShort(appendEntries.payloadVersion);
245
246             out.writeInt(appendEntries.entries.size());
247             for (ReplicatedLogEntry e: appendEntries.entries) {
248                 out.writeLong(e.getIndex());
249                 out.writeLong(e.getTerm());
250                 out.writeObject(e.getData());
251             }
252         }
253
254         @Override
255         public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
256             long term = in.readLong();
257             String leaderId = (String) in.readObject();
258             long prevLogTerm = in.readLong();
259             long prevLogIndex = in.readLong();
260             long leaderCommit = in.readLong();
261             long replicatedToAllIndex = in.readLong();
262             short payloadVersion = in.readShort();
263
264             int size = in.readInt();
265             var entries = ImmutableList.<ReplicatedLogEntry>builderWithExpectedSize(size);
266             for (int i = 0; i < size; i++) {
267                 entries.add(new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject()));
268             }
269
270             appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries.build(), leaderCommit,
271                 replicatedToAllIndex, payloadVersion, RaftVersions.CURRENT_VERSION, RaftVersions.BORON_VERSION, null);
272         }
273
274         private Object readResolve() {
275             return appendEntries;
276         }
277     }
278 }