From 3e309063bc2e1a9f66dd4217dd033f4a24cf3d90 Mon Sep 17 00:00:00 2001 From: Rudolf Brisuda Date: Mon, 10 Oct 2016 18:13:33 +0200 Subject: [PATCH] Bug 6911 - RPC support in singleton - master invoke RPC and send back to slave. Change-Id: Ica407f800da3d902f722d835bf6d658163c01bb5 Signed-off-by: Rudolf Brisuda --- netconf/netconf-topology-singleton/pom.xml | 4 + .../singleton/impl/MasterSalFacade.java | 4 +- .../singleton/impl/ProxyDOMRpcService.java | 95 ++++++++++++- .../impl/RemoteOperationTxProcessorImpl.java | 2 +- .../impl/actors/NetconfNodeActor.java | 53 +++++++- .../impl/tx/NetconfProxyDOMTransaction.java | 3 +- .../impl/utils/ClusteringRpcException.java | 21 +++ .../CreateInitialMasterActorData.java | 10 +- .../singleton/messages/SchemaPathMessage.java | 77 +++++++++++ .../messages/rpc/InvokeRpcMessage.java | 76 +++++++++++ .../messages/rpc/InvokeRpcMessageReply.java | 83 ++++++++++++ .../transactions/EmptyReadResponse.java | 1 + .../transactions/EmptyResultResponse.java | 19 +++ .../singleton/impl/NetconfNodeActorTest.java | 128 ++++++++++++++++-- .../TestingRemoteDeviceConnectorImpl.java | 2 +- .../impl/tx/ReadOnlyTransactionTest.java | 8 +- .../impl/tx/WriteOnlyTransactionTest.java | 8 +- 17 files changed, 569 insertions(+), 25 deletions(-) create mode 100644 netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/utils/ClusteringRpcException.java create mode 100644 netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/SchemaPathMessage.java create mode 100644 netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/rpc/InvokeRpcMessage.java create mode 100644 netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/rpc/InvokeRpcMessageReply.java create mode 100644 netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/EmptyResultResponse.java diff --git a/netconf/netconf-topology-singleton/pom.xml b/netconf/netconf-topology-singleton/pom.xml index 861f745e7c..7be0339a40 100644 --- a/netconf/netconf-topology-singleton/pom.xml +++ b/netconf/netconf-topology-singleton/pom.xml @@ -82,6 +82,10 @@ akka-testkit_2.11 test + + org.opendaylight.yangtools + yang-data-codec-xml + diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/MasterSalFacade.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/MasterSalFacade.java index 8e45cbc374..e4911acbf7 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/MasterSalFacade.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/MasterSalFacade.java @@ -150,8 +150,8 @@ class MasterSalFacade implements AutoCloseable, RemoteDeviceHandler invokeRpc(@Nonnull final SchemaPath type, @Nullable final NormalizedNode input) { - throw new UnsupportedOperationException("InvokeRpc: DOMRpc service not working in cluster."); + LOG.trace("{}: Rpc operation invoked with schema type: {} and node: {}.", id, type, input); + + final NormalizedNodeMessage normalizedNodeMessage = + new NormalizedNodeMessage(YangInstanceIdentifier.EMPTY, input); + final Future scalaFuture = + Patterns.ask(masterActorRef, + new InvokeRpcMessage(new SchemaPathMessage(type), normalizedNodeMessage), + NetconfTopologyUtils.TIMEOUT); + + final SettableFuture settableFuture = SettableFuture.create(); + + final CheckedFuture checkedFuture = Futures.makeChecked(settableFuture, + new Function() { + + @Nullable + @Override + public DOMRpcException apply(@Nullable final Exception exception) { + return new ClusteringRpcException(id + ": Exception during remote rpc invocation.", + exception); + } + }); + + scalaFuture.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable failure, final Object success) throws Throwable { + if (failure != null) { + settableFuture.setException(failure); + return; + } + if (success instanceof Throwable) { + settableFuture.setException((Throwable) success); + return; + } + if (success instanceof EmptyResultResponse || success == null) { + settableFuture.set(null); + return; + } + final Collection errors = ((InvokeRpcMessageReply) success).getRpcErrors(); + final NormalizedNodeMessage normalizedNodeMessageResult = + ((InvokeRpcMessageReply) success).getNormalizedNodeMessage(); + final DOMRpcResult result; + if (normalizedNodeMessageResult == null ){ + result = new DefaultDOMRpcResult(errors); + } else { + if (errors == null) { + result = new DefaultDOMRpcResult(normalizedNodeMessageResult.getNode()); + } else { + result = new DefaultDOMRpcResult(normalizedNodeMessageResult.getNode(), errors); + } + } + settableFuture.set(result); + } + }, actorSystem.dispatcher()); + + return checkedFuture; + } @Nonnull @Override public ListenerRegistration registerRpcListener( @Nonnull final T listener) { + // NOOP, only proxy throw new UnsupportedOperationException("RegisterRpcListener: DOMRpc service not working in cluster."); } } diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/RemoteOperationTxProcessorImpl.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/RemoteOperationTxProcessorImpl.java index f107966b24..db5e3e91b5 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/RemoteOperationTxProcessorImpl.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/RemoteOperationTxProcessorImpl.java @@ -23,8 +23,8 @@ import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; import org.opendaylight.netconf.topology.singleton.api.RemoteOperationTxProcessor; import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage; -import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply; import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse; +import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply; import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java index 75c7e62e8a..e0fe6cc387 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java @@ -17,9 +17,12 @@ import com.google.common.util.concurrent.Futures; import java.io.IOException; import java.util.List; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider; import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider; import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; import org.opendaylight.netconf.topology.singleton.api.RemoteOperationTxProcessor; @@ -31,19 +34,25 @@ import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySet import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint; import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData; import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized; +import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage; import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData; import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint; import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint; import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest; +import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage; +import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply; import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest; import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse; import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest; import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest; import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest; import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest; import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest; import org.opendaylight.netconf.topology.singleton.messages.transactions.TransactionRequest; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory; import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository; import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException; @@ -67,6 +76,7 @@ public class NetconfNodeActor extends UntypedActor { private RemoteOperationTxProcessor operationsProcessor; private List sourceIdentifiers; + private DOMRpcService deviceRpc; private SlaveSalFacade slaveSalManager; public static Props props(final NetconfTopologySetup setup, @@ -93,6 +103,8 @@ public class NetconfNodeActor extends UntypedActor { operationsProcessor = new RemoteOperationTxProcessorImpl(((CreateInitialMasterActorData) message).getDeviceDataBroker(), id); + this.deviceRpc = ((CreateInitialMasterActorData) message).getDeviceRpc(); + sender().tell(new MasterActorDataInitialized(), self()); LOG.debug("{}: Master is ready.", id); @@ -116,6 +128,11 @@ public class NetconfNodeActor extends UntypedActor { final YangTextSchemaSourceRequest yangTextSchemaSourceRequest = (YangTextSchemaSourceRequest) message; sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender()); + } else if (message instanceof InvokeRpcMessage) { + + final InvokeRpcMessage invokeRpcMessage = ((InvokeRpcMessage) message); + invokeSlaveRpc(invokeRpcMessage.getSchemaPath(), invokeRpcMessage.getNormalizedNodeMessage(), sender()); + } else if (message instanceof RegisterMountPoint) { //slaves sourceIdentifiers = ((RegisterMountPoint) message).getSourceIndentifiers(); @@ -187,7 +204,35 @@ public class NetconfNodeActor extends UntypedActor { }); } - private void registerSlaveMountPoint(final ActorRef masterReference) { + private void invokeSlaveRpc(final SchemaPath schemaPath, final NormalizedNodeMessage normalizedNodeMessage, + final ActorRef recipient) { + + final CheckedFuture rpcResult = + deviceRpc.invokeRpc(schemaPath, normalizedNodeMessage.getNode()); + + Futures.addCallback(rpcResult, new FutureCallback() { + @Override + public void onSuccess(@Nullable final DOMRpcResult domRpcResult) { + if (domRpcResult == null) { + recipient.tell(new EmptyResultResponse(), getSender()); + return; + } + NormalizedNodeMessage nodeMessageReply = null; + if (domRpcResult.getResult() != null) { + nodeMessageReply = new NormalizedNodeMessage(YangInstanceIdentifier.EMPTY, + domRpcResult.getResult()); + } + recipient.tell(new InvokeRpcMessageReply(nodeMessageReply, domRpcResult.getErrors()), getSelf()); + } + + @Override + public void onFailure(@Nonnull final Throwable throwable) { + recipient.tell(throwable, getSelf()); + } + }); + } + + private void registerSlaveMountPoint(ActorRef masterReference) { if (this.slaveSalManager != null) { slaveSalManager.close(); } @@ -195,7 +240,7 @@ public class NetconfNodeActor extends UntypedActor { final CheckedFuture remoteSchemaContext = getSchemaContext(masterReference); - final DOMRpcService deviceRpc = getDOMRpcService(); + final DOMRpcService deviceRpc = getDOMRpcService(masterReference); Futures.addCallback(remoteSchemaContext, new FutureCallback() { @Override @@ -211,8 +256,8 @@ public class NetconfNodeActor extends UntypedActor { }); } - private DOMRpcService getDOMRpcService() { - return new ProxyDOMRpcService(); + private DOMRpcService getDOMRpcService(ActorRef masterReference) { + return new ProxyDOMRpcService(setup.getActorSystem(), masterReference, id); } private CheckedFuture getSchemaContext(ActorRef masterReference) { diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfProxyDOMTransaction.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfProxyDOMTransaction.java index 9a23a71da5..cca67466dd 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfProxyDOMTransaction.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfProxyDOMTransaction.java @@ -19,7 +19,6 @@ import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction; import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils; import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage; -import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply; import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest; import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest; import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse; @@ -27,6 +26,7 @@ import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsR import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest; import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest; import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest; +import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply; import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; @@ -81,7 +81,6 @@ public class NetconfProxyDOMTransaction implements NetconfDOMTransaction { promise.success(Optional.absent()); return; } - promise.success(Optional.of((NormalizedNodeMessage) success)); } }, actorSystem.dispatcher()); diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/utils/ClusteringRpcException.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/utils/ClusteringRpcException.java new file mode 100644 index 0000000000..01ea9202e5 --- /dev/null +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/utils/ClusteringRpcException.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.netconf.topology.singleton.impl.utils; + +import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; + +public class ClusteringRpcException extends DOMRpcException { + public ClusteringRpcException(String message) { + super(message); + } + + public ClusteringRpcException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/CreateInitialMasterActorData.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/CreateInitialMasterActorData.java index bab9056742..45fd4df0f6 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/CreateInitialMasterActorData.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/CreateInitialMasterActorData.java @@ -11,6 +11,7 @@ package org.opendaylight.netconf.topology.singleton.messages; import java.io.Serializable; import java.util.List; import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier; /** @@ -21,11 +22,14 @@ public class CreateInitialMasterActorData implements Serializable { private final DOMDataBroker deviceDataBroker; private final List allSourceIdentifiers; + private final DOMRpcService deviceRpc; public CreateInitialMasterActorData(final DOMDataBroker deviceDataBroker, - final List allSourceIdentifiers) { + final List allSourceIdentifiers, + final DOMRpcService deviceRpc) { this.deviceDataBroker = deviceDataBroker; this.allSourceIdentifiers = allSourceIdentifiers; + this.deviceRpc = deviceRpc; } public DOMDataBroker getDeviceDataBroker() { @@ -35,4 +39,8 @@ public class CreateInitialMasterActorData implements Serializable { public List getSourceIndentifiers() { return allSourceIdentifiers; } + + public DOMRpcService getDeviceRpc() { + return deviceRpc; + } } diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/SchemaPathMessage.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/SchemaPathMessage.java new file mode 100644 index 0000000000..036f4e2a02 --- /dev/null +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/SchemaPathMessage.java @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.netconf.topology.singleton.messages; + +import com.google.common.collect.Iterables; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.Serializable; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; + +public class SchemaPathMessage implements Serializable { + private static final long serialVersionUID = 1L; + + private SchemaPath schemaPath; + + public SchemaPathMessage(final SchemaPath schemaPath) { + this.schemaPath = schemaPath; + } + + public SchemaPath getSchemaPath() { + return schemaPath; + } + + private Object writeReplace() { + return new Proxy(this); + } + + private static class Proxy implements Externalizable { + private static final long serialVersionUID = 2L; + + private SchemaPathMessage schemaPathMessage; + + public Proxy() { + //due to Externalizable + } + + Proxy(final SchemaPathMessage schemaPathMessage) { + this.schemaPathMessage = schemaPathMessage; + } + + @Override + public void writeExternal(final ObjectOutput out) throws IOException { + out.writeInt(Iterables.size(schemaPathMessage.getSchemaPath().getPathTowardsRoot())); + + for (final QName qName : schemaPathMessage.getSchemaPath().getPathTowardsRoot()) { + out.writeObject(qName); + } + + out.writeBoolean(schemaPathMessage.getSchemaPath().isAbsolute()); + } + + @Override + public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { + final int sizePath = in.readInt(); + final QName[] paths = new QName[sizePath]; + for (int i = 0; i < sizePath; i++) { + paths[i] = (QName) in.readObject(); + } + final boolean absolute = in.readBoolean(); + schemaPathMessage = new SchemaPathMessage(SchemaPath.create(absolute, paths)); + } + + private Object readResolve() { + return schemaPathMessage; + } + } + +} diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/rpc/InvokeRpcMessage.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/rpc/InvokeRpcMessage.java new file mode 100644 index 0000000000..f52131e17b --- /dev/null +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/rpc/InvokeRpcMessage.java @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.netconf.topology.singleton.messages.rpc; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.Serializable; +import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage; +import org.opendaylight.netconf.topology.singleton.messages.SchemaPathMessage; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; + +public class InvokeRpcMessage implements Serializable { + private static final long serialVersionUID = 1L; + + private final SchemaPathMessage schemaPathMessage; + private final NormalizedNodeMessage normalizedNodeMessage; + + public InvokeRpcMessage(final SchemaPathMessage schemaPathMessage, + final NormalizedNodeMessage normalizedNodeMessage) { + this.schemaPathMessage = schemaPathMessage; + this.normalizedNodeMessage = normalizedNodeMessage; + } + private SchemaPathMessage getSchemaPathMessage() { + return schemaPathMessage; + } + + public SchemaPath getSchemaPath() { + return schemaPathMessage.getSchemaPath(); + } + + public NormalizedNodeMessage getNormalizedNodeMessage() { + return normalizedNodeMessage; + } + + private Object writeReplace() { + return new Proxy(this); + } + + private static class Proxy implements Externalizable { + private static final long serialVersionUID = 2L; + + private InvokeRpcMessage invokeRpcMessage; + + public Proxy() { + //due to Externalizable + } + + Proxy(final InvokeRpcMessage invokeRpcMessage) { + this.invokeRpcMessage = invokeRpcMessage; + } + + @Override + public void writeExternal(final ObjectOutput out) throws IOException { + out.writeObject(invokeRpcMessage.getSchemaPathMessage()); + out.writeObject(invokeRpcMessage.getNormalizedNodeMessage()); + } + + @Override + public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { + invokeRpcMessage = new InvokeRpcMessage((SchemaPathMessage) in.readObject(), + (NormalizedNodeMessage) in.readObject()); + } + + private Object readResolve() { + return invokeRpcMessage; + } + } +} diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/rpc/InvokeRpcMessageReply.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/rpc/InvokeRpcMessageReply.java new file mode 100644 index 0000000000..5c60ba03fb --- /dev/null +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/rpc/InvokeRpcMessageReply.java @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.netconf.topology.singleton.messages.rpc; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.Serializable; +import java.util.Collection; +import java.util.LinkedList; +import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage; +import org.opendaylight.yangtools.yang.common.RpcError; + +public class InvokeRpcMessageReply implements Serializable { + private static final long serialVersionUID = 1L; + + private final Collection rpcErrors; + private final NormalizedNodeMessage normalizedNodeMessage; + + public InvokeRpcMessageReply(final NormalizedNodeMessage normalizedNodeMessage, + final Collection rpcErrors) { + this.normalizedNodeMessage = normalizedNodeMessage; + this.rpcErrors = rpcErrors; + } + + public NormalizedNodeMessage getNormalizedNodeMessage() { + return normalizedNodeMessage; + } + + public Collection getRpcErrors() { + return rpcErrors; + } + + private Object writeReplace() { + return new Proxy(this); + } + + private static class Proxy implements Externalizable { + private static final long serialVersionUID = 2L; + + private InvokeRpcMessageReply invokeRpcMessageReply; + + public Proxy() { + //due to Externalizable + } + + Proxy(final InvokeRpcMessageReply invokeRpcMessageReply) { + this.invokeRpcMessageReply = invokeRpcMessageReply; + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(invokeRpcMessageReply.getRpcErrors().size()); + for (final RpcError rpcError : invokeRpcMessageReply.getRpcErrors()) { + out.writeObject(rpcError); + } + out.writeObject(invokeRpcMessageReply.getNormalizedNodeMessage()); + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + final int size = in.readInt(); + final Collection rpcErrors = new LinkedList<>(); + for (int i = 0; i < size; i++) { + rpcErrors.add((RpcError) in.readObject()); + } + final NormalizedNodeMessage normalizedNodeMessage = (NormalizedNodeMessage) in.readObject(); + invokeRpcMessageReply = new InvokeRpcMessageReply(normalizedNodeMessage, rpcErrors); + } + + private Object readResolve() { + return invokeRpcMessageReply; + } + } + +} diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/EmptyReadResponse.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/EmptyReadResponse.java index 6e1e1bdc9f..046ea2e017 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/EmptyReadResponse.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/EmptyReadResponse.java @@ -15,4 +15,5 @@ import java.io.Serializable; */ public class EmptyReadResponse implements Serializable { private static final long serialVersionUID = 1L; + } diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/EmptyResultResponse.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/EmptyResultResponse.java new file mode 100644 index 0000000000..9d752b8059 --- /dev/null +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/EmptyResultResponse.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.netconf.topology.singleton.messages.transactions; + +import java.io.Serializable; + +/** + * Message is sended when RPC result does not exist or is empty + */ +public class EmptyResultResponse implements Serializable { + private static final long serialVersionUID = 1L; +} + diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeActorTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeActorTest.java index a77104e28d..63246e0951 100644 --- a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeActorTest.java +++ b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeActorTest.java @@ -9,9 +9,12 @@ package org.opendaylight.netconf.topology.singleton.impl; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.MockitoAnnotations.initMocks; import static org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils.DEFAULT_SCHEMA_REPOSITORY; import akka.actor.ActorContext; @@ -34,22 +37,38 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.List; +import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.mockito.Mock; import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy; import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; +import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult; import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor; +import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringRpcException; import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup; import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint; import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData; import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized; import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData; import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.common.RpcResultBuilder; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException; +import org.opendaylight.yangtools.yang.model.repo.api.RevisionSourceIdentifier; import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository; import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceException; import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier; @@ -69,12 +88,15 @@ public class NetconfNodeActorTest { private ActorRef masterRef; private RemoteDeviceId remoteDeviceId; + @Mock + private DOMRpcService domRpcService; + @Before public void setup() throws UnknownHostException { - + initMocks + (this); remoteDeviceId = new RemoteDeviceId("netconf-topology", new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9999)); - final NetconfTopologySetup setup = mock(NetconfTopologySetup.class); final Props props = NetconfNodeActor.props(setup, remoteDeviceId, DEFAULT_SCHEMA_REPOSITORY, @@ -100,8 +122,8 @@ public class NetconfNodeActorTest { /* Test init master data */ final Future initialDataToActor = - Patterns.ask(masterRef, new CreateInitialMasterActorData(domDataBroker, sourceIdentifiers), - TIMEOUT); + Patterns.ask(masterRef, new CreateInitialMasterActorData(domDataBroker, sourceIdentifiers, + domRpcService), TIMEOUT); final Object success = Await.result(initialDataToActor, TIMEOUT.duration()); assertTrue(success instanceof MasterActorDataInitialized); @@ -128,13 +150,13 @@ public class NetconfNodeActorTest { final DOMDataBroker domDataBroker = mock(DOMDataBroker.class); final List sourceIdentifiers = - Lists.newArrayList(SourceIdentifier.create("testID", Optional.absent())); + Lists.newArrayList(RevisionSourceIdentifier.create("testID", Optional.absent())); // init master data final Future initialDataToActor = - Patterns.ask(masterRef, new CreateInitialMasterActorData(domDataBroker, sourceIdentifiers), - TIMEOUT); + Patterns.ask(masterRef, new CreateInitialMasterActorData(domDataBroker, sourceIdentifiers, + domRpcService), TIMEOUT); final Object successInit = Await.result(initialDataToActor, TIMEOUT.duration()); @@ -156,7 +178,7 @@ public class NetconfNodeActorTest { @Test public void testYangTextSchemaSourceRequestMessage() throws Exception { final SchemaRepository schemaRepository = mock(SchemaRepository.class); - final SourceIdentifier sourceIdentifier = SourceIdentifier.create("testID", Optional.absent()); + final SourceIdentifier sourceIdentifier = RevisionSourceIdentifier.create("testID", Optional.absent()); final Props props = NetconfNodeActor.props(mock(NetconfTopologySetup.class), remoteDeviceId, DEFAULT_SCHEMA_REPOSITORY, schemaRepository); @@ -213,6 +235,96 @@ public class NetconfNodeActorTest { } + @Test + public void testProxyDOMRpcService() throws Exception { + + final DOMDataBroker domDataBroker = mock(DOMDataBroker.class); + final List sourceIdentifiers = + Lists.newArrayList(RevisionSourceIdentifier.create("testID", Optional.absent())); + + // init master data + + final Future initialDataToActor = + Patterns.ask(masterRef, new CreateInitialMasterActorData(domDataBroker, sourceIdentifiers, + domRpcService), TIMEOUT); + + final Object successInit = Await.result(initialDataToActor, TIMEOUT.duration()); + + assertTrue(successInit instanceof MasterActorDataInitialized); + + // test if slave get right identifiers from master + + final ProxyDOMRpcService slaveDomRPCService = new ProxyDOMRpcService(system, masterRef, remoteDeviceId); + + final SchemaPath schemaPath = SchemaPath.create(true, QName.create("TestQname")); + final NormalizedNode outputNode = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("TestQname"))) + .withChild(ImmutableNodes.leafNode(QName.create("NodeQname"), "foo")).build(); + final RpcError rpcError = RpcResultBuilder.newError(RpcError.ErrorType.RPC, null, "Rpc invocation failed."); + // EmptyResultResponse message + + doReturn(Futures.immediateCheckedFuture(null)).when(domRpcService).invokeRpc(any(), any()); + + final CheckedFuture resultFutureEmpty = + slaveDomRPCService.invokeRpc(schemaPath, outputNode); + + final Object resultNull = resultFutureEmpty.checkedGet(2, TimeUnit.SECONDS); + + assertEquals(null, resultNull); + + // InvokeRpcMessageReply message + + doReturn(Futures.immediateCheckedFuture(new DefaultDOMRpcResult(outputNode))). + when(domRpcService).invokeRpc(any(), any()); + + final CheckedFuture resultFutureNn = + slaveDomRPCService.invokeRpc(schemaPath, outputNode); + + final DOMRpcResult resultNn = resultFutureNn.checkedGet(2, TimeUnit.SECONDS); + + assertEquals(outputNode, resultNn.getResult()); + assertTrue(resultNn.getErrors().isEmpty()); + + // InvokeRpcMessageReply message only error + + doReturn(Futures.immediateCheckedFuture(new DefaultDOMRpcResult(rpcError))) + .when(domRpcService).invokeRpc(any(), any()); + + final CheckedFuture resultFutureError = + slaveDomRPCService.invokeRpc(schemaPath, outputNode); + + final DOMRpcResult resultError = resultFutureError.checkedGet(2, TimeUnit.SECONDS); + + assertNull(resultError.getResult()); + assertEquals(rpcError, resultError.getErrors().iterator().next()); + + // InvokeRpcMessageReply message error + result + + doReturn(Futures.immediateCheckedFuture(new DefaultDOMRpcResult(outputNode, rpcError))) + .when(domRpcService).invokeRpc(any(), any()); + + final CheckedFuture resultFutureOutputError = + slaveDomRPCService.invokeRpc(schemaPath, outputNode); + + final DOMRpcResult resultOutputError = resultFutureOutputError.checkedGet(2, TimeUnit.SECONDS); + + assertEquals(outputNode, resultOutputError.getResult()); + assertEquals(rpcError, resultOutputError.getErrors().iterator().next()); + + // Throwable message + + exception.expect(DOMRpcException.class); + + doReturn(Futures.immediateFailedCheckedFuture(new ClusteringRpcException(""))) + .when(domRpcService).invokeRpc(any(), any()); + + final CheckedFuture resultFutureThrowable = + slaveDomRPCService.invokeRpc(schemaPath, outputNode); + + resultFutureThrowable.checkedGet(2, TimeUnit.SECONDS); + + } + private String convertStreamToString(java.io.InputStream is) { java.util.Scanner s = new java.util.Scanner(is).useDelimiter("\\A"); return s.hasNext() ? s.next() : ""; diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/TestingRemoteDeviceConnectorImpl.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/TestingRemoteDeviceConnectorImpl.java index d4b4ec0492..780b14e86f 100644 --- a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/TestingRemoteDeviceConnectorImpl.java +++ b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/TestingRemoteDeviceConnectorImpl.java @@ -38,9 +38,9 @@ class TestingRemoteDeviceConnectorImpl extends RemoteDeviceConnectorImpl { @Override public NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId, final NetconfNode node, final ActorRef deviceContextActorRef) { + final NetconfConnectorDTO connectorDTO = new NetconfConnectorDTO(communicator, salFacade); doReturn(Futures.immediateCheckedFuture(null)).when(communicator).initializeRemoteConnection(any(), any()); - return connectorDTO; } diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ReadOnlyTransactionTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ReadOnlyTransactionTest.java index c491ee6fe4..6297efe963 100644 --- a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ReadOnlyTransactionTest.java +++ b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ReadOnlyTransactionTest.java @@ -41,6 +41,7 @@ import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction; import org.opendaylight.netconf.topology.singleton.impl.NetconfDOMDataBroker; @@ -74,6 +75,9 @@ public class ReadOnlyTransactionTest { @Mock private DOMDataReadOnlyTransaction readTx; + @Mock + private DOMRpcService domRpcService; + @Before public void setup() throws UnknownHostException { initMocks(this); @@ -252,8 +256,8 @@ public class ReadOnlyTransactionTest { private void initializeDataTest() throws Exception { final Future initialDataToActor = - Patterns.ask(masterRef, new CreateInitialMasterActorData(masterDataBroker, sourceIdentifiers), - TIMEOUT); + Patterns.ask(masterRef, new CreateInitialMasterActorData(masterDataBroker, sourceIdentifiers, + domRpcService), TIMEOUT); final Object success = Await.result(initialDataToActor, TIMEOUT.duration()); diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/WriteOnlyTransactionTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/WriteOnlyTransactionTest.java index c9fa38f4bd..5007657c8a 100644 --- a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/WriteOnlyTransactionTest.java +++ b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/WriteOnlyTransactionTest.java @@ -47,6 +47,7 @@ import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFaile import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction; import org.opendaylight.netconf.topology.singleton.impl.NetconfDOMDataBroker; @@ -80,6 +81,9 @@ public class WriteOnlyTransactionTest { @Mock private DOMDataWriteTransaction writeTx; + @Mock + private DOMRpcService domRpcService; + @Before public void setup() throws UnknownHostException { initMocks(this); @@ -247,8 +251,8 @@ public class WriteOnlyTransactionTest { private void initializeDataTest() throws Exception { final Future initialDataToActor = - Patterns.ask(masterRef, new CreateInitialMasterActorData(masterDataBroker, sourceIdentifiers), - TIMEOUT); + Patterns.ask(masterRef, new CreateInitialMasterActorData(masterDataBroker, sourceIdentifiers, + domRpcService), TIMEOUT); final Object success = Await.result(initialDataToActor, TIMEOUT.duration()); -- 2.36.6