From a58c23b491f665e6d5449e97d430a7e15d1ecda6 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Wed, 26 Oct 2016 14:59:03 -0400 Subject: [PATCH] Use NormalizedNode streaming serialization in sal-remoterpc-connector Converted the code in sal-remoterpc-connector to use the NormalizedNode streaming classes for serialization instead of the legacy protobuff-based serialization. Change-Id: Ia08ccf9c9b569a8e70c7fb345946b7f02c46be5e Signed-off-by: Tom Pantelis --- .../remote/rpc/RemoteDOMRpcFuture.java | 19 +++--- .../controller/remote/rpc/RemoteRpcInput.java | 11 ++- .../controller/remote/rpc/RpcBroker.java | 13 +--- .../remote/rpc/messages/ExecuteRpc.java | 68 ++++++++++++++----- .../remote/rpc/messages/RpcResponse.java | 49 +++++++++++-- .../remote/rpc/AbstractRpcTest.java | 6 +- .../rpc/RemoteRpcImplementationTest.java | 3 +- .../controller/remote/rpc/RpcBrokerTest.java | 4 +- .../remote/rpc/messages/ExecuteRpcTest.java | 33 +++++++++ .../remote/rpc/messages/RpcResponseTest.java | 31 +++++++++ 10 files changed, 180 insertions(+), 57 deletions(-) create mode 100644 opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpcTest.java create mode 100644 opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/messages/RpcResponseTest.java diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteDOMRpcFuture.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteDOMRpcFuture.java index c6b796d26d..5df6e2bb07 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteDOMRpcFuture.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteDOMRpcFuture.java @@ -15,7 +15,6 @@ import com.google.common.util.concurrent.CheckedFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer; import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult; @@ -92,19 +91,17 @@ class RemoteDOMRpcFuture extends AbstractFuture implements Checked RemoteDOMRpcFuture.this.failNow(error); } else if (reply instanceof RpcResponse) { final RpcResponse rpcReply = (RpcResponse) reply; - final NormalizedNode result; - if (rpcReply.getResultNormalizedNode() == null) { - result = null; - LOG.debug("Received response for rpc {}: result is null", rpcName); - } else { - result = NormalizedNodeSerializer.deSerialize(rpcReply.getResultNormalizedNode()); - LOG.debug("Received response for rpc {}: result is {}", rpcName, result); - } + final NormalizedNode result = rpcReply.getResultNormalizedNode(); + + LOG.debug("Received response for rpc {}: result is {}", rpcName, result); + RemoteDOMRpcFuture.this.set(new DefaultDOMRpcResult(result)); + LOG.debug("Future {} for rpc {} successfully completed", RemoteDOMRpcFuture.this, rpcName); + } else { + RemoteDOMRpcFuture.this.failNow(new IllegalStateException("Incorrect reply type " + reply + + "from Akka")); } - RemoteDOMRpcFuture.this.failNow(new IllegalStateException("Incorrect reply type " + reply - + "from Akka")); } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcInput.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcInput.java index 3c77220972..a7f43f4e27 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcInput.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcInput.java @@ -10,8 +10,7 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import java.util.Collection; import java.util.Map; -import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer; -import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.Node; +import javax.annotation.Nullable; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; @@ -28,13 +27,13 @@ class RemoteRpcInput implements ContainerNode { this.delegate = delegate; } - protected static RemoteRpcInput from(final Node node) { + protected static RemoteRpcInput from(@Nullable final NormalizedNode node) { if(node == null) { return null; } - final NormalizedNode deserialized = NormalizedNodeSerializer.deSerialize(node); - Preconditions.checkArgument(deserialized instanceof ContainerNode); - return new RemoteRpcInput((ContainerNode) deserialized); + + Preconditions.checkArgument(node instanceof ContainerNode); + return new RemoteRpcInput((ContainerNode) node); } ContainerNode delegate() { diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java index 55de22e321..9f3fed4fd5 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java @@ -19,11 +19,9 @@ import com.google.common.util.concurrent.Futures; import java.util.Arrays; import java.util.Collection; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; -import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer; import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; -import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.Node; import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc; import org.opendaylight.controller.remote.rpc.messages.RpcResponse; import org.opendaylight.yangtools.yang.common.RpcError; @@ -72,7 +70,7 @@ public class RpcBroker extends AbstractUntypedActor { Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(final DOMRpcResult result) { - if (result.getErrors() != null && (!result.getErrors().isEmpty())) { + if (result.getErrors() != null && !result.getErrors().isEmpty()) { final String message = String.format("Execution of RPC %s failed", msg.getRpc()); Collection errors = result.getErrors(); if (errors == null || errors.size() == 0) { @@ -81,16 +79,9 @@ public class RpcBroker extends AbstractUntypedActor { sender.tell(new akka.actor.Status.Failure(new RpcErrorsException(message, errors)), self); } else { - final Node serializedResultNode; - if (result.getResult() == null) { - serializedResultNode = null; - } else { - serializedResultNode = NormalizedNodeSerializer.serialize(result.getResult()); - } - LOG.debug("Sending response for execute rpc : {}", msg.getRpc()); - sender.tell(new RpcResponse(serializedResultNode), self); + sender.tell(new RpcResponse(result.getResult()), self); } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpc.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpc.java index 55f87851cc..7579a674c1 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpc.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpc.java @@ -10,11 +10,15 @@ package org.opendaylight.controller.remote.rpc.messages; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; import java.io.Serializable; -import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.SerializationUtils; import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier; -import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; -import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.Node; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -25,32 +29,30 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; public class ExecuteRpc implements Serializable { private static final long serialVersionUID = 1128904894827335676L; - private final NormalizedNodeMessages.Node inputNormalizedNode; + private final NormalizedNode inputNormalizedNode; private final QName rpc; - private ExecuteRpc(final NormalizedNodeMessages.Node inputNormalizedNode, final QName rpc) { - Preconditions.checkNotNull(rpc, "rpc Qname should not be null"); - + private ExecuteRpc(@Nullable final NormalizedNode inputNormalizedNode, @Nonnull final QName rpc) { + this.rpc = Preconditions.checkNotNull(rpc, "rpc Qname should not be null"); this.inputNormalizedNode = inputNormalizedNode; - this.rpc = rpc; } - public NormalizedNodeMessages.Node getInputNormalizedNode() { + public static ExecuteRpc from(@Nonnull final DOMRpcIdentifier rpc, @Nullable final NormalizedNode input) { + return new ExecuteRpc(input, rpc.getType().getLastComponent()); + } + + @Nullable + public NormalizedNode getInputNormalizedNode() { return inputNormalizedNode; } + @Nonnull public QName getRpc() { return rpc; } - public static ExecuteRpc from(final DOMRpcIdentifier rpc, final NormalizedNode input) { - final Node serializedInput; - if(input != null) { - serializedInput = NormalizedNodeSerializer.serialize(input); - } else { - serializedInput = null; - } - return new ExecuteRpc(serializedInput, rpc.getType().getLastComponent()); + private Object writeReplace() { + return new Proxy(this); } @Override @@ -60,4 +62,36 @@ public class ExecuteRpc implements Serializable { .add("normalizedNode", inputNormalizedNode) .toString(); } + + private static class Proxy implements Externalizable { + private static final long serialVersionUID = 1L; + + private ExecuteRpc executeRpc; + + // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't + // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") + public Proxy() { + } + + Proxy(ExecuteRpc executeRpc) { + this.executeRpc = executeRpc; + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(executeRpc.getRpc()); + SerializationUtils.serializeNormalizedNode(executeRpc.getInputNormalizedNode(), out); + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + QName qname = (QName) in.readObject(); + executeRpc = new ExecuteRpc(SerializationUtils.deserializeNormalizedNode(in), qname); + } + + private Object readResolve() { + return executeRpc; + } + } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RpcResponse.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RpcResponse.java index e5cb488d97..d46bf6ab32 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RpcResponse.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RpcResponse.java @@ -7,19 +7,60 @@ */ package org.opendaylight.controller.remote.rpc.messages; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; import java.io.Serializable; -import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; +import javax.annotation.Nullable; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.SerializationUtils; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; public class RpcResponse implements Serializable { private static final long serialVersionUID = -4211279498688989245L; - private final NormalizedNodeMessages.Node resultNormalizedNode; + private final NormalizedNode resultNormalizedNode; - public RpcResponse(final NormalizedNodeMessages.Node inputNormalizedNode) { + public RpcResponse(@Nullable final NormalizedNode inputNormalizedNode) { resultNormalizedNode = inputNormalizedNode; } - public NormalizedNodeMessages.Node getResultNormalizedNode() { + @Nullable + public NormalizedNode getResultNormalizedNode() { return resultNormalizedNode; } + + private Object writeReplace() { + return new Proxy(this); + } + + private static class Proxy implements Externalizable { + private static final long serialVersionUID = 1L; + + private RpcResponse rpcResponse; + + // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't + // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") + public Proxy() { + } + + Proxy(RpcResponse rpcResponse) { + this.rpcResponse = rpcResponse; + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + SerializationUtils.serializeNormalizedNode(rpcResponse.getResultNormalizedNode(), out); + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + rpcResponse = new RpcResponse(SerializationUtils.deserializeNormalizedNode(in)); + } + + private Object readResolve() { + return rpcResponse; + } + } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/AbstractRpcTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/AbstractRpcTest.java index 3c8cb65675..431c4a86ed 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/AbstractRpcTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/AbstractRpcTest.java @@ -65,7 +65,7 @@ public class AbstractRpcTest { static final SchemaPath TEST_RPC_TYPE = SchemaPath.create(true, TEST_RPC); static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.create(new YangInstanceIdentifier.NodeIdentifier(TEST_RPC)); - static final DOMRpcIdentifier TEST_RPC_ID = DOMRpcIdentifier.create(TEST_RPC_TYPE, TEST_PATH); + public static final DOMRpcIdentifier TEST_RPC_ID = DOMRpcIdentifier.create(TEST_RPC_TYPE, TEST_PATH); static ActorSystem node1; static ActorSystem node2; @@ -149,13 +149,13 @@ public class AbstractRpcTest { assertEquals(exp, actual); } - static ContainerNode makeRPCInput(final String data) { + public static ContainerNode makeRPCInput(final String data) { return Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(TEST_RPC_INPUT)) .withChild(ImmutableNodes.leafNode(TEST_RPC_INPUT_DATA, data)).build(); } - static ContainerNode makeRPCOutput(final String data) { + public static ContainerNode makeRPCOutput(final String data) { return Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(TEST_RPC_OUTPUT)) .withChild(ImmutableNodes.leafNode(TEST_RPC_OUTPUT, data)).build(); } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementationTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementationTest.java index a70b90b07e..8d6e8e598d 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementationTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementationTest.java @@ -26,7 +26,6 @@ import java.util.concurrent.TimeUnit; import org.junit.Ignore; import org.junit.Test; import org.mockito.ArgumentCaptor; -import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer; import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationNotAvailableException; import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; @@ -232,7 +231,7 @@ public class RemoteRpcImplementationTest extends AbstractRpcTest { */ @Test(expected = DOMRpcImplementationNotAvailableException.class) public void testInvokeRpcWithLoopException() throws Exception { - final NormalizedNode invokeRpcInput = RemoteRpcInput.from(NormalizedNodeSerializer.serialize(makeRPCInput("foo"))); + final NormalizedNode invokeRpcInput = RemoteRpcInput.from(makeRPCInput("foo")); final CheckedFuture frontEndFuture = remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput); frontEndFuture.checkedGet(5, TimeUnit.SECONDS); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java index 6ecd7dac3a..88b65f2708 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java @@ -19,7 +19,6 @@ import com.google.common.util.concurrent.Futures; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; -import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer; import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationNotAvailableException; import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; @@ -47,8 +46,7 @@ public class RpcBrokerTest extends AbstractRpcTest { final RpcResponse rpcResponse = expectMsgClass(duration("5 seconds"), RpcResponse.class); - assertEquals(rpcResult.getResult(), - NormalizedNodeSerializer.deSerialize(rpcResponse.getResultNormalizedNode())); + assertEquals(rpcResult.getResult(), rpcResponse.getResultNormalizedNode()); } }; } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpcTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpcTest.java new file mode 100644 index 0000000000..2aeed83744 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpcTest.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc.messages; + +import static org.junit.Assert.assertEquals; + +import org.apache.commons.lang.SerializationUtils; +import org.junit.Test; +import org.opendaylight.controller.remote.rpc.AbstractRpcTest; + +/** + * Unit tests for ExecuteRpc. + * + * @author Thomas Pantelis + */ +public class ExecuteRpcTest { + + @Test + public void testSerialization() { + ExecuteRpc expected = ExecuteRpc.from(AbstractRpcTest.TEST_RPC_ID, + AbstractRpcTest.makeRPCInput("serialization-test")); + + ExecuteRpc actual = (ExecuteRpc) SerializationUtils.clone(expected); + + assertEquals("getRpc", expected.getRpc(), actual.getRpc()); + assertEquals("getInputNormalizedNode", expected.getInputNormalizedNode(), actual.getInputNormalizedNode()); + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/messages/RpcResponseTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/messages/RpcResponseTest.java new file mode 100644 index 0000000000..f4ec377bb7 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/messages/RpcResponseTest.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc.messages; + +import static org.junit.Assert.assertEquals; + +import org.apache.commons.lang.SerializationUtils; +import org.junit.Test; +import org.opendaylight.controller.remote.rpc.AbstractRpcTest; + +/** + * Unit tests for RpcResponse. + * + * @author Thomas Pantelis + */ +public class RpcResponseTest { + + @Test + public void testSerialization() { + RpcResponse expected = new RpcResponse(AbstractRpcTest.makeRPCOutput("serialization-test")); + + RpcResponse actual = (RpcResponse) SerializationUtils.clone(expected); + + assertEquals("getResultNormalizedNode", expected.getResultNormalizedNode(), actual.getResultNormalizedNode()); + } +} -- 2.36.6