Merge "Fix potential issue with transaction timeouts"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / messages / AppendEntries.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.messages;
10
11 import com.google.protobuf.GeneratedMessage;
12 import java.io.IOException;
13 import java.io.ObjectInputStream;
14 import java.io.ObjectOutputStream;
15 import java.util.ArrayList;
16 import java.util.Iterator;
17 import java.util.List;
18 import java.util.Map;
19 import org.opendaylight.controller.cluster.raft.RaftVersions;
20 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
21 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
22 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
23 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
24
25 /**
26  * Invoked by leader to replicate log entries (§5.3); also used as
27  * heartbeat (§5.2).
28  */
29 public class AppendEntries extends AbstractRaftRPC {
30     private static final long serialVersionUID = 1L;
31
32     public static final Class<AppendEntriesMessages.AppendEntries> LEGACY_SERIALIZABLE_CLASS =
33             AppendEntriesMessages.AppendEntries.class;
34
35     private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(AppendEntries.class);
36
37     // So that follower can redirect clients
38     private final String leaderId;
39
40     // Index of log entry immediately preceding new ones
41     private final long prevLogIndex;
42
43     // term of prevLogIndex entry
44     private final long prevLogTerm;
45
46     // log entries to store (empty for heartbeat;
47     // may send more than one for efficiency)
48     private transient List<ReplicatedLogEntry> entries;
49
50     // leader's commitIndex
51     private final long leaderCommit;
52
53     public AppendEntries(long term, String leaderId, long prevLogIndex,
54         long prevLogTerm, List<ReplicatedLogEntry> entries, long leaderCommit) {
55         super(term);
56         this.leaderId = leaderId;
57         this.prevLogIndex = prevLogIndex;
58         this.prevLogTerm = prevLogTerm;
59         this.entries = entries;
60         this.leaderCommit = leaderCommit;
61     }
62
63     private void writeObject(ObjectOutputStream out) throws IOException {
64         out.writeShort(RaftVersions.CURRENT_VERSION);
65         out.defaultWriteObject();
66
67         out.writeInt(entries.size());
68         for(ReplicatedLogEntry e: entries) {
69             out.writeObject(e);
70         }
71     }
72
73     private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
74         in.readShort(); // version
75
76         in.defaultReadObject();
77
78         int size = in.readInt();
79         entries = new ArrayList<>(size);
80         for(int i = 0; i < size; i++) {
81             entries.add((ReplicatedLogEntry) in.readObject());
82         }
83     }
84
85     public String getLeaderId() {
86         return leaderId;
87     }
88
89     public long getPrevLogIndex() {
90         return prevLogIndex;
91     }
92
93     public long getPrevLogTerm() {
94         return prevLogTerm;
95     }
96
97     public List<ReplicatedLogEntry> getEntries() {
98         return entries;
99     }
100
101     public long getLeaderCommit() {
102         return leaderCommit;
103     }
104
105     @Override
106     public String toString() {
107         final StringBuilder sb =
108             new StringBuilder("AppendEntries{");
109         sb.append("term=").append(getTerm());
110         sb.append("leaderId='").append(leaderId).append('\'');
111         sb.append(", prevLogIndex=").append(prevLogIndex);
112         sb.append(", prevLogTerm=").append(prevLogTerm);
113         sb.append(", entries=").append(entries);
114         sb.append(", leaderCommit=").append(leaderCommit);
115         sb.append('}');
116         return sb.toString();
117     }
118
119     public <T extends Object> Object toSerializable() {
120         return toSerializable(RaftVersions.CURRENT_VERSION);
121     }
122
123     public <T extends Object> Object toSerializable(short version) {
124         if(version < RaftVersions.LITHIUM_VERSION) {
125             return toLegacySerializable();
126         } else {
127             return this;
128         }
129     }
130
131     @SuppressWarnings({ "rawtypes", "unchecked" })
132     private <T> Object toLegacySerializable() {
133         AppendEntriesMessages.AppendEntries.Builder to = AppendEntriesMessages.AppendEntries.newBuilder();
134         to.setTerm(this.getTerm())
135             .setLeaderId(this.getLeaderId())
136             .setPrevLogTerm(this.getPrevLogTerm())
137             .setPrevLogIndex(this.getPrevLogIndex())
138             .setLeaderCommit(this.getLeaderCommit());
139
140         for (ReplicatedLogEntry logEntry : this.getEntries()) {
141
142             AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Builder arBuilder =
143                 AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.newBuilder();
144
145             AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload.Builder arpBuilder =
146                 AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload.newBuilder();
147
148             //get the client specific payload extensions and add them to the payload builder
149             Map<GeneratedMessage.GeneratedExtension, T> map = logEntry.getData().encode();
150             Iterator<Map.Entry<GeneratedMessage.GeneratedExtension, T>> iter = map.entrySet().iterator();
151
152             while (iter.hasNext()) {
153                 Map.Entry<GeneratedMessage.GeneratedExtension, T> entry = iter.next();
154                 arpBuilder.setExtension(entry.getKey(), entry.getValue());
155             }
156
157             arpBuilder.setClientPayloadClassName(logEntry.getData().getClientPayloadClassName());
158
159             arBuilder.setData(arpBuilder).setIndex(logEntry.getIndex()).setTerm(logEntry.getTerm());
160             to.addLogEntries(arBuilder);
161         }
162
163         return to.build();
164     }
165
166     public static AppendEntries fromSerializable(Object serialized) {
167         if(serialized instanceof AppendEntries) {
168             return (AppendEntries)serialized;
169         }
170         else {
171             return fromLegacySerializable((AppendEntriesMessages.AppendEntries) serialized);
172         }
173     }
174
175     private static AppendEntries fromLegacySerializable(AppendEntriesMessages.AppendEntries from) {
176         List<ReplicatedLogEntry> logEntryList = new ArrayList<>();
177         for (AppendEntriesMessages.AppendEntries.ReplicatedLogEntry leProtoBuff : from.getLogEntriesList()) {
178
179             Payload payload = null ;
180             try {
181                 if(leProtoBuff.getData() != null && leProtoBuff.getData().getClientPayloadClassName() != null) {
182                     String clientPayloadClassName = leProtoBuff.getData().getClientPayloadClassName();
183                     payload = (Payload) Class.forName(clientPayloadClassName).newInstance();
184                     payload = payload.decode(leProtoBuff.getData());
185                 } else {
186                     LOG.error("Payload is null or payload does not have client payload class name");
187                 }
188
189             } catch (InstantiationException e) {
190                 LOG.error("InstantiationException when instantiating "+leProtoBuff.getData().getClientPayloadClassName(), e);
191             } catch (IllegalAccessException e) {
192                 LOG.error("IllegalAccessException when accessing "+leProtoBuff.getData().getClientPayloadClassName(), e);
193             } catch (ClassNotFoundException e) {
194                 LOG.error("ClassNotFoundException when loading "+leProtoBuff.getData().getClientPayloadClassName(), e);
195             }
196             ReplicatedLogEntry logEntry = new ReplicatedLogImplEntry(
197                 leProtoBuff.getIndex(), leProtoBuff.getTerm(), payload);
198             logEntryList.add(logEntry);
199         }
200
201         AppendEntries to = new AppendEntries(from.getTerm(),
202             from.getLeaderId(),
203             from.getPrevLogIndex(),
204             from.getPrevLogTerm(),
205             logEntryList,
206             from.getLeaderCommit());
207
208         return to;
209     }
210
211     public static boolean isSerializedType(Object message) {
212         return message instanceof AppendEntries || LEGACY_SERIALIZABLE_CLASS.isInstance(message);
213     }
214 }