import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult;
RemoteDOMRpcFuture.this.failNow(error);
} else if (reply instanceof RpcResponse) {
final RpcResponse rpcReply = (RpcResponse) reply;
- final NormalizedNode<?, ?> result;
- if (rpcReply.getResultNormalizedNode() == null) {
- result = null;
- LOG.debug("Received response for rpc {}: result is null", rpcName);
- } else {
- result = NormalizedNodeSerializer.deSerialize(rpcReply.getResultNormalizedNode());
- LOG.debug("Received response for rpc {}: result is {}", rpcName, result);
- }
+ final NormalizedNode<?, ?> result = rpcReply.getResultNormalizedNode();
+
+ LOG.debug("Received response for rpc {}: result is {}", rpcName, result);
+
RemoteDOMRpcFuture.this.set(new DefaultDOMRpcResult(result));
+
LOG.debug("Future {} for rpc {} successfully completed", RemoteDOMRpcFuture.this, rpcName);
+ } else {
+ RemoteDOMRpcFuture.this.failNow(new IllegalStateException("Incorrect reply type " + reply
+ + "from Akka"));
}
- RemoteDOMRpcFuture.this.failNow(new IllegalStateException("Incorrect reply type " + reply
- + "from Akka"));
}
}
import com.google.common.base.Preconditions;
import java.util.Collection;
import java.util.Map;
-import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
-import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.Node;
+import javax.annotation.Nullable;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
this.delegate = delegate;
}
- protected static RemoteRpcInput from(final Node node) {
+ protected static RemoteRpcInput from(@Nullable final NormalizedNode<?, ?> node) {
if(node == null) {
return null;
}
- final NormalizedNode<?, ?> deserialized = NormalizedNodeSerializer.deSerialize(node);
- Preconditions.checkArgument(deserialized instanceof ContainerNode);
- return new RemoteRpcInput((ContainerNode) deserialized);
+
+ Preconditions.checkArgument(node instanceof ContainerNode);
+ return new RemoteRpcInput((ContainerNode) node);
}
ContainerNode delegate() {
import java.util.Arrays;
import java.util.Collection;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
-import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.Node;
import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
import org.opendaylight.yangtools.yang.common.RpcError;
Futures.addCallback(future, new FutureCallback<DOMRpcResult>() {
@Override
public void onSuccess(final DOMRpcResult result) {
- if (result.getErrors() != null && (!result.getErrors().isEmpty())) {
+ if (result.getErrors() != null && !result.getErrors().isEmpty()) {
final String message = String.format("Execution of RPC %s failed", msg.getRpc());
Collection<RpcError> errors = result.getErrors();
if (errors == null || errors.size() == 0) {
sender.tell(new akka.actor.Status.Failure(new RpcErrorsException(message, errors)), self);
} else {
- final Node serializedResultNode;
- if (result.getResult() == null) {
- serializedResultNode = null;
- } else {
- serializedResultNode = NormalizedNodeSerializer.serialize(result.getResult());
- }
-
LOG.debug("Sending response for execute rpc : {}", msg.getRpc());
- sender.tell(new RpcResponse(serializedResultNode), self);
+ sender.tell(new RpcResponse(result.getResult()), self);
}
}
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import java.io.Serializable;
-import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.SerializationUtils;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
-import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
-import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.Node;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
public class ExecuteRpc implements Serializable {
private static final long serialVersionUID = 1128904894827335676L;
- private final NormalizedNodeMessages.Node inputNormalizedNode;
+ private final NormalizedNode<?, ?> inputNormalizedNode;
private final QName rpc;
- private ExecuteRpc(final NormalizedNodeMessages.Node inputNormalizedNode, final QName rpc) {
- Preconditions.checkNotNull(rpc, "rpc Qname should not be null");
-
+ private ExecuteRpc(@Nullable final NormalizedNode<?, ?> inputNormalizedNode, @Nonnull final QName rpc) {
+ this.rpc = Preconditions.checkNotNull(rpc, "rpc Qname should not be null");
this.inputNormalizedNode = inputNormalizedNode;
- this.rpc = rpc;
}
- public NormalizedNodeMessages.Node getInputNormalizedNode() {
+ public static ExecuteRpc from(@Nonnull final DOMRpcIdentifier rpc, @Nullable final NormalizedNode<?, ?> input) {
+ return new ExecuteRpc(input, rpc.getType().getLastComponent());
+ }
+
+ @Nullable
+ public NormalizedNode<?, ?> getInputNormalizedNode() {
return inputNormalizedNode;
}
+ @Nonnull
public QName getRpc() {
return rpc;
}
- public static ExecuteRpc from(final DOMRpcIdentifier rpc, final NormalizedNode<?, ?> input) {
- final Node serializedInput;
- if(input != null) {
- serializedInput = NormalizedNodeSerializer.serialize(input);
- } else {
- serializedInput = null;
- }
- return new ExecuteRpc(serializedInput, rpc.getType().getLastComponent());
+ private Object writeReplace() {
+ return new Proxy(this);
}
@Override
.add("normalizedNode", inputNormalizedNode)
.toString();
}
+
+ private static class Proxy implements Externalizable {
+ private static final long serialVersionUID = 1L;
+
+ private ExecuteRpc executeRpc;
+
+ // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+ // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
+ public Proxy() {
+ }
+
+ Proxy(ExecuteRpc executeRpc) {
+ this.executeRpc = executeRpc;
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(executeRpc.getRpc());
+ SerializationUtils.serializeNormalizedNode(executeRpc.getInputNormalizedNode(), out);
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ QName qname = (QName) in.readObject();
+ executeRpc = new ExecuteRpc(SerializationUtils.deserializeNormalizedNode(in), qname);
+ }
+
+ private Object readResolve() {
+ return executeRpc;
+ }
+ }
}
*/
package org.opendaylight.controller.remote.rpc.messages;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import java.io.Serializable;
-import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.SerializationUtils;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
public class RpcResponse implements Serializable {
private static final long serialVersionUID = -4211279498688989245L;
- private final NormalizedNodeMessages.Node resultNormalizedNode;
+ private final NormalizedNode<?, ?> resultNormalizedNode;
- public RpcResponse(final NormalizedNodeMessages.Node inputNormalizedNode) {
+ public RpcResponse(@Nullable final NormalizedNode<?, ?> inputNormalizedNode) {
resultNormalizedNode = inputNormalizedNode;
}
- public NormalizedNodeMessages.Node getResultNormalizedNode() {
+ @Nullable
+ public NormalizedNode<?, ?> getResultNormalizedNode() {
return resultNormalizedNode;
}
+
+ private Object writeReplace() {
+ return new Proxy(this);
+ }
+
+ private static class Proxy implements Externalizable {
+ private static final long serialVersionUID = 1L;
+
+ private RpcResponse rpcResponse;
+
+ // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+ // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
+ public Proxy() {
+ }
+
+ Proxy(RpcResponse rpcResponse) {
+ this.rpcResponse = rpcResponse;
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ SerializationUtils.serializeNormalizedNode(rpcResponse.getResultNormalizedNode(), out);
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ rpcResponse = new RpcResponse(SerializationUtils.deserializeNormalizedNode(in));
+ }
+
+ private Object readResolve() {
+ return rpcResponse;
+ }
+ }
}
static final SchemaPath TEST_RPC_TYPE = SchemaPath.create(true, TEST_RPC);
static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.create(new YangInstanceIdentifier.NodeIdentifier(TEST_RPC));
- static final DOMRpcIdentifier TEST_RPC_ID = DOMRpcIdentifier.create(TEST_RPC_TYPE, TEST_PATH);
+ public static final DOMRpcIdentifier TEST_RPC_ID = DOMRpcIdentifier.create(TEST_RPC_TYPE, TEST_PATH);
static ActorSystem node1;
static ActorSystem node2;
assertEquals(exp, actual);
}
- static ContainerNode makeRPCInput(final String data) {
+ public static ContainerNode makeRPCInput(final String data) {
return Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(TEST_RPC_INPUT))
.withChild(ImmutableNodes.leafNode(TEST_RPC_INPUT_DATA, data)).build();
}
- static ContainerNode makeRPCOutput(final String data) {
+ public static ContainerNode makeRPCOutput(final String data) {
return Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(TEST_RPC_OUTPUT))
.withChild(ImmutableNodes.leafNode(TEST_RPC_OUTPUT, data)).build();
}
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
-import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationNotAvailableException;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
*/
@Test(expected = DOMRpcImplementationNotAvailableException.class)
public void testInvokeRpcWithLoopException() throws Exception {
- final NormalizedNode<?, ?> invokeRpcInput = RemoteRpcInput.from(NormalizedNodeSerializer.serialize(makeRPCInput("foo")));
+ final NormalizedNode<?, ?> invokeRpcInput = RemoteRpcInput.from(makeRPCInput("foo"));
final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture = remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
frontEndFuture.checkedGet(5, TimeUnit.SECONDS);
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
-import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationNotAvailableException;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
final RpcResponse rpcResponse = expectMsgClass(duration("5 seconds"), RpcResponse.class);
- assertEquals(rpcResult.getResult(),
- NormalizedNodeSerializer.deSerialize(rpcResponse.getResultNormalizedNode()));
+ assertEquals(rpcResult.getResult(), rpcResponse.getResultNormalizedNode());
}
};
}
--- /dev/null
+/*
+ * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.messages;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.remote.rpc.AbstractRpcTest;
+
+/**
+ * Unit tests for ExecuteRpc.
+ *
+ * @author Thomas Pantelis
+ */
+public class ExecuteRpcTest {
+
+ @Test
+ public void testSerialization() {
+ ExecuteRpc expected = ExecuteRpc.from(AbstractRpcTest.TEST_RPC_ID,
+ AbstractRpcTest.makeRPCInput("serialization-test"));
+
+ ExecuteRpc actual = (ExecuteRpc) SerializationUtils.clone(expected);
+
+ assertEquals("getRpc", expected.getRpc(), actual.getRpc());
+ assertEquals("getInputNormalizedNode", expected.getInputNormalizedNode(), actual.getInputNormalizedNode());
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.messages;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.remote.rpc.AbstractRpcTest;
+
+/**
+ * Unit tests for RpcResponse.
+ *
+ * @author Thomas Pantelis
+ */
+public class RpcResponseTest {
+
+ @Test
+ public void testSerialization() {
+ RpcResponse expected = new RpcResponse(AbstractRpcTest.makeRPCOutput("serialization-test"));
+
+ RpcResponse actual = (RpcResponse) SerializationUtils.clone(expected);
+
+ assertEquals("getResultNormalizedNode", expected.getResultNormalizedNode(), actual.getResultNormalizedNode());
+ }
+}