From: tpantelis Date: Wed, 21 Jan 2015 20:12:24 +0000 (-0500) Subject: Bug 2268: Serialize ApppendEntries X-Git-Tag: release/lithium~643^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=cf1c3a92ee533791afd0883504978de116dcdd0c Bug 2268: Serialize ApppendEntries Changed AppendEntries to use java serialization instead of protobuff to take advantage of payload streaming. Kept backwards compatibility support for deserializing from the pre-Lithium protobuff message. Change-Id: Ia1edf186cbe8eba3f46207bcf3ba17598c5bca37 Signed-off-by: tpantelis --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftVersions.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftVersions.java new file mode 100644 index 0000000000..4330a4caa0 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftVersions.java @@ -0,0 +1,17 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft; + +/** + * @author Thomas Pantelis + */ +public interface RaftVersions { + short HELIUM_VERSION = 0; + short LITHIUM_VERSION = 1; + short CURRENT_VERSION = LITHIUM_VERSION; +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplEntry.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplEntry.java index 986c7f382c..799e75ebfa 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplEntry.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplEntry.java @@ -11,9 +11,8 @@ package org.opendaylight.controller.cluster.raft; import java.io.Serializable; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; -public class ReplicatedLogImplEntry implements ReplicatedLogEntry, - Serializable { - private static final long serialVersionUID = 1L; +public class ReplicatedLogImplEntry implements ReplicatedLogEntry, Serializable { + private static final long serialVersionUID = -9085798014576489130L; private final long index; private final long term; @@ -26,15 +25,18 @@ public class ReplicatedLogImplEntry implements ReplicatedLogEntry, this.payload = payload; } - @Override public Payload getData() { + @Override + public Payload getData() { return payload; } - @Override public long getTerm() { + @Override + public long getTerm() { return term; } - @Override public long getIndex() { + @Override + public long getIndex() { return index; } @@ -43,7 +45,8 @@ public class ReplicatedLogImplEntry implements ReplicatedLogEntry, return getData().size(); } - @Override public String toString() { + @Override + public String toString() { return "Entry{" + "index=" + index + ", term=" + term + diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SerializationUtils.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SerializationUtils.java index 2f5ba48f92..7ec32440e7 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SerializationUtils.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SerializationUtils.java @@ -14,7 +14,7 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; public class SerializationUtils { public static Object fromSerializable(Object serializable){ - if(serializable.getClass().equals(AppendEntries.SERIALIZABLE_CLASS)){ + if(AppendEntries.isSerializedType(serializable)){ return AppendEntries.fromSerializable(serializable); } else if (serializable.getClass().equals(InstallSnapshot.SERIALIZABLE_CLASS)) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AbstractRaftRPC.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AbstractRaftRPC.java index 0122d45c30..3c9ebf47fd 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AbstractRaftRPC.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AbstractRaftRPC.java @@ -9,7 +9,8 @@ package org.opendaylight.controller.cluster.raft.messages; public class AbstractRaftRPC implements RaftRPC { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = -6061342433962854822L; + // term protected long term; @@ -21,6 +22,7 @@ public class AbstractRaftRPC implements RaftRPC { public AbstractRaftRPC() { } + @Override public long getTerm() { return term; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java index e5aebaa6be..8198106217 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java @@ -9,25 +9,30 @@ package org.opendaylight.controller.cluster.raft.messages; import com.google.protobuf.GeneratedMessage; -import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; -import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; -import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages; - +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; +import org.opendaylight.controller.cluster.raft.RaftVersions; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages; /** * Invoked by leader to replicate log entries (§5.3); also used as * heartbeat (§5.2). */ public class AppendEntries extends AbstractRaftRPC { - public static final Class SERIALIZABLE_CLASS = AppendEntriesMessages.AppendEntries.class; + private static final long serialVersionUID = 1L; + + public static final Class LEGACY_SERIALIZABLE_CLASS = + AppendEntriesMessages.AppendEntries.class; private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(AppendEntries.class); - private static final long serialVersionUID = 1L; // So that follower can redirect clients private final String leaderId; @@ -40,7 +45,7 @@ public class AppendEntries extends AbstractRaftRPC { // log entries to store (empty for heartbeat; // may send more than one for efficiency) - private final List entries; + private transient List entries; // leader's commitIndex private final long leaderCommit; @@ -55,6 +60,28 @@ public class AppendEntries extends AbstractRaftRPC { this.leaderCommit = leaderCommit; } + private void writeObject(ObjectOutputStream out) throws IOException { + out.writeShort(RaftVersions.CURRENT_VERSION); + out.defaultWriteObject(); + + out.writeInt(entries.size()); + for(ReplicatedLogEntry e: entries) { + out.writeObject(e); + } + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.readShort(); // version + + in.defaultReadObject(); + + int size = in.readInt(); + entries = new ArrayList<>(size); + for(int i = 0; i < size; i++) { + entries.add((ReplicatedLogEntry) in.readObject()); + } + } + public String getLeaderId() { return leaderId; } @@ -75,7 +102,8 @@ public class AppendEntries extends AbstractRaftRPC { return leaderCommit; } - @Override public String toString() { + @Override + public String toString() { final StringBuilder sb = new StringBuilder("AppendEntries{"); sb.append("term=").append(getTerm()); @@ -88,7 +116,20 @@ public class AppendEntries extends AbstractRaftRPC { return sb.toString(); } - public Object toSerializable(){ + public Object toSerializable() { + return toSerializable(RaftVersions.CURRENT_VERSION); + } + + public Object toSerializable(short version) { + if(version < RaftVersions.LITHIUM_VERSION) { + return toLegacySerializable(); + } else { + return this; + } + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + private Object toLegacySerializable() { AppendEntriesMessages.AppendEntries.Builder to = AppendEntriesMessages.AppendEntries.newBuilder(); to.setTerm(this.getTerm()) .setLeaderId(this.getLeaderId()) @@ -122,9 +163,16 @@ public class AppendEntries extends AbstractRaftRPC { return to.build(); } - public static AppendEntries fromSerializable(Object o){ - AppendEntriesMessages.AppendEntries from = (AppendEntriesMessages.AppendEntries) o; + public static AppendEntries fromSerializable(Object serialized) { + if(serialized instanceof AppendEntries) { + return (AppendEntries)serialized; + } + else { + return fromLegacySerializable((AppendEntriesMessages.AppendEntries) serialized); + } + } + private static AppendEntries fromLegacySerializable(AppendEntriesMessages.AppendEntries from) { List logEntryList = new ArrayList<>(); for (AppendEntriesMessages.AppendEntries.ReplicatedLogEntry leProtoBuff : from.getLogEntriesList()) { @@ -134,7 +182,6 @@ public class AppendEntries extends AbstractRaftRPC { String clientPayloadClassName = leProtoBuff.getData().getClientPayloadClassName(); payload = (Payload) Class.forName(clientPayloadClassName).newInstance(); payload = payload.decode(leProtoBuff.getData()); - payload.setClientPayloadClassName(clientPayloadClassName); } else { LOG.error("Payload is null or payload does not have client payload class name"); } @@ -160,4 +207,8 @@ public class AppendEntries extends AbstractRaftRPC { return to; } + + public static boolean isSerializedType(Object message) { + return message instanceof AppendEntries || LEGACY_SERIALIZABLE_CLASS.isInstance(message); + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index 2424d4d557..cd852eaae2 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -55,14 +55,17 @@ public class MockRaftActorContext implements RaftActorContext { private long currentTerm = 0; private String votedFor = ""; + @Override public long getCurrentTerm() { return currentTerm; } + @Override public String getVotedFor() { return votedFor; } + @Override public void update(long currentTerm, String votedFor){ this.currentTerm = currentTerm; this.votedFor = votedFor; @@ -127,6 +130,7 @@ public class MockRaftActorContext implements RaftActorContext { return lastApplied; } + @Override public void setReplicatedLog(ReplicatedLog replicatedLog) { this.replicatedLog = replicatedLog; } @@ -202,7 +206,7 @@ public class MockRaftActorContext implements RaftActorContext { } public static class MockPayload extends Payload implements Serializable { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 3121380393130864247L; private String value = ""; public MockPayload(){ @@ -235,6 +239,7 @@ public class MockRaftActorContext implements RaftActorContext { return MockPayload.class.getName(); } + @Override public String toString() { return value; } @@ -273,7 +278,7 @@ public class MockRaftActorContext implements RaftActorContext { } public static class MockReplicatedLogBuilder { - private ReplicatedLog mockLog = new SimpleReplicatedLog(); + private final ReplicatedLog mockLog = new SimpleReplicatedLog(); public MockReplicatedLogBuilder createEntries(int start, int end, int term) { for (int i=start; iemptyList(), 10L); + + assertSame("toSerializable", entries, entries.toSerializable()); + assertSame("fromSerializable", entries, + org.opendaylight.controller.cluster.raft.SerializationUtils.fromSerializable(entries)); + } + + @Test + public void testToAndFromLegacySerializable() { + ReplicatedLogEntry entry = new ReplicatedLogImplEntry(3, 4, new MockPayload("payload")); + AppendEntries entries = new AppendEntries(5L, "node1", 7L, 8L, Arrays.asList(entry), 10L); + + Object serializable = entries.toSerializable(RaftVersions.HELIUM_VERSION); + Assert.assertTrue(serializable instanceof AppendEntriesMessages.AppendEntries); + + AppendEntries entries2 = (AppendEntries) + org.opendaylight.controller.cluster.raft.SerializationUtils.fromSerializable(serializable); + + verifyAppendEntries(entries, entries2); + } + + private void verifyAppendEntries(AppendEntries expected, AppendEntries actual) { + assertEquals("getLeaderId", expected.getLeaderId(), actual.getLeaderId()); + assertEquals("getTerm", expected.getTerm(), actual.getTerm()); + assertEquals("getLeaderCommit", expected.getLeaderCommit(), actual.getLeaderCommit()); + assertEquals("getPrevLogIndex", expected.getPrevLogIndex(), actual.getPrevLogIndex()); + assertEquals("getPrevLogTerm", expected.getPrevLogTerm(), actual.getPrevLogTerm()); + + assertEquals("getEntries size", expected.getEntries().size(), actual.getEntries().size()); + Iterator iter = expected.getEntries().iterator(); + for(ReplicatedLogEntry e: actual.getEntries()) { + verifyReplicatedLogEntry(iter.next(), e); + } + } + + private void verifyReplicatedLogEntry(ReplicatedLogEntry expected, ReplicatedLogEntry actual) { + assertEquals("getIndex", expected.getIndex(), actual.getIndex()); + assertEquals("getTerm", expected.getTerm(), actual.getTerm()); + assertEquals("getData", expected.getData().toString(), actual.getData().toString()); + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/resources/helium-serialized-ReplicatedLogImplEntry b/opendaylight/md-sal/sal-akka-raft/src/test/resources/helium-serialized-ReplicatedLogImplEntry new file mode 100644 index 0000000000..3e87f66f0c Binary files /dev/null and b/opendaylight/md-sal/sal-akka-raft/src/test/resources/helium-serialized-ReplicatedLogImplEntry differ diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayloadTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayloadTest.java index 9e02223f54..cd74167259 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayloadTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayloadTest.java @@ -11,6 +11,7 @@ import org.junit.Assert; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; +import org.opendaylight.controller.cluster.raft.RaftVersions; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; @@ -69,7 +70,8 @@ public class CompositeModificationPayloadTest { AppendEntries appendEntries = new AppendEntries(1, "member-1", 0, 100, entries, 1); - AppendEntriesMessages.AppendEntries o = (AppendEntriesMessages.AppendEntries) appendEntries.toSerializable(); + AppendEntriesMessages.AppendEntries o = (AppendEntriesMessages.AppendEntries) + appendEntries.toSerializable(RaftVersions.HELIUM_VERSION); o.writeDelimitedTo(new FileOutputStream(SERIALIZE_OUT)); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Server.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Server.java index e6bdf5aac3..6b6cf326be 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Server.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Server.java @@ -24,7 +24,7 @@ public class Server { public static class ServerActor extends UntypedActor { @Override public void onReceive(Object message) throws Exception { - if(AppendEntries.SERIALIZABLE_CLASS.equals(message.getClass())){ + if(AppendEntries.LEGACY_SERIALIZABLE_CLASS.equals(message.getClass())){ AppendEntries appendEntries = AppendEntries.fromSerializable(message);