X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fmessages%2FAppendEntries.java;h=81981062177510641c08edb243016075f388ae0f;hb=cf1c3a92ee533791afd0883504978de116dcdd0c;hp=14eb8a4b77ce98bff75d01afe2103f417897d4cb;hpb=b3bf00226d83387a060d97a4f573377f07e93b5a;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java index 14eb8a4b77..8198106217 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java @@ -9,23 +9,28 @@ package org.opendaylight.controller.cluster.raft.messages; import com.google.protobuf.GeneratedMessage; -import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; -import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; -import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages; - +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; +import org.opendaylight.controller.cluster.raft.RaftVersions; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages; /** * Invoked by leader to replicate log entries (§5.3); also used as * heartbeat (§5.2). */ public class AppendEntries extends AbstractRaftRPC { + private static final long serialVersionUID = 1L; - public static final Class SERIALIZABLE_CLASS = AppendEntriesMessages.AppendEntries.class; + public static final Class LEGACY_SERIALIZABLE_CLASS = + AppendEntriesMessages.AppendEntries.class; private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(AppendEntries.class); @@ -40,7 +45,7 @@ public class AppendEntries extends AbstractRaftRPC { // log entries to store (empty for heartbeat; // may send more than one for efficiency) - private final List entries; + private transient List entries; // leader's commitIndex private final long leaderCommit; @@ -55,6 +60,28 @@ public class AppendEntries extends AbstractRaftRPC { this.leaderCommit = leaderCommit; } + private void writeObject(ObjectOutputStream out) throws IOException { + out.writeShort(RaftVersions.CURRENT_VERSION); + out.defaultWriteObject(); + + out.writeInt(entries.size()); + for(ReplicatedLogEntry e: entries) { + out.writeObject(e); + } + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.readShort(); // version + + in.defaultReadObject(); + + int size = in.readInt(); + entries = new ArrayList<>(size); + for(int i = 0; i < size; i++) { + entries.add((ReplicatedLogEntry) in.readObject()); + } + } + public String getLeaderId() { return leaderId; } @@ -75,7 +102,8 @@ public class AppendEntries extends AbstractRaftRPC { return leaderCommit; } - @Override public String toString() { + @Override + public String toString() { final StringBuilder sb = new StringBuilder("AppendEntries{"); sb.append("term=").append(getTerm()); @@ -88,7 +116,20 @@ public class AppendEntries extends AbstractRaftRPC { return sb.toString(); } - public Object toSerializable(){ + public Object toSerializable() { + return toSerializable(RaftVersions.CURRENT_VERSION); + } + + public Object toSerializable(short version) { + if(version < RaftVersions.LITHIUM_VERSION) { + return toLegacySerializable(); + } else { + return this; + } + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + private Object toLegacySerializable() { AppendEntriesMessages.AppendEntries.Builder to = AppendEntriesMessages.AppendEntries.newBuilder(); to.setTerm(this.getTerm()) .setLeaderId(this.getLeaderId()) @@ -122,9 +163,16 @@ public class AppendEntries extends AbstractRaftRPC { return to.build(); } - public static AppendEntries fromSerializable(Object o){ - AppendEntriesMessages.AppendEntries from = (AppendEntriesMessages.AppendEntries) o; + public static AppendEntries fromSerializable(Object serialized) { + if(serialized instanceof AppendEntries) { + return (AppendEntries)serialized; + } + else { + return fromLegacySerializable((AppendEntriesMessages.AppendEntries) serialized); + } + } + private static AppendEntries fromLegacySerializable(AppendEntriesMessages.AppendEntries from) { List logEntryList = new ArrayList<>(); for (AppendEntriesMessages.AppendEntries.ReplicatedLogEntry leProtoBuff : from.getLogEntriesList()) { @@ -134,7 +182,6 @@ public class AppendEntries extends AbstractRaftRPC { String clientPayloadClassName = leProtoBuff.getData().getClientPayloadClassName(); payload = (Payload) Class.forName(clientPayloadClassName).newInstance(); payload = payload.decode(leProtoBuff.getData()); - payload.setClientPayloadClassName(clientPayloadClassName); } else { LOG.error("Payload is null or payload does not have client payload class name"); } @@ -160,4 +207,8 @@ public class AppendEntries extends AbstractRaftRPC { return to; } + + public static boolean isSerializedType(Object message) { + return message instanceof AppendEntries || LEGACY_SERIALIZABLE_CLASS.isInstance(message); + } }