From baa92613648863a7d839be36f89cc30431d5a66a Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Sun, 24 Jul 2016 16:59:35 -0400 Subject: [PATCH] Convert AppendEntries and reply to Externalizable proxy Converted the AppendEntries and AppendEntriesReply messages to use the Externalizable proxy pattern. The classes remain Serializable but use an internal Externalizable Proxy class with writeReplace and readResolve. This reduces the serialized size to less than half. Change-Id: Ica1a8ce09458b49b2993d3304ee2d80e38d4fc59 Signed-off-by: Tom Pantelis --- .../cluster/raft/messages/AppendEntries.java | 91 +++++++++++++------ .../raft/messages/AppendEntriesReply.java | 64 ++++++++++++- .../raft/messages/AppendEntriesReplyTest.java | 1 + 3 files changed, 127 insertions(+), 29 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java index 896f3048bc..dc1ebf0730 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java @@ -8,13 +8,15 @@ package org.opendaylight.controller.cluster.raft.messages; +import java.io.Externalizable; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; +import java.io.ObjectInput; +import java.io.ObjectOutput; import java.util.ArrayList; import java.util.List; -import org.opendaylight.controller.cluster.raft.RaftVersions; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; /** * Invoked by leader to replicate log entries (§5.3); also used as @@ -23,8 +25,6 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; public class AppendEntries extends AbstractRaftRPC { private static final long serialVersionUID = 1L; - private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(AppendEntries.class); - // So that follower can redirect clients private final String leaderId; @@ -58,28 +58,6 @@ public class AppendEntries extends AbstractRaftRPC { this.payloadVersion = payloadVersion; } - private void writeObject(ObjectOutputStream out) throws IOException { - out.writeShort(RaftVersions.CURRENT_VERSION); - out.defaultWriteObject(); - - out.writeInt(entries.size()); - for(ReplicatedLogEntry e: entries) { - out.writeObject(e); - } - } - - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.readShort(); // raft version - - in.defaultReadObject(); - - int size = in.readInt(); - entries = new ArrayList<>(size); - for(int i = 0; i < size; i++) { - entries.add((ReplicatedLogEntry) in.readObject()); - } - } - public String getLeaderId() { return leaderId; } @@ -117,4 +95,63 @@ public class AppendEntries extends AbstractRaftRPC { .append(payloadVersion).append(", entries=").append(entries).append("]"); return builder.toString(); } + + private Object writeReplace() { + return new Proxy(this); + } + + private static class Proxy implements Externalizable { + private static final long serialVersionUID = 1L; + + private AppendEntries appendEntries; + + public Proxy() { + } + + Proxy(AppendEntries appendEntries) { + this.appendEntries = appendEntries; + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + out.writeLong(appendEntries.getTerm()); + out.writeObject(appendEntries.leaderId); + out.writeLong(appendEntries.prevLogTerm); + out.writeLong(appendEntries.prevLogIndex); + out.writeLong(appendEntries.leaderCommit); + out.writeLong(appendEntries.replicatedToAllIndex); + out.writeShort(appendEntries.payloadVersion); + + out.writeInt(appendEntries.entries.size()); + for(ReplicatedLogEntry e: appendEntries.entries) { + out.writeLong(e.getIndex()); + out.writeLong(e.getTerm()); + out.writeObject(e.getData()); + } + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + long term = in.readLong(); + String leaderId = (String) in.readObject(); + long prevLogTerm = in.readLong(); + long prevLogIndex = in.readLong(); + long leaderCommit = in.readLong(); + long replicatedToAllIndex = in.readLong(); + short payloadVersion = in.readShort(); + + int size = in.readInt(); + List entries = new ArrayList<>(size); + for(int i = 0; i < size; i++) { + entries.add(new ReplicatedLogImplEntry(in.readLong(), in.readLong(), (Payload) in.readObject())); + } + + appendEntries = new AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries, leaderCommit, + replicatedToAllIndex, payloadVersion); + } + + private Object readResolve() { + return appendEntries; + } + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java index ffdfaa6a0e..7784c7eec9 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java @@ -8,6 +8,10 @@ package org.opendaylight.controller.cluster.raft.messages; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; import org.opendaylight.controller.cluster.raft.RaftVersions; /** @@ -33,7 +37,7 @@ public class AppendEntriesReply extends AbstractRaftRPC { private final short payloadVersion; - private final short raftVersion = RaftVersions.CURRENT_VERSION; + private final short raftVersion; private final boolean forceInstallSnapshot; @@ -43,7 +47,14 @@ public class AppendEntriesReply extends AbstractRaftRPC { } public AppendEntriesReply(String followerId, long term, boolean success, long logLastIndex, long logLastTerm, - short payloadVersion, boolean forceInstallSnapshot) { + short payloadVersion, boolean forceInstallSnapshot) { + this(followerId, term, success, logLastIndex, logLastTerm, payloadVersion, forceInstallSnapshot, + RaftVersions.CURRENT_VERSION); + + } + + private AppendEntriesReply(String followerId, long term, boolean success, long logLastIndex, long logLastTerm, + short payloadVersion, boolean forceInstallSnapshot, short raftVersion) { super(term); this.followerId = followerId; @@ -52,6 +63,7 @@ public class AppendEntriesReply extends AbstractRaftRPC { this.logLastTerm = logLastTerm; this.payloadVersion = payloadVersion; this.forceInstallSnapshot = forceInstallSnapshot; + this.raftVersion = raftVersion; } public boolean isSuccess() { @@ -88,4 +100,52 @@ public class AppendEntriesReply extends AbstractRaftRPC { + ", logLastIndex=" + logLastIndex + ", logLastTerm=" + logLastTerm + ", forceInstallSnapshot=" + forceInstallSnapshot + ", payloadVersion=" + payloadVersion + ", raftVersion=" + raftVersion + "]"; } + + private Object writeReplace() { + return new Proxy(this); + } + + private static class Proxy implements Externalizable { + private static final long serialVersionUID = 1L; + + private AppendEntriesReply appendEntriesReply; + + public Proxy() { + } + + Proxy(AppendEntriesReply appendEntriesReply) { + this.appendEntriesReply = appendEntriesReply; + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + out.writeShort(appendEntriesReply.raftVersion); + out.writeLong(appendEntriesReply.getTerm()); + out.writeObject(appendEntriesReply.followerId); + out.writeBoolean(appendEntriesReply.success); + out.writeLong(appendEntriesReply.logLastIndex); + out.writeLong(appendEntriesReply.logLastTerm); + out.writeShort(appendEntriesReply.payloadVersion); + out.writeBoolean(appendEntriesReply.forceInstallSnapshot); + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + short raftVersion = in.readShort(); + long term = in.readLong(); + String followerId = (String) in.readObject(); + boolean success = in.readBoolean(); + long logLastIndex = in.readLong(); + long logLastTerm = in.readLong(); + short payloadVersion = in.readShort(); + boolean forceInstallSnapshot = in.readBoolean(); + + appendEntriesReply = new AppendEntriesReply(followerId, term, success, logLastIndex, logLastTerm, + payloadVersion, forceInstallSnapshot, raftVersion); + } + + private Object readResolve() { + return appendEntriesReply; + } + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReplyTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReplyTest.java index 0f1bee5f31..4469c8ed16 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReplyTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReplyTest.java @@ -28,5 +28,6 @@ public class AppendEntriesReplyTest { assertEquals("getLogLastTerm", expected.getLogLastTerm(), cloned.getLogLastTerm()); assertEquals("getLogLastIndex", expected.getLogLastIndex(), cloned.getLogLastIndex()); assertEquals("getPayloadVersion", expected.getPayloadVersion(), cloned.getPayloadVersion()); + assertEquals("getRaftVersion", expected.getRaftVersion(), cloned.getRaftVersion()); } } -- 2.36.6