Bug 6911 - RPC support in singleton
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / actors / NetconfNodeActor.java
index 75c7e62e8a3f6d1cbc56346d861575755e0bf2ea..e0fe6cc387ada90a917ae8278a440c1def3caebf 100644 (file)
@@ -17,9 +17,12 @@ import com.google.common.util.concurrent.Futures;
 import java.io.IOException;
 import java.util.List;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
 import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
 import org.opendaylight.netconf.topology.singleton.api.RemoteOperationTxProcessor;
@@ -31,19 +34,25 @@ import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySet
 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
 import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
+import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage;
+import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.TransactionRequest;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
 import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
@@ -67,6 +76,7 @@ public class NetconfNodeActor extends UntypedActor {
 
     private RemoteOperationTxProcessor operationsProcessor;
     private List<SourceIdentifier> sourceIdentifiers;
+    private DOMRpcService deviceRpc;
     private SlaveSalFacade slaveSalManager;
 
     public static Props props(final NetconfTopologySetup setup,
@@ -93,6 +103,8 @@ public class NetconfNodeActor extends UntypedActor {
             operationsProcessor =
                     new RemoteOperationTxProcessorImpl(((CreateInitialMasterActorData) message).getDeviceDataBroker(),
                             id);
+            this.deviceRpc = ((CreateInitialMasterActorData) message).getDeviceRpc();
+
             sender().tell(new MasterActorDataInitialized(), self());
 
             LOG.debug("{}: Master is ready.", id);
@@ -116,6 +128,11 @@ public class NetconfNodeActor extends UntypedActor {
             final YangTextSchemaSourceRequest yangTextSchemaSourceRequest = (YangTextSchemaSourceRequest) message;
             sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender());
 
+        } else if (message instanceof InvokeRpcMessage) {
+
+            final InvokeRpcMessage invokeRpcMessage = ((InvokeRpcMessage) message);
+            invokeSlaveRpc(invokeRpcMessage.getSchemaPath(), invokeRpcMessage.getNormalizedNodeMessage(), sender());
+
         } else if (message instanceof RegisterMountPoint) { //slaves
 
             sourceIdentifiers = ((RegisterMountPoint) message).getSourceIndentifiers();
@@ -187,7 +204,35 @@ public class NetconfNodeActor extends UntypedActor {
         });
     }
 
-    private void registerSlaveMountPoint(final ActorRef masterReference) {
+    private void invokeSlaveRpc(final SchemaPath schemaPath, final NormalizedNodeMessage normalizedNodeMessage,
+                                final ActorRef recipient) {
+
+        final CheckedFuture<DOMRpcResult, DOMRpcException> rpcResult =
+                deviceRpc.invokeRpc(schemaPath, normalizedNodeMessage.getNode());
+
+        Futures.addCallback(rpcResult, new FutureCallback<DOMRpcResult>() {
+            @Override
+            public void onSuccess(@Nullable final DOMRpcResult domRpcResult) {
+                if (domRpcResult == null) {
+                    recipient.tell(new EmptyResultResponse(), getSender());
+                    return;
+                }
+                NormalizedNodeMessage nodeMessageReply = null;
+                if (domRpcResult.getResult() != null) {
+                    nodeMessageReply = new NormalizedNodeMessage(YangInstanceIdentifier.EMPTY,
+                            domRpcResult.getResult());
+                }
+                recipient.tell(new InvokeRpcMessageReply(nodeMessageReply, domRpcResult.getErrors()), getSelf());
+            }
+
+            @Override
+            public void onFailure(@Nonnull final Throwable throwable) {
+                recipient.tell(throwable, getSelf());
+            }
+        });
+    }
+
+    private void registerSlaveMountPoint(ActorRef masterReference) {
         if (this.slaveSalManager != null) {
             slaveSalManager.close();
         }
@@ -195,7 +240,7 @@ public class NetconfNodeActor extends UntypedActor {
 
         final CheckedFuture<SchemaContext, SchemaResolutionException> remoteSchemaContext =
                 getSchemaContext(masterReference);
-        final DOMRpcService deviceRpc = getDOMRpcService();
+        final DOMRpcService deviceRpc = getDOMRpcService(masterReference);
 
         Futures.addCallback(remoteSchemaContext, new FutureCallback<SchemaContext>() {
             @Override
@@ -211,8 +256,8 @@ public class NetconfNodeActor extends UntypedActor {
         });
     }
 
-    private DOMRpcService getDOMRpcService() {
-        return new ProxyDOMRpcService();
+    private DOMRpcService getDOMRpcService(ActorRef masterReference) {
+        return new ProxyDOMRpcService(setup.getActorSystem(), masterReference, id);
     }
 
     private CheckedFuture<SchemaContext, SchemaResolutionException> getSchemaContext(ActorRef masterReference) {