d77a0841489347ea69177e80764b7fa6a45dd049
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / messages / AppendEntries.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.messages;
10
11 import static java.util.Objects.requireNonNull;
12
13 import com.google.common.annotations.VisibleForTesting;
14 import java.io.Externalizable;
15 import java.io.IOException;
16 import java.io.ObjectInput;
17 import java.io.ObjectOutput;
18 import java.util.ArrayList;
19 import java.util.List;
20 import java.util.Optional;
21 import javax.annotation.Nonnull;
22 import javax.annotation.Nullable;
23 import org.opendaylight.controller.cluster.raft.RaftVersions;
24 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
25 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
26 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
27
28 /**
29  * Invoked by leader to replicate log entries (§5.3); also used as
30  * heartbeat (§5.2).
31  */
32 public class AppendEntries extends AbstractRaftRPC {
33     private static final long serialVersionUID = 1L;
34
35     // So that follower can redirect clients
36     private final String leaderId;
37
38     // Index of log entry immediately preceding new ones
39     private final long prevLogIndex;
40
41     // term of prevLogIndex entry
42     private final long prevLogTerm;
43
44     // log entries to store (empty for heart beat - may send more than one for efficiency)
45     private final List<ReplicatedLogEntry> entries;
46
47     // leader's commitIndex
48     private final long leaderCommit;
49
50     // index which has been replicated successfully to all followers, -1 if none
51     private final long replicatedToAllIndex;
52
53     private final short payloadVersion;
54
55     private final short recipientRaftVersion;
56
57     private final short leaderRaftVersion;
58
59     private final String leaderAddress;
60
61     private AppendEntries(long term, @Nonnull String leaderId, long prevLogIndex, long prevLogTerm,
62             @Nonnull List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex,
63             short payloadVersion, short recipientRaftVersion, short leaderRaftVersion, @Nullable String leaderAddress) {
64         super(term);
65         this.leaderId = requireNonNull(leaderId);
66         this.prevLogIndex = prevLogIndex;
67         this.prevLogTerm = prevLogTerm;
68         this.entries = requireNonNull(entries);
69         this.leaderCommit = leaderCommit;
70         this.replicatedToAllIndex = replicatedToAllIndex;
71         this.payloadVersion = payloadVersion;
72         this.recipientRaftVersion = recipientRaftVersion;
73         this.leaderRaftVersion = leaderRaftVersion;
74         this.leaderAddress = leaderAddress;
75     }
76
77     public AppendEntries(long term, @Nonnull String leaderId, long prevLogIndex, long prevLogTerm,
78             @Nonnull List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex,
79             short payloadVersion, short recipientRaftVersion, @Nullable String leaderAddress) {
80         this(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, replicatedToAllIndex, payloadVersion,
81                 recipientRaftVersion, RaftVersions.CURRENT_VERSION, leaderAddress);
82     }
83
84     @VisibleForTesting
85     public AppendEntries(long term, @Nonnull String leaderId, long prevLogIndex, long prevLogTerm,
86             @Nonnull List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex,
87             short payloadVersion) {
88         this(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, replicatedToAllIndex, payloadVersion,
89                 RaftVersions.CURRENT_VERSION, null);
90     }
91
92     @Nonnull
93     public String getLeaderId() {
94         return leaderId;
95     }
96
97     public long getPrevLogIndex() {
98         return prevLogIndex;
99     }
100
101     public long getPrevLogTerm() {
102         return prevLogTerm;
103     }
104
105     @Nonnull
106     public List<ReplicatedLogEntry> getEntries() {
107         return entries;
108     }
109
110     public long getLeaderCommit() {
111         return leaderCommit;
112     }
113
114     public long getReplicatedToAllIndex() {
115         return replicatedToAllIndex;
116     }
117
118     public short getPayloadVersion() {
119         return payloadVersion;
120     }
121
122     public Optional<String> getLeaderAddress() {
123         return Optional.ofNullable(leaderAddress);
124     }
125
126     public short getLeaderRaftVersion() {
127         return leaderRaftVersion;
128     }
129
130     @Override
131     public String toString() {
132         return "AppendEntries [leaderId=" + leaderId
133                 + ", prevLogIndex=" + prevLogIndex
134                 + ", prevLogTerm=" + prevLogTerm
135                 + ", leaderCommit=" + leaderCommit
136                 + ", replicatedToAllIndex=" + replicatedToAllIndex
137                 + ", payloadVersion=" + payloadVersion
138                 + ", recipientRaftVersion=" + recipientRaftVersion
139                 + ", leaderRaftVersion=" + leaderRaftVersion
140                 + ", leaderAddress=" + leaderAddress
141                 + ", entries=" + entries + "]";
142     }
143
144     private Object writeReplace() {
145         return recipientRaftVersion >= RaftVersions.FLUORINE_VERSION ? new ProxyV2(this) : new Proxy(this);
146     }
147
148     /**
149      * Fluorine version that adds the leader address.
150      */
151     private static class ProxyV2 implements Externalizable {
152         private static final long serialVersionUID = 1L;
153
154         private AppendEntries appendEntries;
155
156         // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
157         // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
158         @SuppressWarnings("checkstyle:RedundantModifier")
159         public ProxyV2() {
160         }
161
162         ProxyV2(AppendEntries appendEntries) {
163             this.appendEntries = appendEntries;
164         }
165
166         @Override
167         public void writeExternal(ObjectOutput out) throws IOException {
168             out.writeShort(appendEntries.leaderRaftVersion);
169             out.writeLong(appendEntries.getTerm());
170             out.writeObject(appendEntries.leaderId);
171             out.writeLong(appendEntries.prevLogTerm);
172             out.writeLong(appendEntries.prevLogIndex);
173             out.writeLong(appendEntries.leaderCommit);
174             out.writeLong(appendEntries.replicatedToAllIndex);
175             out.writeShort(appendEntries.payloadVersion);
176
177             out.writeInt(appendEntries.entries.size());
178             for (ReplicatedLogEntry e: appendEntries.entries) {
179                 out.writeLong(e.getIndex());
180                 out.writeLong(e.getTerm());
181                 out.writeObject(e.getData());
182             }
183
184             out.writeObject(appendEntries.leaderAddress);
185         }
186
187         @Override
188         public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
189             short leaderRaftVersion = in.readShort();
190             long term = in.readLong();
191             String leaderId = (String) in.readObject();
192             long prevLogTerm = in.readLong();
193             long prevLogIndex = in.readLong();
194             long leaderCommit = in.readLong();
195             long replicatedToAllIndex = in.readLong();
196             short payloadVersion = in.readShort();
197
198             int size = in.readInt();
199             List<ReplicatedLogEntry> entries = new ArrayList<>(size);
200             for (int i = 0; i < size; i++) {
201                 entries.add(new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject()));
202             }
203
204             String leaderAddress = (String)in.readObject();
205
206             appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit,
207                     replicatedToAllIndex, payloadVersion, RaftVersions.CURRENT_VERSION, leaderRaftVersion,
208                     leaderAddress);
209         }
210
211         private Object readResolve() {
212             return appendEntries;
213         }
214     }
215
216     /**
217      * Pre-Fluorine version.
218      */
219     @Deprecated
220     private static class Proxy implements Externalizable {
221         private static final long serialVersionUID = 1L;
222
223         private AppendEntries appendEntries;
224
225         // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
226         // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
227         @SuppressWarnings("checkstyle:RedundantModifier")
228         public Proxy() {
229         }
230
231         Proxy(AppendEntries appendEntries) {
232             this.appendEntries = appendEntries;
233         }
234
235         @Override
236         public void writeExternal(ObjectOutput out) throws IOException {
237             out.writeLong(appendEntries.getTerm());
238             out.writeObject(appendEntries.leaderId);
239             out.writeLong(appendEntries.prevLogTerm);
240             out.writeLong(appendEntries.prevLogIndex);
241             out.writeLong(appendEntries.leaderCommit);
242             out.writeLong(appendEntries.replicatedToAllIndex);
243             out.writeShort(appendEntries.payloadVersion);
244
245             out.writeInt(appendEntries.entries.size());
246             for (ReplicatedLogEntry e: appendEntries.entries) {
247                 out.writeLong(e.getIndex());
248                 out.writeLong(e.getTerm());
249                 out.writeObject(e.getData());
250             }
251         }
252
253         @Override
254         public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
255             long term = in.readLong();
256             String leaderId = (String) in.readObject();
257             long prevLogTerm = in.readLong();
258             long prevLogIndex = in.readLong();
259             long leaderCommit = in.readLong();
260             long replicatedToAllIndex = in.readLong();
261             short payloadVersion = in.readShort();
262
263             int size = in.readInt();
264             List<ReplicatedLogEntry> entries = new ArrayList<>(size);
265             for (int i = 0; i < size; i++) {
266                 entries.add(new SimpleReplicatedLogEntry(in.readLong(), in.readLong(), (Payload) in.readObject()));
267             }
268
269             appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit,
270                 replicatedToAllIndex, payloadVersion, RaftVersions.CURRENT_VERSION, RaftVersions.BORON_VERSION, null);
271         }
272
273         private Object readResolve() {
274             return appendEntries;
275         }
276     }
277 }