d8075f4381957a66e1958d36f7b9886c28efb5c5
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / messages / AppendEntries.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.messages;
10
11 import com.google.protobuf.GeneratedMessage;
12 import java.io.IOException;
13 import java.io.ObjectInputStream;
14 import java.io.ObjectOutputStream;
15 import java.util.ArrayList;
16 import java.util.Iterator;
17 import java.util.List;
18 import java.util.Map;
19 import org.opendaylight.controller.cluster.raft.RaftVersions;
20 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
21 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
22 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
23 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
24
25 /**
26  * Invoked by leader to replicate log entries (§5.3); also used as
27  * heartbeat (§5.2).
28  */
29 public class AppendEntries extends AbstractRaftRPC {
30     private static final long serialVersionUID = 1L;
31
32     public static final Class<AppendEntriesMessages.AppendEntries> LEGACY_SERIALIZABLE_CLASS =
33             AppendEntriesMessages.AppendEntries.class;
34
35     private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(AppendEntries.class);
36
37     // So that follower can redirect clients
38     private final String leaderId;
39
40     // Index of log entry immediately preceding new ones
41     private final long prevLogIndex;
42
43     // term of prevLogIndex entry
44     private final long prevLogTerm;
45
46     // log entries to store (empty for heartbeat;
47     // may send more than one for efficiency)
48     private transient List<ReplicatedLogEntry> entries;
49
50     // leader's commitIndex
51     private final long leaderCommit;
52
53     // index which has been replicated successfully to all followers, -1 if none
54     private final long replicatedToAllIndex;
55
56     private final short payloadVersion;
57
58     public AppendEntries(long term, String leaderId, long prevLogIndex, long prevLogTerm,
59             List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex, short payloadVersion) {
60         super(term);
61         this.leaderId = leaderId;
62         this.prevLogIndex = prevLogIndex;
63         this.prevLogTerm = prevLogTerm;
64         this.entries = entries;
65         this.leaderCommit = leaderCommit;
66         this.replicatedToAllIndex = replicatedToAllIndex;
67         this.payloadVersion = payloadVersion;
68     }
69
70     private void writeObject(ObjectOutputStream out) throws IOException {
71         out.writeShort(RaftVersions.CURRENT_VERSION);
72         out.defaultWriteObject();
73
74         out.writeInt(entries.size());
75         for(ReplicatedLogEntry e: entries) {
76             out.writeObject(e);
77         }
78     }
79
80     private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
81         in.readShort(); // raft version
82
83         in.defaultReadObject();
84
85         int size = in.readInt();
86         entries = new ArrayList<>(size);
87         for(int i = 0; i < size; i++) {
88             entries.add((ReplicatedLogEntry) in.readObject());
89         }
90     }
91
92     public String getLeaderId() {
93         return leaderId;
94     }
95
96     public long getPrevLogIndex() {
97         return prevLogIndex;
98     }
99
100     public long getPrevLogTerm() {
101         return prevLogTerm;
102     }
103
104     public List<ReplicatedLogEntry> getEntries() {
105         return entries;
106     }
107
108     public long getLeaderCommit() {
109         return leaderCommit;
110     }
111
112     public long getReplicatedToAllIndex() {
113         return replicatedToAllIndex;
114     }
115
116     public short getPayloadVersion() {
117         return payloadVersion;
118     }
119
120     @Override
121     public String toString() {
122         StringBuilder builder = new StringBuilder();
123         builder.append("AppendEntries [leaderId=").append(leaderId).append(", prevLogIndex=").append(prevLogIndex)
124                 .append(", prevLogTerm=").append(prevLogTerm).append(", leaderCommit=").append(leaderCommit)
125                 .append(", replicatedToAllIndex=").append(replicatedToAllIndex).append(", payloadVersion=")
126                 .append(payloadVersion).append(", entries=").append(entries).append("]");
127         return builder.toString();
128     }
129
130     public <T extends Object> Object toSerializable() {
131         return toSerializable(RaftVersions.CURRENT_VERSION);
132     }
133
134     public <T extends Object> Object toSerializable(short version) {
135         if(version < RaftVersions.LITHIUM_VERSION) {
136             return toLegacySerializable();
137         } else {
138             return this;
139         }
140     }
141
142     @SuppressWarnings({ "rawtypes", "unchecked" })
143     private <T> Object toLegacySerializable() {
144         AppendEntriesMessages.AppendEntries.Builder to = AppendEntriesMessages.AppendEntries.newBuilder();
145         to.setTerm(this.getTerm())
146             .setLeaderId(this.getLeaderId())
147             .setPrevLogTerm(this.getPrevLogTerm())
148             .setPrevLogIndex(this.getPrevLogIndex())
149             .setLeaderCommit(this.getLeaderCommit());
150
151         for (ReplicatedLogEntry logEntry : this.getEntries()) {
152
153             AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Builder arBuilder =
154                 AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.newBuilder();
155
156             AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload.Builder arpBuilder =
157                 AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload.newBuilder();
158
159             //get the client specific payload extensions and add them to the payload builder
160             Map<GeneratedMessage.GeneratedExtension, T> map = logEntry.getData().encode();
161             Iterator<Map.Entry<GeneratedMessage.GeneratedExtension, T>> iter = map.entrySet().iterator();
162
163             while (iter.hasNext()) {
164                 Map.Entry<GeneratedMessage.GeneratedExtension, T> entry = iter.next();
165                 arpBuilder.setExtension(entry.getKey(), entry.getValue());
166             }
167
168             arpBuilder.setClientPayloadClassName(logEntry.getData().getClientPayloadClassName());
169
170             arBuilder.setData(arpBuilder).setIndex(logEntry.getIndex()).setTerm(logEntry.getTerm());
171             to.addLogEntries(arBuilder);
172         }
173
174         return to.build();
175     }
176
177     public static AppendEntries fromSerializable(Object serialized) {
178         if(serialized instanceof AppendEntries) {
179             return (AppendEntries)serialized;
180         }
181         else {
182             return fromLegacySerializable((AppendEntriesMessages.AppendEntries) serialized);
183         }
184     }
185
186     private static AppendEntries fromLegacySerializable(AppendEntriesMessages.AppendEntries from) {
187         List<ReplicatedLogEntry> logEntryList = new ArrayList<>();
188         for (AppendEntriesMessages.AppendEntries.ReplicatedLogEntry leProtoBuff : from.getLogEntriesList()) {
189
190             Payload payload = null ;
191             try {
192                 if(leProtoBuff.getData() != null && leProtoBuff.getData().getClientPayloadClassName() != null) {
193                     String clientPayloadClassName = leProtoBuff.getData().getClientPayloadClassName();
194                     payload = (Payload) Class.forName(clientPayloadClassName).newInstance();
195                     payload = payload.decode(leProtoBuff.getData());
196                 } else {
197                     LOG.error("Payload is null or payload does not have client payload class name");
198                 }
199
200             } catch (InstantiationException e) {
201                 LOG.error("InstantiationException when instantiating "+leProtoBuff.getData().getClientPayloadClassName(), e);
202             } catch (IllegalAccessException e) {
203                 LOG.error("IllegalAccessException when accessing "+leProtoBuff.getData().getClientPayloadClassName(), e);
204             } catch (ClassNotFoundException e) {
205                 LOG.error("ClassNotFoundException when loading "+leProtoBuff.getData().getClientPayloadClassName(), e);
206             }
207             ReplicatedLogEntry logEntry = new ReplicatedLogImplEntry(
208                 leProtoBuff.getIndex(), leProtoBuff.getTerm(), payload);
209             logEntryList.add(logEntry);
210         }
211
212         AppendEntries to = new AppendEntries(from.getTerm(),
213             from.getLeaderId(),
214             from.getPrevLogIndex(),
215             from.getPrevLogTerm(),
216             logEntryList,
217             from.getLeaderCommit(), -1, (short)0);
218
219         return to;
220     }
221
222     public static boolean isSerializedType(Object message) {
223         return message instanceof AppendEntries || LEGACY_SERIALIZABLE_CLASS.isInstance(message);
224     }
225 }