2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.messages;
11 import com.google.protobuf.GeneratedMessage;
12 import java.io.IOException;
13 import java.io.ObjectInputStream;
14 import java.io.ObjectOutputStream;
15 import java.util.ArrayList;
16 import java.util.Iterator;
17 import java.util.List;
19 import org.opendaylight.controller.cluster.raft.RaftVersions;
20 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
21 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
22 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
23 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
26 * Invoked by leader to replicate log entries (§5.3); also used as
29 public class AppendEntries extends AbstractRaftRPC {
30 private static final long serialVersionUID = 1L;
32 public static final Class<AppendEntriesMessages.AppendEntries> LEGACY_SERIALIZABLE_CLASS =
33 AppendEntriesMessages.AppendEntries.class;
35 private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(AppendEntries.class);
37 // So that follower can redirect clients
38 private final String leaderId;
40 // Index of log entry immediately preceding new ones
41 private final long prevLogIndex;
43 // term of prevLogIndex entry
44 private final long prevLogTerm;
46 // log entries to store (empty for heartbeat;
47 // may send more than one for efficiency)
48 private transient List<ReplicatedLogEntry> entries;
50 // leader's commitIndex
51 private final long leaderCommit;
53 // index which has been replicated successfully to all followers, -1 if none
54 private final long replicatedToAllIndex;
56 private final short payloadVersion;
58 public AppendEntries(long term, String leaderId, long prevLogIndex, long prevLogTerm,
59 List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex, short payloadVersion) {
61 this.leaderId = leaderId;
62 this.prevLogIndex = prevLogIndex;
63 this.prevLogTerm = prevLogTerm;
64 this.entries = entries;
65 this.leaderCommit = leaderCommit;
66 this.replicatedToAllIndex = replicatedToAllIndex;
67 this.payloadVersion = payloadVersion;
70 private void writeObject(ObjectOutputStream out) throws IOException {
71 out.writeShort(RaftVersions.CURRENT_VERSION);
72 out.defaultWriteObject();
74 out.writeInt(entries.size());
75 for(ReplicatedLogEntry e: entries) {
80 private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
81 in.readShort(); // raft version
83 in.defaultReadObject();
85 int size = in.readInt();
86 entries = new ArrayList<>(size);
87 for(int i = 0; i < size; i++) {
88 entries.add((ReplicatedLogEntry) in.readObject());
92 public String getLeaderId() {
96 public long getPrevLogIndex() {
100 public long getPrevLogTerm() {
104 public List<ReplicatedLogEntry> getEntries() {
108 public long getLeaderCommit() {
112 public long getReplicatedToAllIndex() {
113 return replicatedToAllIndex;
116 public short getPayloadVersion() {
117 return payloadVersion;
121 public String toString() {
122 StringBuilder builder = new StringBuilder();
123 builder.append("AppendEntries [leaderId=").append(leaderId).append(", prevLogIndex=").append(prevLogIndex)
124 .append(", prevLogTerm=").append(prevLogTerm).append(", leaderCommit=").append(leaderCommit)
125 .append(", replicatedToAllIndex=").append(replicatedToAllIndex).append(", payloadVersion=")
126 .append(payloadVersion).append(", entries=").append(entries).append("]");
127 return builder.toString();
130 public <T extends Object> Object toSerializable() {
131 return toSerializable(RaftVersions.CURRENT_VERSION);
134 public <T extends Object> Object toSerializable(short version) {
135 if(version < RaftVersions.LITHIUM_VERSION) {
136 return toLegacySerializable();
142 @SuppressWarnings({ "rawtypes", "unchecked" })
143 private <T> Object toLegacySerializable() {
144 AppendEntriesMessages.AppendEntries.Builder to = AppendEntriesMessages.AppendEntries.newBuilder();
145 to.setTerm(this.getTerm())
146 .setLeaderId(this.getLeaderId())
147 .setPrevLogTerm(this.getPrevLogTerm())
148 .setPrevLogIndex(this.getPrevLogIndex())
149 .setLeaderCommit(this.getLeaderCommit());
151 for (ReplicatedLogEntry logEntry : this.getEntries()) {
153 AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Builder arBuilder =
154 AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.newBuilder();
156 AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload.Builder arpBuilder =
157 AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload.newBuilder();
159 //get the client specific payload extensions and add them to the payload builder
160 Map<GeneratedMessage.GeneratedExtension, T> map = logEntry.getData().encode();
161 Iterator<Map.Entry<GeneratedMessage.GeneratedExtension, T>> iter = map.entrySet().iterator();
163 while (iter.hasNext()) {
164 Map.Entry<GeneratedMessage.GeneratedExtension, T> entry = iter.next();
165 arpBuilder.setExtension(entry.getKey(), entry.getValue());
168 arpBuilder.setClientPayloadClassName(logEntry.getData().getClientPayloadClassName());
170 arBuilder.setData(arpBuilder).setIndex(logEntry.getIndex()).setTerm(logEntry.getTerm());
171 to.addLogEntries(arBuilder);
177 public static AppendEntries fromSerializable(Object serialized) {
178 if(serialized instanceof AppendEntries) {
179 return (AppendEntries)serialized;
182 return fromLegacySerializable((AppendEntriesMessages.AppendEntries) serialized);
186 private static AppendEntries fromLegacySerializable(AppendEntriesMessages.AppendEntries from) {
187 List<ReplicatedLogEntry> logEntryList = new ArrayList<>();
188 for (AppendEntriesMessages.AppendEntries.ReplicatedLogEntry leProtoBuff : from.getLogEntriesList()) {
190 Payload payload = null ;
192 if(leProtoBuff.getData() != null && leProtoBuff.getData().getClientPayloadClassName() != null) {
193 String clientPayloadClassName = leProtoBuff.getData().getClientPayloadClassName();
194 payload = (Payload) Class.forName(clientPayloadClassName).newInstance();
195 payload = payload.decode(leProtoBuff.getData());
197 LOG.error("Payload is null or payload does not have client payload class name");
200 } catch (InstantiationException e) {
201 LOG.error("InstantiationException when instantiating "+leProtoBuff.getData().getClientPayloadClassName(), e);
202 } catch (IllegalAccessException e) {
203 LOG.error("IllegalAccessException when accessing "+leProtoBuff.getData().getClientPayloadClassName(), e);
204 } catch (ClassNotFoundException e) {
205 LOG.error("ClassNotFoundException when loading "+leProtoBuff.getData().getClientPayloadClassName(), e);
207 ReplicatedLogEntry logEntry = new ReplicatedLogImplEntry(
208 leProtoBuff.getIndex(), leProtoBuff.getTerm(), payload);
209 logEntryList.add(logEntry);
212 AppendEntries to = new AppendEntries(from.getTerm(),
214 from.getPrevLogIndex(),
215 from.getPrevLogTerm(),
217 from.getLeaderCommit(), -1, (short)0);
222 public static boolean isSerializedType(Object message) {
223 return message instanceof AppendEntries || LEGACY_SERIALIZABLE_CLASS.isInstance(message);