Bug 2268: Serialize ApppendEntries
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / messages / AppendEntries.java
index e5aebaa6be9f01e4dd46bd7a38db2e210fd39773..81981062177510641c08edb243016075f388ae0f 100644 (file)
@@ -9,25 +9,30 @@
 package org.opendaylight.controller.cluster.raft.messages;
 
 import com.google.protobuf.GeneratedMessage;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import org.opendaylight.controller.cluster.raft.RaftVersions;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 
 /**
  * Invoked by leader to replicate log entries (§5.3); also used as
  * heartbeat (§5.2).
  */
 public class AppendEntries extends AbstractRaftRPC {
-    public static final Class<AppendEntriesMessages.AppendEntries> SERIALIZABLE_CLASS = AppendEntriesMessages.AppendEntries.class;
+    private static final long serialVersionUID = 1L;
+
+    public static final Class<AppendEntriesMessages.AppendEntries> LEGACY_SERIALIZABLE_CLASS =
+            AppendEntriesMessages.AppendEntries.class;
 
     private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(AppendEntries.class);
-    private static final long serialVersionUID = 1L;
 
     // So that follower can redirect clients
     private final String leaderId;
@@ -40,7 +45,7 @@ public class AppendEntries extends AbstractRaftRPC {
 
     // log entries to store (empty for heartbeat;
     // may send more than one for efficiency)
-    private final List<ReplicatedLogEntry> entries;
+    private transient List<ReplicatedLogEntry> entries;
 
     // leader's commitIndex
     private final long leaderCommit;
@@ -55,6 +60,28 @@ public class AppendEntries extends AbstractRaftRPC {
         this.leaderCommit = leaderCommit;
     }
 
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        out.writeShort(RaftVersions.CURRENT_VERSION);
+        out.defaultWriteObject();
+
+        out.writeInt(entries.size());
+        for(ReplicatedLogEntry e: entries) {
+            out.writeObject(e);
+        }
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+        in.readShort(); // version
+
+        in.defaultReadObject();
+
+        int size = in.readInt();
+        entries = new ArrayList<>(size);
+        for(int i = 0; i < size; i++) {
+            entries.add((ReplicatedLogEntry) in.readObject());
+        }
+    }
+
     public String getLeaderId() {
         return leaderId;
     }
@@ -75,7 +102,8 @@ public class AppendEntries extends AbstractRaftRPC {
         return leaderCommit;
     }
 
-    @Override public String toString() {
+    @Override
+    public String toString() {
         final StringBuilder sb =
             new StringBuilder("AppendEntries{");
         sb.append("term=").append(getTerm());
@@ -88,7 +116,20 @@ public class AppendEntries extends AbstractRaftRPC {
         return sb.toString();
     }
 
-    public <T extends Object> Object toSerializable(){
+    public <T extends Object> Object toSerializable() {
+        return toSerializable(RaftVersions.CURRENT_VERSION);
+    }
+
+    public <T extends Object> Object toSerializable(short version) {
+        if(version < RaftVersions.LITHIUM_VERSION) {
+            return toLegacySerializable();
+        } else {
+            return this;
+        }
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    private <T> Object toLegacySerializable() {
         AppendEntriesMessages.AppendEntries.Builder to = AppendEntriesMessages.AppendEntries.newBuilder();
         to.setTerm(this.getTerm())
             .setLeaderId(this.getLeaderId())
@@ -122,9 +163,16 @@ public class AppendEntries extends AbstractRaftRPC {
         return to.build();
     }
 
-    public static AppendEntries fromSerializable(Object o){
-        AppendEntriesMessages.AppendEntries from = (AppendEntriesMessages.AppendEntries) o;
+    public static AppendEntries fromSerializable(Object serialized) {
+        if(serialized instanceof AppendEntries) {
+            return (AppendEntries)serialized;
+        }
+        else {
+            return fromLegacySerializable((AppendEntriesMessages.AppendEntries) serialized);
+        }
+    }
 
+    private static AppendEntries fromLegacySerializable(AppendEntriesMessages.AppendEntries from) {
         List<ReplicatedLogEntry> logEntryList = new ArrayList<>();
         for (AppendEntriesMessages.AppendEntries.ReplicatedLogEntry leProtoBuff : from.getLogEntriesList()) {
 
@@ -134,7 +182,6 @@ public class AppendEntries extends AbstractRaftRPC {
                     String clientPayloadClassName = leProtoBuff.getData().getClientPayloadClassName();
                     payload = (Payload) Class.forName(clientPayloadClassName).newInstance();
                     payload = payload.decode(leProtoBuff.getData());
-                    payload.setClientPayloadClassName(clientPayloadClassName);
                 } else {
                     LOG.error("Payload is null or payload does not have client payload class name");
                 }
@@ -160,4 +207,8 @@ public class AppendEntries extends AbstractRaftRPC {
 
         return to;
     }
+
+    public static boolean isSerializedType(Object message) {
+        return message instanceof AppendEntries || LEGACY_SERIALIZABLE_CLASS.isInstance(message);
+    }
 }