Bug-2590: Clustering : Minimize usage of in-memory journal
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / messages / AppendEntries.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.messages;
10
11 import com.google.protobuf.GeneratedMessage;
12 import java.io.IOException;
13 import java.io.ObjectInputStream;
14 import java.io.ObjectOutputStream;
15 import java.util.ArrayList;
16 import java.util.Iterator;
17 import java.util.List;
18 import java.util.Map;
19 import org.opendaylight.controller.cluster.raft.RaftVersions;
20 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
21 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
22 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
23 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
24
25 /**
26  * Invoked by leader to replicate log entries (§5.3); also used as
27  * heartbeat (§5.2).
28  */
29 public class AppendEntries extends AbstractRaftRPC {
30     private static final long serialVersionUID = 1L;
31
32     public static final Class<AppendEntriesMessages.AppendEntries> LEGACY_SERIALIZABLE_CLASS =
33             AppendEntriesMessages.AppendEntries.class;
34
35     private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(AppendEntries.class);
36
37     // So that follower can redirect clients
38     private final String leaderId;
39
40     // Index of log entry immediately preceding new ones
41     private final long prevLogIndex;
42
43     // term of prevLogIndex entry
44     private final long prevLogTerm;
45
46     // log entries to store (empty for heartbeat;
47     // may send more than one for efficiency)
48     private transient List<ReplicatedLogEntry> entries;
49
50     // leader's commitIndex
51     private final long leaderCommit;
52
53     // index which has been replicated successfully to all followers, -1 if none
54     private final long replicatedToAllIndex;
55
56     public AppendEntries(long term, String leaderId, long prevLogIndex,
57         long prevLogTerm, List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex) {
58         super(term);
59         this.leaderId = leaderId;
60         this.prevLogIndex = prevLogIndex;
61         this.prevLogTerm = prevLogTerm;
62         this.entries = entries;
63         this.leaderCommit = leaderCommit;
64         this.replicatedToAllIndex = replicatedToAllIndex;
65     }
66
67     private void writeObject(ObjectOutputStream out) throws IOException {
68         out.writeShort(RaftVersions.CURRENT_VERSION);
69         out.defaultWriteObject();
70
71         out.writeInt(entries.size());
72         for(ReplicatedLogEntry e: entries) {
73             out.writeObject(e);
74         }
75     }
76
77     private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
78         in.readShort(); // version
79
80         in.defaultReadObject();
81
82         int size = in.readInt();
83         entries = new ArrayList<>(size);
84         for(int i = 0; i < size; i++) {
85             entries.add((ReplicatedLogEntry) in.readObject());
86         }
87     }
88
89     public String getLeaderId() {
90         return leaderId;
91     }
92
93     public long getPrevLogIndex() {
94         return prevLogIndex;
95     }
96
97     public long getPrevLogTerm() {
98         return prevLogTerm;
99     }
100
101     public List<ReplicatedLogEntry> getEntries() {
102         return entries;
103     }
104
105     public long getLeaderCommit() {
106         return leaderCommit;
107     }
108
109     public long getReplicatedToAllIndex() {
110         return replicatedToAllIndex;
111     }
112
113     @Override
114     public String toString() {
115         final StringBuilder sb =
116             new StringBuilder("AppendEntries{");
117         sb.append("term=").append(getTerm());
118         sb.append("leaderId='").append(leaderId).append('\'');
119         sb.append(", prevLogIndex=").append(prevLogIndex);
120         sb.append(", prevLogTerm=").append(prevLogTerm);
121         sb.append(", entries=").append(entries);
122         sb.append(", leaderCommit=").append(leaderCommit);
123         sb.append(", replicatedToAllIndex=").append(replicatedToAllIndex);
124         sb.append('}');
125         return sb.toString();
126     }
127
128     public <T extends Object> Object toSerializable() {
129         return toSerializable(RaftVersions.CURRENT_VERSION);
130     }
131
132     public <T extends Object> Object toSerializable(short version) {
133         if(version < RaftVersions.LITHIUM_VERSION) {
134             return toLegacySerializable();
135         } else {
136             return this;
137         }
138     }
139
140     @SuppressWarnings({ "rawtypes", "unchecked" })
141     private <T> Object toLegacySerializable() {
142         AppendEntriesMessages.AppendEntries.Builder to = AppendEntriesMessages.AppendEntries.newBuilder();
143         to.setTerm(this.getTerm())
144             .setLeaderId(this.getLeaderId())
145             .setPrevLogTerm(this.getPrevLogTerm())
146             .setPrevLogIndex(this.getPrevLogIndex())
147             .setLeaderCommit(this.getLeaderCommit());
148
149         for (ReplicatedLogEntry logEntry : this.getEntries()) {
150
151             AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Builder arBuilder =
152                 AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.newBuilder();
153
154             AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload.Builder arpBuilder =
155                 AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload.newBuilder();
156
157             //get the client specific payload extensions and add them to the payload builder
158             Map<GeneratedMessage.GeneratedExtension, T> map = logEntry.getData().encode();
159             Iterator<Map.Entry<GeneratedMessage.GeneratedExtension, T>> iter = map.entrySet().iterator();
160
161             while (iter.hasNext()) {
162                 Map.Entry<GeneratedMessage.GeneratedExtension, T> entry = iter.next();
163                 arpBuilder.setExtension(entry.getKey(), entry.getValue());
164             }
165
166             arpBuilder.setClientPayloadClassName(logEntry.getData().getClientPayloadClassName());
167
168             arBuilder.setData(arpBuilder).setIndex(logEntry.getIndex()).setTerm(logEntry.getTerm());
169             to.addLogEntries(arBuilder);
170         }
171
172         return to.build();
173     }
174
175     public static AppendEntries fromSerializable(Object serialized) {
176         if(serialized instanceof AppendEntries) {
177             return (AppendEntries)serialized;
178         }
179         else {
180             return fromLegacySerializable((AppendEntriesMessages.AppendEntries) serialized);
181         }
182     }
183
184     private static AppendEntries fromLegacySerializable(AppendEntriesMessages.AppendEntries from) {
185         List<ReplicatedLogEntry> logEntryList = new ArrayList<>();
186         for (AppendEntriesMessages.AppendEntries.ReplicatedLogEntry leProtoBuff : from.getLogEntriesList()) {
187
188             Payload payload = null ;
189             try {
190                 if(leProtoBuff.getData() != null && leProtoBuff.getData().getClientPayloadClassName() != null) {
191                     String clientPayloadClassName = leProtoBuff.getData().getClientPayloadClassName();
192                     payload = (Payload) Class.forName(clientPayloadClassName).newInstance();
193                     payload = payload.decode(leProtoBuff.getData());
194                 } else {
195                     LOG.error("Payload is null or payload does not have client payload class name");
196                 }
197
198             } catch (InstantiationException e) {
199                 LOG.error("InstantiationException when instantiating "+leProtoBuff.getData().getClientPayloadClassName(), e);
200             } catch (IllegalAccessException e) {
201                 LOG.error("IllegalAccessException when accessing "+leProtoBuff.getData().getClientPayloadClassName(), e);
202             } catch (ClassNotFoundException e) {
203                 LOG.error("ClassNotFoundException when loading "+leProtoBuff.getData().getClientPayloadClassName(), e);
204             }
205             ReplicatedLogEntry logEntry = new ReplicatedLogImplEntry(
206                 leProtoBuff.getIndex(), leProtoBuff.getTerm(), payload);
207             logEntryList.add(logEntry);
208         }
209
210         AppendEntries to = new AppendEntries(from.getTerm(),
211             from.getLeaderId(),
212             from.getPrevLogIndex(),
213             from.getPrevLogTerm(),
214             logEntryList,
215             from.getLeaderCommit(), -1);
216
217         return to;
218     }
219
220     public static boolean isSerializedType(Object message) {
221         return message instanceof AppendEntries || LEGACY_SERIALIZABLE_CLASS.isInstance(message);
222     }
223 }