Merge "BUG-2634 Config binding for netconf server"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / messages / AppendEntries.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.messages;
10
11 import com.google.protobuf.GeneratedMessage;
12 import java.io.IOException;
13 import java.io.ObjectInputStream;
14 import java.io.ObjectOutputStream;
15 import java.util.ArrayList;
16 import java.util.Iterator;
17 import java.util.List;
18 import java.util.Map;
19 import org.opendaylight.controller.cluster.raft.RaftVersions;
20 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
21 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
22 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
23 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
24
25 /**
26  * Invoked by leader to replicate log entries (§5.3); also used as
27  * heartbeat (§5.2).
28  */
29 public class AppendEntries extends AbstractRaftRPC {
30     private static final long serialVersionUID = 1L;
31
32     public static final Class<AppendEntriesMessages.AppendEntries> LEGACY_SERIALIZABLE_CLASS =
33             AppendEntriesMessages.AppendEntries.class;
34
35     private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(AppendEntries.class);
36
37     // So that follower can redirect clients
38     private final String leaderId;
39
40     // Index of log entry immediately preceding new ones
41     private final long prevLogIndex;
42
43     // term of prevLogIndex entry
44     private final long prevLogTerm;
45
46     // log entries to store (empty for heartbeat;
47     // may send more than one for efficiency)
48     private transient List<ReplicatedLogEntry> entries;
49
50     // leader's commitIndex
51     private final long leaderCommit;
52
53     // index which has been replicated successfully to all followers, -1 if none
54     private final long replicatedToAllIndex;
55
56     public AppendEntries(long term, String leaderId, long prevLogIndex,
57         long prevLogTerm, List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex) {
58         super(term);
59         this.leaderId = leaderId;
60         this.prevLogIndex = prevLogIndex;
61         this.prevLogTerm = prevLogTerm;
62         this.entries = entries;
63         this.leaderCommit = leaderCommit;
64         this.replicatedToAllIndex = replicatedToAllIndex;
65     }
66
67     private void writeObject(ObjectOutputStream out) throws IOException {
68         out.writeShort(RaftVersions.CURRENT_VERSION);
69         out.defaultWriteObject();
70
71         out.writeInt(entries.size());
72         for(ReplicatedLogEntry e: entries) {
73             out.writeObject(e);
74         }
75     }
76
77     private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
78         in.readShort(); // version
79
80         in.defaultReadObject();
81
82         int size = in.readInt();
83         entries = new ArrayList<>(size);
84         for(int i = 0; i < size; i++) {
85             entries.add((ReplicatedLogEntry) in.readObject());
86         }
87     }
88
89     public String getLeaderId() {
90         return leaderId;
91     }
92
93     public long getPrevLogIndex() {
94         return prevLogIndex;
95     }
96
97     public long getPrevLogTerm() {
98         return prevLogTerm;
99     }
100
101     public List<ReplicatedLogEntry> getEntries() {
102         return entries;
103     }
104
105     public long getLeaderCommit() {
106         return leaderCommit;
107     }
108
109     public long getReplicatedToAllIndex() {
110         return replicatedToAllIndex;
111     }
112
113
114     @Override
115     public String toString() {
116         StringBuilder builder = new StringBuilder();
117         builder.append("AppendEntries [term=").append(term).append(", leaderId=").append(leaderId)
118                 .append(", prevLogIndex=").append(prevLogIndex).append(", prevLogTerm=").append(prevLogTerm)
119                 .append(", entries=").append(entries).append(", leaderCommit=").append(leaderCommit)
120                 .append(", replicatedToAllIndex=").append(replicatedToAllIndex).append("]");
121         return builder.toString();
122     }
123
124     public <T extends Object> Object toSerializable() {
125         return toSerializable(RaftVersions.CURRENT_VERSION);
126     }
127
128     public <T extends Object> Object toSerializable(short version) {
129         if(version < RaftVersions.LITHIUM_VERSION) {
130             return toLegacySerializable();
131         } else {
132             return this;
133         }
134     }
135
136     @SuppressWarnings({ "rawtypes", "unchecked" })
137     private <T> Object toLegacySerializable() {
138         AppendEntriesMessages.AppendEntries.Builder to = AppendEntriesMessages.AppendEntries.newBuilder();
139         to.setTerm(this.getTerm())
140             .setLeaderId(this.getLeaderId())
141             .setPrevLogTerm(this.getPrevLogTerm())
142             .setPrevLogIndex(this.getPrevLogIndex())
143             .setLeaderCommit(this.getLeaderCommit());
144
145         for (ReplicatedLogEntry logEntry : this.getEntries()) {
146
147             AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Builder arBuilder =
148                 AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.newBuilder();
149
150             AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload.Builder arpBuilder =
151                 AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload.newBuilder();
152
153             //get the client specific payload extensions and add them to the payload builder
154             Map<GeneratedMessage.GeneratedExtension, T> map = logEntry.getData().encode();
155             Iterator<Map.Entry<GeneratedMessage.GeneratedExtension, T>> iter = map.entrySet().iterator();
156
157             while (iter.hasNext()) {
158                 Map.Entry<GeneratedMessage.GeneratedExtension, T> entry = iter.next();
159                 arpBuilder.setExtension(entry.getKey(), entry.getValue());
160             }
161
162             arpBuilder.setClientPayloadClassName(logEntry.getData().getClientPayloadClassName());
163
164             arBuilder.setData(arpBuilder).setIndex(logEntry.getIndex()).setTerm(logEntry.getTerm());
165             to.addLogEntries(arBuilder);
166         }
167
168         return to.build();
169     }
170
171     public static AppendEntries fromSerializable(Object serialized) {
172         if(serialized instanceof AppendEntries) {
173             return (AppendEntries)serialized;
174         }
175         else {
176             return fromLegacySerializable((AppendEntriesMessages.AppendEntries) serialized);
177         }
178     }
179
180     private static AppendEntries fromLegacySerializable(AppendEntriesMessages.AppendEntries from) {
181         List<ReplicatedLogEntry> logEntryList = new ArrayList<>();
182         for (AppendEntriesMessages.AppendEntries.ReplicatedLogEntry leProtoBuff : from.getLogEntriesList()) {
183
184             Payload payload = null ;
185             try {
186                 if(leProtoBuff.getData() != null && leProtoBuff.getData().getClientPayloadClassName() != null) {
187                     String clientPayloadClassName = leProtoBuff.getData().getClientPayloadClassName();
188                     payload = (Payload) Class.forName(clientPayloadClassName).newInstance();
189                     payload = payload.decode(leProtoBuff.getData());
190                 } else {
191                     LOG.error("Payload is null or payload does not have client payload class name");
192                 }
193
194             } catch (InstantiationException e) {
195                 LOG.error("InstantiationException when instantiating "+leProtoBuff.getData().getClientPayloadClassName(), e);
196             } catch (IllegalAccessException e) {
197                 LOG.error("IllegalAccessException when accessing "+leProtoBuff.getData().getClientPayloadClassName(), e);
198             } catch (ClassNotFoundException e) {
199                 LOG.error("ClassNotFoundException when loading "+leProtoBuff.getData().getClientPayloadClassName(), e);
200             }
201             ReplicatedLogEntry logEntry = new ReplicatedLogImplEntry(
202                 leProtoBuff.getIndex(), leProtoBuff.getTerm(), payload);
203             logEntryList.add(logEntry);
204         }
205
206         AppendEntries to = new AppendEntries(from.getTerm(),
207             from.getLeaderId(),
208             from.getPrevLogIndex(),
209             from.getPrevLogTerm(),
210             logEntryList,
211             from.getLeaderCommit(), -1);
212
213         return to;
214     }
215
216     public static boolean isSerializedType(Object message) {
217         return message instanceof AppendEntries || LEGACY_SERIALIZABLE_CLASS.isInstance(message);
218     }
219 }