Bug 6911 - RPC support in singleton 74/48974/1
authorRudolf Brisuda <rbrisuda@cisco.com>
Mon, 10 Oct 2016 16:13:33 +0000 (18:13 +0200)
committerJakub Morvay <jmorvay@cisco.com>
Mon, 5 Dec 2016 11:18:59 +0000 (12:18 +0100)
- master invoke RPC and send back to slave.

Change-Id: Ica407f800da3d902f722d835bf6d658163c01bb5
Signed-off-by: Rudolf Brisuda <rbrisuda@cisco.com>
17 files changed:
netconf/netconf-topology-singleton/pom.xml
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/MasterSalFacade.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/ProxyDOMRpcService.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/RemoteOperationTxProcessorImpl.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfProxyDOMTransaction.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/utils/ClusteringRpcException.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/CreateInitialMasterActorData.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/SchemaPathMessage.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/rpc/InvokeRpcMessage.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/rpc/InvokeRpcMessageReply.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/EmptyReadResponse.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/EmptyResultResponse.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeActorTest.java
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/TestingRemoteDeviceConnectorImpl.java
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ReadOnlyTransactionTest.java
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/WriteOnlyTransactionTest.java

index f4f816f2109f6b0bd4bceeb7614a2d8fe519b591..6c6e762c49701ac10a542032ba4bb117b3f61ae3 100644 (file)
             <groupId>org.opendaylight.controller</groupId>
             <artifactId>sal-distributed-datastore</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.opendaylight.yangtools</groupId>
+            <artifactId>yang-data-codec-xml</artifactId>
+        </dependency>
     </dependencies>
 
 </project>
index 8e45cbc37462965dcb324e75c62c7d470723fc9f..e4911acbf72a974251ddcc8f566c1b3c4c5a8286 100644 (file)
@@ -150,8 +150,8 @@ class MasterSalFacade implements AutoCloseable, RemoteDeviceHandler<NetconfSessi
                         .collect(Collectors.toList());
 
         // send initial data to master actor and create actor for providing it
-        return Patterns.ask(masterActorRef, new CreateInitialMasterActorData(deviceDataBroker, sourceIdentifiers),
-                NetconfTopologyUtils.TIMEOUT);
+        return Patterns.ask(masterActorRef, new CreateInitialMasterActorData(deviceDataBroker, sourceIdentifiers,
+                        deviceRpc), NetconfTopologyUtils.TIMEOUT);
     }
 
     private void updateDeviceData() {
index c1c843014b00e395ac8697d77080b958dfbea9fe..c4ee7a7ea197a5be4e3994dcb1bd230225af8538 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
 
 package org.opendaylight.netconf.topology.singleton.impl;
 
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import com.google.common.base.Function;
 import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.Collection;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcAvailabilityListener;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringRpcException;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.SchemaPathMessage;
+import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage;
+import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
 
 public class ProxyDOMRpcService implements DOMRpcService {
 
+    private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
+
+    private final ActorRef masterActorRef;
+    private final ActorSystem actorSystem;
+    private final RemoteDeviceId id;
+
+    public ProxyDOMRpcService(final ActorSystem actorSystem, final ActorRef masterActorRef,
+                              final RemoteDeviceId remoteDeviceId) {
+        this.actorSystem = actorSystem;
+        this.masterActorRef = masterActorRef;
+        id = remoteDeviceId;
+    }
+
     @Nonnull
     @Override
     public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(@Nonnull final SchemaPath type,
                                                                   @Nullable final NormalizedNode<?, ?> input) {
-        throw new UnsupportedOperationException("InvokeRpc: DOMRpc service not working in cluster.");
+        LOG.trace("{}: Rpc operation invoked with schema type: {} and node: {}.", id, type, input);
+
+        final NormalizedNodeMessage normalizedNodeMessage =
+                new NormalizedNodeMessage(YangInstanceIdentifier.EMPTY, input);
+        final Future<Object> scalaFuture =
+                Patterns.ask(masterActorRef,
+                        new InvokeRpcMessage(new SchemaPathMessage(type), normalizedNodeMessage),
+                        NetconfTopologyUtils.TIMEOUT);
+
+        final SettableFuture<DOMRpcResult> settableFuture = SettableFuture.create();
+
+        final CheckedFuture<DOMRpcResult, DOMRpcException> checkedFuture = Futures.makeChecked(settableFuture,
+                new Function<Exception, DOMRpcException>() {
+
+            @Nullable
+            @Override
+            public DOMRpcException apply(@Nullable final Exception exception) {
+                return new ClusteringRpcException(id + ": Exception during remote rpc invocation.",
+                        exception);
+            }
+        });
+
+        scalaFuture.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(final Throwable failure, final Object success) throws Throwable {
+                if (failure != null) {
+                    settableFuture.setException(failure);
+                    return;
+                }
+                if (success instanceof Throwable) {
+                    settableFuture.setException((Throwable) success);
+                    return;
+                }
+                if (success instanceof EmptyResultResponse || success == null) {
+                    settableFuture.set(null);
+                    return;
+                }
+                final Collection<RpcError> errors = ((InvokeRpcMessageReply) success).getRpcErrors();
+                final NormalizedNodeMessage normalizedNodeMessageResult =
+                        ((InvokeRpcMessageReply) success).getNormalizedNodeMessage();
+                final DOMRpcResult result;
+                if (normalizedNodeMessageResult == null ){
+                    result = new DefaultDOMRpcResult(errors);
+                } else {
+                    if (errors == null) {
+                        result = new DefaultDOMRpcResult(normalizedNodeMessageResult.getNode());
+                    } else {
+                        result = new DefaultDOMRpcResult(normalizedNodeMessageResult.getNode(), errors);
+                    }
+                }
+                settableFuture.set(result);
+            }
+        }, actorSystem.dispatcher());
+
+        return checkedFuture;
+
     }
 
     @Nonnull
     @Override
     public <T extends DOMRpcAvailabilityListener> ListenerRegistration<T> registerRpcListener(
             @Nonnull final T listener) {
+        // NOOP, only proxy
         throw new UnsupportedOperationException("RegisterRpcListener: DOMRpc service not working in cluster.");
     }
 }
index f107966b24ea18bdd034586bb1acfd42f8ba419b..db5e3e91b5a2181a9194fcc760e58e89c711a249 100644 (file)
@@ -23,8 +23,8 @@ import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
 import org.opendaylight.netconf.topology.singleton.api.RemoteOperationTxProcessor;
 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
index 75c7e62e8a3f6d1cbc56346d861575755e0bf2ea..e0fe6cc387ada90a917ae8278a440c1def3caebf 100644 (file)
@@ -17,9 +17,12 @@ import com.google.common.util.concurrent.Futures;
 import java.io.IOException;
 import java.util.List;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
 import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
 import org.opendaylight.netconf.topology.singleton.api.RemoteOperationTxProcessor;
@@ -31,19 +34,25 @@ import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySet
 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
 import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
+import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage;
+import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.TransactionRequest;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
 import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
@@ -67,6 +76,7 @@ public class NetconfNodeActor extends UntypedActor {
 
     private RemoteOperationTxProcessor operationsProcessor;
     private List<SourceIdentifier> sourceIdentifiers;
+    private DOMRpcService deviceRpc;
     private SlaveSalFacade slaveSalManager;
 
     public static Props props(final NetconfTopologySetup setup,
@@ -93,6 +103,8 @@ public class NetconfNodeActor extends UntypedActor {
             operationsProcessor =
                     new RemoteOperationTxProcessorImpl(((CreateInitialMasterActorData) message).getDeviceDataBroker(),
                             id);
+            this.deviceRpc = ((CreateInitialMasterActorData) message).getDeviceRpc();
+
             sender().tell(new MasterActorDataInitialized(), self());
 
             LOG.debug("{}: Master is ready.", id);
@@ -116,6 +128,11 @@ public class NetconfNodeActor extends UntypedActor {
             final YangTextSchemaSourceRequest yangTextSchemaSourceRequest = (YangTextSchemaSourceRequest) message;
             sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender());
 
+        } else if (message instanceof InvokeRpcMessage) {
+
+            final InvokeRpcMessage invokeRpcMessage = ((InvokeRpcMessage) message);
+            invokeSlaveRpc(invokeRpcMessage.getSchemaPath(), invokeRpcMessage.getNormalizedNodeMessage(), sender());
+
         } else if (message instanceof RegisterMountPoint) { //slaves
 
             sourceIdentifiers = ((RegisterMountPoint) message).getSourceIndentifiers();
@@ -187,7 +204,35 @@ public class NetconfNodeActor extends UntypedActor {
         });
     }
 
-    private void registerSlaveMountPoint(final ActorRef masterReference) {
+    private void invokeSlaveRpc(final SchemaPath schemaPath, final NormalizedNodeMessage normalizedNodeMessage,
+                                final ActorRef recipient) {
+
+        final CheckedFuture<DOMRpcResult, DOMRpcException> rpcResult =
+                deviceRpc.invokeRpc(schemaPath, normalizedNodeMessage.getNode());
+
+        Futures.addCallback(rpcResult, new FutureCallback<DOMRpcResult>() {
+            @Override
+            public void onSuccess(@Nullable final DOMRpcResult domRpcResult) {
+                if (domRpcResult == null) {
+                    recipient.tell(new EmptyResultResponse(), getSender());
+                    return;
+                }
+                NormalizedNodeMessage nodeMessageReply = null;
+                if (domRpcResult.getResult() != null) {
+                    nodeMessageReply = new NormalizedNodeMessage(YangInstanceIdentifier.EMPTY,
+                            domRpcResult.getResult());
+                }
+                recipient.tell(new InvokeRpcMessageReply(nodeMessageReply, domRpcResult.getErrors()), getSelf());
+            }
+
+            @Override
+            public void onFailure(@Nonnull final Throwable throwable) {
+                recipient.tell(throwable, getSelf());
+            }
+        });
+    }
+
+    private void registerSlaveMountPoint(ActorRef masterReference) {
         if (this.slaveSalManager != null) {
             slaveSalManager.close();
         }
@@ -195,7 +240,7 @@ public class NetconfNodeActor extends UntypedActor {
 
         final CheckedFuture<SchemaContext, SchemaResolutionException> remoteSchemaContext =
                 getSchemaContext(masterReference);
-        final DOMRpcService deviceRpc = getDOMRpcService();
+        final DOMRpcService deviceRpc = getDOMRpcService(masterReference);
 
         Futures.addCallback(remoteSchemaContext, new FutureCallback<SchemaContext>() {
             @Override
@@ -211,8 +256,8 @@ public class NetconfNodeActor extends UntypedActor {
         });
     }
 
-    private DOMRpcService getDOMRpcService() {
-        return new ProxyDOMRpcService();
+    private DOMRpcService getDOMRpcService(ActorRef masterReference) {
+        return new ProxyDOMRpcService(setup.getActorSystem(), masterReference, id);
     }
 
     private CheckedFuture<SchemaContext, SchemaResolutionException> getSchemaContext(ActorRef masterReference) {
index e79f83adf67b24064f026b1a1dec2a0f841651a8..26f25fae1d5b296e395d2d9a70c8a184e2a406f9 100644 (file)
@@ -22,7 +22,6 @@ import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
 import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
@@ -30,6 +29,7 @@ import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsR
 import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.slf4j.Logger;
@@ -84,7 +84,6 @@ public class NetconfProxyDOMTransaction implements NetconfDOMTransaction {
                     promise.success(Optional.absent());
                     return;
                 }
-
                 promise.success(Optional.of((NormalizedNodeMessage) success));
             }
         }, actorSystem.dispatcher());
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/utils/ClusteringRpcException.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/utils/ClusteringRpcException.java
new file mode 100644 (file)
index 0000000..01ea920
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.utils;
+
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
+
+public class ClusteringRpcException extends DOMRpcException {
+    public ClusteringRpcException(String message) {
+        super(message);
+    }
+
+    public ClusteringRpcException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}
index bab905674254dac1391730393b3480bcb0e609ff..45fd4df0f6e66f2e63ba8633a6ec2c7d78af8fd5 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.netconf.topology.singleton.messages;
 import java.io.Serializable;
 import java.util.List;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
 
 /**
@@ -21,11 +22,14 @@ public class CreateInitialMasterActorData implements Serializable {
 
     private final DOMDataBroker deviceDataBroker;
     private final List<SourceIdentifier> allSourceIdentifiers;
+    private final DOMRpcService deviceRpc;
 
     public CreateInitialMasterActorData(final DOMDataBroker deviceDataBroker,
-                                        final List<SourceIdentifier> allSourceIdentifiers) {
+                                        final List<SourceIdentifier> allSourceIdentifiers,
+                                        final DOMRpcService deviceRpc) {
         this.deviceDataBroker = deviceDataBroker;
         this.allSourceIdentifiers = allSourceIdentifiers;
+        this.deviceRpc = deviceRpc;
     }
 
     public DOMDataBroker getDeviceDataBroker() {
@@ -35,4 +39,8 @@ public class CreateInitialMasterActorData implements Serializable {
     public List<SourceIdentifier> getSourceIndentifiers() {
         return allSourceIdentifiers;
     }
+
+    public DOMRpcService getDeviceRpc() {
+        return deviceRpc;
+    }
 }
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/SchemaPathMessage.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/SchemaPathMessage.java
new file mode 100644 (file)
index 0000000..036f4e2
--- /dev/null
@@ -0,0 +1,77 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages;
+
+import com.google.common.collect.Iterables;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+public class SchemaPathMessage implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private SchemaPath schemaPath;
+
+    public SchemaPathMessage(final SchemaPath schemaPath) {
+        this.schemaPath = schemaPath;
+    }
+
+    public SchemaPath getSchemaPath() {
+        return schemaPath;
+    }
+
+    private Object writeReplace() {
+        return new Proxy(this);
+    }
+
+    private static class Proxy implements Externalizable {
+        private static final long serialVersionUID = 2L;
+
+        private SchemaPathMessage schemaPathMessage;
+
+        public Proxy() {
+            //due to Externalizable
+        }
+
+        Proxy(final SchemaPathMessage schemaPathMessage) {
+            this.schemaPathMessage = schemaPathMessage;
+        }
+
+        @Override
+        public void writeExternal(final ObjectOutput out) throws IOException {
+            out.writeInt(Iterables.size(schemaPathMessage.getSchemaPath().getPathTowardsRoot()));
+
+            for (final QName qName : schemaPathMessage.getSchemaPath().getPathTowardsRoot()) {
+                out.writeObject(qName);
+            }
+
+            out.writeBoolean(schemaPathMessage.getSchemaPath().isAbsolute());
+        }
+
+        @Override
+        public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+            final int sizePath = in.readInt();
+            final QName[] paths = new QName[sizePath];
+            for (int i = 0; i < sizePath; i++) {
+                paths[i] = (QName) in.readObject();
+            }
+            final boolean absolute = in.readBoolean();
+            schemaPathMessage = new SchemaPathMessage(SchemaPath.create(absolute, paths));
+        }
+
+        private Object readResolve() {
+            return schemaPathMessage;
+        }
+    }
+
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/rpc/InvokeRpcMessage.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/rpc/InvokeRpcMessage.java
new file mode 100644 (file)
index 0000000..f52131e
--- /dev/null
@@ -0,0 +1,76 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages.rpc;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.SchemaPathMessage;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+public class InvokeRpcMessage implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final SchemaPathMessage schemaPathMessage;
+    private final NormalizedNodeMessage normalizedNodeMessage;
+
+    public InvokeRpcMessage(final SchemaPathMessage schemaPathMessage,
+                            final NormalizedNodeMessage normalizedNodeMessage) {
+        this.schemaPathMessage = schemaPathMessage;
+        this.normalizedNodeMessage = normalizedNodeMessage;
+    }
+    private SchemaPathMessage getSchemaPathMessage() {
+        return schemaPathMessage;
+    }
+
+    public SchemaPath getSchemaPath() {
+        return schemaPathMessage.getSchemaPath();
+    }
+
+    public NormalizedNodeMessage getNormalizedNodeMessage() {
+        return normalizedNodeMessage;
+    }
+
+    private Object writeReplace() {
+        return new Proxy(this);
+    }
+
+    private static class Proxy implements Externalizable {
+        private static final long serialVersionUID = 2L;
+
+        private InvokeRpcMessage invokeRpcMessage;
+
+        public Proxy() {
+            //due to Externalizable
+        }
+
+        Proxy(final InvokeRpcMessage invokeRpcMessage) {
+            this.invokeRpcMessage = invokeRpcMessage;
+        }
+
+        @Override
+        public void writeExternal(final ObjectOutput out) throws IOException {
+            out.writeObject(invokeRpcMessage.getSchemaPathMessage());
+            out.writeObject(invokeRpcMessage.getNormalizedNodeMessage());
+        }
+
+        @Override
+        public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+            invokeRpcMessage = new InvokeRpcMessage((SchemaPathMessage) in.readObject(),
+                    (NormalizedNodeMessage) in.readObject());
+        }
+
+        private Object readResolve() {
+            return invokeRpcMessage;
+        }
+    }
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/rpc/InvokeRpcMessageReply.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/rpc/InvokeRpcMessageReply.java
new file mode 100644 (file)
index 0000000..5c60ba0
--- /dev/null
@@ -0,0 +1,83 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages.rpc;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.LinkedList;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.yangtools.yang.common.RpcError;
+
+public class InvokeRpcMessageReply implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final Collection<RpcError> rpcErrors;
+    private final NormalizedNodeMessage normalizedNodeMessage;
+
+    public InvokeRpcMessageReply(final NormalizedNodeMessage normalizedNodeMessage,
+                                 final Collection<RpcError> rpcErrors) {
+        this.normalizedNodeMessage = normalizedNodeMessage;
+        this.rpcErrors = rpcErrors;
+    }
+
+    public NormalizedNodeMessage getNormalizedNodeMessage() {
+        return normalizedNodeMessage;
+    }
+
+    public Collection<RpcError> getRpcErrors() {
+        return rpcErrors;
+    }
+
+    private Object writeReplace() {
+        return new Proxy(this);
+    }
+
+    private static class Proxy implements Externalizable {
+        private static final long serialVersionUID = 2L;
+
+        private InvokeRpcMessageReply invokeRpcMessageReply;
+
+        public Proxy() {
+            //due to Externalizable
+        }
+
+        Proxy(final InvokeRpcMessageReply invokeRpcMessageReply) {
+            this.invokeRpcMessageReply = invokeRpcMessageReply;
+        }
+
+        @Override
+        public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeInt(invokeRpcMessageReply.getRpcErrors().size());
+            for (final RpcError rpcError : invokeRpcMessageReply.getRpcErrors()) {
+                out.writeObject(rpcError);
+            }
+            out.writeObject(invokeRpcMessageReply.getNormalizedNodeMessage());
+        }
+
+        @Override
+        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            final int size = in.readInt();
+            final Collection<RpcError> rpcErrors = new LinkedList<>();
+            for (int i = 0; i < size; i++) {
+                rpcErrors.add((RpcError) in.readObject());
+            }
+            final NormalizedNodeMessage normalizedNodeMessage = (NormalizedNodeMessage) in.readObject();
+            invokeRpcMessageReply = new InvokeRpcMessageReply(normalizedNodeMessage, rpcErrors);
+        }
+
+        private Object readResolve() {
+            return invokeRpcMessageReply;
+        }
+    }
+
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/EmptyResultResponse.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/EmptyResultResponse.java
new file mode 100644 (file)
index 0000000..9d752b8
--- /dev/null
@@ -0,0 +1,19 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages.transactions;
+
+import java.io.Serializable;
+
+/**
+ * Message is sended when RPC result does not exist or is empty
+ */
+public class EmptyResultResponse implements Serializable {
+    private static final long serialVersionUID = 1L;
+}
+
index a77104e28dde8f96b2c1678ea9001bcdc80a933d..63246e095198515b7ba2a6f582191c59e95f18cd 100644 (file)
@@ -9,9 +9,12 @@
 package org.opendaylight.netconf.topology.singleton.impl;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.MockitoAnnotations.initMocks;
 import static org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils.DEFAULT_SCHEMA_REPOSITORY;
 
 import akka.actor.ActorContext;
@@ -34,22 +37,38 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
+import org.mockito.Mock;
 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
 import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
+import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringRpcException;
 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException;
+import org.opendaylight.yangtools.yang.model.repo.api.RevisionSourceIdentifier;
 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceException;
 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
@@ -69,12 +88,15 @@ public class NetconfNodeActorTest {
     private ActorRef masterRef;
     private RemoteDeviceId remoteDeviceId;
 
+    @Mock
+    private DOMRpcService domRpcService;
+
     @Before
     public void setup() throws UnknownHostException {
-
+        initMocks
+                (this);
         remoteDeviceId = new RemoteDeviceId("netconf-topology",
                 new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9999));
-
         final NetconfTopologySetup setup = mock(NetconfTopologySetup.class);
 
         final Props props = NetconfNodeActor.props(setup, remoteDeviceId, DEFAULT_SCHEMA_REPOSITORY,
@@ -100,8 +122,8 @@ public class NetconfNodeActorTest {
         /* Test init master data */
 
         final Future<Object> initialDataToActor =
-                Patterns.ask(masterRef, new CreateInitialMasterActorData(domDataBroker, sourceIdentifiers),
-                        TIMEOUT);
+                Patterns.ask(masterRef, new CreateInitialMasterActorData(domDataBroker, sourceIdentifiers,
+                                domRpcService), TIMEOUT);
 
         final Object success = Await.result(initialDataToActor, TIMEOUT.duration());
         assertTrue(success instanceof MasterActorDataInitialized);
@@ -128,13 +150,13 @@ public class NetconfNodeActorTest {
 
         final DOMDataBroker domDataBroker = mock(DOMDataBroker.class);
         final List<SourceIdentifier> sourceIdentifiers =
-                Lists.newArrayList(SourceIdentifier.create("testID", Optional.absent()));
+                Lists.newArrayList(RevisionSourceIdentifier.create("testID", Optional.absent()));
 
         // init master data
 
         final Future<Object> initialDataToActor =
-                Patterns.ask(masterRef, new CreateInitialMasterActorData(domDataBroker, sourceIdentifiers),
-                        TIMEOUT);
+                Patterns.ask(masterRef, new CreateInitialMasterActorData(domDataBroker, sourceIdentifiers,
+                                domRpcService), TIMEOUT);
 
         final Object successInit = Await.result(initialDataToActor, TIMEOUT.duration());
 
@@ -156,7 +178,7 @@ public class NetconfNodeActorTest {
     @Test
     public void testYangTextSchemaSourceRequestMessage() throws Exception {
         final SchemaRepository schemaRepository = mock(SchemaRepository.class);
-        final SourceIdentifier sourceIdentifier = SourceIdentifier.create("testID", Optional.absent());
+        final SourceIdentifier sourceIdentifier = RevisionSourceIdentifier.create("testID", Optional.absent());
         final Props props = NetconfNodeActor.props(mock(NetconfTopologySetup.class), remoteDeviceId,
                 DEFAULT_SCHEMA_REPOSITORY, schemaRepository);
 
@@ -213,6 +235,96 @@ public class NetconfNodeActorTest {
 
     }
 
+    @Test
+    public void testProxyDOMRpcService() throws Exception {
+
+        final DOMDataBroker domDataBroker = mock(DOMDataBroker.class);
+        final List<SourceIdentifier> sourceIdentifiers =
+                Lists.newArrayList(RevisionSourceIdentifier.create("testID", Optional.absent()));
+
+        // init master data
+
+        final Future<Object> initialDataToActor =
+                Patterns.ask(masterRef, new CreateInitialMasterActorData(domDataBroker, sourceIdentifiers,
+                        domRpcService), TIMEOUT);
+
+        final Object successInit = Await.result(initialDataToActor, TIMEOUT.duration());
+
+        assertTrue(successInit instanceof MasterActorDataInitialized);
+
+        // test if slave get right identifiers from master
+
+        final ProxyDOMRpcService slaveDomRPCService = new ProxyDOMRpcService(system, masterRef, remoteDeviceId);
+
+        final SchemaPath schemaPath = SchemaPath.create(true, QName.create("TestQname"));
+        final NormalizedNode<?, ?> outputNode = ImmutableContainerNodeBuilder.create()
+                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("TestQname")))
+                .withChild(ImmutableNodes.leafNode(QName.create("NodeQname"), "foo")).build();
+        final RpcError rpcError = RpcResultBuilder.newError(RpcError.ErrorType.RPC, null, "Rpc invocation failed.");
+        // EmptyResultResponse message
+
+        doReturn(Futures.immediateCheckedFuture(null)).when(domRpcService).invokeRpc(any(), any());
+
+        final CheckedFuture<DOMRpcResult, DOMRpcException> resultFutureEmpty =
+                slaveDomRPCService.invokeRpc(schemaPath, outputNode);
+
+        final Object resultNull = resultFutureEmpty.checkedGet(2, TimeUnit.SECONDS);
+
+        assertEquals(null, resultNull);
+
+        // InvokeRpcMessageReply message
+
+        doReturn(Futures.immediateCheckedFuture(new DefaultDOMRpcResult(outputNode))).
+                when(domRpcService).invokeRpc(any(), any());
+
+        final CheckedFuture<DOMRpcResult, DOMRpcException> resultFutureNn =
+                slaveDomRPCService.invokeRpc(schemaPath, outputNode);
+
+        final DOMRpcResult resultNn = resultFutureNn.checkedGet(2, TimeUnit.SECONDS);
+
+        assertEquals(outputNode, resultNn.getResult());
+        assertTrue(resultNn.getErrors().isEmpty());
+
+        // InvokeRpcMessageReply message only error
+
+        doReturn(Futures.immediateCheckedFuture(new DefaultDOMRpcResult(rpcError)))
+                .when(domRpcService).invokeRpc(any(), any());
+
+        final CheckedFuture<DOMRpcResult, DOMRpcException> resultFutureError =
+                slaveDomRPCService.invokeRpc(schemaPath, outputNode);
+
+        final DOMRpcResult resultError = resultFutureError.checkedGet(2, TimeUnit.SECONDS);
+
+        assertNull(resultError.getResult());
+        assertEquals(rpcError, resultError.getErrors().iterator().next());
+
+        // InvokeRpcMessageReply message error + result
+
+        doReturn(Futures.immediateCheckedFuture(new DefaultDOMRpcResult(outputNode, rpcError)))
+                .when(domRpcService).invokeRpc(any(), any());
+
+        final CheckedFuture<DOMRpcResult, DOMRpcException> resultFutureOutputError =
+                slaveDomRPCService.invokeRpc(schemaPath, outputNode);
+
+        final DOMRpcResult resultOutputError = resultFutureOutputError.checkedGet(2, TimeUnit.SECONDS);
+
+        assertEquals(outputNode, resultOutputError.getResult());
+        assertEquals(rpcError, resultOutputError.getErrors().iterator().next());
+
+        // Throwable message
+
+        exception.expect(DOMRpcException.class);
+
+        doReturn(Futures.immediateFailedCheckedFuture(new ClusteringRpcException("")))
+                .when(domRpcService).invokeRpc(any(), any());
+
+        final CheckedFuture<DOMRpcResult, DOMRpcException> resultFutureThrowable =
+                slaveDomRPCService.invokeRpc(schemaPath, outputNode);
+
+        resultFutureThrowable.checkedGet(2, TimeUnit.SECONDS);
+
+    }
+
     private String convertStreamToString(java.io.InputStream is) {
         java.util.Scanner s = new java.util.Scanner(is).useDelimiter("\\A");
         return s.hasNext() ? s.next() : "";
index d4b4ec0492789811379d552c96f805d3c72b8e29..780b14e86f58632b0a96198f65862b737ab67c5f 100644 (file)
@@ -38,9 +38,9 @@ class TestingRemoteDeviceConnectorImpl extends RemoteDeviceConnectorImpl {
     @Override
     public NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId, final NetconfNode node,
                                                         final ActorRef deviceContextActorRef) {
+
         final NetconfConnectorDTO connectorDTO = new NetconfConnectorDTO(communicator, salFacade);
         doReturn(Futures.immediateCheckedFuture(null)).when(communicator).initializeRemoteConnection(any(), any());
-
         return connectorDTO;
     }
 
index c491ee6fe4a41023e18e018bbaedac602cbbdbd9..6297efe963641e5534ce55f78e0942ef8b0429c0 100644 (file)
@@ -41,6 +41,7 @@ import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
 import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
 import org.opendaylight.netconf.topology.singleton.impl.NetconfDOMDataBroker;
@@ -74,6 +75,9 @@ public class ReadOnlyTransactionTest {
     @Mock
     private DOMDataReadOnlyTransaction readTx;
 
+    @Mock
+    private DOMRpcService domRpcService;
+
     @Before
     public void setup() throws UnknownHostException {
         initMocks(this);
@@ -252,8 +256,8 @@ public class ReadOnlyTransactionTest {
 
     private void initializeDataTest() throws Exception {
         final Future<Object> initialDataToActor =
-                Patterns.ask(masterRef, new CreateInitialMasterActorData(masterDataBroker, sourceIdentifiers),
-                        TIMEOUT);
+                Patterns.ask(masterRef, new CreateInitialMasterActorData(masterDataBroker, sourceIdentifiers,
+                                domRpcService), TIMEOUT);
 
         final Object success = Await.result(initialDataToActor, TIMEOUT.duration());
 
index c9fa38f4bd6ba06410079732fbcabb0a879fa637..5007657c8a97e282c5cefeab959c6e31ccec6886 100644 (file)
@@ -47,6 +47,7 @@ import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFaile
 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
 import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
 import org.opendaylight.netconf.topology.singleton.impl.NetconfDOMDataBroker;
@@ -80,6 +81,9 @@ public class WriteOnlyTransactionTest {
     @Mock
     private DOMDataWriteTransaction writeTx;
 
+    @Mock
+    private DOMRpcService domRpcService;
+
     @Before
     public void setup() throws UnknownHostException {
         initMocks(this);
@@ -247,8 +251,8 @@ public class WriteOnlyTransactionTest {
 
     private void initializeDataTest() throws Exception {
         final Future<Object> initialDataToActor =
-                Patterns.ask(masterRef, new CreateInitialMasterActorData(masterDataBroker, sourceIdentifiers),
-                        TIMEOUT);
+                Patterns.ask(masterRef, new CreateInitialMasterActorData(masterDataBroker, sourceIdentifiers,
+                                domRpcService), TIMEOUT);
 
         final Object success = Await.result(initialDataToActor, TIMEOUT.duration());