package org.opendaylight.controller.cluster.raft.messages;
import com.google.protobuf.GeneratedMessage;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import org.opendaylight.controller.cluster.raft.RaftVersions;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
/**
* Invoked by leader to replicate log entries (§5.3); also used as
* heartbeat (§5.2).
*/
public class AppendEntries extends AbstractRaftRPC {
+ private static final long serialVersionUID = 1L;
- public static final Class SERIALIZABLE_CLASS = AppendEntriesMessages.AppendEntries.class;
+ public static final Class<AppendEntriesMessages.AppendEntries> LEGACY_SERIALIZABLE_CLASS =
+ AppendEntriesMessages.AppendEntries.class;
private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(AppendEntries.class);
// log entries to store (empty for heartbeat;
// may send more than one for efficiency)
- private final List<ReplicatedLogEntry> entries;
+ private transient List<ReplicatedLogEntry> entries;
// leader's commitIndex
private final long leaderCommit;
+ // index which has been replicated successfully to all followers, -1 if none
+ private final long replicatedToAllIndex;
+
public AppendEntries(long term, String leaderId, long prevLogIndex,
- long prevLogTerm, List<ReplicatedLogEntry> entries, long leaderCommit) {
+ long prevLogTerm, List<ReplicatedLogEntry> entries, long leaderCommit, long replicatedToAllIndex) {
super(term);
this.leaderId = leaderId;
this.prevLogIndex = prevLogIndex;
this.prevLogTerm = prevLogTerm;
this.entries = entries;
this.leaderCommit = leaderCommit;
+ this.replicatedToAllIndex = replicatedToAllIndex;
+ }
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.writeShort(RaftVersions.CURRENT_VERSION);
+ out.defaultWriteObject();
+
+ out.writeInt(entries.size());
+ for(ReplicatedLogEntry e: entries) {
+ out.writeObject(e);
+ }
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.readShort(); // version
+
+ in.defaultReadObject();
+
+ int size = in.readInt();
+ entries = new ArrayList<>(size);
+ for(int i = 0; i < size; i++) {
+ entries.add((ReplicatedLogEntry) in.readObject());
+ }
}
public String getLeaderId() {
return leaderCommit;
}
- @Override public String toString() {
+ public long getReplicatedToAllIndex() {
+ return replicatedToAllIndex;
+ }
+
+ @Override
+ public String toString() {
final StringBuilder sb =
new StringBuilder("AppendEntries{");
sb.append("term=").append(getTerm());
sb.append(", prevLogTerm=").append(prevLogTerm);
sb.append(", entries=").append(entries);
sb.append(", leaderCommit=").append(leaderCommit);
+ sb.append(", replicatedToAllIndex=").append(replicatedToAllIndex);
sb.append('}');
return sb.toString();
}
- public <T extends Object> Object toSerializable(){
+ public <T extends Object> Object toSerializable() {
+ return toSerializable(RaftVersions.CURRENT_VERSION);
+ }
+
+ public <T extends Object> Object toSerializable(short version) {
+ if(version < RaftVersions.LITHIUM_VERSION) {
+ return toLegacySerializable();
+ } else {
+ return this;
+ }
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private <T> Object toLegacySerializable() {
AppendEntriesMessages.AppendEntries.Builder to = AppendEntriesMessages.AppendEntries.newBuilder();
to.setTerm(this.getTerm())
.setLeaderId(this.getLeaderId())
return to.build();
}
- public static AppendEntries fromSerializable(Object o){
- AppendEntriesMessages.AppendEntries from = (AppendEntriesMessages.AppendEntries) o;
+ public static AppendEntries fromSerializable(Object serialized) {
+ if(serialized instanceof AppendEntries) {
+ return (AppendEntries)serialized;
+ }
+ else {
+ return fromLegacySerializable((AppendEntriesMessages.AppendEntries) serialized);
+ }
+ }
+ private static AppendEntries fromLegacySerializable(AppendEntriesMessages.AppendEntries from) {
List<ReplicatedLogEntry> logEntryList = new ArrayList<>();
for (AppendEntriesMessages.AppendEntries.ReplicatedLogEntry leProtoBuff : from.getLogEntriesList()) {
String clientPayloadClassName = leProtoBuff.getData().getClientPayloadClassName();
payload = (Payload) Class.forName(clientPayloadClassName).newInstance();
payload = payload.decode(leProtoBuff.getData());
- payload.setClientPayloadClassName(clientPayloadClassName);
} else {
LOG.error("Payload is null or payload does not have client payload class name");
}
from.getPrevLogIndex(),
from.getPrevLogTerm(),
logEntryList,
- from.getLeaderCommit());
+ from.getLeaderCommit(), -1);
return to;
}
+
+ public static boolean isSerializedType(Object message) {
+ return message instanceof AppendEntries || LEGACY_SERIALIZABLE_CLASS.isInstance(message);
+ }
}