Add slave/master end-to-end test 17/72117/8
authorTom Pantelis <tompantelis@gmail.com>
Mon, 21 May 2018 14:26:52 +0000 (10:26 -0400)
committerTom Pantelis <tompantelis@gmail.com>
Thu, 24 May 2018 03:16:29 +0000 (03:16 +0000)
Added a MountPointEndToEndTest that tests master and slave
mount point operations end-to-end with as little mocking as
possible. Hooks were added in NetconfTopologyContext and MasterSalFacade
for the MountPointEndToEndTest to get a hold of the MasterSalFacade
to invoke it manually to simulate backend device connectivity and to
inject the backend DOMDataBroker.

Change-Id: I5cc59987570ba4e99980af4e2f4c5adf47b6a81e
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
13 files changed:
netconf/netconf-topology-singleton/pom.xml
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/api/RemoteDeviceConnector.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/MasterSalFacade.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeManager.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfTopologyContext.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/ProxyDOMRpcService.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/RemoteDeviceConnectorImpl.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/utils/ClusteringRpcException.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/rpc/InvokeRpcMessage.java
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/MountPointEndToEndTest.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/RemoteDeviceConnectorImplTest.java
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/TestingRemoteDeviceConnectorImpl.java

index 9f4fa8f7edd8fa3403dcd44121887d4fe2fcfc5c..625e3c30a7409f85c1197ea6afda8c23d6a3e3c2 100644 (file)
             <groupId>org.opendaylight.mdsal.model</groupId>
             <artifactId>ietf-topology</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.opendaylight.mdsal</groupId>
+            <artifactId>mdsal-eos-dom-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.opendaylight.controller</groupId>
             <artifactId>sal-binding-broker-impl</artifactId>
             <scope>test</scope>
             <type>test-jar</type>
         </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal-test-model</artifactId>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.awaitility</groupId>
             <artifactId>awaitility</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+            <scope>test</scope>
+        </dependency>
 
         <dependency>
             <groupId>org.mockito</groupId>
index 4e9cc8924e4bfdcc384764449a6f4446808c84b8..69e2dd20bd494660d3b40f3703824f1189e28a05 100644 (file)
@@ -8,7 +8,8 @@
 
 package org.opendaylight.netconf.topology.singleton.api;
 
-import akka.actor.ActorRef;
+import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
 
 /**
  * Provides API for connection odl (master) with device.
@@ -17,9 +18,8 @@ public interface RemoteDeviceConnector {
 
     /**
      * Create device communicator and open device connection.
-     * @param masterActorRef master actor reference
      */
-    void startRemoteDeviceConnection(ActorRef masterActorRef);
+    void startRemoteDeviceConnection(RemoteDeviceHandler<NetconfSessionPreferences> deviceHandler);
 
     /**
      * Stop device communicator.
index 1ebbad07001604248f0d4778b30669baa973d6a5..ac77b7b32adbd6687aa0392ffb9aaa2762e70b33 100644 (file)
@@ -67,24 +67,26 @@ class MasterSalFacade implements AutoCloseable, RemoteDeviceHandler<NetconfSessi
     }
 
     @Override
-    @SuppressWarnings("checkstyle:hiddenField")
     public void onDeviceConnected(final SchemaContext remoteSchemaContext,
-                                  final NetconfSessionPreferences netconfSessionPreferences,
-                                  final DOMRpcService deviceRpc) {
+                                  final NetconfSessionPreferences sessionPreferences,
+                                  final DOMRpcService domRpcService) {
         this.currentSchemaContext = remoteSchemaContext;
-        this.netconfSessionPreferences = netconfSessionPreferences;
-        this.deviceRpc = deviceRpc;
+        this.netconfSessionPreferences = sessionPreferences;
+        this.deviceRpc = domRpcService;
+
+        LOG.info("Device {} connected - registering master mount point", id);
 
         registerMasterMountPoint();
 
         sendInitialDataToActor().onComplete(new OnComplete<Object>() {
             @Override
-            public void onComplete(final Throwable failure, final Object success) throws Throwable {
+            public void onComplete(final Throwable failure, final Object success) {
                 if (failure == null) {
                     updateDeviceData();
                     return;
                 }
-                throw failure;
+
+                LOG.error("{}: CreateInitialMasterActorData to {} failed", id, masterActorRef, failure);
             }
         }, actorSystem.dispatcher());
 
@@ -92,6 +94,7 @@ class MasterSalFacade implements AutoCloseable, RemoteDeviceHandler<NetconfSessi
 
     @Override
     public void onDeviceDisconnected() {
+        LOG.info("Device {} disconnected - unregistering master mount point", id);
         salProvider.getTopologyDatastoreAdapter().updateDeviceData(false, new NetconfDeviceCapabilities());
         unregisterMasterMountPoint();
     }
@@ -122,9 +125,8 @@ class MasterSalFacade implements AutoCloseable, RemoteDeviceHandler<NetconfSessi
 
         final NetconfDeviceNotificationService notificationService = new NetconfDeviceNotificationService();
 
-        LOG.info("{}: Creating master data broker for device", id);
+        deviceDataBroker = newDeviceDataBroker();
 
-        deviceDataBroker = new NetconfDeviceDataBroker(id, currentSchemaContext, deviceRpc, netconfSessionPreferences);
         // We need to create ProxyDOMDataBroker so accessing mountpoint
         // on leader node would be same as on follower node
         final ProxyDOMDataBroker proxyDataBroker =
@@ -133,20 +135,28 @@ class MasterSalFacade implements AutoCloseable, RemoteDeviceHandler<NetconfSessi
                 .onTopologyDeviceConnected(currentSchemaContext, proxyDataBroker, deviceRpc, notificationService);
     }
 
+    protected DOMDataBroker newDeviceDataBroker() {
+        return new NetconfDeviceDataBroker(id, currentSchemaContext, deviceRpc, netconfSessionPreferences);
+    }
+
     private Future<Object> sendInitialDataToActor() {
         final List<SourceIdentifier> sourceIdentifiers =
                 SchemaContextUtil.getConstituentModuleIdentifiers(currentSchemaContext).stream()
                 .map(mi -> RevisionSourceIdentifier.create(mi.getName(), mi.getRevision()))
                 .collect(Collectors.toList());
 
-        // send initial data to master actor and create actor for providing it
+        LOG.debug("{}: Sending CreateInitialMasterActorData with sourceIdentifiers {} to {}",
+                id, sourceIdentifiers, masterActorRef);
+
+        // send initial data to master actor
         return Patterns.ask(masterActorRef, new CreateInitialMasterActorData(deviceDataBroker, sourceIdentifiers,
                 deviceRpc), actorResponseWaitTime);
     }
 
     private void updateDeviceData() {
-        final Cluster cluster = Cluster.get(actorSystem);
-        salProvider.getTopologyDatastoreAdapter().updateClusteredDeviceData(true, cluster.selfAddress().toString(),
+        final String masterAddress = Cluster.get(actorSystem).selfAddress().toString();
+        LOG.debug("{}: updateDeviceData with master address {}", id, masterAddress);
+        salProvider.getTopologyDatastoreAdapter().updateClusteredDeviceData(true, masterAddress,
                 netconfSessionPreferences.getNetconfDeviceCapabilities());
     }
 
index 529a538fe76b5f7f86fe45a2ecb5aef8c26c98c3..b749d011ad4db3ec53a5b1ead10349b14cab351e 100644 (file)
@@ -38,6 +38,7 @@ import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
 import org.slf4j.Logger;
@@ -145,10 +146,10 @@ class NetconfNodeManager
     }
 
     void registerDataTreeChangeListener(final String topologyId, final NodeKey key) {
-        LOG.debug("{}: Registering data tree change listener on node {}", id, key);
+        final InstanceIdentifier<Node> path = NetconfTopologyUtils.createTopologyNodeListPath(key, topologyId);
+        LOG.debug("{}: Registering data tree change listener on path {}", id, path);
         dataChangeListenerRegistration = setup.getDataBroker().registerDataTreeChangeListener(
-                new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL,
-                        NetconfTopologyUtils.createTopologyNodeListPath(key, topologyId)), this);
+                new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, path), this);
     }
 
     private synchronized void handleSlaveMountPoint(final DataObjectModification<Node> rootNode) {
index d8df4c36f5f97a4672e9563cce20a011c2710872..2560d7bd2f031758b44a1b71eef3336ba894a0e9 100644 (file)
@@ -62,8 +62,7 @@ class NetconfTopologyContext implements ClusterSingletonService, AutoCloseable {
         remoteDeviceId = NetconfTopologyUtils.createRemoteDeviceId(netconfTopologyDeviceSetup.getNode().getNodeId(),
                 netconfTopologyDeviceSetup.getNode().getAugmentation(NetconfNode.class));
 
-        remoteDeviceConnector = new RemoteDeviceConnectorImpl(netconfTopologyDeviceSetup, remoteDeviceId,
-                actorResponseWaitTime, mountService);
+        remoteDeviceConnector = new RemoteDeviceConnectorImpl(netconfTopologyDeviceSetup, remoteDeviceId);
 
         netconfNodeManager = createNodeDeviceManager();
     }
@@ -88,7 +87,7 @@ class NetconfTopologyContext implements ClusterSingletonService, AutoCloseable {
                     actorResponseWaitTime, mountService),
                     NetconfTopologyUtils.createMasterActorName(remoteDeviceId.getName(), masterAddress));
 
-            remoteDeviceConnector.startRemoteDeviceConnection(masterActorRef);
+            remoteDeviceConnector.startRemoteDeviceConnection(newMasterSalFacade());
         }
 
     }
@@ -148,8 +147,7 @@ class NetconfTopologyContext implements ClusterSingletonService, AutoCloseable {
         if (!isMaster) {
             netconfNodeManager.refreshDevice(netconfTopologyDeviceSetup, remoteDeviceId);
         }
-        remoteDeviceConnector = new RemoteDeviceConnectorImpl(netconfTopologyDeviceSetup, remoteDeviceId,
-                actorResponseWaitTime, mountService);
+        remoteDeviceConnector = new RemoteDeviceConnectorImpl(netconfTopologyDeviceSetup, remoteDeviceId);
 
         if (isMaster) {
             final Future<Object> future = Patterns.ask(masterActorRef, new RefreshSetupMasterActorData(
@@ -162,7 +160,7 @@ class NetconfTopologyContext implements ClusterSingletonService, AutoCloseable {
                         LOG.error("Failed to refresh master actor data: {}", failure);
                         return;
                     }
-                    remoteDeviceConnector.startRemoteDeviceConnection(masterActorRef);
+                    remoteDeviceConnector.startRemoteDeviceConnection(newMasterSalFacade());
                 }
             }, netconfTopologyDeviceSetup.getActorSystem().dispatcher());
         }
@@ -181,4 +179,9 @@ class NetconfTopologyContext implements ClusterSingletonService, AutoCloseable {
             masterActorRef = null;
         }
     }
+
+    protected MasterSalFacade newMasterSalFacade() {
+        return new MasterSalFacade(remoteDeviceId, netconfTopologyDeviceSetup.getActorSystem(), masterActorRef,
+                actorResponseWaitTime, mountService, netconfTopologyDeviceSetup.getDataBroker());
+    }
 }
index f972e4568454e32207e24980d97aa2f6ab6db527..b6e56fbef4e1b867aa02d88bba327a9615f39267 100644 (file)
@@ -32,6 +32,7 @@ import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage
 import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.util.concurrent.ExceptionMapper;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -44,6 +45,14 @@ public class ProxyDOMRpcService implements DOMRpcService {
 
     private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
 
+    private final ExceptionMapper<DOMRpcException> domRpcExceptionMapper =
+        new ExceptionMapper<DOMRpcException>("invokeRpc", DOMRpcException.class) {
+            @Override
+            protected DOMRpcException newWithCause(String message, Throwable cause) {
+                return new ClusteringRpcException(id + ": Exception during remote rpc invocation.", cause);
+            }
+        };
+
     private final ActorRef masterActorRef;
     private final ActorSystem actorSystem;
     private final RemoteDeviceId id;
@@ -63,8 +72,8 @@ public class ProxyDOMRpcService implements DOMRpcService {
                                                                   @Nullable final NormalizedNode<?, ?> input) {
         LOG.trace("{}: Rpc operation invoked with schema type: {} and node: {}.", id, type, input);
 
-        final NormalizedNodeMessage normalizedNodeMessage =
-                new NormalizedNodeMessage(YangInstanceIdentifier.EMPTY, input);
+        final NormalizedNodeMessage normalizedNodeMessage = input != null
+                ? new NormalizedNodeMessage(YangInstanceIdentifier.EMPTY, input) : null;
         final Future<Object> scalaFuture = Patterns.ask(masterActorRef,
                 new InvokeRpcMessage(new SchemaPathMessage(type), normalizedNodeMessage), actorResponseWaitTime);
 
@@ -96,8 +105,7 @@ public class ProxyDOMRpcService implements DOMRpcService {
             }
         }, actorSystem.dispatcher());
 
-        return Futures.makeChecked(settableFuture,
-            ex -> new ClusteringRpcException(id + ": Exception during remote rpc invocation.", ex));
+        return Futures.makeChecked(settableFuture, domRpcExceptionMapper);
     }
 
     @Nonnull
index e4fd08b77f268c2586206b1388587db3e383a3cf..eb99f5cf52c77e8913ba0583b516a5d1894c9042 100644 (file)
@@ -8,8 +8,6 @@
 
 package org.opendaylight.netconf.topology.singleton.impl;
 
-import akka.actor.ActorRef;
-import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -28,7 +26,6 @@ import java.util.Objects;
 import java.util.Optional;
 import javax.annotation.Nullable;
 import org.opendaylight.aaa.encrypt.AAAEncryptionService;
-import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
 import org.opendaylight.netconf.api.NetconfMessage;
 import org.opendaylight.netconf.client.NetconfClientSessionListener;
 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
@@ -88,8 +85,6 @@ public class RemoteDeviceConnectorImpl implements RemoteDeviceConnector {
 
     private final NetconfTopologySetup netconfTopologyDeviceSetup;
     private final RemoteDeviceId remoteDeviceId;
-    private final DOMMountPointService mountService;
-    private final Timeout actorResponseWaitTime;
     private final String privateKeyPath;
     private final String privateKeyPassphrase;
     private final AAAEncryptionService encryptionService;
@@ -97,13 +92,10 @@ public class RemoteDeviceConnectorImpl implements RemoteDeviceConnector {
     private final NetconfKeystoreAdapter keystoreAdapter;
 
     public RemoteDeviceConnectorImpl(final NetconfTopologySetup netconfTopologyDeviceSetup,
-                                     final RemoteDeviceId remoteDeviceId, final Timeout actorResponseWaitTime,
-                                     final DOMMountPointService mountService) {
+                                     final RemoteDeviceId remoteDeviceId) {
 
         this.netconfTopologyDeviceSetup = Preconditions.checkNotNull(netconfTopologyDeviceSetup);
         this.remoteDeviceId = remoteDeviceId;
-        this.actorResponseWaitTime = actorResponseWaitTime;
-        this.mountService = mountService;
         this.privateKeyPath = netconfTopologyDeviceSetup.getPrivateKeyPath();
         this.privateKeyPassphrase = netconfTopologyDeviceSetup.getPrivateKeyPassphrase();
         this.encryptionService = netconfTopologyDeviceSetup.getEncryptionService();
@@ -111,7 +103,7 @@ public class RemoteDeviceConnectorImpl implements RemoteDeviceConnector {
     }
 
     @Override
-    public void startRemoteDeviceConnection(final ActorRef deviceContextActorRef) {
+    public void startRemoteDeviceConnection(final RemoteDeviceHandler<NetconfSessionPreferences> deviceHandler) {
 
         final NetconfNode netconfNode = netconfTopologyDeviceSetup.getNode().getAugmentation(NetconfNode.class);
         final NodeId nodeId = netconfTopologyDeviceSetup.getNode().getNodeId();
@@ -119,7 +111,7 @@ public class RemoteDeviceConnectorImpl implements RemoteDeviceConnector {
         Preconditions.checkNotNull(netconfNode.getPort());
         Preconditions.checkNotNull(netconfNode.isTcpOnly());
 
-        this.deviceCommunicatorDTO = createDeviceCommunicator(nodeId, netconfNode, deviceContextActorRef);
+        this.deviceCommunicatorDTO = createDeviceCommunicator(nodeId, netconfNode, deviceHandler);
         final NetconfDeviceCommunicator deviceCommunicator = deviceCommunicatorDTO.getCommunicator();
         final NetconfClientSessionListener netconfClientSessionListener = deviceCommunicatorDTO.getSessionListener();
         final NetconfReconnectingClientConfiguration clientConfig =
@@ -143,17 +135,18 @@ public class RemoteDeviceConnectorImpl implements RemoteDeviceConnector {
     @SuppressWarnings("checkstyle:IllegalCatch")
     @Override
     public void stopRemoteDeviceConnection() {
-        Preconditions.checkNotNull(deviceCommunicatorDTO, remoteDeviceId + ": Device communicator was not created.");
-        try {
-            deviceCommunicatorDTO.close();
-        } catch (final Exception e) {
-            LOG.error("{}: Error at closing device communicator.", remoteDeviceId, e);
+        if (deviceCommunicatorDTO != null) {
+            try {
+                deviceCommunicatorDTO.close();
+            } catch (final Exception e) {
+                LOG.error("{}: Error at closing device communicator.", remoteDeviceId, e);
+            }
         }
     }
 
     @VisibleForTesting
     NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId, final NetconfNode node,
-                                                 final ActorRef deviceContextActorRef) {
+                                                 final RemoteDeviceHandler<NetconfSessionPreferences> deviceHandler) {
         //setup default values since default value is not supported in mdsal
         final long defaultRequestTimeoutMillis = node.getDefaultRequestTimeoutMillis() == null
                 ? NetconfTopologyUtils.DEFAULT_REQUEST_TIMEOUT_MILLIS : node.getDefaultRequestTimeoutMillis();
@@ -162,9 +155,7 @@ public class RemoteDeviceConnectorImpl implements RemoteDeviceConnector {
         final boolean reconnectOnChangedSchema = node.isReconnectOnChangedSchema() == null
                 ? NetconfTopologyUtils.DEFAULT_RECONNECT_ON_CHANGED_SCHEMA : node.isReconnectOnChangedSchema();
 
-        RemoteDeviceHandler<NetconfSessionPreferences> salFacade = new MasterSalFacade(remoteDeviceId,
-                netconfTopologyDeviceSetup.getActorSystem(), deviceContextActorRef, actorResponseWaitTime,
-                mountService, netconfTopologyDeviceSetup.getDataBroker());
+        RemoteDeviceHandler<NetconfSessionPreferences> salFacade = deviceHandler;
         if (keepaliveDelay > 0) {
             LOG.info("{}: Adding keepalive facade.", remoteDeviceId);
             salFacade = new KeepaliveSalFacade(remoteDeviceId, salFacade,
@@ -174,7 +165,6 @@ public class RemoteDeviceConnectorImpl implements RemoteDeviceConnector {
 
         final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO = netconfTopologyDeviceSetup.getSchemaResourcesDTO();
 
-
         // pre register yang library sources as fallback schemas to schema registry
         final List<SchemaSourceRegistration<YangTextSchemaSource>> registeredYangLibSources = Lists.newArrayList();
         if (node.getYangLibrary() != null) {
index 2af981783e4e0882dae9bf28e55c5c44adc4e625..67d0b52a7c9bb668181aa3da72a6f6d020c46255 100644 (file)
@@ -239,12 +239,17 @@ public class NetconfNodeActor extends AbstractUntypedActor {
     private void invokeSlaveRpc(final SchemaPath schemaPath, final NormalizedNodeMessage normalizedNodeMessage,
                                 final ActorRef recipient) {
 
-        final CheckedFuture<DOMRpcResult, DOMRpcException> rpcResult =
-                deviceRpc.invokeRpc(schemaPath, normalizedNodeMessage.getNode());
+        LOG.debug("{}: invokeSlaveRpc for {}, input: {} on rpc service {}", id, schemaPath, normalizedNodeMessage,
+                deviceRpc);
+
+        final CheckedFuture<DOMRpcResult, DOMRpcException> rpcResult = deviceRpc.invokeRpc(schemaPath,
+                normalizedNodeMessage != null ? normalizedNodeMessage.getNode() : null);
 
         Futures.addCallback(rpcResult, new FutureCallback<DOMRpcResult>() {
             @Override
             public void onSuccess(@Nullable final DOMRpcResult domRpcResult) {
+                LOG.debug("{}: invokeSlaveRpc for {}, domRpcResult: {}", id, schemaPath, domRpcResult);
+
                 if (domRpcResult == null) {
                     recipient.tell(new EmptyResultResponse(), getSender());
                     return;
index 01ea9202e51e9485369f3fc72c4900e9ba4178b2..fc0405b83778f243f288fefc4b2cedd849afc13b 100644 (file)
@@ -11,6 +11,8 @@ package org.opendaylight.netconf.topology.singleton.impl.utils;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
 
 public class ClusteringRpcException extends DOMRpcException {
+    private static final long serialVersionUID = 1L;
+
     public ClusteringRpcException(String message) {
         super(message);
     }
index e8ef986f36a8c7de1094fe460fc3ed77f857abdf..2cea27172ae590005890df55f634992503ae0b31 100644 (file)
@@ -13,6 +13,7 @@ import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.io.Serializable;
+import javax.annotation.Nullable;
 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
 import org.opendaylight.netconf.topology.singleton.messages.SchemaPathMessage;
 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
@@ -24,7 +25,7 @@ public class InvokeRpcMessage implements Serializable {
     private final NormalizedNodeMessage normalizedNodeMessage;
 
     public InvokeRpcMessage(final SchemaPathMessage schemaPathMessage,
-                            final NormalizedNodeMessage normalizedNodeMessage) {
+                            @Nullable final NormalizedNodeMessage normalizedNodeMessage) {
         this.schemaPathMessage = schemaPathMessage;
         this.normalizedNodeMessage = normalizedNodeMessage;
     }
@@ -37,6 +38,7 @@ public class InvokeRpcMessage implements Serializable {
         return schemaPathMessage.getSchemaPath();
     }
 
+    @Nullable
     public NormalizedNodeMessage getNormalizedNodeMessage() {
         return normalizedNodeMessage;
     }
diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/MountPointEndToEndTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/MountPointEndToEndTest.java
new file mode 100644 (file)
index 0000000..b19a2cc
--- /dev/null
@@ -0,0 +1,702 @@
+/*
+ * Copyright (c) 2018 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.topology.singleton.impl;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+import akka.actor.ActorSystem;
+import akka.testkit.javadsl.TestKit;
+import akka.util.Timeout;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
+import com.typesafe.config.ConfigFactory;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.GlobalEventExecutor;
+import io.netty.util.concurrent.SucceededFuture;
+import java.io.File;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.opendaylight.aaa.encrypt.AAAEncryptionService;
+import org.opendaylight.controller.cluster.ActorSystemProvider;
+import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
+import org.opendaylight.controller.config.threadpool.ThreadPool;
+import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.binding.impl.BindingToNormalizedNodeCodec;
+import org.opendaylight.controller.md.sal.binding.test.AbstractConcurrentDataBrokerTest;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementation;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.md.sal.dom.api.DOMService;
+import org.opendaylight.controller.md.sal.dom.broker.impl.DOMRpcRouter;
+import org.opendaylight.controller.md.sal.dom.broker.impl.mount.DOMMountPointServiceImpl;
+import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult;
+import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.controller.sal.core.api.mount.MountProvisionListener;
+import org.opendaylight.mdsal.binding.generator.impl.ModuleInfoBackedContext;
+import org.opendaylight.mdsal.eos.dom.simple.SimpleDOMEntityOwnershipService;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
+import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
+import org.opendaylight.mdsal.singleton.dom.impl.DOMClusterSingletonServiceProviderImpl;
+import org.opendaylight.netconf.client.NetconfClientDispatcher;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
+import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringRpcException;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Host;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4Address;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.PortNumber;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.keystore.rev171017.Keystore;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.credentials.LoginPwUnencryptedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.credentials.login.pw.unencrypted.LoginPasswordUnencryptedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.Config;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.ConfigBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.GetTopOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.PutTopInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.Top;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.two.level.list.TopLevelList;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.two.level.list.TopLevelListBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.YangModuleInfo;
+import org.opendaylight.yangtools.yang.binding.util.BindingReflections;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.Module;
+import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.opendaylight.yangtools.yang.model.repo.api.RevisionSourceIdentifier;
+import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
+import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests netconf mount points end-to-end.
+ *
+ * @author Thomas Pantelis
+ */
+public class MountPointEndToEndTest {
+    private static Logger LOG = LoggerFactory.getLogger(MountPointEndToEndTest.class);
+
+    private static final String TOP_MODULE_NAME = "opendaylight-mdsal-list-test";
+    private static final String ACTOR_SYSTEM_NAME = "test";
+    private static final String TOPOLOGY_ID = TopologyNetconf.QNAME.getLocalName();
+    private static final NodeId NODE_ID = new NodeId("node-id");
+    private static final InstanceIdentifier<Node> NODE_INSTANCE_ID = NetconfTopologyUtils.createTopologyNodeListPath(
+            new NodeKey(NODE_ID), TOPOLOGY_ID);
+
+    @Mock private RpcProviderRegistry mockRpcProviderRegistry;
+    @Mock private NetconfClientDispatcher mockClientDispatcher;
+    @Mock private AAAEncryptionService mockEncryptionService;
+    @Mock private ThreadPool mockThreadPool;
+    @Mock private ScheduledThreadPool mockKeepaliveExecutor;
+
+    @Mock private ActorSystemProvider mockMasterActorSystemProvider;
+    @Mock private MountProvisionListener masterMountPointListener;
+    private final DOMMountPointService masterMountPointService = new DOMMountPointServiceImpl();
+    private final DOMRpcRouter deviceRpcService = new DOMRpcRouter();
+    private DOMClusterSingletonServiceProviderImpl masterClusterSingletonServiceProvider;
+    private DataBroker masterDataBroker;
+    private DOMDataBroker deviceDOMDataBroker;
+    private ActorSystem masterSystem;
+    private NetconfTopologyManager masterNetconfTopologyManager;
+    private volatile SettableFuture<MasterSalFacade> masterSalFacadeFuture = SettableFuture.create();
+
+    @Mock private ActorSystemProvider mockSlaveActorSystemProvider;
+    @Mock private ClusterSingletonServiceProvider mockSlaveClusterSingletonServiceProvider;
+    @Mock private ClusterSingletonServiceRegistration mockSlaveClusterSingletonServiceReg;
+    @Mock private MountProvisionListener slaveMountPointListener;
+    private final DOMMountPointService slaveMountPointService = new DOMMountPointServiceImpl();
+    private DataBroker slaveDataBroker;
+    private ActorSystem slaveSystem;
+    private NetconfTopologyManager slaveNetconfTopologyManager;
+    private final SettableFuture<NetconfTopologyContext> slaveNetconfTopologyContextFuture = SettableFuture.create();
+    private BindingTransactionChain slaveTxChain;
+
+    private final EventExecutor eventExecutor = GlobalEventExecutor.INSTANCE;
+    private final Config config = new ConfigBuilder().setWriteTransactionIdleTimeout(0).build();
+    private SchemaContext deviceSchemaContext;
+    private YangModuleInfo topModuleInfo;
+    private SchemaPath putTopRpcSchemaPath;
+    private SchemaPath getTopRpcSchemaPath;
+    private BindingToNormalizedNodeCodec bindingToNormalized;
+    private YangInstanceIdentifier yangNodeInstanceId;
+    private final TopDOMRpcImplementation topRpcImplementation = new TopDOMRpcImplementation();
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    @Before
+    public void setUp() throws Exception {
+        initMocks(this);
+
+        deleteCacheDir();
+
+        topModuleInfo = BindingReflections.getModuleInfo(Top.class);
+
+        final ModuleInfoBackedContext moduleContext = ModuleInfoBackedContext.create();
+        moduleContext.addModuleInfos(Arrays.asList(topModuleInfo));
+        deviceSchemaContext = moduleContext.tryToCreateSchemaContext().get();
+
+        deviceRpcService.onGlobalContextUpdated(deviceSchemaContext);
+
+        putTopRpcSchemaPath = findRpcDefinition("put-top").getPath();
+        getTopRpcSchemaPath = findRpcDefinition("get-top").getPath();
+
+        deviceRpcService.registerRpcImplementation(topRpcImplementation,
+                DOMRpcIdentifier.create(putTopRpcSchemaPath), DOMRpcIdentifier.create(getTopRpcSchemaPath));
+
+        setupMaster();
+
+        setupSlave();
+
+        yangNodeInstanceId = bindingToNormalized.toYangInstanceIdentifier(NODE_INSTANCE_ID);
+
+        doReturn(new SucceededFuture(GlobalEventExecutor.INSTANCE, null)).when(mockClientDispatcher)
+                .createReconnectingClient(any());
+
+        LOG.info("****** Setup complete");
+    }
+
+    private void deleteCacheDir() {
+        FileUtils.deleteQuietly(new File(NetconfTopologyUtils.CACHE_DIRECTORY));
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        deleteCacheDir();
+        TestKit.shutdownActorSystem(slaveSystem, Boolean.TRUE);
+        TestKit.shutdownActorSystem(masterSystem, Boolean.TRUE);
+    }
+
+    private void setupMaster() throws Exception {
+        AbstractConcurrentDataBrokerTest dataBrokerTest = newDataBrokerTest();
+        masterDataBroker = dataBrokerTest.getDataBroker();
+        deviceDOMDataBroker = dataBrokerTest.getDomBroker();
+        bindingToNormalized = dataBrokerTest.getDataBrokerTestCustomizer().getBindingToNormalized();
+
+        masterSystem = ActorSystem.create(ACTOR_SYSTEM_NAME, ConfigFactory.load().getConfig("Master"));
+
+        masterClusterSingletonServiceProvider = new DOMClusterSingletonServiceProviderImpl(
+                new SimpleDOMEntityOwnershipService());
+        masterClusterSingletonServiceProvider.initializeProvider();
+
+        doReturn(masterSystem).when(mockMasterActorSystemProvider).getActorSystem();
+
+        doReturn(MoreExecutors.newDirectExecutorService()).when(mockThreadPool).getExecutor();
+
+        NetconfTopologyUtils.DEFAULT_SCHEMA_REPOSITORY.registerSchemaSource(
+            id -> Futures.immediateFuture(YangTextSchemaSource.delegateForByteSource(id,
+                    topModuleInfo.getYangTextByteSource())),
+            PotentialSchemaSource.create(RevisionSourceIdentifier.create(TOP_MODULE_NAME,
+                    topModuleInfo.getName().getRevision()), YangTextSchemaSource.class, 1));
+
+        masterNetconfTopologyManager = new NetconfTopologyManager(masterDataBroker, mockRpcProviderRegistry,
+                masterClusterSingletonServiceProvider, mockKeepaliveExecutor, mockThreadPool,
+                mockMasterActorSystemProvider, eventExecutor, mockClientDispatcher, TOPOLOGY_ID, config,
+                masterMountPointService, mockEncryptionService) {
+            @Override
+            protected NetconfTopologyContext newNetconfTopologyContext(NetconfTopologySetup setup,
+                    ServiceGroupIdentifier serviceGroupIdent, Timeout actorResponseWaitTime) {
+                NetconfTopologyContext context =
+                        super.newNetconfTopologyContext(setup, serviceGroupIdent, actorResponseWaitTime);
+                NetconfTopologyContext spiedContext = spy(context);
+                doAnswer(invocation -> {
+                    final MasterSalFacade spiedFacade = (MasterSalFacade) spy(invocation.callRealMethod());
+                    doReturn(deviceDOMDataBroker).when(spiedFacade).newDeviceDataBroker();
+                    masterSalFacadeFuture.set(spiedFacade);
+                    return spiedFacade;
+                }).when(spiedContext).newMasterSalFacade();
+
+                return spiedContext;
+            }
+        };
+
+        masterNetconfTopologyManager.init();
+
+        verifyTopologyNodesCreated(masterDataBroker);
+    }
+
+    private void setupSlave() throws Exception {
+        AbstractConcurrentDataBrokerTest dataBrokerTest = newDataBrokerTest();
+        slaveDataBroker = dataBrokerTest.getDataBroker();
+
+        slaveSystem = ActorSystem.create(ACTOR_SYSTEM_NAME, ConfigFactory.load().getConfig("Slave"));
+
+        doReturn(slaveSystem).when(mockSlaveActorSystemProvider).getActorSystem();
+
+        doReturn(mockSlaveClusterSingletonServiceReg).when(mockSlaveClusterSingletonServiceProvider)
+                .registerClusterSingletonService(any());
+
+        slaveNetconfTopologyManager = new NetconfTopologyManager(slaveDataBroker, mockRpcProviderRegistry,
+                mockSlaveClusterSingletonServiceProvider, mockKeepaliveExecutor, mockThreadPool,
+                mockSlaveActorSystemProvider, eventExecutor, mockClientDispatcher, TOPOLOGY_ID, config,
+                slaveMountPointService, mockEncryptionService)  {
+            @Override
+            protected NetconfTopologyContext newNetconfTopologyContext(NetconfTopologySetup setup,
+                    ServiceGroupIdentifier serviceGroupIdent, Timeout actorResponseWaitTime) {
+                NetconfTopologyContext spiedContext =
+                        spy(super.newNetconfTopologyContext(setup, serviceGroupIdent, actorResponseWaitTime));
+                slaveNetconfTopologyContextFuture.set(spiedContext);
+                return spiedContext;
+            }
+        };
+
+        slaveNetconfTopologyManager.init();
+
+        verifyTopologyNodesCreated(slaveDataBroker);
+
+        slaveTxChain = slaveDataBroker.createTransactionChain(new TransactionChainListener() {
+            @Override
+            public void onTransactionChainSuccessful(TransactionChain<?, ?> chain) {
+            }
+
+            @Override
+            public void onTransactionChainFailed(TransactionChain<?, ?> chain, AsyncTransaction<?, ?> transaction,
+                    Throwable cause) {
+                LOG.error("Slave transaction chain failed", cause);
+            }
+        });
+    }
+
+    @Test
+    public void test() throws Exception {
+        testMaster();
+
+        testSlave();
+
+        final MasterSalFacade masterSalFacade = testMasterNodeUpdated();
+
+        testMasterDisconnected(masterSalFacade);
+
+        testCleanup();
+    }
+
+    private MasterSalFacade testMaster() throws InterruptedException, ExecutionException, TimeoutException {
+        LOG.info("****** Testing master");
+
+        writeNetconfNode(NetconfTopologyUtils.DEFAULT_CACHE_DIRECTORY, masterDataBroker);
+
+        final MasterSalFacade masterSalFacade = masterSalFacadeFuture.get(5, TimeUnit.SECONDS);
+
+        masterSalFacade.onDeviceConnected(deviceSchemaContext,
+                NetconfSessionPreferences.fromStrings(Collections.emptyList()), deviceRpcService);
+
+        DOMMountPoint masterMountPoint = awaitMountPoint(masterMountPointService);
+
+        LOG.info("****** Testing master DOMDataBroker operations");
+
+        testDOMDataBrokerOperations(getDOMDataBroker(masterMountPoint));
+
+        LOG.info("****** Testing master DOMRpcService");
+
+        testDOMRpcService(getDOMRpcService(masterMountPoint));
+        return masterSalFacade;
+    }
+
+    private void testSlave() throws InterruptedException, ExecutionException, TimeoutException {
+        LOG.info("****** Testing slave");
+
+        writeNetconfNode("slave", slaveDataBroker);
+
+        verify(mockSlaveClusterSingletonServiceProvider, timeout(5000)).registerClusterSingletonService(any());
+
+        // Since the master and slave use separate DataBrokers we need to copy the master's oper node to the slave.
+        // This is essentially what happens in a clustered environment but we'll use a DTCL here.
+
+        masterDataBroker.registerDataTreeChangeListener(
+            new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, NODE_INSTANCE_ID), changes -> {
+                final WriteTransaction slaveTx = slaveTxChain.newWriteOnlyTransaction();
+                for (DataTreeModification<Node> dataTreeModification : changes) {
+                    DataObjectModification<Node> rootNode = dataTreeModification.getRootNode();
+                    InstanceIdentifier<Node> path = dataTreeModification.getRootPath().getRootIdentifier();
+                    switch (rootNode.getModificationType()) {
+                        case WRITE:
+                        case SUBTREE_MODIFIED:
+                            slaveTx.merge(LogicalDatastoreType.OPERATIONAL, path, rootNode.getDataAfter());
+                            break;
+                        case DELETE:
+                            slaveTx.delete(LogicalDatastoreType.OPERATIONAL, path);
+                            break;
+                        default:
+                            break;
+                    }
+                }
+
+                slaveTx.commit();
+            });
+
+        DOMMountPoint slaveMountPoint = awaitMountPoint(slaveMountPointService);
+
+        final NetconfTopologyContext slaveNetconfTopologyContext =
+                slaveNetconfTopologyContextFuture.get(5, TimeUnit.SECONDS);
+        verify(slaveNetconfTopologyContext, never()).newMasterSalFacade();
+
+        LOG.info("****** Testing slave DOMDataBroker operations");
+
+        testDOMDataBrokerOperations(getDOMDataBroker(slaveMountPoint));
+
+        LOG.info("****** Testing slave DOMRpcService");
+
+        testDOMRpcService(getDOMRpcService(slaveMountPoint));
+    }
+
+    private MasterSalFacade testMasterNodeUpdated() throws InterruptedException, ExecutionException, TimeoutException {
+        LOG.info("****** Testing update master node");
+
+        masterMountPointService.registerProvisionListener(masterMountPointListener);
+        slaveMountPointService.registerProvisionListener(slaveMountPointListener);
+
+        masterSalFacadeFuture = SettableFuture.create();
+        writeNetconfNode(NetconfTopologyUtils.DEFAULT_CACHE_DIRECTORY, masterDataBroker);
+
+        verify(masterMountPointListener, timeout(5000)).onMountPointRemoved(yangNodeInstanceId);
+
+        MasterSalFacade masterSalFacade = masterSalFacadeFuture.get(5, TimeUnit.SECONDS);
+
+        masterSalFacade.onDeviceConnected(deviceSchemaContext,
+                NetconfSessionPreferences.fromStrings(Collections.emptyList()), deviceRpcService);
+
+        verify(masterMountPointListener, timeout(5000)).onMountPointCreated(yangNodeInstanceId);
+
+        verify(slaveMountPointListener, timeout(5000)).onMountPointRemoved(yangNodeInstanceId);
+        verify(slaveMountPointListener, timeout(5000)).onMountPointCreated(yangNodeInstanceId);
+
+        return masterSalFacade;
+    }
+
+    private void testMasterDisconnected(final MasterSalFacade masterSalFacade)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        LOG.info("****** Testing master disconnected");
+
+        masterSalFacade.onDeviceDisconnected();
+
+        awaitMountPointNotPresent(masterMountPointService);
+
+        await().atMost(5, TimeUnit.SECONDS).until(() -> {
+            try (ReadOnlyTransaction readTx = masterDataBroker.newReadOnlyTransaction()) {
+                Optional<Node> node = readTx.read(LogicalDatastoreType.OPERATIONAL,
+                        NODE_INSTANCE_ID).get(5, TimeUnit.SECONDS);
+                assertTrue(node.isPresent());
+                final NetconfNode netconfNode = node.get().getAugmentation(NetconfNode.class);
+                return netconfNode.getConnectionStatus() != NetconfNodeConnectionStatus.ConnectionStatus.Connected;
+            }
+        });
+
+        awaitMountPointNotPresent(slaveMountPointService);
+    }
+
+    private void testCleanup() throws Exception {
+        LOG.info("****** Testing cleanup");
+
+        slaveNetconfTopologyManager.close();
+        verify(mockSlaveClusterSingletonServiceReg).close();
+    }
+
+    private void testDOMRpcService(DOMRpcService domRpcService)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        testPutTopRpc(domRpcService, new DefaultDOMRpcResult((NormalizedNode<?, ?>)null));
+        testPutTopRpc(domRpcService, null);
+        testPutTopRpc(domRpcService, new DefaultDOMRpcResult(ImmutableList.of(
+                RpcResultBuilder.newError(ErrorType.APPLICATION, "tag1", "error1"),
+                RpcResultBuilder.newError(ErrorType.APPLICATION, "tag2", "error2"))));
+
+        testGetTopRpc(domRpcService, new DefaultDOMRpcResult(bindingToNormalized.toNormalizedNodeRpcData(
+                new GetTopOutputBuilder().setTopLevelList(Arrays.asList(new TopLevelListBuilder().setName("one")
+                        .build())).build())));
+
+        testFailedRpc(domRpcService, getTopRpcSchemaPath, null);
+    }
+
+    private void testPutTopRpc(DOMRpcService domRpcService, DOMRpcResult result)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        ContainerNode putTopInput = bindingToNormalized.toNormalizedNodeRpcData(
+                new PutTopInputBuilder().setTopLevelList(Arrays.asList(new TopLevelListBuilder().setName("one")
+                        .build())).build());
+        testRpc(domRpcService, putTopRpcSchemaPath, putTopInput, result);
+    }
+
+    private void testGetTopRpc(DOMRpcService domRpcService, DOMRpcResult result)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        testRpc(domRpcService, getTopRpcSchemaPath, null, result);
+    }
+
+    private void testRpc(DOMRpcService domRpcService, SchemaPath schemaPath, NormalizedNode<?, ?> input,
+            DOMRpcResult result) throws InterruptedException, ExecutionException, TimeoutException {
+        final DOMRpcResult actual = invokeRpc(domRpcService, schemaPath, input, Futures.immediateCheckedFuture(result));
+        if (result == null) {
+            assertNull(actual);
+            return;
+        }
+
+        assertNotNull(actual);
+        assertEquals(result.getResult(), actual.getResult());
+
+        assertEquals(result.getErrors().size(), actual.getErrors().size());
+        Iterator<RpcError> iter1 = result.getErrors().iterator();
+        Iterator<RpcError> iter2 = actual.getErrors().iterator();
+        while (iter1.hasNext() && iter2.hasNext()) {
+            RpcError err1 = iter1.next();
+            RpcError err2 = iter2.next();
+            assertEquals(err1.getErrorType(), err2.getErrorType());
+            assertEquals(err1.getTag(), err2.getTag());
+            assertEquals(err1.getMessage(), err2.getMessage());
+            assertEquals(err1.getSeverity(), err2.getSeverity());
+            assertEquals(err1.getApplicationTag(), err2.getApplicationTag());
+            assertEquals(err1.getInfo(), err2.getInfo());
+        }
+    }
+
+    private void testFailedRpc(DOMRpcService domRpcService, SchemaPath schemaPath, NormalizedNode<?, ?> input)
+            throws InterruptedException, TimeoutException  {
+        try {
+            invokeRpc(domRpcService, schemaPath, input, Futures.immediateFailedCheckedFuture(
+                    new ClusteringRpcException("mock")));
+            fail("Expected exception");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof ClusteringRpcException);
+            assertEquals("mock", e.getCause().getMessage());
+        }
+    }
+
+    private DOMRpcResult invokeRpc(DOMRpcService domRpcService, SchemaPath schemaPath, NormalizedNode<?, ?> input,
+            CheckedFuture<DOMRpcResult, DOMRpcException> returnFuture)
+                    throws InterruptedException, ExecutionException, TimeoutException {
+        topRpcImplementation.init(returnFuture);
+        final ListenableFuture<DOMRpcResult> resultFuture = domRpcService.invokeRpc(schemaPath, input);
+
+        topRpcImplementation.verify(DOMRpcIdentifier.create(schemaPath), input);
+
+        return resultFuture.get(5, TimeUnit.SECONDS);
+    }
+
+    private static void testDOMDataBrokerOperations(DOMDataBroker dataBroker)
+            throws InterruptedException, ExecutionException, TimeoutException {
+
+        DOMDataWriteTransaction writeTx = dataBroker.newWriteOnlyTransaction();
+
+        final ContainerNode topNode = Builders.containerBuilder()
+                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(Top.QNAME)).build();
+        final YangInstanceIdentifier topPath = YangInstanceIdentifier.of(Top.QNAME);
+        writeTx.put(LogicalDatastoreType.CONFIGURATION, topPath, topNode);
+
+        final QName name = QName.create(TopLevelList.QNAME, "name");
+        final YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(topPath)
+                .node(TopLevelList.QNAME).build();
+        final MapEntryNode listEntryNode = ImmutableNodes.mapEntry(TopLevelList.QNAME, name, "one");
+        final MapNode listNode = ImmutableNodes.mapNodeBuilder(TopLevelList.QNAME).addChild(listEntryNode).build();
+        writeTx.merge(LogicalDatastoreType.CONFIGURATION, listPath, listNode);
+        writeTx.commit().get(5, TimeUnit.SECONDS);
+
+        verifyDataInStore(dataBroker.newReadWriteTransaction(), YangInstanceIdentifier.builder(listPath)
+                .nodeWithKey(TopLevelList.QNAME, name, "one").build(), listEntryNode);
+
+        writeTx = dataBroker.newWriteOnlyTransaction();
+        writeTx.delete(LogicalDatastoreType.CONFIGURATION, topPath);
+        writeTx.commit().get(5, TimeUnit.SECONDS);
+
+        DOMDataReadWriteTransaction readTx = dataBroker.newReadWriteTransaction();
+        assertFalse(readTx.exists(LogicalDatastoreType.CONFIGURATION, topPath).get(5, TimeUnit.SECONDS));
+        assertTrue(readTx.cancel());
+    }
+
+    private static void writeNetconfNode(String cacheDir, DataBroker databroker)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        final NetconfNode netconfNode = new NetconfNodeBuilder()
+                .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
+                .setPort(new PortNumber(1234))
+                .setActorResponseWaitTime(10)
+                .setTcpOnly(Boolean.TRUE)
+                .setSchemaless(Boolean.FALSE)
+                .setKeepaliveDelay(0L)
+                .setConnectionTimeoutMillis(5000L)
+                .setDefaultRequestTimeoutMillis(5000L)
+                .setMaxConnectionAttempts(1L)
+                .setCredentials(new LoginPwUnencryptedBuilder().setLoginPasswordUnencrypted(
+                        new LoginPasswordUnencryptedBuilder().setUsername("user").setPassword("pass").build()).build())
+                .setSchemaCacheDirectory(cacheDir)
+                .build();
+        final Node node = new NodeBuilder().setNodeId(NODE_ID).addAugmentation(NetconfNode.class, netconfNode).build();
+
+        final WriteTransaction writeTx = databroker.newWriteOnlyTransaction();
+        writeTx.put(LogicalDatastoreType.CONFIGURATION, NODE_INSTANCE_ID, node);
+        writeTx.commit().get(5, TimeUnit.SECONDS);
+    }
+
+    private static void verifyDataInStore(DOMDataReadTransaction readTx, YangInstanceIdentifier path,
+            NormalizedNode<?, ?> expNode) throws InterruptedException, ExecutionException, TimeoutException {
+        final Optional<NormalizedNode<?, ?>> read = readTx.read(LogicalDatastoreType.CONFIGURATION, path)
+                .get(5, TimeUnit.SECONDS);
+        assertTrue(read.isPresent());
+        assertEquals(expNode, read.get());
+
+        final Boolean exists = readTx.exists(LogicalDatastoreType.CONFIGURATION, path).get(5, TimeUnit.SECONDS);
+        assertTrue(exists);
+    }
+
+    private static void verifyTopologyNodesCreated(DataBroker dataBroker) {
+        await().atMost(5, TimeUnit.SECONDS).until(() -> {
+            try (ReadOnlyTransaction readTx = dataBroker.newReadOnlyTransaction()) {
+                Optional<Topology> configTopology = readTx.read(LogicalDatastoreType.CONFIGURATION,
+                        NetconfTopologyUtils.createTopologyListPath(TOPOLOGY_ID)).get(3, TimeUnit.SECONDS);
+                Optional<Topology> operTopology = readTx.read(LogicalDatastoreType.OPERATIONAL,
+                        NetconfTopologyUtils.createTopologyListPath(TOPOLOGY_ID)).get(3, TimeUnit.SECONDS);
+                return configTopology.isPresent() && operTopology.isPresent();
+            }
+        });
+    }
+
+    private AbstractConcurrentDataBrokerTest newDataBrokerTest() throws Exception {
+        AbstractConcurrentDataBrokerTest dataBrokerTest = new AbstractConcurrentDataBrokerTest(true) {
+            @Override
+            protected Iterable<YangModuleInfo> getModuleInfos() throws Exception {
+                return ImmutableSet.of(BindingReflections.getModuleInfo(NetconfNode.class),
+                        BindingReflections.getModuleInfo(NetworkTopology.class),
+                        BindingReflections.getModuleInfo(Topology.class),
+                        BindingReflections.getModuleInfo(Keystore.class),
+                        topModuleInfo);
+            }
+        };
+
+        dataBrokerTest.setup();
+        return dataBrokerTest;
+    }
+
+    private void awaitMountPointNotPresent(DOMMountPointService mountPointService) {
+        await().atMost(5, TimeUnit.SECONDS).until(
+            () -> !mountPointService.getMountPoint(yangNodeInstanceId).isPresent());
+    }
+
+    private static DOMDataBroker getDOMDataBroker(DOMMountPoint mountPoint) {
+        return getMountPointService(mountPoint, DOMDataBroker.class);
+    }
+
+    private static DOMRpcService getDOMRpcService(DOMMountPoint mountPoint) {
+        return getMountPointService(mountPoint, DOMRpcService.class);
+    }
+
+    private static <T extends DOMService> T getMountPointService(DOMMountPoint mountPoint, Class<T> serviceClass) {
+        final Optional<T> maybeService = mountPoint.getService(serviceClass);
+        assertTrue(maybeService.isPresent());
+        return maybeService.get();
+    }
+
+    private DOMMountPoint awaitMountPoint(DOMMountPointService mountPointService) {
+        await().atMost(5, TimeUnit.SECONDS).until(() -> {
+            return mountPointService.getMountPoint(yangNodeInstanceId).isPresent();
+        });
+
+        return mountPointService.getMountPoint(yangNodeInstanceId).get();
+    }
+
+    private RpcDefinition findRpcDefinition(String rpc) {
+        Module topModule = deviceSchemaContext.findModule(TOP_MODULE_NAME, topModuleInfo.getName().getRevision()).get();
+        RpcDefinition rpcDefinition = null;
+        for (RpcDefinition def: topModule.getRpcs()) {
+            if (def.getQName().getLocalName().equals(rpc)) {
+                rpcDefinition = def;
+                break;
+            }
+        }
+
+        assertNotNull(rpc + " rpc not found in " + topModule.getRpcs(), rpcDefinition);
+        return rpcDefinition;
+    }
+
+    private static class TopDOMRpcImplementation implements DOMRpcImplementation {
+        private volatile SettableFuture<Entry<DOMRpcIdentifier, NormalizedNode<?, ?>>> rpcInvokedFuture;
+        private volatile CheckedFuture<DOMRpcResult, DOMRpcException> returnFuture;
+
+        @Override
+        public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(DOMRpcIdentifier rpc,
+                NormalizedNode<?, ?> input) {
+            rpcInvokedFuture.set(new SimpleEntry<>(rpc, input));
+            return returnFuture;
+        }
+
+        void init(CheckedFuture<DOMRpcResult, DOMRpcException> retFuture) {
+            this.returnFuture = retFuture;
+            rpcInvokedFuture = SettableFuture.create();
+        }
+
+        void verify(DOMRpcIdentifier expRpc, NormalizedNode<?, ?> expInput)
+                throws InterruptedException, ExecutionException, TimeoutException {
+            final Entry<DOMRpcIdentifier, NormalizedNode<?, ?>> actual = rpcInvokedFuture.get(5, TimeUnit.SECONDS);
+            assertEquals(expRpc, actual.getKey());
+            assertEquals(expInput, actual.getValue());
+        }
+    }
+}
index d4432d185d11339474dc36538cd0750b67745473..5249788b4d85dae7330afc5eab3d0f551704965a 100644 (file)
@@ -12,22 +12,20 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.MockitoAnnotations.initMocks;
 
-import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.util.Timeout;
 import com.google.common.net.InetAddresses;
 import com.google.common.util.concurrent.Futures;
 import io.netty.util.concurrent.EventExecutor;
 import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
 import java.util.concurrent.ExecutorService;
 import org.junit.Before;
 import org.junit.Test;
@@ -48,6 +46,7 @@ import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
 import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfiguration;
 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
 import org.opendaylight.netconf.sal.connect.netconf.sal.KeepaliveSalFacade;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfConnectorDTO;
@@ -132,6 +131,7 @@ public class RemoteDeviceConnectorImplTest {
         builder.setTopologyId(TOPOLOGY_ID);
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void testStopRemoteDeviceConnection() {
         final Credentials credentials = new LoginPasswordBuilder()
@@ -152,15 +152,12 @@ public class RemoteDeviceConnectorImplTest {
 
 
         final NetconfDeviceCommunicator communicator = mock(NetconfDeviceCommunicator.class);
-        final RemoteDeviceHandler salFacade = mock(RemoteDeviceHandler.class);
+        final RemoteDeviceHandler<NetconfSessionPreferences> salFacade = mock(RemoteDeviceHandler.class);
 
         final TestingRemoteDeviceConnectorImpl remoteDeviceConnection =
-                new TestingRemoteDeviceConnectorImpl(builder.build(), remoteDeviceId, communicator, salFacade, TIMEOUT,
-                        mountPointService);
+                new TestingRemoteDeviceConnectorImpl(builder.build(), remoteDeviceId, communicator);
 
-        final ActorRef masterRef = mock(ActorRef.class);
-
-        remoteDeviceConnection.startRemoteDeviceConnection(masterRef);
+        remoteDeviceConnection.startRemoteDeviceConnection(salFacade);
 
         remoteDeviceConnection.stopRemoteDeviceConnection();
 
@@ -169,37 +166,7 @@ public class RemoteDeviceConnectorImplTest {
 
     }
 
-    @Test
-    public void testMasterSalFacade() throws UnknownHostException {
-        final ExecutorService executorService = mock(ExecutorService.class);
-        doReturn(executorService).when(processingExecutor).getExecutor();
-
-        final Credentials credentials = new LoginPasswordBuilder()
-                .setPassword("admin").setUsername("admin").build();
-        final NetconfNode netconfNode = new NetconfNodeBuilder()
-                .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
-                .setPort(new PortNumber(9999))
-                .setReconnectOnChangedSchema(true)
-                .setDefaultRequestTimeoutMillis(1000L)
-                .setBetweenAttemptsTimeoutMillis(100)
-                .setSchemaless(false)
-                .setTcpOnly(false)
-                .setCredentials(credentials)
-                .build();
-
-        final Node node = new NodeBuilder().setNodeId(NODE_ID).addAugmentation(NetconfNode.class, netconfNode).build();
-        builder.setSchemaResourceDTO(NetconfTopologyUtils.setupSchemaCacheDTO(node));
-        final RemoteDeviceConnectorImpl remoteDeviceConnection =
-                new RemoteDeviceConnectorImpl(builder.build(), remoteDeviceId, TIMEOUT, mountPointService);
-
-        final ActorRef masterRef = mock(ActorRef.class);
-
-        final NetconfConnectorDTO connectorDTO =
-                remoteDeviceConnection.createDeviceCommunicator(NODE_ID, netconfNode, masterRef);
-
-        assertTrue(connectorDTO.getFacade() instanceof MasterSalFacade);
-    }
-
+    @SuppressWarnings("unchecked")
     @Test
     public void testKeapAliveFacade() {
         final ExecutorService executorService = mock(ExecutorService.class);
@@ -223,12 +190,12 @@ public class RemoteDeviceConnectorImplTest {
         builder.setSchemaResourceDTO(NetconfTopologyUtils.setupSchemaCacheDTO(node));
 
         final RemoteDeviceConnectorImpl remoteDeviceConnection =
-                new RemoteDeviceConnectorImpl(builder.build(), remoteDeviceId, TIMEOUT, mountPointService);
+                new RemoteDeviceConnectorImpl(builder.build(), remoteDeviceId);
 
-        final ActorRef masterRef = mock(ActorRef.class);
+        final RemoteDeviceHandler<NetconfSessionPreferences> salFacade = mock(RemoteDeviceHandler.class);
 
         final NetconfConnectorDTO connectorDTO =
-                remoteDeviceConnection.createDeviceCommunicator(NODE_ID, netconfNode, masterRef);
+                remoteDeviceConnection.createDeviceCommunicator(NODE_ID, netconfNode, salFacade);
 
         assertTrue(connectorDTO.getFacade() instanceof KeepaliveSalFacade);
     }
@@ -250,7 +217,7 @@ public class RemoteDeviceConnectorImplTest {
                 .build();
 
         final RemoteDeviceConnectorImpl remoteDeviceConnection =
-                new RemoteDeviceConnectorImpl(builder.build(), remoteDeviceId, TIMEOUT, mountPointService);
+                new RemoteDeviceConnectorImpl(builder.build(), remoteDeviceId);
 
         final NetconfReconnectingClientConfiguration defaultClientConfig =
                 remoteDeviceConnection.getClientConfig(listener, testingNode);
index 96eb505844f9fa8d83743bc7ab6837fc3b6cc8e9..fc4a6a7593d9d73edfc4b8902da5c144dbd92c7b 100644 (file)
@@ -11,12 +11,10 @@ package org.opendaylight.netconf.topology.singleton.impl;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doReturn;
 
-import akka.actor.ActorRef;
-import akka.util.Timeout;
 import com.google.common.util.concurrent.Futures;
-import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfConnectorDTO;
 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
@@ -26,22 +24,17 @@ import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.
 class TestingRemoteDeviceConnectorImpl extends RemoteDeviceConnectorImpl {
 
     private final NetconfDeviceCommunicator communicator;
-    private final RemoteDeviceHandler salFacade;
 
     TestingRemoteDeviceConnectorImpl(final NetconfTopologySetup netconfTopologyDeviceSetup,
                                      final RemoteDeviceId remoteDeviceId,
-                                     final NetconfDeviceCommunicator communicator,
-                                     final RemoteDeviceHandler salFacade,
-                                     final Timeout actorResponseWaitTime,
-                                     final DOMMountPointService mountPointService) {
-        super(netconfTopologyDeviceSetup, remoteDeviceId, actorResponseWaitTime, mountPointService);
+                                     final NetconfDeviceCommunicator communicator) {
+        super(netconfTopologyDeviceSetup, remoteDeviceId);
         this.communicator = communicator;
-        this.salFacade = salFacade;
     }
 
     @Override
     public NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId, final NetconfNode node,
-                                                        final ActorRef deviceContextActorRef) {
+            RemoteDeviceHandler<NetconfSessionPreferences> salFacade) {
         final NetconfConnectorDTO connectorDTO = new NetconfConnectorDTO(communicator, salFacade);
         doReturn(Futures.immediateCheckedFuture(null)).when(communicator).initializeRemoteConnection(any(), any());
         return connectorDTO;