Use NormalizedNode streaming serialization in sal-remoterpc-connector 38/47638/3
authorTom Pantelis <tpanteli@brocade.com>
Wed, 26 Oct 2016 18:59:03 +0000 (14:59 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Thu, 27 Oct 2016 01:28:12 +0000 (21:28 -0400)
Converted the code in sal-remoterpc-connector to use the NormalizedNode
streaming classes for serialization instead of the legacy protobuff-based
serialization.

Change-Id: Ia08ccf9c9b569a8e70c7fb345946b7f02c46be5e
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteDOMRpcFuture.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcInput.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpc.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RpcResponse.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/AbstractRpcTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementationTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpcTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/messages/RpcResponseTest.java [new file with mode: 0644]

index c6b796d26d8d585ce4d67b448fc12264f8966e5a..5df6e2bb0750f1f93d0286aad0cbe09004089168 100644 (file)
@@ -15,7 +15,6 @@ import com.google.common.util.concurrent.CheckedFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
 import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult;
@@ -92,19 +91,17 @@ class RemoteDOMRpcFuture extends AbstractFuture<DOMRpcResult> implements Checked
                 RemoteDOMRpcFuture.this.failNow(error);
             } else if (reply instanceof RpcResponse) {
                 final RpcResponse rpcReply = (RpcResponse) reply;
-                final NormalizedNode<?, ?> result;
-                if (rpcReply.getResultNormalizedNode() == null) {
-                    result = null;
-                    LOG.debug("Received response for rpc {}: result is null", rpcName);
-                } else {
-                    result = NormalizedNodeSerializer.deSerialize(rpcReply.getResultNormalizedNode());
-                    LOG.debug("Received response for rpc {}: result is {}", rpcName, result);
-                }
+                final NormalizedNode<?, ?> result = rpcReply.getResultNormalizedNode();
+
+                LOG.debug("Received response for rpc {}: result is {}", rpcName, result);
+
                 RemoteDOMRpcFuture.this.set(new DefaultDOMRpcResult(result));
+
                 LOG.debug("Future {} for rpc {} successfully completed", RemoteDOMRpcFuture.this, rpcName);
+            } else {
+                RemoteDOMRpcFuture.this.failNow(new IllegalStateException("Incorrect reply type " + reply
+                        + "from Akka"));
             }
-            RemoteDOMRpcFuture.this.failNow(new IllegalStateException("Incorrect reply type " + reply
-                    + "from Akka"));
         }
     }
 
index 3c77220972625fa7258ee8e1780734c9aa5681a3..a7f43f4e2766c97487f94ff61d9ca17054dc803d 100644 (file)
@@ -10,8 +10,7 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import java.util.Collection;
 import java.util.Map;
-import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
-import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.Node;
+import javax.annotation.Nullable;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
@@ -28,13 +27,13 @@ class RemoteRpcInput implements ContainerNode {
         this.delegate = delegate;
     }
 
-    protected static RemoteRpcInput from(final Node node) {
+    protected static RemoteRpcInput from(@Nullable final NormalizedNode<?, ?> node) {
         if(node == null) {
             return null;
         }
-        final NormalizedNode<?, ?> deserialized = NormalizedNodeSerializer.deSerialize(node);
-        Preconditions.checkArgument(deserialized instanceof ContainerNode);
-        return new RemoteRpcInput((ContainerNode) deserialized);
+
+        Preconditions.checkArgument(node instanceof ContainerNode);
+        return new RemoteRpcInput((ContainerNode) node);
     }
 
     ContainerNode delegate() {
index 55de22e321bcd936ce064c2bc8d13bc0f13790a8..9f3fed4fd5139a9e792b0855aec234233d38720c 100644 (file)
@@ -19,11 +19,9 @@ import com.google.common.util.concurrent.Futures;
 import java.util.Arrays;
 import java.util.Collection;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
-import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.Node;
 import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
 import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
 import org.opendaylight.yangtools.yang.common.RpcError;
@@ -72,7 +70,7 @@ public class RpcBroker extends AbstractUntypedActor {
             Futures.addCallback(future, new FutureCallback<DOMRpcResult>() {
                 @Override
                 public void onSuccess(final DOMRpcResult result) {
-                    if (result.getErrors() != null && (!result.getErrors().isEmpty())) {
+                    if (result.getErrors() != null && !result.getErrors().isEmpty()) {
                         final String message = String.format("Execution of RPC %s failed", msg.getRpc());
                         Collection<RpcError> errors = result.getErrors();
                         if (errors == null || errors.size() == 0) {
@@ -81,16 +79,9 @@ public class RpcBroker extends AbstractUntypedActor {
 
                         sender.tell(new akka.actor.Status.Failure(new RpcErrorsException(message, errors)), self);
                     } else {
-                        final Node serializedResultNode;
-                        if (result.getResult() == null) {
-                            serializedResultNode = null;
-                        } else {
-                            serializedResultNode = NormalizedNodeSerializer.serialize(result.getResult());
-                        }
-
                         LOG.debug("Sending response for execute rpc : {}", msg.getRpc());
 
-                        sender.tell(new RpcResponse(serializedResultNode), self);
+                        sender.tell(new RpcResponse(result.getResult()), self);
                     }
                 }
 
index 55f87851cc7b8168cbd627ad3d66768475f6ce91..7579a674c13f583c15437b4741cd498ff53c452b 100644 (file)
@@ -10,11 +10,15 @@ package org.opendaylight.controller.remote.rpc.messages;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
 import java.io.Serializable;
-import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.SerializationUtils;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
-import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
-import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.Node;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
@@ -25,32 +29,30 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 public class ExecuteRpc implements Serializable {
     private static final long serialVersionUID = 1128904894827335676L;
 
-    private final NormalizedNodeMessages.Node inputNormalizedNode;
+    private final NormalizedNode<?, ?> inputNormalizedNode;
     private final QName rpc;
 
-    private ExecuteRpc(final NormalizedNodeMessages.Node inputNormalizedNode, final QName rpc) {
-        Preconditions.checkNotNull(rpc, "rpc Qname should not be null");
-
+    private ExecuteRpc(@Nullable final NormalizedNode<?, ?> inputNormalizedNode, @Nonnull final QName rpc) {
+        this.rpc = Preconditions.checkNotNull(rpc, "rpc Qname should not be null");
         this.inputNormalizedNode = inputNormalizedNode;
-        this.rpc = rpc;
     }
 
-    public NormalizedNodeMessages.Node getInputNormalizedNode() {
+    public static ExecuteRpc from(@Nonnull final DOMRpcIdentifier rpc, @Nullable final NormalizedNode<?, ?> input) {
+        return new ExecuteRpc(input, rpc.getType().getLastComponent());
+    }
+
+    @Nullable
+    public NormalizedNode<?, ?> getInputNormalizedNode() {
         return inputNormalizedNode;
     }
 
+    @Nonnull
     public QName getRpc() {
         return rpc;
     }
 
-    public static ExecuteRpc from(final DOMRpcIdentifier rpc, final NormalizedNode<?, ?> input) {
-        final Node serializedInput;
-        if(input != null) {
-            serializedInput = NormalizedNodeSerializer.serialize(input);
-        } else {
-            serializedInput = null;
-        }
-        return new ExecuteRpc(serializedInput, rpc.getType().getLastComponent());
+    private Object writeReplace() {
+        return new Proxy(this);
     }
 
     @Override
@@ -60,4 +62,36 @@ public class ExecuteRpc implements Serializable {
                 .add("normalizedNode", inputNormalizedNode)
                 .toString();
     }
+
+    private static class Proxy implements Externalizable {
+        private static final long serialVersionUID = 1L;
+
+        private ExecuteRpc executeRpc;
+
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
+        public Proxy() {
+        }
+
+        Proxy(ExecuteRpc executeRpc) {
+            this.executeRpc = executeRpc;
+        }
+
+        @Override
+        public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeObject(executeRpc.getRpc());
+            SerializationUtils.serializeNormalizedNode(executeRpc.getInputNormalizedNode(), out);
+        }
+
+        @Override
+        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            QName qname = (QName) in.readObject();
+            executeRpc = new ExecuteRpc(SerializationUtils.deserializeNormalizedNode(in), qname);
+        }
+
+        private Object readResolve() {
+            return executeRpc;
+        }
+    }
 }
index e5cb488d97183beeaea3dba4d440d42b44f02b03..d46bf6ab32ce5e96889482526d37c802d2099002 100644 (file)
@@ -7,19 +7,60 @@
  */
 package org.opendaylight.controller.remote.rpc.messages;
 
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
 import java.io.Serializable;
-import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.SerializationUtils;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
 public class RpcResponse implements Serializable {
     private static final long serialVersionUID = -4211279498688989245L;
 
-    private final NormalizedNodeMessages.Node resultNormalizedNode;
+    private final NormalizedNode<?, ?> resultNormalizedNode;
 
-    public RpcResponse(final NormalizedNodeMessages.Node inputNormalizedNode) {
+    public RpcResponse(@Nullable final NormalizedNode<?, ?> inputNormalizedNode) {
         resultNormalizedNode = inputNormalizedNode;
     }
 
-    public NormalizedNodeMessages.Node getResultNormalizedNode() {
+    @Nullable
+    public NormalizedNode<?, ?> getResultNormalizedNode() {
         return resultNormalizedNode;
     }
+
+    private Object writeReplace() {
+        return new Proxy(this);
+    }
+
+    private static class Proxy implements Externalizable {
+        private static final long serialVersionUID = 1L;
+
+        private RpcResponse rpcResponse;
+
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
+        public Proxy() {
+        }
+
+        Proxy(RpcResponse rpcResponse) {
+            this.rpcResponse = rpcResponse;
+        }
+
+        @Override
+        public void writeExternal(ObjectOutput out) throws IOException {
+            SerializationUtils.serializeNormalizedNode(rpcResponse.getResultNormalizedNode(), out);
+        }
+
+        @Override
+        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            rpcResponse = new RpcResponse(SerializationUtils.deserializeNormalizedNode(in));
+        }
+
+        private Object readResolve() {
+            return rpcResponse;
+        }
+    }
 }
index 3c8cb65675aa9db81a1b5aac98bbb105b9d3ea5b..431c4a86edef40173a4a4bb1a4fc22541ae1f87c 100644 (file)
@@ -65,7 +65,7 @@ public class AbstractRpcTest {
 
     static final SchemaPath TEST_RPC_TYPE = SchemaPath.create(true, TEST_RPC);
     static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.create(new YangInstanceIdentifier.NodeIdentifier(TEST_RPC));
-    static final DOMRpcIdentifier TEST_RPC_ID = DOMRpcIdentifier.create(TEST_RPC_TYPE, TEST_PATH);
+    public static final DOMRpcIdentifier TEST_RPC_ID = DOMRpcIdentifier.create(TEST_RPC_TYPE, TEST_PATH);
 
     static ActorSystem node1;
     static ActorSystem node2;
@@ -149,13 +149,13 @@ public class AbstractRpcTest {
         assertEquals(exp, actual);
     }
 
-    static ContainerNode makeRPCInput(final String data) {
+    public static ContainerNode makeRPCInput(final String data) {
         return Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(TEST_RPC_INPUT))
             .withChild(ImmutableNodes.leafNode(TEST_RPC_INPUT_DATA, data)).build();
 
     }
 
-    static ContainerNode makeRPCOutput(final String data) {
+    public static ContainerNode makeRPCOutput(final String data) {
         return Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(TEST_RPC_OUTPUT))
                 .withChild(ImmutableNodes.leafNode(TEST_RPC_OUTPUT, data)).build();
     }
index a70b90b07e58bf830cf972c415ded12e8983ebc7..8d6e8e598d201fc859e6cf90cdf3d8f272cf603d 100644 (file)
@@ -26,7 +26,6 @@ import java.util.concurrent.TimeUnit;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
-import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationNotAvailableException;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
@@ -232,7 +231,7 @@ public class RemoteRpcImplementationTest extends AbstractRpcTest {
      */
     @Test(expected = DOMRpcImplementationNotAvailableException.class)
     public void testInvokeRpcWithLoopException() throws Exception {
-        final NormalizedNode<?, ?> invokeRpcInput = RemoteRpcInput.from(NormalizedNodeSerializer.serialize(makeRPCInput("foo")));
+        final NormalizedNode<?, ?> invokeRpcInput = RemoteRpcInput.from(makeRPCInput("foo"));
         final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture = remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
 
         frontEndFuture.checkedGet(5, TimeUnit.SECONDS);
index 6ecd7dac3adc48630540c64e7f14effa6adc915b..88b65f2708abf2f2ede2586fcb3a3123efac27cd 100644 (file)
@@ -19,7 +19,6 @@ import com.google.common.util.concurrent.Futures;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
-import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationNotAvailableException;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
@@ -47,8 +46,7 @@ public class RpcBrokerTest extends AbstractRpcTest {
 
                 final RpcResponse rpcResponse = expectMsgClass(duration("5 seconds"), RpcResponse.class);
 
-                assertEquals(rpcResult.getResult(),
-                        NormalizedNodeSerializer.deSerialize(rpcResponse.getResultNormalizedNode()));
+                assertEquals(rpcResult.getResult(), rpcResponse.getResultNormalizedNode());
             }
         };
     }
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpcTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpcTest.java
new file mode 100644 (file)
index 0000000..2aeed83
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.messages;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.remote.rpc.AbstractRpcTest;
+
+/**
+ * Unit tests for ExecuteRpc.
+ *
+ * @author Thomas Pantelis
+ */
+public class ExecuteRpcTest {
+
+    @Test
+    public void testSerialization() {
+        ExecuteRpc expected = ExecuteRpc.from(AbstractRpcTest.TEST_RPC_ID,
+                AbstractRpcTest.makeRPCInput("serialization-test"));
+
+        ExecuteRpc actual = (ExecuteRpc) SerializationUtils.clone(expected);
+
+        assertEquals("getRpc", expected.getRpc(), actual.getRpc());
+        assertEquals("getInputNormalizedNode", expected.getInputNormalizedNode(), actual.getInputNormalizedNode());
+    }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/messages/RpcResponseTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/messages/RpcResponseTest.java
new file mode 100644 (file)
index 0000000..f4ec377
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.messages;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.remote.rpc.AbstractRpcTest;
+
+/**
+ * Unit tests for RpcResponse.
+ *
+ * @author Thomas Pantelis
+ */
+public class RpcResponseTest {
+
+    @Test
+    public void testSerialization() {
+        RpcResponse expected = new RpcResponse(AbstractRpcTest.makeRPCOutput("serialization-test"));
+
+        RpcResponse actual = (RpcResponse) SerializationUtils.clone(expected);
+
+        assertEquals("getResultNormalizedNode", expected.getResultNormalizedNode(), actual.getResultNormalizedNode());
+    }
+}