Convert AppendEntries and reply to Externalizable proxy
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / messages / AppendEntries.java
index 896f3048bc83909512ef7d1493404c7352c00ab1..dc1ebf0730fd49bd7b3b73ba3c54e4c5127bc02c 100644 (file)
@@ -8,13 +8,15 @@
 
 package org.opendaylight.controller.cluster.raft.messages;
 
+import java.io.Externalizable;
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
 import java.util.ArrayList;
 import java.util.List;
-import org.opendaylight.controller.cluster.raft.RaftVersions;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 
 /**
  * Invoked by leader to replicate log entries (ยง5.3); also used as
@@ -23,8 +25,6 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 public class AppendEntries extends AbstractRaftRPC {
     private static final long serialVersionUID = 1L;
 
-    private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(AppendEntries.class);
-
     // So that follower can redirect clients
     private final String leaderId;
 
@@ -58,28 +58,6 @@ public class AppendEntries extends AbstractRaftRPC {
         this.payloadVersion = payloadVersion;
     }
 
-    private void writeObject(ObjectOutputStream out) throws IOException {
-        out.writeShort(RaftVersions.CURRENT_VERSION);
-        out.defaultWriteObject();
-
-        out.writeInt(entries.size());
-        for(ReplicatedLogEntry e: entries) {
-            out.writeObject(e);
-        }
-    }
-
-    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-        in.readShort(); // raft version
-
-        in.defaultReadObject();
-
-        int size = in.readInt();
-        entries = new ArrayList<>(size);
-        for(int i = 0; i < size; i++) {
-            entries.add((ReplicatedLogEntry) in.readObject());
-        }
-    }
-
     public String getLeaderId() {
         return leaderId;
     }
@@ -117,4 +95,63 @@ public class AppendEntries extends AbstractRaftRPC {
                 .append(payloadVersion).append(", entries=").append(entries).append("]");
         return builder.toString();
     }
+
+    private Object writeReplace() {
+        return new Proxy(this);
+    }
+
+    private static class Proxy implements Externalizable {
+        private static final long serialVersionUID = 1L;
+
+        private AppendEntries appendEntries;
+
+        public Proxy() {
+        }
+
+        Proxy(AppendEntries appendEntries) {
+            this.appendEntries = appendEntries;
+        }
+
+        @Override
+        public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeLong(appendEntries.getTerm());
+            out.writeObject(appendEntries.leaderId);
+            out.writeLong(appendEntries.prevLogTerm);
+            out.writeLong(appendEntries.prevLogIndex);
+            out.writeLong(appendEntries.leaderCommit);
+            out.writeLong(appendEntries.replicatedToAllIndex);
+            out.writeShort(appendEntries.payloadVersion);
+
+            out.writeInt(appendEntries.entries.size());
+            for(ReplicatedLogEntry e: appendEntries.entries) {
+                out.writeLong(e.getIndex());
+                out.writeLong(e.getTerm());
+                out.writeObject(e.getData());
+            }
+        }
+
+        @Override
+        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            long term = in.readLong();
+            String leaderId = (String) in.readObject();
+            long prevLogTerm = in.readLong();
+            long prevLogIndex = in.readLong();
+            long leaderCommit = in.readLong();
+            long replicatedToAllIndex = in.readLong();
+            short payloadVersion = in.readShort();
+
+            int size = in.readInt();
+            List<ReplicatedLogEntry> entries = new ArrayList<>(size);
+            for(int i = 0; i < size; i++) {
+                entries.add(new ReplicatedLogImplEntry(in.readLong(), in.readLong(), (Payload) in.readObject()));
+            }
+
+            appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit,
+                    replicatedToAllIndex, payloadVersion);
+        }
+
+        private Object readResolve() {
+            return appendEntries;
+        }
+    }
 }