Convert AppendEntries and reply to Externalizable proxy 79/42479/3
authorTom Pantelis <tpanteli@brocade.com>
Sun, 24 Jul 2016 20:59:35 +0000 (16:59 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Wed, 27 Jul 2016 03:20:30 +0000 (03:20 +0000)
Converted the AppendEntries and AppendEntriesReply messages to use the
Externalizable proxy pattern. The classes remain Serializable but use an
internal Externalizable Proxy class with writeReplace and readResolve.
This reduces the serialized size to less than half.

Change-Id: Ica1a8ce09458b49b2993d3304ee2d80e38d4fc59
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReplyTest.java

index 896f3048bc83909512ef7d1493404c7352c00ab1..dc1ebf0730fd49bd7b3b73ba3c54e4c5127bc02c 100644 (file)
@@ -8,13 +8,15 @@
 
 package org.opendaylight.controller.cluster.raft.messages;
 
+import java.io.Externalizable;
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
 import java.util.ArrayList;
 import java.util.List;
-import org.opendaylight.controller.cluster.raft.RaftVersions;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 
 /**
  * Invoked by leader to replicate log entries (ยง5.3); also used as
@@ -23,8 +25,6 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 public class AppendEntries extends AbstractRaftRPC {
     private static final long serialVersionUID = 1L;
 
-    private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(AppendEntries.class);
-
     // So that follower can redirect clients
     private final String leaderId;
 
@@ -58,28 +58,6 @@ public class AppendEntries extends AbstractRaftRPC {
         this.payloadVersion = payloadVersion;
     }
 
-    private void writeObject(ObjectOutputStream out) throws IOException {
-        out.writeShort(RaftVersions.CURRENT_VERSION);
-        out.defaultWriteObject();
-
-        out.writeInt(entries.size());
-        for(ReplicatedLogEntry e: entries) {
-            out.writeObject(e);
-        }
-    }
-
-    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-        in.readShort(); // raft version
-
-        in.defaultReadObject();
-
-        int size = in.readInt();
-        entries = new ArrayList<>(size);
-        for(int i = 0; i < size; i++) {
-            entries.add((ReplicatedLogEntry) in.readObject());
-        }
-    }
-
     public String getLeaderId() {
         return leaderId;
     }
@@ -117,4 +95,63 @@ public class AppendEntries extends AbstractRaftRPC {
                 .append(payloadVersion).append(", entries=").append(entries).append("]");
         return builder.toString();
     }
+
+    private Object writeReplace() {
+        return new Proxy(this);
+    }
+
+    private static class Proxy implements Externalizable {
+        private static final long serialVersionUID = 1L;
+
+        private AppendEntries appendEntries;
+
+        public Proxy() {
+        }
+
+        Proxy(AppendEntries appendEntries) {
+            this.appendEntries = appendEntries;
+        }
+
+        @Override
+        public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeLong(appendEntries.getTerm());
+            out.writeObject(appendEntries.leaderId);
+            out.writeLong(appendEntries.prevLogTerm);
+            out.writeLong(appendEntries.prevLogIndex);
+            out.writeLong(appendEntries.leaderCommit);
+            out.writeLong(appendEntries.replicatedToAllIndex);
+            out.writeShort(appendEntries.payloadVersion);
+
+            out.writeInt(appendEntries.entries.size());
+            for(ReplicatedLogEntry e: appendEntries.entries) {
+                out.writeLong(e.getIndex());
+                out.writeLong(e.getTerm());
+                out.writeObject(e.getData());
+            }
+        }
+
+        @Override
+        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            long term = in.readLong();
+            String leaderId = (String) in.readObject();
+            long prevLogTerm = in.readLong();
+            long prevLogIndex = in.readLong();
+            long leaderCommit = in.readLong();
+            long replicatedToAllIndex = in.readLong();
+            short payloadVersion = in.readShort();
+
+            int size = in.readInt();
+            List<ReplicatedLogEntry> entries = new ArrayList<>(size);
+            for(int i = 0; i < size; i++) {
+                entries.add(new ReplicatedLogImplEntry(in.readLong(), in.readLong(), (Payload) in.readObject()));
+            }
+
+            appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit,
+                    replicatedToAllIndex, payloadVersion);
+        }
+
+        private Object readResolve() {
+            return appendEntries;
+        }
+    }
 }
index ffdfaa6a0ed112ce50eb22d881d2731dd7a69978..7784c7eec9c91413180cce47fff68467bec198f6 100644 (file)
@@ -8,6 +8,10 @@
 
 package org.opendaylight.controller.cluster.raft.messages;
 
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
 import org.opendaylight.controller.cluster.raft.RaftVersions;
 
 /**
@@ -33,7 +37,7 @@ public class AppendEntriesReply extends AbstractRaftRPC {
 
     private final short payloadVersion;
 
-    private final short raftVersion = RaftVersions.CURRENT_VERSION;
+    private final short raftVersion;
 
     private final boolean forceInstallSnapshot;
 
@@ -43,7 +47,14 @@ public class AppendEntriesReply extends AbstractRaftRPC {
     }
 
     public AppendEntriesReply(String followerId, long term, boolean success, long logLastIndex, long logLastTerm,
-                              short payloadVersion, boolean forceInstallSnapshot) {
+            short payloadVersion, boolean forceInstallSnapshot) {
+        this(followerId, term, success, logLastIndex, logLastTerm, payloadVersion, forceInstallSnapshot,
+                RaftVersions.CURRENT_VERSION);
+
+    }
+
+    private AppendEntriesReply(String followerId, long term, boolean success, long logLastIndex, long logLastTerm,
+                              short payloadVersion, boolean forceInstallSnapshot, short raftVersion) {
         super(term);
 
         this.followerId = followerId;
@@ -52,6 +63,7 @@ public class AppendEntriesReply extends AbstractRaftRPC {
         this.logLastTerm = logLastTerm;
         this.payloadVersion = payloadVersion;
         this.forceInstallSnapshot = forceInstallSnapshot;
+        this.raftVersion = raftVersion;
     }
 
     public boolean isSuccess() {
@@ -88,4 +100,52 @@ public class AppendEntriesReply extends AbstractRaftRPC {
                 + ", logLastIndex=" + logLastIndex + ", logLastTerm=" + logLastTerm + ", forceInstallSnapshot="
                 + forceInstallSnapshot + ", payloadVersion=" + payloadVersion + ", raftVersion=" + raftVersion + "]";
     }
+
+    private Object writeReplace() {
+        return new Proxy(this);
+    }
+
+    private static class Proxy implements Externalizable {
+        private static final long serialVersionUID = 1L;
+
+        private AppendEntriesReply appendEntriesReply;
+
+        public Proxy() {
+        }
+
+        Proxy(AppendEntriesReply appendEntriesReply) {
+            this.appendEntriesReply = appendEntriesReply;
+        }
+
+        @Override
+        public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeShort(appendEntriesReply.raftVersion);
+            out.writeLong(appendEntriesReply.getTerm());
+            out.writeObject(appendEntriesReply.followerId);
+            out.writeBoolean(appendEntriesReply.success);
+            out.writeLong(appendEntriesReply.logLastIndex);
+            out.writeLong(appendEntriesReply.logLastTerm);
+            out.writeShort(appendEntriesReply.payloadVersion);
+            out.writeBoolean(appendEntriesReply.forceInstallSnapshot);
+        }
+
+        @Override
+        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            short raftVersion = in.readShort();
+            long term = in.readLong();
+            String followerId = (String) in.readObject();
+            boolean success = in.readBoolean();
+            long logLastIndex = in.readLong();
+            long logLastTerm = in.readLong();
+            short payloadVersion = in.readShort();
+            boolean forceInstallSnapshot = in.readBoolean();
+
+            appendEntriesReply = new AppendEntriesReply(followerId, term, success, logLastIndex, logLastTerm,
+                    payloadVersion, forceInstallSnapshot, raftVersion);
+        }
+
+        private Object readResolve() {
+            return appendEntriesReply;
+        }
+    }
 }
index 0f1bee5f31e1eb22c02c888b8202b82a2f898584..4469c8ed16720a3b513bb681ee3d465b518e8a63 100644 (file)
@@ -28,5 +28,6 @@ public class AppendEntriesReplyTest {
         assertEquals("getLogLastTerm", expected.getLogLastTerm(), cloned.getLogLastTerm());
         assertEquals("getLogLastIndex", expected.getLogLastIndex(), cloned.getLogLastIndex());
         assertEquals("getPayloadVersion", expected.getPayloadVersion(), cloned.getPayloadVersion());
+        assertEquals("getRaftVersion", expected.getRaftVersion(), cloned.getRaftVersion());
     }
 }