Convert AppendEntries and reply to Externalizable proxy 79/42479/3
authorTom Pantelis <tpanteli@brocade.com>
Sun, 24 Jul 2016 20:59:35 +0000 (16:59 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Wed, 27 Jul 2016 03:20:30 +0000 (03:20 +0000)
Converted the AppendEntries and AppendEntriesReply messages to use the
Externalizable proxy pattern. The classes remain Serializable but use an
internal Externalizable Proxy class with writeReplace and readResolve.
This reduces the serialized size to less than half.

Change-Id: Ica1a8ce09458b49b2993d3304ee2d80e38d4fc59
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReplyTest.java

index 896f304..dc1ebf0 100644 (file)
@@ -8,13 +8,15 @@
 
 package org.opendaylight.controller.cluster.raft.messages;
 
+import java.io.Externalizable;
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
 import java.util.ArrayList;
 import java.util.List;
-import org.opendaylight.controller.cluster.raft.RaftVersions;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 
 /**
  * Invoked by leader to replicate log entries (ยง5.3); also used as
@@ -23,8 +25,6 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 public class AppendEntries extends AbstractRaftRPC {
     private static final long serialVersionUID = 1L;
 
-    private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(AppendEntries.class);
-
     // So that follower can redirect clients
     private final String leaderId;
 
@@ -58,28 +58,6 @@ public class AppendEntries extends AbstractRaftRPC {
         this.payloadVersion = payloadVersion;
     }
 
-    private void writeObject(ObjectOutputStream out) throws IOException {
-        out.writeShort(RaftVersions.CURRENT_VERSION);
-        out.defaultWriteObject();
-
-        out.writeInt(entries.size());
-        for(ReplicatedLogEntry e: entries) {
-            out.writeObject(e);
-        }
-    }
-
-    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-        in.readShort(); // raft version
-
-        in.defaultReadObject();
-
-        int size = in.readInt();
-        entries = new ArrayList<>(size);
-        for(int i = 0; i < size; i++) {
-            entries.add((ReplicatedLogEntry) in.readObject());
-        }
-    }
-
     public String getLeaderId() {
         return leaderId;
     }
@@ -117,4 +95,63 @@ public class AppendEntries extends AbstractRaftRPC {
                 .append(payloadVersion).append(", entries=").append(entries).append("]");
         return builder.toString();
     }
+
+    private Object writeReplace() {
+        return new Proxy(this);
+    }
+
+    private static class Proxy implements Externalizable {
+        private static final long serialVersionUID = 1L;
+
+        private AppendEntries appendEntries;
+
+        public Proxy() {
+        }
+
+        Proxy(AppendEntries appendEntries) {
+            this.appendEntries = appendEntries;
+        }
+
+        @Override
+        public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeLong(appendEntries.getTerm());
+            out.writeObject(appendEntries.leaderId);
+            out.writeLong(appendEntries.prevLogTerm);
+            out.writeLong(appendEntries.prevLogIndex);
+            out.writeLong(appendEntries.leaderCommit);
+            out.writeLong(appendEntries.replicatedToAllIndex);
+            out.writeShort(appendEntries.payloadVersion);
+
+            out.writeInt(appendEntries.entries.size());
+            for(ReplicatedLogEntry e: appendEntries.entries) {
+                out.writeLong(e.getIndex());
+                out.writeLong(e.getTerm());
+                out.writeObject(e.getData());
+            }
+        }
+
+        @Override
+        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            long term = in.readLong();
+            String leaderId = (String) in.readObject();
+            long prevLogTerm = in.readLong();
+            long prevLogIndex = in.readLong();
+            long leaderCommit = in.readLong();
+            long replicatedToAllIndex = in.readLong();
+            short payloadVersion = in.readShort();
+
+            int size = in.readInt();
+            List<ReplicatedLogEntry> entries = new ArrayList<>(size);
+            for(int i = 0; i < size; i++) {
+                entries.add(new ReplicatedLogImplEntry(in.readLong(), in.readLong(), (Payload) in.readObject()));
+            }
+
+            appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit,
+                    replicatedToAllIndex, payloadVersion);
+        }
+
+        private Object readResolve() {
+            return appendEntries;
+        }
+    }
 }
index ffdfaa6..7784c7e 100644 (file)
@@ -8,6 +8,10 @@
 
 package org.opendaylight.controller.cluster.raft.messages;
 
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
 import org.opendaylight.controller.cluster.raft.RaftVersions;
 
 /**
@@ -33,7 +37,7 @@ public class AppendEntriesReply extends AbstractRaftRPC {
 
     private final short payloadVersion;
 
-    private final short raftVersion = RaftVersions.CURRENT_VERSION;
+    private final short raftVersion;
 
     private final boolean forceInstallSnapshot;
 
@@ -43,7 +47,14 @@ public class AppendEntriesReply extends AbstractRaftRPC {
     }
 
     public AppendEntriesReply(String followerId, long term, boolean success, long logLastIndex, long logLastTerm,
-                              short payloadVersion, boolean forceInstallSnapshot) {
+            short payloadVersion, boolean forceInstallSnapshot) {
+        this(followerId, term, success, logLastIndex, logLastTerm, payloadVersion, forceInstallSnapshot,
+                RaftVersions.CURRENT_VERSION);
+
+    }
+
+    private AppendEntriesReply(String followerId, long term, boolean success, long logLastIndex, long logLastTerm,
+                              short payloadVersion, boolean forceInstallSnapshot, short raftVersion) {
         super(term);
 
         this.followerId = followerId;
@@ -52,6 +63,7 @@ public class AppendEntriesReply extends AbstractRaftRPC {
         this.logLastTerm = logLastTerm;
         this.payloadVersion = payloadVersion;
         this.forceInstallSnapshot = forceInstallSnapshot;
+        this.raftVersion = raftVersion;
     }
 
     public boolean isSuccess() {
@@ -88,4 +100,52 @@ public class AppendEntriesReply extends AbstractRaftRPC {
                 + ", logLastIndex=" + logLastIndex + ", logLastTerm=" + logLastTerm + ", forceInstallSnapshot="
                 + forceInstallSnapshot + ", payloadVersion=" + payloadVersion + ", raftVersion=" + raftVersion + "]";
     }
+
+    private Object writeReplace() {
+        return new Proxy(this);
+    }
+
+    private static class Proxy implements Externalizable {
+        private static final long serialVersionUID = 1L;
+
+        private AppendEntriesReply appendEntriesReply;
+
+        public Proxy() {
+        }
+
+        Proxy(AppendEntriesReply appendEntriesReply) {
+            this.appendEntriesReply = appendEntriesReply;
+        }
+
+        @Override
+        public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeShort(appendEntriesReply.raftVersion);
+            out.writeLong(appendEntriesReply.getTerm());
+            out.writeObject(appendEntriesReply.followerId);
+            out.writeBoolean(appendEntriesReply.success);
+            out.writeLong(appendEntriesReply.logLastIndex);
+            out.writeLong(appendEntriesReply.logLastTerm);
+            out.writeShort(appendEntriesReply.payloadVersion);
+            out.writeBoolean(appendEntriesReply.forceInstallSnapshot);
+        }
+
+        @Override
+        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            short raftVersion = in.readShort();
+            long term = in.readLong();
+            String followerId = (String) in.readObject();
+            boolean success = in.readBoolean();
+            long logLastIndex = in.readLong();
+            long logLastTerm = in.readLong();
+            short payloadVersion = in.readShort();
+            boolean forceInstallSnapshot = in.readBoolean();
+
+            appendEntriesReply = new AppendEntriesReply(followerId, term, success, logLastIndex, logLastTerm,
+                    payloadVersion, forceInstallSnapshot, raftVersion);
+        }
+
+        private Object readResolve() {
+            return appendEntriesReply;
+        }
+    }
 }
index 0f1bee5..4469c8e 100644 (file)
@@ -28,5 +28,6 @@ public class AppendEntriesReplyTest {
         assertEquals("getLogLastTerm", expected.getLogLastTerm(), cloned.getLogLastTerm());
         assertEquals("getLogLastIndex", expected.getLogLastIndex(), cloned.getLogLastIndex());
         assertEquals("getPayloadVersion", expected.getPayloadVersion(), cloned.getPayloadVersion());
+        assertEquals("getRaftVersion", expected.getRaftVersion(), cloned.getRaftVersion());
     }
 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.