Merge "Make Raft messages serializable"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / messages / AppendEntries.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.messages;
10
11 import com.google.protobuf.GeneratedMessage;
12 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
13 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
14 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
15 import org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages;
16
17 import java.io.Serializable;
18 import java.util.ArrayList;
19 import java.util.Iterator;
20 import java.util.List;
21 import java.util.Map;
22
23 /**
24  * Invoked by leader to replicate log entries (§5.3); also used as
25  * heartbeat (§5.2).
26  */
27 public class AppendEntries extends AbstractRaftRPC implements Serializable {
28
29     public static final Class SERIALIZABLE_CLASS = AppendEntriesMessages.AppendEntries.class;
30
31     private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(AppendEntries.class);
32
33     // So that follower can redirect clients
34     private final String leaderId;
35
36     // Index of log entry immediately preceding new ones
37     private final long prevLogIndex;
38
39     // term of prevLogIndex entry
40     private final long prevLogTerm;
41
42     // log entries to store (empty for heartbeat;
43     // may send more than one for efficiency)
44     private final List<ReplicatedLogEntry> entries;
45
46     // leader's commitIndex
47     private final long leaderCommit;
48
49     public AppendEntries(long term, String leaderId, long prevLogIndex,
50         long prevLogTerm, List<ReplicatedLogEntry> entries, long leaderCommit) {
51         super(term);
52         this.leaderId = leaderId;
53         this.prevLogIndex = prevLogIndex;
54         this.prevLogTerm = prevLogTerm;
55         this.entries = entries;
56         this.leaderCommit = leaderCommit;
57     }
58
59     public String getLeaderId() {
60         return leaderId;
61     }
62
63     public long getPrevLogIndex() {
64         return prevLogIndex;
65     }
66
67     public long getPrevLogTerm() {
68         return prevLogTerm;
69     }
70
71     public List<ReplicatedLogEntry> getEntries() {
72         return entries;
73     }
74
75     public long getLeaderCommit() {
76         return leaderCommit;
77     }
78
79     @Override public String toString() {
80         return "AppendEntries{" +
81             "leaderId='" + leaderId + '\'' +
82             ", prevLogIndex=" + prevLogIndex +
83             ", prevLogTerm=" + prevLogTerm +
84             ", entries=" + entries +
85             ", leaderCommit=" + leaderCommit +
86             '}';
87     }
88
89     public <T extends Object> Object toSerializable(){
90         AppendEntriesMessages.AppendEntries.Builder to = AppendEntriesMessages.AppendEntries.newBuilder();
91         to.setTerm(this.getTerm())
92             .setLeaderId(this.getLeaderId())
93             .setPrevLogTerm(this.getPrevLogTerm())
94             .setPrevLogIndex(this.getPrevLogIndex())
95             .setLeaderCommit(this.getLeaderCommit());
96
97         for (ReplicatedLogEntry logEntry : this.getEntries()) {
98
99             AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Builder arBuilder =
100                 AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.newBuilder();
101
102             AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload.Builder arpBuilder =
103                 AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload.newBuilder();
104
105             //get the client specific payload extensions and add them to the payload builder
106             Map<GeneratedMessage.GeneratedExtension, T> map = logEntry.getData().encode();
107             Iterator<Map.Entry<GeneratedMessage.GeneratedExtension, T>> iter = map.entrySet().iterator();
108
109             while (iter.hasNext()) {
110                 Map.Entry<GeneratedMessage.GeneratedExtension, T> entry = iter.next();
111                 arpBuilder.setExtension(entry.getKey(), entry.getValue());
112             }
113
114             arpBuilder.setClientPayloadClassName(logEntry.getData().getClientPayloadClassName());
115
116             arBuilder.setData(arpBuilder).setIndex(logEntry.getIndex()).setTerm(logEntry.getTerm());
117             to.addLogEntries(arBuilder);
118         }
119
120         return to.build();
121     }
122
123     public static AppendEntries fromSerializable(Object o){
124         AppendEntriesMessages.AppendEntries from = (AppendEntriesMessages.AppendEntries) o;
125
126         List<ReplicatedLogEntry> logEntryList = new ArrayList<>();
127         for (AppendEntriesMessages.AppendEntries.ReplicatedLogEntry leProtoBuff : from.getLogEntriesList()) {
128
129             Payload payload = null ;
130             try {
131                 if(leProtoBuff.getData() != null && leProtoBuff.getData().getClientPayloadClassName() != null) {
132                     String clientPayloadClassName = leProtoBuff.getData().getClientPayloadClassName();
133                     payload = (Payload)Class.forName(clientPayloadClassName).newInstance();
134                     payload = payload.decode(leProtoBuff.getData());
135                     payload.setClientPayloadClassName(clientPayloadClassName);
136                 } else {
137                     LOG.error("Payload is null or payload does not have client payload class name");
138                 }
139
140             } catch (InstantiationException e) {
141                 LOG.error("InstantiationException when instantiating "+leProtoBuff.getData().getClientPayloadClassName(), e);
142             } catch (IllegalAccessException e) {
143                 LOG.error("IllegalAccessException when accessing "+leProtoBuff.getData().getClientPayloadClassName(), e);
144             } catch (ClassNotFoundException e) {
145                 LOG.error("ClassNotFoundException when loading "+leProtoBuff.getData().getClientPayloadClassName(), e);
146             }
147             ReplicatedLogEntry logEntry = new ReplicatedLogImplEntry(
148                 leProtoBuff.getIndex(), leProtoBuff.getTerm(), payload);
149             logEntryList.add(logEntry);
150         }
151
152         AppendEntries to = new AppendEntries(from.getTerm(),
153             from.getLeaderId(),
154             from.getPrevLogIndex(),
155             from.getPrevLogTerm(),
156             logEntryList,
157             from.getLeaderCommit());
158
159         return to;
160     }
161 }