Bug 6714 - Use singleton service in clustered netconf topology 51/47351/3
authorRudolf Brisuda <rbrisuda@cisco.com>
Fri, 7 Oct 2016 09:44:23 +0000 (11:44 +0200)
committerRudolf Brisuda <rbrisuda@cisco.com>
Fri, 21 Oct 2016 15:49:10 +0000 (15:49 +0000)
Implemented clustered netconf topology with using clustered
singleton service.

Change-Id: I0e71a9934a70c6bce8eeab0644a5bd05c5548a8c
Signed-off-by: Rudolf Brisuda <rbrisuda@cisco.com>
56 files changed:
features/netconf-connector/pom.xml
features/netconf-connector/src/main/features/features.xml
netconf/netconf-artifacts/pom.xml
netconf/netconf-topology-singleton/pom.xml [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/api/NetconfDOMTransaction.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/api/NetconfTopologySingletonService.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/api/RemoteDeviceConnector.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/api/RemoteOperationTxProcessor.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/MasterSalFacade.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfDOMDataBroker.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeManager.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfTopologyContext.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfTopologyManager.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/ProxyDOMRpcService.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/ProxyYangTextSourceProvider.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/RemoteDeviceConnectorImpl.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/RemoteOperationTxProcessorImpl.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/SlaveSalFacade.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfMasterDOMTransaction.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfProxyDOMTransaction.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfReadOnlyTransaction.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfWriteOnlyTransaction.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/utils/NetconfConnectorDTO.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/utils/NetconfTopologySetup.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/utils/NetconfTopologyUtils.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/AskForMasterMountPoint.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/CreateInitialMasterActorData.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/MasterActorDataInitialized.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/NormalizedNodeMessage.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/RefreshSetupMasterActorData.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/RegisterMountPoint.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/SubmitFailedReply.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/UnregisterSlaveMountPoint.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/YangTextSchemaSourceRequest.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/CancelRequest.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/DeleteRequest.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/EmptyReadResponse.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/ExistsRequest.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/MergeRequest.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/PutRequest.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/ReadRequest.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/SubmitReply.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/SubmitRequest.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/TransactionRequest.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/main/resources/org/opendaylight/blueprint/netconf-topology-singleton.xml [new file with mode: 0644]
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeActorTest.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfTopologyManagerTest.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/RemoteDeviceConnectorImplTest.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/TestingRemoteDeviceConnectorImpl.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ReadOnlyTransactionTest.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/WriteOnlyTransactionTest.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/utils/NetconfTopologyUtilTest.java [new file with mode: 0644]
netconf/pom.xml
netconf/sal-netconf-connector/src/main/java/org/opendaylight/netconf/sal/connect/netconf/sal/NetconfDeviceTopologyAdapter.java
netconf/sal-netconf-connector/src/main/yang/netconf-node-topology.yang

index 2137bf49a2c813a01e607b70ce6245be9760bdea..623efe53fe0b33e5f3f48f486cc746ea065c3d25 100644 (file)
       <classifier>config</classifier>
       <type>xml</type>
     </dependency>
-    <dependency>
-       <groupId>${project.groupId}</groupId>
-       <artifactId>netconf-topology-config</artifactId>
-       <classifier>clustered-config</classifier>
-       <type>xml</type>
-    </dependency>
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>netconf-tcp</artifactId>
       <groupId>org.bouncycastle</groupId>
       <artifactId>bcprov-jdk15on</artifactId>
     </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>netconf-topology-singleton</artifactId>
+    </dependency>
 
     <dependency>
       <groupId>${project.groupId}</groupId>
index 31eb932c757a51a78ddfc9b05a9f0496ab31f2fc..25b5bf094480fe9863b7a57127ba08f1c6b7085e 100644 (file)
         <configfile finalname='${config.configfile.directory}/${config.netconf.topology.configfile}'>mvn:org.opendaylight.netconf/netconf-topology-config/{{VERSION}}/xml/config</configfile>
     </feature>
 
-    <feature name='odl-netconf-clustered-topology' version='${project.version}' description="OpenDaylight :: Clustered Netconf Topology :: Netconf Connector + Netconf SSH Server + Clustered Netconf configuration via config topology datastore">
-        <feature version='${netconf.version}'>odl-netconf-ssh</feature>
+    <feature name='odl-netconf-clustered-topology' version='${project.version}' description="OpenDaylight :: Clustered Netconf Topology :: Netconf Connector + Netconf SSH Server">
+        <feature version='${project.version}'>odl-netconf-ssh</feature>
         <feature version='${project.version}'>odl-netconf-connector</feature>
-        <configfile finalname='${config.configfile.directory}/${config.netconf.topology.configfile}'>mvn:org.opendaylight.netconf/netconf-topology-config/{{VERSION}}/xml/clustered-config</configfile>
+        <bundle>mvn:org.opendaylight.netconf/netconf-topology-singleton/{{VERSION}}</bundle>
     </feature>
 
     <feature name='odl-netconf-console' version='${project.version}' description="OpenDaylight :: Netconf Console + Karaf CLI for netconf CRUD operations">
index 7ec3258696f9b312a33e7625ebd148a46a8b8512..d90781931ccf9d3d3b58516103a5cacf76758355 100644 (file)
                 <version>${project.version}</version>
                 <type>test-jar</type>
             </dependency>
+            <dependency>
+                <groupId>${project.groupId}</groupId>
+                <artifactId>netconf-topology-singleton</artifactId>
+                <version>${project.version}</version>
+            </dependency>
 
             <dependency>
                 <groupId>${project.groupId}</groupId>
diff --git a/netconf/netconf-topology-singleton/pom.xml b/netconf/netconf-topology-singleton/pom.xml
new file mode 100644 (file)
index 0000000..fb3c450
--- /dev/null
@@ -0,0 +1,76 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>config-parent</artifactId>
+        <version>0.5.1-SNAPSHOT</version>
+        <relativePath/>
+    </parent>
+
+    <groupId>org.opendaylight.netconf</groupId>
+    <artifactId>netconf-topology-singleton</artifactId>
+    <version>1.1.1-SNAPSHOT</version>
+    <name>${project.artifactId}</name>
+    <packaging>bundle</packaging>
+
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.opendaylight.netconf</groupId>
+                <artifactId>netconf-subsystem</artifactId>
+                <version>${project.version}</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.opendaylight.netconf</groupId>
+            <artifactId>mdsal-netconf-notification</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.mdsal.model</groupId>
+            <artifactId>ietf-topology</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.typesafe.akka</groupId>
+            <artifactId>akka-actor_2.11</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.netconf</groupId>
+            <artifactId>sal-netconf-connector</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.typesafe.akka</groupId>
+            <artifactId>akka-cluster_2.11</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal-clustering-commons</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.typesafe.akka</groupId>
+            <artifactId>akka-testkit_2.11</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/api/NetconfDOMTransaction.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/api/NetconfDOMTransaction.java
new file mode 100644 (file)
index 0000000..ec9c717
--- /dev/null
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.api;
+
+import com.google.common.base.Optional;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import scala.concurrent.Future;
+
+/**
+ * Provides API for all operations of read and write transactions
+ */
+public interface NetconfDOMTransaction {
+
+    /**
+     * Read data from particular data-store
+     * @param store data-store type
+     * @param path unique identifier of a particular node instance in the data tree
+     * @return result as future
+     */
+    Future<Optional<NormalizedNodeMessage>> read(LogicalDatastoreType store, YangInstanceIdentifier path);
+
+    /**
+     * Test existence of node in certain data-store
+     * @param store data-store type
+     * @param path unique identifier of a particular node instance in the data tree
+     * @return result as future
+     */
+    Future<Boolean> exists(LogicalDatastoreType store, YangInstanceIdentifier path);
+
+    /**
+     * Put data to particular data-store
+     * @param store data-store type
+     * @param data data for inserting included in NormalizedNodeMessage object
+     */
+    void put(LogicalDatastoreType store, NormalizedNodeMessage data);
+
+    /**
+     * Merge data with existing node in particular data-store
+     * @param store data-store type
+     * @param data data for merging included in NormalizedNodeMessage object
+     */
+    void merge(LogicalDatastoreType store, NormalizedNodeMessage data);
+
+    /**
+     * Delete node in particular data-store in path
+     * @param store data-store type
+     * @param path unique identifier of a particular node instance in the data tree
+     */
+    void delete(LogicalDatastoreType store, YangInstanceIdentifier path);
+
+    /**
+     * Cancel operation
+     * @return success or not
+     */
+    boolean cancel();
+
+    /**
+     * Commit opened transaction.
+     * @return void or raised exception
+     */
+    Future<Void> submit();
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/api/NetconfTopologySingletonService.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/api/NetconfTopologySingletonService.java
new file mode 100644 (file)
index 0000000..37095be
--- /dev/null
@@ -0,0 +1,15 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.api;
+
+/**
+ * Provides API for advertising services for blue print service
+ */
+public interface NetconfTopologySingletonService {
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/api/RemoteDeviceConnector.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/api/RemoteDeviceConnector.java
new file mode 100644 (file)
index 0000000..bb74b75
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.api;
+
+import akka.actor.ActorRef;
+
+/**
+ * Provides API for connection odl (master) with device
+ */
+public interface RemoteDeviceConnector {
+
+    /**
+     * Create device communicator and open device connection
+     * @param masterActorRef master actor reference
+     */
+    void startRemoteDeviceConnection(ActorRef masterActorRef);
+
+    /**
+     * Stop device communicator
+     */
+    void stopRemoteDeviceConnection();
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/api/RemoteOperationTxProcessor.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/api/RemoteOperationTxProcessor.java
new file mode 100644 (file)
index 0000000..9023847
--- /dev/null
@@ -0,0 +1,75 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.api;
+
+import akka.actor.ActorRef;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+/**
+ * Provides API for remote calling operations of transactions. Slave sends message of particular
+ * operation to master and master performs it.
+ */
+public interface RemoteOperationTxProcessor {
+
+    /**
+     * Delete node in particular data-store in path
+     * @param store data-store type
+     * @param path unique identifier of a particular node instance in the data tree
+     */
+    void doDelete(LogicalDatastoreType store, YangInstanceIdentifier path);
+
+    /**
+     * Commit opened transaction.
+     * @param recipient recipient of submit result
+     * @param sender sender of submit result
+     */
+    void doSubmit(ActorRef recipient, ActorRef sender);
+
+    /**
+     * Cancel operation
+     * @param recipient recipient of cancel result
+     * @param sender sender of cancel result
+     */
+    void doCancel(ActorRef recipient, ActorRef sender);
+
+    /**
+     * Put data to particular data-store
+     * @param store data-store type
+     * @param data data for inserting included in NormalizedNodeMessage object
+     */
+    void doPut(LogicalDatastoreType store, NormalizedNodeMessage data);
+
+    /**
+     * Merge data with existing node in particular data-store
+     * @param store data-store type
+     * @param data data for merging included in NormalizedNodeMessage object
+     */
+    void doMerge(LogicalDatastoreType store, NormalizedNodeMessage data);
+
+    /**
+     * Read data from particular data-store
+     * @param store data-store type
+     * @param path unique identifier of a particular node instance in the data tree
+     * @param recipient recipient of read result
+     * @param sender sender of read result
+     */
+    void doRead(LogicalDatastoreType store, YangInstanceIdentifier path, ActorRef recipient, ActorRef sender);
+
+    /**
+     * Test existence of node in certain data-store
+     * @param store data-store type
+     * @param path unique identifier of a particular node instance in the data tree
+     * @param recipient recipient of exists result
+     * @param sender sender of exists result
+     */
+    void doExists(LogicalDatastoreType store, YangInstanceIdentifier path, ActorRef recipient, ActorRef sender);
+
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/MasterSalFacade.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/MasterSalFacade.java
new file mode 100644 (file)
index 0000000..7fa8e5a
--- /dev/null
@@ -0,0 +1,177 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.cluster.Cluster;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
+import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceNotificationService;
+import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceSalProvider;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
+import org.opendaylight.netconf.topology.singleton.impl.tx.NetconfMasterDOMTransaction;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
+import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
+import org.opendaylight.yangtools.yang.common.SimpleDateFormatUtil;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.repo.api.RevisionSourceIdentifier;
+import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
+class MasterSalFacade implements AutoCloseable, RemoteDeviceHandler<NetconfSessionPreferences> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MasterSalFacade.class);
+
+    private final RemoteDeviceId id;
+
+    private SchemaContext remoteSchemaContext = null;
+    private NetconfSessionPreferences netconfSessionPreferences = null;
+    private DOMRpcService deviceRpc = null;
+    private final NetconfDeviceSalProvider salProvider;
+
+    private final ActorRef masterActorRef;
+    private final ActorSystem actorSystem;
+    private DOMDataBroker deviceDataBroker = null;
+
+    MasterSalFacade(final RemoteDeviceId id,
+                           final Broker domBroker,
+                           final BindingAwareBroker bindingBroker,
+                           final ActorSystem actorSystem,
+                           final ActorRef masterActorRef) {
+        this.id = id;
+        this.salProvider = new NetconfDeviceSalProvider(id);
+        this.actorSystem = actorSystem;
+        this.masterActorRef = masterActorRef;
+
+        registerToSal(domBroker, bindingBroker);
+    }
+
+    private void registerToSal(final Broker domRegistryDependency, final BindingAwareBroker bindingBroker) {
+        // TODO: remove use of provider, there is possible directly create mount instance and
+        // TODO: NetconfDeviceTopologyAdapter in constructor = less complexity
+
+        domRegistryDependency.registerProvider(salProvider);
+        bindingBroker.registerProvider(salProvider);
+    }
+
+    @Override
+    public void onDeviceConnected(final SchemaContext remoteSchemaContext,
+                                  final NetconfSessionPreferences netconfSessionPreferences,
+                                  final DOMRpcService deviceRpc) {
+        this.remoteSchemaContext = remoteSchemaContext;
+        this.netconfSessionPreferences = netconfSessionPreferences;
+        this.deviceRpc = deviceRpc;
+
+        registerMasterMountPoint();
+
+        sendInitialDataToActor().onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(final Throwable failure, final Object success) throws Throwable {
+                if (failure == null) {
+                    updateDeviceData();
+                    return;
+                }
+                throw failure;
+            }
+        }, actorSystem.dispatcher());
+
+    }
+
+    @Override
+    public void onDeviceDisconnected() {
+        salProvider.getTopologyDatastoreAdapter().updateDeviceData(false, new NetconfDeviceCapabilities());
+        unregisterMasterMountPoint();
+    }
+
+    @Override
+    public void onDeviceFailed(final Throwable throwable) {
+        salProvider.getTopologyDatastoreAdapter().setDeviceAsFailed(throwable);
+        unregisterMasterMountPoint();
+    }
+
+    @Override
+    public void onNotification(final DOMNotification domNotification) {
+        salProvider.getMountInstance().publish(domNotification);
+    }
+
+    @Override
+    public void close() {
+        unregisterMasterMountPoint();
+        closeGracefully(salProvider);
+    }
+
+    private void registerMasterMountPoint() {
+        Preconditions.checkNotNull(id);
+        Preconditions.checkNotNull(remoteSchemaContext,
+                "Device has no remote schema context yet. Probably not fully connected.");
+        Preconditions.checkNotNull(netconfSessionPreferences,
+                "Device has no capabilities yet. Probably not fully connected.");
+
+        final NetconfDeviceNotificationService notificationService = new NetconfDeviceNotificationService();
+
+        LOG.info("Creating master data broker for device {}", id);
+
+        final NetconfDOMTransaction masterDOMTransactions =
+                new NetconfMasterDOMTransaction(id, remoteSchemaContext, deviceRpc, netconfSessionPreferences);
+        deviceDataBroker =
+                new NetconfDOMDataBroker(actorSystem, id, masterDOMTransactions);
+        salProvider.getMountInstance()
+                .onTopologyDeviceConnected(remoteSchemaContext, deviceDataBroker, deviceRpc, notificationService);
+    }
+
+    private Future<Object> sendInitialDataToActor() {
+        final List<SourceIdentifier> sourceIdentifiers =
+                remoteSchemaContext.getAllModuleIdentifiers().stream().map(mi ->
+                        RevisionSourceIdentifier.create(mi.getName(),
+                            (SimpleDateFormatUtil.DEFAULT_DATE_REV == mi.getRevision() ? Optional.<String>absent() :
+                                    Optional.of(SimpleDateFormatUtil.getRevisionFormat().format(mi.getRevision())))))
+                        .collect(Collectors.toList());
+
+        // send initial data to master actor and create actor for providing it
+        return Patterns.ask(masterActorRef, new CreateInitialMasterActorData(deviceDataBroker, sourceIdentifiers),
+                NetconfTopologyUtils.TIMEOUT);
+    }
+
+    private void updateDeviceData() {
+        Cluster cluster = Cluster.get(actorSystem);
+        salProvider.getTopologyDatastoreAdapter().updateClusteredDeviceData(true, cluster.selfAddress().toString(),
+                netconfSessionPreferences.getNetconfDeviceCapabilities());
+    }
+
+    private void unregisterMasterMountPoint() {
+        salProvider.getMountInstance().onTopologyDeviceDisconnected();
+    }
+
+    private void closeGracefully(final AutoCloseable resource) {
+        if (resource != null) {
+            try {
+                resource.close();
+            } catch (final Exception e) {
+                LOG.warn("{}: Ignoring exception while closing {}", id, resource, e);
+            }
+        }
+    }
+
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfDOMDataBroker.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfDOMDataBroker.java
new file mode 100644 (file)
index 0000000..99ddc7f
--- /dev/null
@@ -0,0 +1,78 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl;
+
+import akka.actor.ActorSystem;
+import java.util.Collections;
+import java.util.Map;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBrokerExtension;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
+import org.opendaylight.netconf.sal.connect.netconf.sal.tx.ReadWriteTx;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
+import org.opendaylight.netconf.topology.singleton.impl.tx.NetconfReadOnlyTransaction;
+import org.opendaylight.netconf.topology.singleton.impl.tx.NetconfWriteOnlyTransaction;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+public class NetconfDOMDataBroker implements DOMDataBroker {
+
+    private final RemoteDeviceId id;
+    private final NetconfDOMTransaction masterDataBroker;
+    private final ActorSystem actorSystem;
+
+    public NetconfDOMDataBroker(final ActorSystem actorSystem, final RemoteDeviceId id,
+                         final NetconfDOMTransaction masterDataBroker) {
+        this.id = id;
+        this.masterDataBroker = masterDataBroker;
+        this.actorSystem = actorSystem;
+    }
+
+    @Override
+    public DOMDataReadOnlyTransaction newReadOnlyTransaction() {
+        return new NetconfReadOnlyTransaction(actorSystem, masterDataBroker);
+    }
+
+    @Override
+    public DOMDataReadWriteTransaction newReadWriteTransaction() {
+        return new ReadWriteTx(new NetconfReadOnlyTransaction(actorSystem, masterDataBroker),
+                new NetconfWriteOnlyTransaction(actorSystem, masterDataBroker));
+    }
+
+    @Override
+    public DOMDataWriteTransaction newWriteOnlyTransaction() {
+        return new NetconfWriteOnlyTransaction(actorSystem, masterDataBroker);
+    }
+
+    @Override
+    public ListenerRegistration<DOMDataChangeListener> registerDataChangeListener(
+            LogicalDatastoreType store, YangInstanceIdentifier path, DOMDataChangeListener listener,
+            DataChangeScope triggeringScope) {
+        throw new UnsupportedOperationException(id + ": Data change listeners not supported for netconf mount point");
+    }
+
+    @Override
+    public DOMTransactionChain createTransactionChain(TransactionChainListener listener) {
+        throw new UnsupportedOperationException(id + ": Transaction chains not supported for netconf mount point");
+    }
+
+    @Nonnull
+    @Override
+    public Map<Class<? extends DOMDataBrokerExtension>, DOMDataBrokerExtension> getSupportedExtensions() {
+        return Collections.emptyMap();
+    }
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeManager.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeManager.java
new file mode 100644 (file)
index 0000000..e90ea66
--- /dev/null
@@ -0,0 +1,143 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl;
+
+import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
+import java.util.Collection;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
+import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
+import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
+import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
+import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Managing and reacting on data tree changes in specific netconf node when master writes status to the operational
+ * data store (e.g. handling lifecycle of slave mount point).
+ */
+class NetconfNodeManager
+        implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeManager.class);
+
+    private NetconfTopologySetup setup;
+    private ListenerRegistration<NetconfNodeManager> dataChangeListenerRegistration;
+    private RemoteDeviceId id;
+    private final SchemaSourceRegistry schemaRegistry;
+    private final SchemaRepository schemaRepository;
+    private ActorRef slaveActorRef;
+
+    NetconfNodeManager(final NetconfTopologySetup setup,
+                       final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
+                       final SchemaRepository schemaRepository) {
+        this.setup = setup;
+        this.id = id;
+        this.schemaRegistry = schemaRegistry;
+        this.schemaRepository = schemaRepository;
+    }
+
+    @Override
+    public void onDataTreeChanged(@Nonnull final Collection<DataTreeModification<Node>> changes) {
+        for (final DataTreeModification<Node> change : changes) {
+            final DataObjectModification<Node> rootNode = change.getRootNode();
+            final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
+            switch (rootNode.getModificationType()) {
+                case SUBTREE_MODIFIED:
+                    LOG.debug("Operational for node {} updated. Trying to register slave mount point", nodeId);
+                    handleSlaveMountPoint(rootNode);
+                    break;
+                case WRITE:
+                    if (rootNode.getDataBefore() != null) {
+                        LOG.debug("Operational for node {} rewrited. Trying to register slave mount point", nodeId);
+                    } else {
+                        LOG.debug("Operational for node {} created. Trying to register slave mount point", nodeId);
+                    }
+                    handleSlaveMountPoint(rootNode);
+                    break;
+                case DELETE:
+                    LOG.debug("Operational for node {} deleted. Trying to remove slave mount point", nodeId);
+                    closeActor();
+                    break;
+                default:
+                    LOG.debug("Uknown operation for node: {}", nodeId);
+            }
+        }
+    }
+
+    @Override
+    public void close() {
+        closeActor();
+
+        if (dataChangeListenerRegistration != null) {
+            dataChangeListenerRegistration.close();
+            dataChangeListenerRegistration = null;
+        }
+    }
+
+    private void closeActor() {
+        if (slaveActorRef != null) {
+            slaveActorRef.tell(new UnregisterSlaveMountPoint(), ActorRef.noSender());
+            slaveActorRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
+            slaveActorRef = null;
+        }
+    }
+
+    void registerDataTreeChangeListener(final String topologyId, final NodeKey key) {
+        LOG.debug("Registering data tree change listener on node {}", key);
+        dataChangeListenerRegistration = setup.getDataBroker().registerDataTreeChangeListener(
+                new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL,
+                        NetconfTopologyUtils.createTopologyNodeListPath(key, topologyId)), this);
+    }
+
+    private void handleSlaveMountPoint(final DataObjectModification<Node> rootNode) {
+        @SuppressWarnings("ConstantConditions")
+        final NetconfNode netconfNodeAfter = rootNode.getDataAfter().getAugmentation(NetconfNode.class);
+
+        if (NetconfNodeConnectionStatus.ConnectionStatus.Connected.equals(netconfNodeAfter.getConnectionStatus())) {
+            createActorRef();
+            final String masterAddress = netconfNodeAfter.getClusteredConnectionStatus().getNetconfMasterNode();
+            final String path = NetconfTopologyUtils.createActorPath(masterAddress,
+                    NetconfTopologyUtils.createMasterActorName(id.getName(),
+                            netconfNodeAfter.getClusteredConnectionStatus().getNetconfMasterNode()));
+            setup.getActorSystem().actorSelection(path).tell(new AskForMasterMountPoint(), slaveActorRef);
+        } else {            ;
+            closeActor();
+        }
+    }
+
+    private void createActorRef() {
+        if (slaveActorRef == null) {
+            slaveActorRef = setup.getActorSystem().actorOf(NetconfNodeActor.props(setup, id, schemaRegistry,
+                    schemaRepository), id.getName());
+        }
+    }
+
+    void refreshDevice(final NetconfTopologySetup netconfTopologyDeviceSetup, final RemoteDeviceId remoteDeviceId) {
+        setup = netconfTopologyDeviceSetup;
+        id = remoteDeviceId;
+    }
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfTopologyContext.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfTopologyContext.java
new file mode 100644 (file)
index 0000000..0f8255c
--- /dev/null
@@ -0,0 +1,169 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl;
+
+import static org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils.DEFAULT_SCHEMA_REPOSITORY;
+
+import akka.actor.ActorRef;
+import akka.cluster.Cluster;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import javax.annotation.Nonnull;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
+import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.api.RemoteDeviceConnector;
+import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
+import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
+class NetconfTopologyContext implements ClusterSingletonService {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyContext.class);
+
+    private final ServiceGroupIdentifier serviceGroupIdent;
+    private NetconfTopologySetup netconfTopologyDeviceSetup;
+    private RemoteDeviceId remoteDeviceId;
+    private RemoteDeviceConnector remoteDeviceConnector;
+    private NetconfNodeManager netconfNodeManager;
+    private boolean finalClose = false;
+    private boolean isMaster;
+
+    private ActorRef masterActorRef;
+
+    NetconfTopologyContext(final NetconfTopologySetup netconfTopologyDeviceSetup,
+                           final ServiceGroupIdentifier serviceGroupIdent) {
+        this.netconfTopologyDeviceSetup = Preconditions.checkNotNull(netconfTopologyDeviceSetup);
+        this.serviceGroupIdent = serviceGroupIdent;
+
+        remoteDeviceId = NetconfTopologyUtils.createRemoteDeviceId(netconfTopologyDeviceSetup.getNode().getNodeId(),
+                netconfTopologyDeviceSetup.getNode().getAugmentation(NetconfNode.class));
+
+        remoteDeviceConnector = new RemoteDeviceConnectorImpl(netconfTopologyDeviceSetup, remoteDeviceId);
+
+        netconfNodeManager = createNodeDeviceManager();
+
+    }
+
+    @Override
+    public void instantiateServiceInstance() {
+        LOG.info("Master was selected: {}", remoteDeviceId.getHost().getIpAddress());
+
+        isMaster = true;
+
+        // master should not listen on netconf-node operational datastore
+        if (netconfNodeManager != null) {
+            netconfNodeManager.close();
+            netconfNodeManager = null;
+        }
+
+        if (!finalClose) {
+            final String masterAddress = Cluster.get(netconfTopologyDeviceSetup.getActorSystem()).selfAddress().toString();
+            masterActorRef = netconfTopologyDeviceSetup.getActorSystem().actorOf(NetconfNodeActor.props(
+                    netconfTopologyDeviceSetup, remoteDeviceId, DEFAULT_SCHEMA_REPOSITORY, DEFAULT_SCHEMA_REPOSITORY),
+                    NetconfTopologyUtils.createMasterActorName(remoteDeviceId.getName(), masterAddress));
+
+            remoteDeviceConnector.startRemoteDeviceConnection(masterActorRef);
+        }
+
+    }
+
+    // called when master is down/changed to slave
+    @Override
+    public ListenableFuture<Void> closeServiceInstance() {
+
+        if (!finalClose) {
+            // in case that master changes role to slave, new NodeDeviceManager must be created and listener registered
+            netconfNodeManager = createNodeDeviceManager();
+        }
+        if (masterActorRef != null) {
+            netconfTopologyDeviceSetup.getActorSystem().stop(masterActorRef);
+            masterActorRef = null;
+        }
+        if (remoteDeviceConnector != null) {
+            remoteDeviceConnector.stopRemoteDeviceConnection();
+        }
+
+        return Futures.immediateCheckedFuture(null);
+    }
+
+    @Override
+    public ServiceGroupIdentifier getIdentifier() {
+        return serviceGroupIdent;
+    }
+
+    private NetconfNodeManager createNodeDeviceManager() {
+        final NetconfNodeManager ndm =
+                new NetconfNodeManager(netconfTopologyDeviceSetup, remoteDeviceId, DEFAULT_SCHEMA_REPOSITORY,
+                        DEFAULT_SCHEMA_REPOSITORY);
+        ndm.registerDataTreeChangeListener(netconfTopologyDeviceSetup.getTopologyId(),
+                netconfTopologyDeviceSetup.getNode().getKey());
+
+        return ndm;
+    }
+
+    void closeFinal() throws Exception {
+        finalClose = true;
+
+        if (netconfNodeManager != null) {
+            netconfNodeManager.close();
+        }
+
+        if (remoteDeviceConnector != null) {
+            remoteDeviceConnector.stopRemoteDeviceConnection();
+        }
+
+        if (masterActorRef != null) {
+            netconfTopologyDeviceSetup.getActorSystem().stop(masterActorRef);
+            masterActorRef = null;
+        }
+    }
+
+    /**
+     * If configuration data was changed
+     * @param setup new setup
+     */
+    void refresh(@Nonnull final NetconfTopologySetup setup) {
+        netconfTopologyDeviceSetup = Preconditions.checkNotNull(setup);
+        remoteDeviceId = NetconfTopologyUtils.createRemoteDeviceId(netconfTopologyDeviceSetup.getNode().getNodeId(),
+                netconfTopologyDeviceSetup.getNode().getAugmentation(NetconfNode.class));
+
+        if (isMaster) {
+            remoteDeviceConnector.stopRemoteDeviceConnection();
+        }
+        if (!isMaster) {
+            netconfNodeManager.refreshDevice(netconfTopologyDeviceSetup, remoteDeviceId);
+        }
+        remoteDeviceConnector = new RemoteDeviceConnectorImpl(netconfTopologyDeviceSetup, remoteDeviceId);
+
+        if (isMaster) {
+            final Future<Object> future = Patterns.ask(masterActorRef, new RefreshSetupMasterActorData(
+                    netconfTopologyDeviceSetup, remoteDeviceId), NetconfTopologyUtils.TIMEOUT);
+
+            future.onComplete(new OnComplete<Object>() {
+                @Override
+                public void onComplete(final Throwable failure, final Object success) throws Throwable {
+                    if (failure != null) {
+                        LOG.error("Failed to refresh master actor data: {}", failure);
+                        return;
+                    }
+                    remoteDeviceConnector.startRemoteDeviceConnection(masterActorRef);
+                }
+            }, netconfTopologyDeviceSetup.getActorSystem().dispatcher());
+        }
+    }
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfTopologyManager.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfTopologyManager.java
new file mode 100644 (file)
index 0000000..6cad5c1
--- /dev/null
@@ -0,0 +1,243 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl;
+
+import akka.actor.ActorSystem;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import io.netty.util.concurrent.EventExecutor;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.ActorSystemProvider;
+import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
+import org.opendaylight.controller.config.threadpool.ThreadPool;
+import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
+import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
+import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
+import org.opendaylight.netconf.client.NetconfClientDispatcher;
+import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopologyBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NetconfTopologyManager
+        implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
+
+    private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new HashMap<>();
+    private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
+            clusterRegistrations = new HashMap<>();
+
+    private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
+
+    private final DataBroker dataBroker;
+    private final RpcProviderRegistry rpcProviderRegistry;
+    private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
+    private final BindingAwareBroker bindingAwareBroker;
+    private final ScheduledThreadPool keepaliveExecutor;
+    private final ThreadPool processingExecutor;
+    private final Broker domBroker;
+    private final ActorSystem actorSystem;
+    private final EventExecutor eventExecutor;
+    private final NetconfClientDispatcher clientDispatcher;
+    private final String topologyId;
+
+    public NetconfTopologyManager(final DataBroker dataBroker, final RpcProviderRegistry rpcProviderRegistry,
+                           final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
+                           final BindingAwareBroker bindingAwareBroker,
+                           final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
+                           final Broker domBroker, final ActorSystemProvider actorSystemProvider, final EventExecutor eventExecutor,
+                           final NetconfClientDispatcher clientDispatcher, final String topologyId) {
+        this.dataBroker = Preconditions.checkNotNull(dataBroker);
+        this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
+        this.clusterSingletonServiceProvider = Preconditions.checkNotNull(clusterSingletonServiceProvider);
+        this.bindingAwareBroker = Preconditions.checkNotNull(bindingAwareBroker);
+        this.keepaliveExecutor = Preconditions.checkNotNull(keepaliveExecutor);
+        this.processingExecutor = Preconditions.checkNotNull(processingExecutor);
+        this.domBroker = Preconditions.checkNotNull(domBroker);
+        this.actorSystem = Preconditions.checkNotNull(actorSystemProvider).getActorSystem();
+        this.eventExecutor = Preconditions.checkNotNull(eventExecutor);
+        this.clientDispatcher = Preconditions.checkNotNull(clientDispatcher);
+        this.topologyId = Preconditions.checkNotNull(topologyId);
+    }
+
+    // Blueprint init method
+    public void init() {
+        dataChangeListenerRegistration = registerDataTreeChangeListener(topologyId);
+    }
+
+    @Override
+    public void onDataTreeChanged(@Nonnull final Collection<DataTreeModification<Node>> changes) {
+        for (DataTreeModification<Node> change : changes) {
+            final DataObjectModification<Node> rootNode = change.getRootNode();
+            final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
+            final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
+            switch (rootNode.getModificationType()) {
+                case SUBTREE_MODIFIED:
+                    LOG.debug("Config for node {} updated", nodeId);
+                    refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
+                    break;
+                case WRITE:
+                    if (contexts.containsKey(dataModifIdent)) {
+                        LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
+                        refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
+                    } else {
+                        LOG.debug("Config for node {} created", nodeId);
+                        startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
+                    }
+                    break;
+                case DELETE:
+                    LOG.debug("Config for node {} deleted", nodeId);
+                    stopNetconfDeviceContext(dataModifIdent);
+                    break;
+                default:
+                    LOG.warn("Unknown operation for {}.", nodeId);
+            }
+        }
+    }
+
+    private void refreshNetconfDeviceContext(InstanceIdentifier<Node> instanceIdentifier, Node node) {
+        final NetconfTopologyContext context = contexts.get(instanceIdentifier);
+        context.refresh(createSetup(instanceIdentifier, node));
+    }
+
+    private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
+        final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
+        Preconditions.checkNotNull(netconfNode);
+        Preconditions.checkNotNull(netconfNode.getHost());
+        Preconditions.checkNotNull(netconfNode.getHost().getIpAddress());
+
+        final ServiceGroupIdentifier serviceGroupIdent =
+                ServiceGroupIdentifier.create(instanceIdentifier.toString());
+
+        final NetconfTopologyContext newNetconfTopologyContext =
+                new NetconfTopologyContext(createSetup(instanceIdentifier, node), serviceGroupIdent);
+
+        final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration  =
+                clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
+
+        clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
+        contexts.put(instanceIdentifier, newNetconfTopologyContext);
+    }
+
+    private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
+        if (contexts.containsKey(instanceIdentifier)) {
+            try {
+                clusterRegistrations.get(instanceIdentifier).close();
+                contexts.get(instanceIdentifier).closeFinal();
+            } catch (Exception e) {
+                LOG.warn("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier);
+            }
+            contexts.remove(instanceIdentifier);
+            clusterRegistrations.remove(instanceIdentifier);
+        }
+    }
+
+    @Override
+    public void close() {
+        if (dataChangeListenerRegistration != null) {
+            dataChangeListenerRegistration.close();
+            dataChangeListenerRegistration = null;
+        }
+        contexts.forEach((instanceIdentifier, netconfTopologyContext) -> {
+            try {
+                netconfTopologyContext.closeFinal();
+            } catch (Exception e) {
+                LOG.warn("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier);
+            }
+        });
+        clusterRegistrations.forEach((instanceIdentifier, clusterSingletonServiceRegistration) -> {
+            try {
+                clusterSingletonServiceRegistration.close();
+            } catch (Exception e) {
+                LOG.warn("Error at unregistering from cluster. InstanceIdentifier: " + instanceIdentifier);
+            }
+        });
+        contexts.clear();
+        clusterRegistrations.clear();
+    }
+
+    private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener(String topologyId) {
+        final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
+        initTopology(wtx, LogicalDatastoreType.CONFIGURATION, topologyId);
+        initTopology(wtx, LogicalDatastoreType.OPERATIONAL, topologyId);
+        Futures.addCallback(wtx.submit(), new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(Void result) {
+                LOG.debug("topology initialization successful");
+            }
+
+            @Override
+            public void onFailure(@Nonnull Throwable throwable) {
+                LOG.error("Unable to initialize netconf-topology, {}", throwable);
+            }
+        });
+
+        LOG.debug("Registering datastore listener");
+        return dataBroker.registerDataTreeChangeListener(
+                        new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION,
+                                NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
+    }
+
+    private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType, String topologyId) {
+        final NetworkTopology networkTopology = new NetworkTopologyBuilder().build();
+        final InstanceIdentifier<NetworkTopology> networkTopologyId =
+                InstanceIdentifier.builder(NetworkTopology.class).build();
+        wtx.merge(datastoreType, networkTopologyId, networkTopology);
+        final Topology topology = new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build();
+        wtx.merge(datastoreType, networkTopologyId.child(Topology.class,
+                new TopologyKey(new TopologyId(topologyId))), topology);
+    }
+
+    private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
+        final NetconfTopologySetupBuilder builder = NetconfTopologySetupBuilder.create()
+                .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
+                .setDataBroker(dataBroker)
+                .setInstanceIdentifier(instanceIdentifier)
+                .setRpcProviderRegistry(rpcProviderRegistry)
+                .setNode(node)
+                .setBindingAwareBroker(bindingAwareBroker)
+                .setActorSystem(actorSystem)
+                .setEventExecutor(eventExecutor)
+                .setDomBroker(domBroker)
+                .setKeepaliveExecutor(keepaliveExecutor)
+                .setProcessingExecutor(processingExecutor)
+                .setTopologyId(topologyId)
+                .setNetconfClientDispatcher(clientDispatcher);
+
+        return builder.build();
+    }
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/ProxyDOMRpcService.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/ProxyDOMRpcService.java
new file mode 100644 (file)
index 0000000..c1c8430
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl;
+
+import com.google.common.util.concurrent.CheckedFuture;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcAvailabilityListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+public class ProxyDOMRpcService implements DOMRpcService {
+
+    @Nonnull
+    @Override
+    public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(@Nonnull final SchemaPath type,
+                                                                  @Nullable final NormalizedNode<?, ?> input) {
+        throw new UnsupportedOperationException("InvokeRpc: DOMRpc service not working in cluster.");
+    }
+
+    @Nonnull
+    @Override
+    public <T extends DOMRpcAvailabilityListener> ListenerRegistration<T> registerRpcListener(
+            @Nonnull final T listener) {
+        throw new UnsupportedOperationException("RegisterRpcListener: DOMRpc service not working in cluster.");
+    }
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/ProxyYangTextSourceProvider.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/ProxyYangTextSourceProvider.java
new file mode 100644 (file)
index 0000000..27514c7
--- /dev/null
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl;
+
+import akka.actor.ActorContext;
+import akka.actor.ActorRef;
+import akka.dispatch.Futures;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import com.google.common.collect.Sets;
+import java.util.Set;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
+import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
+import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
+import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
+import scala.concurrent.Future;
+import scala.concurrent.impl.Promise;
+
+public class ProxyYangTextSourceProvider implements RemoteYangTextSourceProvider {
+
+    private final ActorRef masterRef;
+    private final ActorContext actorContext;
+
+    public ProxyYangTextSourceProvider(final ActorRef masterRef, final ActorContext actorContext) {
+        this.masterRef = masterRef;
+        this.actorContext = actorContext;
+    }
+
+    @Override
+    public Future<Set<SourceIdentifier>> getProvidedSources() {
+        // NOOP
+        return Futures.successful(Sets.newHashSet());
+    }
+
+    @Override
+    public Future<YangTextSchemaSourceSerializationProxy> getYangTextSchemaSource(
+            @Nonnull final SourceIdentifier sourceIdentifier) {
+
+        final Future<Object> scalaFuture = Patterns.ask(masterRef,
+                new YangTextSchemaSourceRequest(sourceIdentifier), NetconfTopologyUtils.TIMEOUT);
+
+        final Promise.DefaultPromise<YangTextSchemaSourceSerializationProxy> promise = new Promise.DefaultPromise<>();
+
+        scalaFuture.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(final Throwable failure, final Object success) throws Throwable {
+                if (failure != null) {
+                    promise.failure(failure);
+                    return;
+                }
+                if (success instanceof Throwable) {
+                    promise.failure((Throwable) success);
+                    return;
+                }
+                promise.success((YangTextSchemaSourceSerializationProxy) success);
+            }
+        }, actorContext.dispatcher());
+
+        return promise.future();
+
+    }
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/RemoteDeviceConnectorImpl.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/RemoteDeviceConnectorImpl.java
new file mode 100644 (file)
index 0000000..47405b8
--- /dev/null
@@ -0,0 +1,425 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl;
+
+import akka.actor.ActorRef;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.netty.util.concurrent.EventExecutor;
+import java.io.File;
+import java.math.BigDecimal;
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import org.opendaylight.netconf.api.NetconfMessage;
+import org.opendaylight.netconf.client.NetconfClientSessionListener;
+import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
+import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfiguration;
+import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfigurationBuilder;
+import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
+import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.LoginPassword;
+import org.opendaylight.netconf.sal.connect.api.RemoteDevice;
+import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.netconf.sal.connect.netconf.LibraryModulesSchemas;
+import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
+import org.opendaylight.netconf.sal.connect.netconf.NetconfDeviceBuilder;
+import org.opendaylight.netconf.sal.connect.netconf.NetconfStateSchemasResolverImpl;
+import org.opendaylight.netconf.sal.connect.netconf.SchemalessNetconfDevice;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
+import org.opendaylight.netconf.sal.connect.netconf.listener.UserPreferences;
+import org.opendaylight.netconf.sal.connect.netconf.sal.KeepaliveSalFacade;
+import org.opendaylight.netconf.sal.connect.netconf.schema.YangLibrarySchemaYangSourceProvider;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.api.RemoteDeviceConnector;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfConnectorDTO;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
+import org.opendaylight.protocol.framework.ReconnectStrategy;
+import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
+import org.opendaylight.protocol.framework.TimedReconnectStrategy;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Host;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.Credentials;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter;
+import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
+import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
+import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
+import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
+import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
+import org.opendaylight.yangtools.yang.model.repo.util.FilesystemSchemaSourceCache;
+import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository;
+import org.opendaylight.yangtools.yang.parser.util.TextToASTTransformer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RemoteDeviceConnectorImpl implements RemoteDeviceConnector {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RemoteDeviceConnectorImpl.class);
+
+    /**
+     * Keeps track of initialized Schema resources.  A Map is maintained in which the key represents the name
+     * of the schema cache directory, and the value is a corresponding <code>SchemaResourcesDTO</code>.  The
+     * <code>SchemaResourcesDTO</code> is essentially a container that allows for the extraction of the
+     * <code>SchemaRegistry</code> and <code>SchemaContextFactory</code> which should be used for a particular
+     * Netconf mount.  Access to <code>schemaResourcesDTOs</code> should be surrounded by appropriate
+     * synchronization locks.
+     */
+    private static final Map<String, NetconfDevice.SchemaResourcesDTO> schemaResourcesDTOs = new HashMap<>();
+
+    private SchemaSourceRegistry schemaRegistry = NetconfTopologyUtils.DEFAULT_SCHEMA_REPOSITORY;
+    private SchemaRepository schemaRepository = NetconfTopologyUtils.DEFAULT_SCHEMA_REPOSITORY;
+
+    private final NetconfTopologySetup netconfTopologyDeviceSetup;
+    private final RemoteDeviceId remoteDeviceId;
+
+    private SchemaContextFactory schemaContextFactory = NetconfTopologyUtils.DEFAULT_SCHEMA_CONTEXT_FACTORY;
+    private NetconfConnectorDTO deviceCommunicatorDTO;
+
+    // Initializes default constant instances for the case when the default schema repository
+    // directory cache/schema is used.
+    static {
+        schemaResourcesDTOs.put(NetconfTopologyUtils.DEFAULT_CACHE_DIRECTORY,
+                new NetconfDevice.SchemaResourcesDTO(NetconfTopologyUtils.DEFAULT_SCHEMA_REPOSITORY,
+                        NetconfTopologyUtils.DEFAULT_SCHEMA_REPOSITORY,
+                        NetconfTopologyUtils.DEFAULT_SCHEMA_CONTEXT_FACTORY,
+                        new NetconfStateSchemasResolverImpl()));
+        NetconfTopologyUtils.DEFAULT_SCHEMA_REPOSITORY.registerSchemaSourceListener(NetconfTopologyUtils.DEFAULT_CACHE);
+        NetconfTopologyUtils.DEFAULT_SCHEMA_REPOSITORY.registerSchemaSourceListener(
+                TextToASTTransformer.create(NetconfTopologyUtils.DEFAULT_SCHEMA_REPOSITORY,
+                        NetconfTopologyUtils.DEFAULT_SCHEMA_REPOSITORY));
+    }
+
+    public RemoteDeviceConnectorImpl(final NetconfTopologySetup netconfTopologyDeviceSetup,
+                                     final RemoteDeviceId remoteDeviceId) {
+
+        this.netconfTopologyDeviceSetup = Preconditions.checkNotNull(netconfTopologyDeviceSetup);
+        this.remoteDeviceId = remoteDeviceId;
+    }
+
+    @Override
+    public void startRemoteDeviceConnection(final ActorRef deviceContextActorRef) {
+
+        final NetconfNode netconfNode = netconfTopologyDeviceSetup.getNode().getAugmentation(NetconfNode.class);
+        final NodeId nodeId = netconfTopologyDeviceSetup.getNode().getNodeId();
+        Preconditions.checkNotNull(netconfNode.getHost());
+        Preconditions.checkNotNull(netconfNode.getPort());
+        Preconditions.checkNotNull(netconfNode.isTcpOnly());
+
+        this.deviceCommunicatorDTO = createDeviceCommunicator(nodeId, netconfNode, deviceContextActorRef);
+        final NetconfDeviceCommunicator deviceCommunicator = deviceCommunicatorDTO.getCommunicator();
+        final NetconfClientSessionListener netconfClientSessionListener = deviceCommunicatorDTO.getSessionListener();
+        final NetconfReconnectingClientConfiguration clientConfig =
+                getClientConfig(netconfClientSessionListener, netconfNode);
+        final ListenableFuture<NetconfDeviceCapabilities> future = deviceCommunicator
+                .initializeRemoteConnection(netconfTopologyDeviceSetup.getNetconfClientDispatcher(), clientConfig);
+
+        Futures.addCallback(future, new FutureCallback<NetconfDeviceCapabilities>() {
+            @Override
+            public void onSuccess(NetconfDeviceCapabilities result) {
+                LOG.debug("{}: Connector started succesfully", nodeId.getValue());
+            }
+
+            @Override
+            public void onFailure(@Nullable Throwable throwable) {
+                LOG.error("{}: Connector failed, {}", nodeId.getValue(), throwable);
+            }
+        });
+    }
+
+    @Override
+    public void stopRemoteDeviceConnection() {
+        Preconditions.checkNotNull(deviceCommunicatorDTO, "Device communicator was not created.");
+        try {
+            deviceCommunicatorDTO.close();
+        } catch (Exception e) {
+            LOG.warn("{}: Error at closing device communicator.", remoteDeviceId);
+        }
+    }
+
+    @VisibleForTesting
+    NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId, final NetconfNode node,
+                                                         final ActorRef deviceContextActorRef) {
+        //setup default values since default value is not supported in mdsal
+        final Long defaultRequestTimeoutMillis = node.getDefaultRequestTimeoutMillis() == null
+                ? NetconfTopologyUtils.DEFAULT_REQUEST_TIMEOUT_MILLIS : node.getDefaultRequestTimeoutMillis();
+        final Long keepaliveDelay = node.getKeepaliveDelay() == null
+                ? NetconfTopologyUtils.DEFAULT_KEEPALIVE_DELAY : node.getKeepaliveDelay();
+        final Boolean reconnectOnChangedSchema = node.isReconnectOnChangedSchema() == null
+                ? NetconfTopologyUtils.DEFAULT_RECONNECT_ON_CHANGED_SCHEMA : node.isReconnectOnChangedSchema();
+
+        RemoteDeviceHandler<NetconfSessionPreferences> salFacade =  new MasterSalFacade(remoteDeviceId,
+                netconfTopologyDeviceSetup.getDomBroker(), netconfTopologyDeviceSetup.getBindingAwareBroker(),
+                netconfTopologyDeviceSetup.getActorSystem(), deviceContextActorRef);
+        if (keepaliveDelay > 0) {
+            LOG.info("Device: {} , Adding keepalive facade.", nodeId);
+            salFacade = new KeepaliveSalFacade(remoteDeviceId, salFacade,
+                    netconfTopologyDeviceSetup.getKeepaliveExecutor().getExecutor(), keepaliveDelay,
+                    defaultRequestTimeoutMillis);
+        }
+
+        // pre register yang library sources as fallback schemas to schema registry
+        List<SchemaSourceRegistration<YangTextSchemaSource>> registeredYangLibSources = Lists.newArrayList();
+        if (node.getYangLibrary() != null) {
+            final String yangLibURL = node.getYangLibrary().getYangLibraryUrl().getValue();
+            final String yangLibUsername = node.getYangLibrary().getUsername();
+            final String yangLigPassword = node.getYangLibrary().getPassword();
+
+            LibraryModulesSchemas libraryModulesSchemas;
+            if (yangLibURL != null) {
+                if (yangLibUsername != null && yangLigPassword != null) {
+                    libraryModulesSchemas = LibraryModulesSchemas.create(yangLibURL, yangLibUsername, yangLigPassword);
+                } else {
+                    libraryModulesSchemas = LibraryModulesSchemas.create(yangLibURL);
+                }
+
+                for (Map.Entry<SourceIdentifier, URL> sourceIdentifierURLEntry :
+                        libraryModulesSchemas.getAvailableModels().entrySet()) {
+                    registeredYangLibSources
+                            .add(schemaRegistry.registerSchemaSource(
+                                    new YangLibrarySchemaYangSourceProvider(remoteDeviceId,
+                                            libraryModulesSchemas.getAvailableModels()),
+                                    PotentialSchemaSource
+                                            .create(sourceIdentifierURLEntry.getKey(), YangTextSchemaSource.class,
+                                                    PotentialSchemaSource.Costs.REMOTE_IO.getValue())));
+                }
+            }
+        }
+
+        final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO = setupSchemaCacheDTO(nodeId, node);
+        final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> device;
+        if (node.isSchemaless()) {
+            device = new SchemalessNetconfDevice(remoteDeviceId, salFacade);
+        } else {
+            device = new NetconfDeviceBuilder()
+                    .setReconnectOnSchemasChange(reconnectOnChangedSchema)
+                    .setSchemaResourcesDTO(schemaResourcesDTO)
+                    .setGlobalProcessingExecutor(netconfTopologyDeviceSetup.getProcessingExecutor().getExecutor())
+                    .setId(remoteDeviceId)
+                    .setSalFacade(salFacade)
+                    .build();
+        }
+
+        final Optional<NetconfSessionPreferences> userCapabilities = getUserCapabilities(node);
+        final int rpcMessageLimit =
+                node.getConcurrentRpcLimit() == null
+                        ? NetconfTopologyUtils.DEFAULT_CONCURRENT_RPC_LIMIT : node.getConcurrentRpcLimit();
+
+        if (rpcMessageLimit < 1) {
+            LOG.info("Device: {}, Concurrent rpc limit is smaller than 1, no limit will be enforced.", remoteDeviceId);
+        }
+
+        return new NetconfConnectorDTO(
+                userCapabilities.isPresent()
+                        ? new NetconfDeviceCommunicator(
+                                remoteDeviceId, device, new UserPreferences(userCapabilities.get(),
+                                node.getYangModuleCapabilities().isOverride()), rpcMessageLimit) :
+                        new NetconfDeviceCommunicator(remoteDeviceId, device, rpcMessageLimit), salFacade);
+    }
+
+    private Optional<NetconfSessionPreferences> getUserCapabilities(final NetconfNode node) {
+        if (node.getYangModuleCapabilities() == null) {
+            return Optional.empty();
+        }
+
+        final List<String> capabilities = node.getYangModuleCapabilities().getCapability();
+        if (capabilities == null || capabilities.isEmpty()) {
+            return Optional.empty();
+        }
+
+        final NetconfSessionPreferences parsedOverrideCapabilities =
+                NetconfSessionPreferences.fromStrings(capabilities);
+        Preconditions.checkState(parsedOverrideCapabilities.getNonModuleCaps().isEmpty(),
+                "Capabilities to override can only contain module based capabilities, non-module capabilities "
+                        + "will be retrieved from the device, configured non-module capabilities: "
+                        + parsedOverrideCapabilities.getNonModuleCaps());
+
+        return Optional.of(parsedOverrideCapabilities);
+    }
+
+    private NetconfDevice.SchemaResourcesDTO setupSchemaCacheDTO(final NodeId nodeId, final NetconfNode node) {
+        // Setup information related to the SchemaRegistry, SchemaResourceFactory, etc.
+        NetconfDevice.SchemaResourcesDTO schemaResourcesDTO = null;
+        final String moduleSchemaCacheDirectory = node.getSchemaCacheDirectory();
+        // Only checks to ensure the String is not empty or null;  further checks related to directory accessibility
+        // and file permissions are handled during the FilesystemSchemaSourceCache initialization.
+        if (!Strings.isNullOrEmpty(moduleSchemaCacheDirectory)) {
+            // If a custom schema cache directory is specified, create the backing DTO; otherwise, the SchemaRegistry
+            // and SchemaContextFactory remain the default values.
+            if (!moduleSchemaCacheDirectory.equals(NetconfTopologyUtils.DEFAULT_CACHE_DIRECTORY)) {
+                // Multiple modules may be created at once;  synchronize to avoid issues with data consistency among
+                // threads.
+                synchronized (schemaResourcesDTOs) {
+                    // Look for the cached DTO to reuse SchemaRegistry and SchemaContextFactory variables if
+                    // they already exist
+                    schemaResourcesDTO = schemaResourcesDTOs.get(moduleSchemaCacheDirectory);
+                    if (schemaResourcesDTO == null) {
+                        schemaResourcesDTO = createSchemaResourcesDTO(moduleSchemaCacheDirectory);
+                        schemaResourcesDTO.getSchemaRegistry().registerSchemaSourceListener(
+                                TextToASTTransformer.create((SchemaRepository) schemaResourcesDTO.getSchemaRegistry(),
+                                        schemaResourcesDTO.getSchemaRegistry())
+                        );
+                        schemaResourcesDTOs.put(moduleSchemaCacheDirectory, schemaResourcesDTO);
+                    }
+                }
+                LOG.info("{} : netconf connector will use schema cache directory {} instead of {}",
+                        nodeId.getValue(), moduleSchemaCacheDirectory, NetconfTopologyUtils.DEFAULT_CACHE_DIRECTORY);
+            }
+        } else {
+            LOG.info("{} : using the default directory {}",
+                    nodeId.getValue(), NetconfTopologyUtils.QUALIFIED_DEFAULT_CACHE_DIRECTORY);
+        }
+
+        if (schemaResourcesDTO == null) {
+            schemaResourcesDTO =
+                    new NetconfDevice.SchemaResourcesDTO(schemaRegistry, schemaRepository, schemaContextFactory,
+                            new NetconfStateSchemasResolverImpl());
+        }
+
+        return schemaResourcesDTO;
+    }
+
+    /**
+     * Creates the backing Schema classes for a particular directory.
+     *
+     * @param moduleSchemaCacheDirectory The string directory relative to "cache"
+     * @return A DTO containing the Schema classes for the Netconf mount.
+     */
+    private NetconfDevice.SchemaResourcesDTO createSchemaResourcesDTO(final String moduleSchemaCacheDirectory) {
+        final SharedSchemaRepository repository = new SharedSchemaRepository(moduleSchemaCacheDirectory);
+        final SchemaContextFactory schemaContextFactory
+                = repository.createSchemaContextFactory(SchemaSourceFilter.ALWAYS_ACCEPT);
+        this.schemaRegistry = repository;
+        this.schemaContextFactory = schemaContextFactory;
+
+        final FilesystemSchemaSourceCache<YangTextSchemaSource> deviceCache =
+                createDeviceFilesystemCache(moduleSchemaCacheDirectory);
+        repository.registerSchemaSourceListener(deviceCache);
+        return new NetconfDevice.SchemaResourcesDTO(repository, repository, schemaContextFactory,
+                new NetconfStateSchemasResolverImpl());
+    }
+
+    /**
+     * Creates a <code>FilesystemSchemaSourceCache</code> for the custom schema cache directory.
+     *
+     * @param schemaCacheDirectory The custom cache directory relative to "cache"
+     * @return A <code>FilesystemSchemaSourceCache</code> for the custom schema cache directory
+     */
+    private FilesystemSchemaSourceCache<YangTextSchemaSource> createDeviceFilesystemCache(
+            final String schemaCacheDirectory) {
+        final String relativeSchemaCacheDirectory =
+                NetconfTopologyUtils.CACHE_DIRECTORY + File.separator + schemaCacheDirectory;
+        return new FilesystemSchemaSourceCache<>(schemaRegistry, YangTextSchemaSource.class,
+                new File(relativeSchemaCacheDirectory));
+    }
+
+    //TODO: duplicate code
+    private InetSocketAddress getSocketAddress(final Host host, int port) {
+        if (host.getDomainName() != null) {
+            return new InetSocketAddress(host.getDomainName().getValue(), port);
+        } else {
+            final IpAddress ipAddress = host.getIpAddress();
+            final String ip = ipAddress.getIpv4Address() != null ? ipAddress.getIpv4Address().getValue() :
+                    ipAddress.getIpv6Address().getValue();
+            return new InetSocketAddress(ip, port);
+        }
+    }
+
+    private static final class TimedReconnectStrategyFactory implements ReconnectStrategyFactory {
+        private final Long connectionAttempts;
+        private final EventExecutor executor;
+        private final double sleepFactor;
+        private final int minSleep;
+
+        TimedReconnectStrategyFactory(final EventExecutor executor, final Long maxConnectionAttempts,
+                                      final int minSleep, final BigDecimal sleepFactor) {
+            if (maxConnectionAttempts != null && maxConnectionAttempts > 0) {
+                connectionAttempts = maxConnectionAttempts;
+            } else {
+                connectionAttempts = null;
+            }
+
+            this.sleepFactor = sleepFactor.doubleValue();
+            this.executor = executor;
+            this.minSleep = minSleep;
+        }
+
+        @Override
+        public ReconnectStrategy createReconnectStrategy() {
+            final Long maxSleep = null;
+            final Long deadline = null;
+
+            return new TimedReconnectStrategy(executor, minSleep,
+                    minSleep, sleepFactor, maxSleep, connectionAttempts, deadline);
+        }
+    }
+
+    @VisibleForTesting
+    NetconfReconnectingClientConfiguration getClientConfig(final NetconfClientSessionListener listener,
+                                                                   final NetconfNode node) {
+
+        //setup default values since default value is not supported in mdsal
+        final long clientConnectionTimeoutMillis = node.getConnectionTimeoutMillis() == null
+                ? NetconfTopologyUtils.DEFAULT_CONNECTION_TIMEOUT_MILLIS : node.getConnectionTimeoutMillis();
+        final long maxConnectionAttempts = node.getMaxConnectionAttempts() == null
+                ? NetconfTopologyUtils.DEFAULT_MAX_CONNECTION_ATTEMPTS : node.getMaxConnectionAttempts();
+        final int betweenAttemptsTimeoutMillis = node.getBetweenAttemptsTimeoutMillis() == null
+                ? NetconfTopologyUtils.DEFAULT_BETWEEN_ATTEMPTS_TIMEOUT_MILLIS : node.getBetweenAttemptsTimeoutMillis();
+        final BigDecimal sleepFactor = node.getSleepFactor() == null
+                ? NetconfTopologyUtils.DEFAULT_SLEEP_FACTOR : node.getSleepFactor();
+
+        final InetSocketAddress socketAddress = getSocketAddress(node.getHost(), node.getPort().getValue());
+
+        final ReconnectStrategyFactory sf =
+                new TimedReconnectStrategyFactory(netconfTopologyDeviceSetup.getEventExecutor(), maxConnectionAttempts,
+                        betweenAttemptsTimeoutMillis, sleepFactor);
+        final ReconnectStrategy strategy = sf.createReconnectStrategy();
+
+        final AuthenticationHandler authHandler;
+        final Credentials credentials = node.getCredentials();
+        if (credentials instanceof org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.credentials.LoginPassword) {
+            authHandler = new LoginPassword(
+                    ((org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.credentials.LoginPassword) credentials).getUsername(),
+                    ((org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.credentials.LoginPassword) credentials).getPassword());
+        } else {
+            throw new IllegalStateException("Only login/password authentification is supported");
+        }
+
+        return NetconfReconnectingClientConfigurationBuilder.create()
+                .withAddress(socketAddress)
+                .withConnectionTimeoutMillis(clientConnectionTimeoutMillis)
+                .withReconnectStrategy(strategy)
+                .withAuthHandler(authHandler)
+                .withProtocol(node.isTcpOnly()
+                        ? NetconfClientConfiguration.NetconfClientProtocol.TCP
+                        : NetconfClientConfiguration.NetconfClientProtocol.SSH)
+                .withConnectStrategyFactory(sf)
+                .withSessionListener(listener)
+                .build();
+    }
+
+    @VisibleForTesting
+    Map<String, NetconfDevice.SchemaResourcesDTO> getSchemaResourcesDTOs() {
+        return schemaResourcesDTOs;
+    }
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/RemoteOperationTxProcessorImpl.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/RemoteOperationTxProcessorImpl.java
new file mode 100644 (file)
index 0000000..da95e9f
--- /dev/null
@@ -0,0 +1,155 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl;
+
+import akka.actor.ActorRef;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.api.RemoteOperationTxProcessor;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.SubmitFailedReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RemoteOperationTxProcessorImpl implements RemoteOperationTxProcessor, AutoCloseable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RemoteOperationTxProcessorImpl.class);
+
+    private final DOMDataBroker dataBroker;
+    private final RemoteDeviceId id;
+    private DOMDataWriteTransaction writeTx;
+    private DOMDataReadOnlyTransaction readTx;
+
+    public RemoteOperationTxProcessorImpl(final DOMDataBroker dataBroker, final RemoteDeviceId id) {
+        this.dataBroker = dataBroker;
+        this.id = id;
+        this.readTx = dataBroker.newReadOnlyTransaction();
+    }
+
+    @Override
+    public void doDelete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+        if (writeTx == null) {
+            writeTx = dataBroker.newWriteOnlyTransaction();
+        }
+        writeTx.delete(store, path);
+    }
+
+    @Override
+    public void doSubmit(final ActorRef recipient, final ActorRef sender) {
+        if (writeTx != null) {
+            CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTx.submit();
+            Futures.addCallback(submitFuture, new FutureCallback<Void>() {
+                @Override
+                public void onSuccess(Void result) {
+                    recipient.tell(new SubmitReply(), sender);
+                }
+
+                @Override
+                public void onFailure(@Nonnull Throwable throwable) {
+                    recipient.tell(throwable, sender);
+                }
+            });
+        } else {
+            recipient.tell(new SubmitFailedReply(), sender);
+            LOG.warn("{}: Couldn't submit transaction because it was already closed.", id);
+        }
+    }
+
+    @Override
+    public void doCancel(final ActorRef recipient, final ActorRef sender) {
+        boolean cancel = false;
+        if (writeTx != null) {
+            cancel = writeTx.cancel();
+        }
+        recipient.tell(cancel, sender);
+    }
+
+    @Override
+    public void doPut(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
+        if (writeTx == null) {
+            writeTx = dataBroker.newWriteOnlyTransaction();
+        }
+        writeTx.put(store, data.getIdentifier(), data.getNode());
+    }
+
+    @Override
+    public void doMerge(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
+        if (writeTx == null) {
+            writeTx = dataBroker.newWriteOnlyTransaction();
+        }
+        writeTx.merge(store, data.getIdentifier(), data.getNode());
+    }
+
+    @Override
+    public void doRead(final LogicalDatastoreType store, final YangInstanceIdentifier path, final ActorRef recipient,
+                       final ActorRef sender) {
+        final CheckedFuture<Optional<NormalizedNode<?,?>>, ReadFailedException> readFuture =
+                readTx.read(store, path);
+
+        Futures.addCallback(readFuture, new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
+
+            @Override
+            public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
+                if (!result.isPresent()) {
+                    recipient.tell(new EmptyReadResponse(), sender);
+                    return;
+                }
+                recipient.tell(new NormalizedNodeMessage(path, result.get()), sender);
+            }
+
+            @Override
+            public void onFailure(@Nonnull final Throwable throwable) {
+                recipient.tell(throwable, sender);
+            }
+        });
+    }
+
+    @Override
+    public void doExists(final LogicalDatastoreType store, final YangInstanceIdentifier path, final ActorRef recipient,
+                         final ActorRef sender) {
+        final CheckedFuture<Boolean, ReadFailedException> readFuture =
+                readTx.exists(store, path);
+        Futures.addCallback(readFuture, new FutureCallback<Boolean>() {
+            @Override
+            public void onSuccess(final Boolean result) {
+                if (result == null) {
+                    recipient.tell(false, sender);
+                } else {
+                    recipient.tell(result, sender);
+                }
+            }
+
+            @Override
+            public void onFailure(@Nonnull final Throwable throwable) {
+                recipient.tell(throwable, sender);
+            }
+        });
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (readTx != null) {
+            readTx.close();
+        }
+    }
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/SlaveSalFacade.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/SlaveSalFacade.java
new file mode 100644 (file)
index 0000000..772020b
--- /dev/null
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceNotificationService;
+import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceSalProvider;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
+import org.opendaylight.netconf.topology.singleton.impl.tx.NetconfProxyDOMTransaction;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SlaveSalFacade {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SlaveSalFacade.class);
+
+    private final RemoteDeviceId id;
+    private final NetconfDeviceSalProvider salProvider;
+
+    private final ActorSystem actorSystem;
+
+    public SlaveSalFacade(final RemoteDeviceId id,
+                          final Broker domBroker,
+                          final ActorSystem actorSystem) {
+        this.id = id;
+        this.salProvider = new NetconfDeviceSalProvider(id);
+        this.actorSystem = actorSystem;
+
+        registerToSal(domBroker);
+    }
+
+    private void registerToSal(final Broker domRegistryDependency) {
+        domRegistryDependency.registerProvider(salProvider);
+
+    }
+
+    public void registerSlaveMountPoint(final SchemaContext remoteSchemaContext, final DOMRpcService deviceRpc,
+                                        final ActorRef masterActorRef) {
+        final NetconfDeviceNotificationService notificationService = new NetconfDeviceNotificationService();
+
+        final NetconfDOMTransaction proxyDOMTransactions =
+                new NetconfProxyDOMTransaction(actorSystem, masterActorRef);
+
+        final NetconfDOMDataBroker netconfDeviceDataBroker =
+                new NetconfDOMDataBroker(actorSystem, id, proxyDOMTransactions);
+
+        salProvider.getMountInstance().onTopologyDeviceConnected(remoteSchemaContext, netconfDeviceDataBroker,
+                deviceRpc, notificationService);
+
+        LOG.info("{}: Slave mount point registered.", id);
+    }
+
+    public void unregisterSlaveMountPoint() {
+        salProvider.getMountInstance().onTopologyDeviceDisconnected();
+    }
+
+    public void close() {
+        unregisterSlaveMountPoint();
+        try {
+            salProvider.getMountInstance().close();
+        } catch (Exception exception) {
+            LOG.warn("{}: Exception in closing slave sal facade: {}", id, exception);
+        }
+
+    }
+
+
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java
new file mode 100644 (file)
index 0000000..75c7e62
--- /dev/null
@@ -0,0 +1,235 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.actors;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import java.io.IOException;
+import java.util.List;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
+import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
+import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.api.RemoteOperationTxProcessor;
+import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMRpcService;
+import org.opendaylight.netconf.topology.singleton.impl.ProxyYangTextSourceProvider;
+import org.opendaylight.netconf.topology.singleton.impl.RemoteOperationTxProcessorImpl;
+import org.opendaylight.netconf.topology.singleton.impl.SlaveSalFacade;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
+import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
+import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
+import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
+import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
+import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
+import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
+import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.TransactionRequest;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceException;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter;
+import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
+import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
+import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
+import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NetconfNodeActor extends UntypedActor {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeActor.class);
+
+    private NetconfTopologySetup setup;
+    private RemoteDeviceId id;
+    private final SchemaSourceRegistry schemaRegistry;
+    private final SchemaRepository schemaRepository;
+
+    private RemoteOperationTxProcessor operationsProcessor;
+    private List<SourceIdentifier> sourceIdentifiers;
+    private SlaveSalFacade slaveSalManager;
+
+    public static Props props(final NetconfTopologySetup setup,
+                              final RemoteDeviceId id, final SchemaSourceRegistry schemaRegistry,
+                              final SchemaRepository schemaRepository) {
+        return Props.create(NetconfNodeActor.class, () ->
+                new NetconfNodeActor(setup, id, schemaRegistry, schemaRepository));
+    }
+
+    private NetconfNodeActor(final NetconfTopologySetup setup,
+                             final RemoteDeviceId id, SchemaSourceRegistry schemaRegistry,
+                             final SchemaRepository schemaRepository) {
+        this.setup = setup;
+        this.id = id;
+        this.schemaRegistry = schemaRegistry;
+        this.schemaRepository = schemaRepository;
+    }
+
+    @Override
+    public void onReceive(final Object message) throws Exception {
+        if (message instanceof CreateInitialMasterActorData) { // master
+
+            sourceIdentifiers = ((CreateInitialMasterActorData) message).getSourceIndentifiers();
+            operationsProcessor =
+                    new RemoteOperationTxProcessorImpl(((CreateInitialMasterActorData) message).getDeviceDataBroker(),
+                            id);
+            sender().tell(new MasterActorDataInitialized(), self());
+
+            LOG.debug("{}: Master is ready.", id);
+
+        } else if (message instanceof  RefreshSetupMasterActorData) {
+            setup = ((RefreshSetupMasterActorData) message).getNetconfTopologyDeviceSetup();
+            id = ((RefreshSetupMasterActorData) message).getRemoteDeviceId();
+            sender().tell(new MasterActorDataInitialized(), self());
+        } else if (message instanceof AskForMasterMountPoint) { // master
+            // only master contains reference to operations processor
+            if (operationsProcessor != null) {
+                getSender().tell(new RegisterMountPoint(sourceIdentifiers), getSelf());
+            }
+
+        } else if (message instanceof TransactionRequest) { // master
+
+            resolveProxyCalls(message, sender(), getSelf());
+
+        } else if (message instanceof YangTextSchemaSourceRequest) { // master
+
+            final YangTextSchemaSourceRequest yangTextSchemaSourceRequest = (YangTextSchemaSourceRequest) message;
+            sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender());
+
+        } else if (message instanceof RegisterMountPoint) { //slaves
+
+            sourceIdentifiers = ((RegisterMountPoint) message).getSourceIndentifiers();
+            registerSlaveMountPoint(getSender());
+
+        } else if (message instanceof UnregisterSlaveMountPoint) { //slaves
+            if (slaveSalManager != null) {
+                slaveSalManager.close();
+                slaveSalManager = null;
+            }
+
+        }
+    }
+
+    private void resolveProxyCalls(final Object message, final ActorRef recipient, final ActorRef futureSender) {
+        if (message instanceof ReadRequest) {
+
+            final ReadRequest readRequest = (ReadRequest) message;
+            operationsProcessor.doRead(readRequest.getStore(), readRequest.getPath(), recipient, futureSender);
+
+        } else if (message instanceof ExistsRequest) {
+
+            final ExistsRequest readRequest = (ExistsRequest) message;
+            operationsProcessor.doExists(readRequest.getStore(), readRequest.getPath(), recipient, futureSender);
+
+        } else if (message instanceof MergeRequest) {
+
+            final MergeRequest mergeRequest = (MergeRequest) message;
+            operationsProcessor.doMerge(mergeRequest.getStore(), mergeRequest.getNormalizedNodeMessage());
+
+        } else if (message instanceof PutRequest) {
+
+            final PutRequest putRequest = (PutRequest) message;
+            operationsProcessor.doPut(putRequest.getStore(), putRequest.getNormalizedNodeMessage());
+
+        } else if (message instanceof DeleteRequest) {
+
+            final DeleteRequest deleteRequest = (DeleteRequest) message;
+            operationsProcessor.doDelete(deleteRequest.getStore(), deleteRequest.getPath());
+
+        } else if (message instanceof CancelRequest) {
+
+            operationsProcessor.doCancel(recipient, futureSender);
+
+        } else if (message instanceof SubmitRequest) {
+
+            operationsProcessor.doSubmit(recipient, futureSender);
+        }
+    }
+
+    private void sendYangTextSchemaSourceProxy(final SourceIdentifier sourceIdentifier, final ActorRef sender) {
+        final CheckedFuture<YangTextSchemaSource, SchemaSourceException> yangTextSchemaSource =
+                schemaRepository.getSchemaSource(sourceIdentifier, YangTextSchemaSource.class);
+
+        Futures.addCallback(yangTextSchemaSource, new FutureCallback<YangTextSchemaSource>() {
+            @Override
+            public void onSuccess(final YangTextSchemaSource yangTextSchemaSource) {
+                try {
+                    sender.tell(new YangTextSchemaSourceSerializationProxy(yangTextSchemaSource), getSelf());
+                } catch (IOException exception) {
+                    sender.tell(exception.getCause(), getSelf());
+                }
+            }
+
+            @Override
+            public void onFailure(@Nonnull final Throwable throwable) {
+                sender.tell(throwable, getSelf());
+            }
+        });
+    }
+
+    private void registerSlaveMountPoint(final ActorRef masterReference) {
+        if (this.slaveSalManager != null) {
+            slaveSalManager.close();
+        }
+        slaveSalManager = new SlaveSalFacade(id, setup.getDomBroker(), setup.getActorSystem());
+
+        final CheckedFuture<SchemaContext, SchemaResolutionException> remoteSchemaContext =
+                getSchemaContext(masterReference);
+        final DOMRpcService deviceRpc = getDOMRpcService();
+
+        Futures.addCallback(remoteSchemaContext, new FutureCallback<SchemaContext>() {
+            @Override
+            public void onSuccess(final SchemaContext result) {
+                LOG.info("{}: Schema context resolved: {}", id, result.getModules());
+                slaveSalManager.registerSlaveMountPoint(result, deviceRpc, masterReference);
+            }
+
+            @Override
+            public void onFailure(@Nonnull final Throwable throwable) {
+                LOG.error("{}: Failed to register mount point: {}", id, throwable);
+            }
+        });
+    }
+
+    private DOMRpcService getDOMRpcService() {
+        return new ProxyDOMRpcService();
+    }
+
+    private CheckedFuture<SchemaContext, SchemaResolutionException> getSchemaContext(ActorRef masterReference) {
+
+        final RemoteYangTextSourceProvider remoteYangTextSourceProvider =
+                new ProxyYangTextSourceProvider(masterReference, getContext());
+        final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider,
+                getContext().dispatcher());
+
+        sourceIdentifiers.forEach(sourceId ->
+                schemaRegistry.registerSchemaSource(remoteProvider, PotentialSchemaSource.create(sourceId,
+                        YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())));
+
+        final SchemaContextFactory schemaContextFactory
+                = schemaRepository.createSchemaContextFactory(SchemaSourceFilter.ALWAYS_ACCEPT);
+
+        return schemaContextFactory.createSchemaContext(sourceIdentifiers);
+    }
+
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfMasterDOMTransaction.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfMasterDOMTransaction.java
new file mode 100644 (file)
index 0000000..602eb75
--- /dev/null
@@ -0,0 +1,150 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.tx;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
+import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceDataBroker;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.Future;
+import scala.concurrent.impl.Promise.DefaultPromise;
+
+public class NetconfMasterDOMTransaction implements NetconfDOMTransaction {
+
+    private final DOMDataBroker delegateBroker;
+
+    private DOMDataReadOnlyTransaction readTx;
+    private DOMDataWriteTransaction writeTx;
+
+    public NetconfMasterDOMTransaction(final RemoteDeviceId id,
+                                       final SchemaContext schemaContext, final DOMRpcService rpc,
+                                       final NetconfSessionPreferences netconfSessionPreferences) {
+
+        delegateBroker = new NetconfDeviceDataBroker(id, schemaContext, rpc, netconfSessionPreferences);
+
+        // only ever need 1 readTx since it doesnt need to be closed
+        readTx = delegateBroker.newReadOnlyTransaction();
+    }
+
+    public NetconfMasterDOMTransaction(final DOMDataBroker delegateBroker) {
+        this.delegateBroker = delegateBroker;
+
+        // only ever need 1 readTx since it doesnt need to be closed
+        readTx = delegateBroker.newReadOnlyTransaction();
+    }
+
+    @Override
+    public Future<Optional<NormalizedNodeMessage>> read(final LogicalDatastoreType store,
+                                                        final YangInstanceIdentifier path) {
+        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readFuture = readTx.read(store, path);
+
+        final DefaultPromise<Optional<NormalizedNodeMessage>> promise = new DefaultPromise<>();
+        Futures.addCallback(readFuture, new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
+            @Override
+            public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
+                if (!result.isPresent()) {
+                    promise.success(Optional.absent());
+                } else {
+                    promise.success(Optional.of(new NormalizedNodeMessage(path, result.get())));
+                }
+            }
+
+            @Override
+            public void onFailure(@Nonnull final Throwable throwable) {
+                promise.failure(throwable);
+            }
+        });
+        return promise.future();
+    }
+
+    @Override
+    public Future<Boolean> exists(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+        final CheckedFuture<Boolean, ReadFailedException> existsFuture = readTx.exists(store, path);
+
+        final DefaultPromise<Boolean> promise = new DefaultPromise<>();
+        Futures.addCallback(existsFuture, new FutureCallback<Boolean>() {
+            @Override
+            public void onSuccess(final Boolean result) {
+                promise.success(result);
+            }
+
+            @Override
+            public void onFailure(@Nonnull final Throwable throwable) {
+                promise.failure(throwable);
+            }
+        });
+        return promise.future();
+    }
+
+    @Override
+    public void put(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
+        if (writeTx == null) {
+            writeTx = delegateBroker.newWriteOnlyTransaction();
+        }
+        writeTx.put(store, data.getIdentifier(), data.getNode());
+    }
+
+    @Override
+    public void merge(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
+        if (writeTx == null) {
+            writeTx = delegateBroker.newWriteOnlyTransaction();
+        }
+        writeTx.merge(store, data.getIdentifier(), data.getNode());
+    }
+
+    @Override
+    public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+        if (writeTx == null) {
+            writeTx = delegateBroker.newWriteOnlyTransaction();
+        }
+        writeTx.delete(store, path);
+    }
+
+    @Override
+    public boolean cancel() {
+        return writeTx.cancel();
+    }
+
+    @Override
+    public Future<Void> submit() {
+        final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTx.submit();
+        final DefaultPromise<Void> promise = new DefaultPromise<>();
+        Futures.addCallback(submitFuture, new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(final Void result) {
+                promise.success(result);
+                writeTx = null;
+            }
+
+            @Override
+            public void onFailure(@Nonnull final Throwable throwable) {
+                promise.failure(throwable);
+                writeTx = null;
+            }
+        });
+        return promise.future();
+    }
+
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfProxyDOMTransaction.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfProxyDOMTransaction.java
new file mode 100644 (file)
index 0000000..ad9cdd8
--- /dev/null
@@ -0,0 +1,170 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.tx;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import com.google.common.base.Optional;
+import org.opendaylight.controller.config.util.xml.DocumentedException;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.SubmitFailedReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.impl.Promise.DefaultPromise;
+
+
+public class NetconfProxyDOMTransaction implements NetconfDOMTransaction {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NetconfProxyDOMTransaction.class);
+
+    private final ActorSystem actorSystem;
+    private final ActorRef masterContextRef;
+
+    public NetconfProxyDOMTransaction(final ActorSystem actorSystem, final ActorRef masterContextRef) {
+        this.actorSystem = actorSystem;
+        this.masterContextRef = masterContextRef;
+    }
+
+    @Override
+    public Future<Optional<NormalizedNodeMessage>> read(final LogicalDatastoreType store,
+                                                        final YangInstanceIdentifier path) {
+
+        final Future<Object> readScalaFuture =
+                Patterns.ask(masterContextRef, new ReadRequest(store, path), NetconfTopologyUtils.TIMEOUT);
+
+        final DefaultPromise<Optional<NormalizedNodeMessage>> promise = new DefaultPromise<>();
+
+        readScalaFuture.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(final Throwable failure, final Object success) throws Throwable {
+                if (failure != null) { // ask timeout
+                    Exception exception = new DocumentedException("Master is down. Please try again.",
+                            DocumentedException.ErrorType.application, DocumentedException.ErrorTag.operation_failed,
+                            DocumentedException.ErrorSeverity.warning);
+                    promise.failure(exception);
+                    return;
+                }
+                if (success instanceof Throwable) { // Error sended by master
+                    promise.failure((Throwable) success);
+                    return;
+                }
+                if (success instanceof EmptyReadResponse) {
+                    promise.success(Optional.absent());
+                    return;
+                }
+
+                promise.success(Optional.of((NormalizedNodeMessage) success));
+            }
+        }, actorSystem.dispatcher());
+
+        return promise.future();
+    }
+
+    @Override
+    public Future<Boolean> exists(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+        final Future<Object> existsScalaFuture =
+                Patterns.ask(masterContextRef, new ExistsRequest(store, path), NetconfTopologyUtils.TIMEOUT);
+
+        final DefaultPromise<Boolean> promise = new DefaultPromise<>();
+        existsScalaFuture.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(final Throwable failure, final Object success) throws Throwable {
+                if (failure != null) { // ask timeout
+                    Exception exception = new DocumentedException("Master is down. Please try again.",
+                            DocumentedException.ErrorType.application, DocumentedException.ErrorTag.operation_failed,
+                            DocumentedException.ErrorSeverity.warning);
+                    promise.failure(exception);
+                    return;
+                }
+                if (success instanceof Throwable) {
+                    promise.failure((Throwable) success);
+                    return;
+                }
+                promise.success((Boolean) success);
+            }
+        }, actorSystem.dispatcher());
+        return promise.future();
+    }
+
+    @Override
+    public void put(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
+        masterContextRef.tell(new PutRequest(store, data), ActorRef.noSender());
+
+    }
+
+    @Override
+    public void merge(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
+        masterContextRef.tell(new MergeRequest(store, data), ActorRef.noSender());
+    }
+
+    @Override
+    public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+        masterContextRef.tell(new DeleteRequest(store, path), ActorRef.noSender());
+    }
+
+    @Override
+    public boolean cancel() {
+        final Future<Object> cancelScalaFuture =
+                Patterns.ask(masterContextRef, new CancelRequest(), NetconfTopologyUtils.TIMEOUT);
+        try {
+            // here must be Await because AsyncWriteTransaction do not return future
+            return (boolean) Await.result(cancelScalaFuture, NetconfTopologyUtils.TIMEOUT.duration());
+        } catch (Exception e) {
+            return false;
+        }
+    }
+
+    @Override
+    public Future<Void> submit() {
+        final Future<Object> submitScalaFuture =
+                Patterns.ask(masterContextRef, new SubmitRequest(), NetconfTopologyUtils.TIMEOUT);
+
+        final DefaultPromise<Void> promise = new DefaultPromise<>();
+
+        submitScalaFuture.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(final Throwable failure, final Object success) throws Throwable {
+                if (failure != null) { // ask timeout
+                    Exception exception = new DocumentedException("Master is down. Please try again.",
+                            DocumentedException.ErrorType.application, DocumentedException.ErrorTag.operation_failed,
+                            DocumentedException.ErrorSeverity.warning);
+                    promise.failure(exception);
+                    return;
+                }
+                if (success instanceof Throwable) {
+                    promise.failure((Throwable) success);
+                } else {
+                    if (success instanceof SubmitFailedReply) {
+                        LOG.error("Transaction was not submitted.");
+                    }
+                    promise.success(null);
+                }
+            }
+        }, actorSystem.dispatcher());
+
+        return promise.future();
+    }
+
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfReadOnlyTransaction.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfReadOnlyTransaction.java
new file mode 100644 (file)
index 0000000..3af25c6
--- /dev/null
@@ -0,0 +1,112 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.tx;
+
+import akka.actor.ActorSystem;
+import akka.dispatch.OnComplete;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import scala.concurrent.Future;
+
+public class NetconfReadOnlyTransaction implements DOMDataReadOnlyTransaction {
+
+    private final NetconfDOMTransaction delegate;
+    private final ActorSystem actorSystem;
+
+    public NetconfReadOnlyTransaction(final ActorSystem actorSystem, final NetconfDOMTransaction delegate) {
+        this.delegate = delegate;
+        this.actorSystem = actorSystem;
+    }
+
+    @Override
+    public void close() {
+        //NOOP
+    }
+
+    @Override
+    public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final LogicalDatastoreType store,
+                                                                                   final YangInstanceIdentifier path) {
+        final Future<Optional<NormalizedNodeMessage>> future = delegate.read(store, path);
+        final SettableFuture<Optional<NormalizedNode<?, ?>>> settableFuture = SettableFuture.create();
+        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> checkedFuture;
+        checkedFuture = Futures.makeChecked(settableFuture, new Function<Exception, ReadFailedException>() {
+            @Nullable
+            @Override
+            public ReadFailedException apply(Exception cause) {
+                return new ReadFailedException("Read from transaction failed", cause);
+            }
+        });
+        future.onComplete(new OnComplete<Optional<NormalizedNodeMessage>>() {
+            @Override
+            public void onComplete(final Throwable throwable,
+                                   final Optional<NormalizedNodeMessage> normalizedNodeMessage) throws Throwable {
+                if (throwable == null) {
+                    if (normalizedNodeMessage.isPresent()) {
+                        settableFuture.set(normalizedNodeMessage.transform(new Function<NormalizedNodeMessage,
+                                NormalizedNode<?, ?>>() {
+
+                            @Nullable
+                            @Override
+                            public NormalizedNode<?, ?> apply(final NormalizedNodeMessage input) {
+                                return input.getNode();
+                            }
+                        }));
+                    } else {
+                        settableFuture.set(Optional.absent());
+                    }
+                } else {
+                    settableFuture.setException(throwable);
+                }
+            }
+        }, actorSystem.dispatcher());
+        return checkedFuture;
+    }
+
+    @Override
+    public CheckedFuture<Boolean, ReadFailedException> exists(final LogicalDatastoreType store,
+                                                              final YangInstanceIdentifier path) {
+        final Future<Boolean> existsFuture = delegate.exists(store, path);
+        final SettableFuture<Boolean> settableFuture = SettableFuture.create();
+        final CheckedFuture<Boolean, ReadFailedException> checkedFuture;
+        checkedFuture = Futures.makeChecked(settableFuture, new Function<Exception, ReadFailedException>() {
+            @Nullable
+            @Override
+            public ReadFailedException apply(Exception cause) {
+                return new ReadFailedException("Read from transaction failed", cause);
+            }
+        });
+        existsFuture.onComplete(new OnComplete<Boolean>() {
+            @Override
+            public void onComplete(final Throwable throwable, final Boolean result) throws Throwable {
+                if (throwable == null) {
+                    settableFuture.set(result);
+                } else {
+                    settableFuture.setException(throwable);
+                }
+            }
+        }, actorSystem.dispatcher());
+        return checkedFuture;
+    }
+
+    @Override
+    public Object getIdentifier() {
+        return this;
+    }
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfWriteOnlyTransaction.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfWriteOnlyTransaction.java
new file mode 100644 (file)
index 0000000..bef72a2
--- /dev/null
@@ -0,0 +1,111 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.tx;
+
+import akka.actor.ActorSystem;
+import akka.dispatch.OnComplete;
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import scala.concurrent.Future;
+
+public class NetconfWriteOnlyTransaction implements DOMDataWriteTransaction {
+
+    private final NetconfDOMTransaction delegate;
+    private final ActorSystem actorSystem;
+
+    public NetconfWriteOnlyTransaction(final ActorSystem actorSystem, final NetconfDOMTransaction delegate) {
+        this.delegate = delegate;
+        this.actorSystem = actorSystem;
+    }
+
+    @Override
+    public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path,
+                    final NormalizedNode<?,?> data) {
+        delegate.put(store, new NormalizedNodeMessage(path, data));
+    }
+
+    @Override
+    public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path,
+                      final NormalizedNode<?,?> data) {
+        delegate.merge(store, new NormalizedNodeMessage(path, data));
+    }
+
+    @Override
+    public boolean cancel() {
+        return delegate.cancel();
+    }
+
+    @Override
+    public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+        delegate.delete(store, path);
+    }
+
+    @Override
+    public CheckedFuture<Void, TransactionCommitFailedException> submit() {
+        final Future<Void> submit = delegate.submit();
+        final SettableFuture<Void> settFuture = SettableFuture.create();
+        final CheckedFuture<Void, TransactionCommitFailedException> checkedFuture;
+        checkedFuture = Futures.makeChecked(settFuture, new Function<Exception, TransactionCommitFailedException>() {
+            @Nullable
+            @Override
+            public TransactionCommitFailedException apply(Exception input) {
+                return new TransactionCommitFailedException("Transaction commit failed", input);
+            }
+        });
+        submit.onComplete(new OnComplete<Void>() {
+            @Override
+            public void onComplete(Throwable throwable, Void object) throws Throwable {
+                if (throwable == null) {
+                    settFuture.set(object);
+                } else {
+                    settFuture.setException(throwable);
+                }
+            }
+        }, actorSystem.dispatcher());
+        return checkedFuture;
+    }
+
+    @Override
+    public ListenableFuture<RpcResult<TransactionStatus>> commit() {
+        final Future<Void> commit = delegate.submit();
+        final SettableFuture<RpcResult<TransactionStatus>> settFuture = SettableFuture.create();
+        commit.onComplete(new OnComplete<Void>() {
+            @Override
+            public void onComplete(final Throwable throwable, final Void result) throws Throwable {
+                if (throwable == null) {
+                    TransactionStatus status = TransactionStatus.SUBMITED;
+                    RpcResult<TransactionStatus> rpcResult = RpcResultBuilder.success(status).build();
+                    settFuture.set(rpcResult);
+                } else {
+                    settFuture.setException(throwable);
+                }
+            }
+        }, actorSystem.dispatcher());
+        return settFuture;
+    }
+
+    @Override
+    public Object getIdentifier() {
+        return this;
+    }
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/utils/NetconfConnectorDTO.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/utils/NetconfConnectorDTO.java
new file mode 100644 (file)
index 0000000..0657878
--- /dev/null
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.utils;
+
+import org.opendaylight.netconf.client.NetconfClientSessionListener;
+import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
+
+public class NetconfConnectorDTO implements AutoCloseable {
+
+    private final NetconfDeviceCommunicator communicator;
+    private final RemoteDeviceHandler<NetconfSessionPreferences> facade;
+
+    public NetconfConnectorDTO(final NetconfDeviceCommunicator communicator,
+                               final RemoteDeviceHandler<NetconfSessionPreferences> facade) {
+        this.communicator = communicator;
+        this.facade = facade;
+    }
+
+    public NetconfDeviceCommunicator getCommunicator() {
+        return communicator;
+    }
+
+    public RemoteDeviceHandler<NetconfSessionPreferences> getFacade() {
+        return facade;
+    }
+
+    public NetconfClientSessionListener getSessionListener() {
+        return communicator;
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (communicator != null) {
+            communicator.close();
+        }
+        if (facade != null) {
+            facade.close();
+        }
+    }
+}
\ No newline at end of file
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/utils/NetconfTopologySetup.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/utils/NetconfTopologySetup.java
new file mode 100644 (file)
index 0000000..d607f33
--- /dev/null
@@ -0,0 +1,254 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.utils;
+
+import akka.actor.ActorSystem;
+import io.netty.util.concurrent.EventExecutor;
+import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
+import org.opendaylight.controller.config.threadpool.ThreadPool;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
+import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
+import org.opendaylight.netconf.client.NetconfClientDispatcher;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+public class NetconfTopologySetup {
+
+    private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
+    private final RpcProviderRegistry rpcProviderRegistry;
+    private final DataBroker dataBroker;
+    private final InstanceIdentifier<Node> instanceIdentifier;
+    private final Node node;
+    private final BindingAwareBroker bindingAwareBroker;
+    private final ScheduledThreadPool keepaliveExecutor;
+    private final ThreadPool processingExecutor;
+    private final Broker domBroker;
+    private final ActorSystem actorSystem;
+    private final EventExecutor eventExecutor;
+    private final NetconfClientDispatcher netconfClientDispatcher;
+    private final String topologyId;
+    private NetconfTopologySetup(final NetconfTopologySetupBuilder builder) {
+        this.clusterSingletonServiceProvider = builder.getClusterSingletonServiceProvider();
+        this.rpcProviderRegistry = builder.getRpcProviderRegistry();
+        this.dataBroker = builder.getDataBroker();
+        this.instanceIdentifier = builder.getInstanceIdentifier();
+        this.node = builder.getNode();
+        this.bindingAwareBroker = builder.getBindingAwareBroker();
+        this.keepaliveExecutor = builder.getKeepaliveExecutor();
+        this.processingExecutor = builder.getProcessingExecutor();
+        this.domBroker = builder.getDomBroker();
+        this.actorSystem = builder.getActorSystem();
+        this.eventExecutor = builder.getEventExecutor();
+        this.netconfClientDispatcher = builder.getNetconfClientDispatcher();
+        this.topologyId = builder.getTopologyId();
+    }
+
+    public ClusterSingletonServiceProvider getClusterSingletonServiceProvider() {
+        return clusterSingletonServiceProvider;
+    }
+
+    public RpcProviderRegistry getRpcProviderRegistry() {
+        return rpcProviderRegistry;
+    }
+
+    public DataBroker getDataBroker() {
+        return dataBroker;
+    }
+
+    public InstanceIdentifier<Node> getInstanceIdentifier() {
+        return instanceIdentifier;
+    }
+
+    public Node getNode() {
+        return node;
+    }
+
+    public BindingAwareBroker getBindingAwareBroker() {
+        return bindingAwareBroker;
+    }
+
+    public ThreadPool getProcessingExecutor() {
+        return processingExecutor;
+    }
+
+    public ScheduledThreadPool getKeepaliveExecutor() {
+        return keepaliveExecutor;
+    }
+
+    public Broker getDomBroker() {
+        return domBroker;
+    }
+
+    public ActorSystem getActorSystem() {
+        return actorSystem;
+    }
+
+    public EventExecutor getEventExecutor() {
+        return eventExecutor;
+    }
+
+    public String getTopologyId() {
+        return topologyId;
+    }
+
+    public NetconfClientDispatcher getNetconfClientDispatcher() {
+        return netconfClientDispatcher;
+    }
+
+    public static class NetconfTopologySetupBuilder {
+
+        private ClusterSingletonServiceProvider clusterSingletonServiceProvider;
+        private RpcProviderRegistry rpcProviderRegistry;
+        private DataBroker dataBroker;
+        private InstanceIdentifier<Node> instanceIdentifier;
+        private Node node;
+        private BindingAwareBroker bindingAwareBroker;
+        private ScheduledThreadPool keepaliveExecutor;
+        private ThreadPool processingExecutor;
+        private Broker domBroker;
+        private ActorSystem actorSystem;
+        private EventExecutor eventExecutor;
+        private String topologyId;
+        private NetconfClientDispatcher netconfClientDispatcher;
+
+        public NetconfTopologySetupBuilder(){
+        }
+
+        private ClusterSingletonServiceProvider getClusterSingletonServiceProvider() {
+            return clusterSingletonServiceProvider;
+        }
+
+        public NetconfTopologySetupBuilder setClusterSingletonServiceProvider(
+                final ClusterSingletonServiceProvider clusterSingletonServiceProvider) {
+            this.clusterSingletonServiceProvider = clusterSingletonServiceProvider;
+            return this;
+        }
+
+        private RpcProviderRegistry getRpcProviderRegistry() {
+            return rpcProviderRegistry;
+        }
+
+        public NetconfTopologySetupBuilder setRpcProviderRegistry(final RpcProviderRegistry rpcProviderRegistry) {
+            this.rpcProviderRegistry = rpcProviderRegistry;
+            return this;
+        }
+
+        private DataBroker getDataBroker() {
+            return dataBroker;
+        }
+
+        public NetconfTopologySetupBuilder setDataBroker(final DataBroker dataBroker) {
+            this.dataBroker = dataBroker;
+            return this;
+        }
+
+        private InstanceIdentifier<Node> getInstanceIdentifier() {
+            return instanceIdentifier;
+        }
+
+        public NetconfTopologySetupBuilder setInstanceIdentifier(final InstanceIdentifier<Node> instanceIdentifier) {
+            this.instanceIdentifier = instanceIdentifier;
+            return this;
+        }
+
+        public Node getNode() {
+            return node;
+        }
+
+        public NetconfTopologySetupBuilder setNode(final Node node) {
+            this.node = node;
+            return this;
+        }
+
+        public NetconfTopologySetup build() {
+            return new NetconfTopologySetup(this);
+        }
+
+        private BindingAwareBroker getBindingAwareBroker() {
+            return bindingAwareBroker;
+        }
+
+        public NetconfTopologySetupBuilder setBindingAwareBroker(BindingAwareBroker bindingAwareBroker) {
+            this.bindingAwareBroker = bindingAwareBroker;
+            return this;
+        }
+
+        private ScheduledThreadPool getKeepaliveExecutor() {
+            return keepaliveExecutor;
+        }
+
+        public NetconfTopologySetupBuilder setKeepaliveExecutor(ScheduledThreadPool keepaliveExecutor) {
+            this.keepaliveExecutor = keepaliveExecutor;
+            return this;
+        }
+
+        private ThreadPool getProcessingExecutor() {
+            return processingExecutor;
+        }
+
+        public NetconfTopologySetupBuilder setProcessingExecutor(ThreadPool processingExecutor) {
+            this.processingExecutor = processingExecutor;
+            return this;
+        }
+
+        private Broker getDomBroker() {
+            return domBroker;
+        }
+
+        public NetconfTopologySetupBuilder setDomBroker(Broker domBroker) {
+            this.domBroker = domBroker;
+            return this;
+        }
+
+        private ActorSystem getActorSystem() {
+            return actorSystem;
+        }
+
+        public NetconfTopologySetupBuilder setActorSystem(ActorSystem actorSystem) {
+            this.actorSystem = actorSystem;
+            return this;
+        }
+
+        private EventExecutor getEventExecutor() {
+            return eventExecutor;
+        }
+
+        public NetconfTopologySetupBuilder setEventExecutor(EventExecutor eventExecutor) {
+            this.eventExecutor = eventExecutor;
+            return this;
+        }
+
+        private String getTopologyId() {
+            return topologyId;
+        }
+
+        public NetconfTopologySetupBuilder setTopologyId(String topologyId) {
+            this.topologyId = topologyId;
+            return this;
+        }
+
+        private NetconfClientDispatcher getNetconfClientDispatcher() {
+            return netconfClientDispatcher;
+        }
+
+        public NetconfTopologySetupBuilder setNetconfClientDispatcher(NetconfClientDispatcher clientDispatcher) {
+            this.netconfClientDispatcher = clientDispatcher;
+            return this;
+        }
+
+        public static NetconfTopologySetupBuilder create() {
+            return new NetconfTopologySetupBuilder();
+        }
+    }
+
+
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/utils/NetconfTopologyUtils.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/utils/NetconfTopologyUtils.java
new file mode 100644 (file)
index 0000000..5114fae
--- /dev/null
@@ -0,0 +1,117 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.utils;
+
+import akka.cluster.Member;
+import akka.util.Timeout;
+import java.io.File;
+import java.math.BigDecimal;
+import java.net.InetSocketAddress;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
+import org.opendaylight.yangtools.yang.binding.Identifier;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter;
+import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
+import org.opendaylight.yangtools.yang.model.repo.util.FilesystemSchemaSourceCache;
+import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository;
+import scala.concurrent.duration.Duration;
+
+public class NetconfTopologyUtils {
+
+    private static final String DEFAULT_SCHEMA_REPOSITORY_NAME = "sal-netconf-connector";
+
+    public static final Timeout TIMEOUT = new Timeout(Duration.create(10, "seconds"));
+
+    public static final long DEFAULT_REQUEST_TIMEOUT_MILLIS = 60000L;
+    public static final int DEFAULT_KEEPALIVE_DELAY = 0;
+    public static final boolean DEFAULT_RECONNECT_ON_CHANGED_SCHEMA = false;
+    public static final int DEFAULT_CONCURRENT_RPC_LIMIT = 0;
+    public static final int DEFAULT_MAX_CONNECTION_ATTEMPTS = 0;
+    public static final int DEFAULT_BETWEEN_ATTEMPTS_TIMEOUT_MILLIS = 2000;
+    public static final long DEFAULT_CONNECTION_TIMEOUT_MILLIS = 20000L;
+    public static final BigDecimal DEFAULT_SLEEP_FACTOR = new BigDecimal(1.5);
+
+
+    // The default cache directory relative to <code>CACHE_DIRECTORY</code>
+
+    public static final String DEFAULT_CACHE_DIRECTORY = "schema";
+
+    // Filesystem based caches are stored relative to the cache directory.
+    public static final String CACHE_DIRECTORY = "cache";
+
+    // The qualified schema cache directory <code>cache/schema</code>
+    public static final String QUALIFIED_DEFAULT_CACHE_DIRECTORY =
+            CACHE_DIRECTORY + File.separator + DEFAULT_CACHE_DIRECTORY;
+
+    // The default schema repository in the case that one is not specified.
+    public static final SharedSchemaRepository DEFAULT_SCHEMA_REPOSITORY =
+            new SharedSchemaRepository(DEFAULT_SCHEMA_REPOSITORY_NAME);
+
+
+     // The default <code>FilesystemSchemaSourceCache</code>, which stores cached files in <code>cache/schema</code>.
+    public static final FilesystemSchemaSourceCache<YangTextSchemaSource> DEFAULT_CACHE =
+            new FilesystemSchemaSourceCache<>(DEFAULT_SCHEMA_REPOSITORY, YangTextSchemaSource.class,
+                    new File(QUALIFIED_DEFAULT_CACHE_DIRECTORY));
+
+    // The default factory for creating <code>SchemaContext</code> instances.
+    public static final SchemaContextFactory DEFAULT_SCHEMA_CONTEXT_FACTORY =
+            DEFAULT_SCHEMA_REPOSITORY.createSchemaContextFactory(SchemaSourceFilter.ALWAYS_ACCEPT);
+
+    public static RemoteDeviceId createRemoteDeviceId(final NodeId nodeId, final NetconfNode node) {
+        IpAddress ipAddress = node.getHost().getIpAddress();
+        InetSocketAddress address = new InetSocketAddress(ipAddress.getIpv4Address() != null
+                ? ipAddress.getIpv4Address().getValue() : ipAddress.getIpv6Address().getValue(),
+                node.getPort().getValue());
+        return new RemoteDeviceId(nodeId.getValue(), address);
+    }
+
+    public static String createActorPath(String masterMember, String name) {
+        return  masterMember + "/user/" + name;
+    }
+
+    public static String createMasterActorName(String name, String masterAddress) {
+        return masterAddress.replaceAll("//", "") + "_" + name;
+    }
+
+    public static NodeId getNodeId(final InstanceIdentifier.PathArgument pathArgument) {
+        if (pathArgument instanceof InstanceIdentifier.IdentifiableItem<?, ?>) {
+
+            final Identifier key = ((InstanceIdentifier.IdentifiableItem) pathArgument).getKey();
+            if (key instanceof NodeKey) {
+                return ((NodeKey) key).getNodeId();
+            }
+        }
+        throw new IllegalStateException("Unable to create NodeId from: " + pathArgument);
+    }
+
+    public static KeyedInstanceIdentifier<Topology, TopologyKey> createTopologyListPath(final String topologyId) {
+        final InstanceIdentifier<NetworkTopology> networkTopology = InstanceIdentifier.create(NetworkTopology.class);
+        return networkTopology.child(Topology.class, new TopologyKey(new TopologyId(topologyId)));
+    }
+
+    public static InstanceIdentifier<Node> createTopologyNodeListPath(final NodeKey key, final String topologyId) {
+        return createTopologyListPath(topologyId)
+                .child(Node.class, new NodeKey(new NodeId(key.getNodeId().getValue())));
+    }
+
+    public static InstanceIdentifier<Node> createTopologyNodePath(final String topologyId) {
+        return createTopologyListPath(topologyId).child(Node.class);
+    }
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/AskForMasterMountPoint.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/AskForMasterMountPoint.java
new file mode 100644 (file)
index 0000000..9fb3ac2
--- /dev/null
@@ -0,0 +1,18 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages;
+
+import java.io.Serializable;
+
+/**
+ * After master is connected, slaves send the message to master and master triggers registering slave mount point
+ * with reply 'RegisterMountPoint' which includes needed parameters.
+ */
+public class AskForMasterMountPoint implements Serializable {
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/CreateInitialMasterActorData.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/CreateInitialMasterActorData.java
new file mode 100644 (file)
index 0000000..2117f69
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages;
+
+import java.io.Serializable;
+import java.util.List;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
+
+/**
+ * Master sends this message to the own actor to set necessary parameters.
+ */
+public class CreateInitialMasterActorData implements Serializable {
+
+    private final DOMDataBroker deviceDataBroker;
+    private final List<SourceIdentifier> allSourceIdentifiers;
+
+    public CreateInitialMasterActorData(final DOMDataBroker deviceDataBroker,
+                                        final List<SourceIdentifier> allSourceIdentifiers) {
+        this.deviceDataBroker = deviceDataBroker;
+        this.allSourceIdentifiers = allSourceIdentifiers;
+    }
+
+    public DOMDataBroker getDeviceDataBroker() {
+        return deviceDataBroker;
+    }
+
+    public List<SourceIdentifier> getSourceIndentifiers() {
+        return allSourceIdentifiers;
+    }
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/MasterActorDataInitialized.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/MasterActorDataInitialized.java
new file mode 100644 (file)
index 0000000..51c698b
--- /dev/null
@@ -0,0 +1,18 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages;
+
+import java.io.Serializable;
+
+/**
+ * Due to possibility of race condition (when data-store is updated before data are initialized in master actor), only
+ * when this message is received by master, operational data-store is changed.
+ */
+public class MasterActorDataInitialized implements Serializable {
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/NormalizedNodeMessage.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/NormalizedNodeMessage.java
new file mode 100644 (file)
index 0000000..48a8674
--- /dev/null
@@ -0,0 +1,67 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataInput;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataOutput;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputOutput;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
+
+/**
+ * Message which holds node data, prepared to sending between remote hosts with serialization.
+ */
+public class NormalizedNodeMessage implements Externalizable {
+    private static final long serialVersionUID = 1L;
+
+    private YangInstanceIdentifier identifier = null;
+    private NormalizedNode<?, ?> node = null;
+
+    public NormalizedNodeMessage() {
+        // empty constructor needed for Externalizable
+    }
+
+    public NormalizedNodeMessage(final YangInstanceIdentifier identifier, final NormalizedNode<?, ?> node) {
+        this.identifier = identifier;
+        this.node = node;
+    }
+
+    public YangInstanceIdentifier getIdentifier() {
+        return identifier;
+    }
+
+    public NormalizedNode<?, ?> getNode() {
+        return node;
+    }
+
+    @Override
+    public void writeExternal(final ObjectOutput out) throws IOException {
+        final NormalizedNodeDataOutput dataOutput = NormalizedNodeInputOutput.newDataOutput(out);
+        final NormalizedNodeWriter normalizedNodeWriter =
+                NormalizedNodeWriter.forStreamWriter((NormalizedNodeStreamWriter) dataOutput);
+
+        dataOutput.writeYangInstanceIdentifier(identifier);
+
+        normalizedNodeWriter.write(node);
+    }
+
+    @Override
+    public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+        final NormalizedNodeDataInput dataInput = NormalizedNodeInputOutput.newDataInput(in);
+
+        identifier = dataInput.readYangInstanceIdentifier();
+        node = dataInput.readNormalizedNode();
+    }
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/RefreshSetupMasterActorData.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/RefreshSetupMasterActorData.java
new file mode 100644 (file)
index 0000000..f75b034
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages;
+
+import java.io.Serializable;
+import java.util.List;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
+import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
+
+/**
+ * Master sends this message to the own actor to refresh setup data
+ */
+public class RefreshSetupMasterActorData implements Serializable {
+
+    private final NetconfTopologySetup netconfTopologyDeviceSetup;
+    private final RemoteDeviceId remoteDeviceId;
+
+    public RefreshSetupMasterActorData(final NetconfTopologySetup netconfTopologyDeviceSetup,
+                                       final RemoteDeviceId remoteDeviceId) {
+        this.netconfTopologyDeviceSetup = netconfTopologyDeviceSetup;
+        this.remoteDeviceId = remoteDeviceId;
+    }
+
+    public NetconfTopologySetup getNetconfTopologyDeviceSetup() {
+        return netconfTopologyDeviceSetup;
+    }
+
+    public RemoteDeviceId getRemoteDeviceId() {
+        return remoteDeviceId;
+    }
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/RegisterMountPoint.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/RegisterMountPoint.java
new file mode 100644 (file)
index 0000000..26c6348
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages;
+
+import java.io.Serializable;
+import java.util.List;
+import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
+
+/**
+ * Master sends the message to slave with necessary parameters for creating slave mount point.
+ */
+public class RegisterMountPoint implements Serializable {
+
+    private final List<SourceIdentifier> allSourceIdentifiers;
+
+    public RegisterMountPoint(final List<SourceIdentifier> allSourceIdentifiers) {
+        this.allSourceIdentifiers = allSourceIdentifiers;
+    }
+
+    public List<SourceIdentifier> getSourceIndentifiers() {
+        return allSourceIdentifiers;
+    }
+
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/SubmitFailedReply.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/SubmitFailedReply.java
new file mode 100644 (file)
index 0000000..0aa4f7f
--- /dev/null
@@ -0,0 +1,17 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages;
+
+import java.io.Serializable;
+
+/**
+ * Message sent from master back to the slave when submit is not performed, tx is closed
+ */
+public class SubmitFailedReply implements Serializable {
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/UnregisterSlaveMountPoint.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/UnregisterSlaveMountPoint.java
new file mode 100644 (file)
index 0000000..7c05b6a
--- /dev/null
@@ -0,0 +1,18 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages;
+
+import java.io.Serializable;
+
+/**
+ * Slave sends the message when unregisters slave mount point (in NetconfNodeManager
+ * close method). Message must be sended before slave actor is poisoned.
+ */
+public class UnregisterSlaveMountPoint implements Serializable {
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/YangTextSchemaSourceRequest.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/YangTextSchemaSourceRequest.java
new file mode 100644 (file)
index 0000000..f8dab73
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages;
+
+import java.io.Serializable;
+import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
+
+/**
+ * Slave sends message to master when tries to resolve schema with particular sourceIdentifier (proxy call).
+ * Master responds with resolved schema source.
+ */
+public class YangTextSchemaSourceRequest implements Serializable {
+
+    private final SourceIdentifier sourceIdentifier;
+
+    public YangTextSchemaSourceRequest(final SourceIdentifier sourceIdentifier) {
+        this.sourceIdentifier = sourceIdentifier;
+    }
+
+    public SourceIdentifier getSourceIdentifier() {
+        return sourceIdentifier;
+    }
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/CancelRequest.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/CancelRequest.java
new file mode 100644 (file)
index 0000000..902dfdc
--- /dev/null
@@ -0,0 +1,12 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages.transactions;
+
+public class CancelRequest implements TransactionRequest {
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/DeleteRequest.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/DeleteRequest.java
new file mode 100644 (file)
index 0000000..03b3db0
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages.transactions;
+
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+public class DeleteRequest implements TransactionRequest {
+
+    private final LogicalDatastoreType store;
+    private final YangInstanceIdentifier path;
+
+    public DeleteRequest(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+        this.store = store;
+        this.path = path;
+    }
+
+    public LogicalDatastoreType getStore() {
+        return store;
+    }
+
+    public YangInstanceIdentifier getPath() {
+        return path;
+    }
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/EmptyReadResponse.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/EmptyReadResponse.java
new file mode 100644 (file)
index 0000000..6eaea93
--- /dev/null
@@ -0,0 +1,17 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages.transactions;
+
+import java.io.Serializable;
+
+/**
+ * Message is sended when read result do not present any value.
+ */
+public class EmptyReadResponse implements Serializable {
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/ExistsRequest.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/ExistsRequest.java
new file mode 100644 (file)
index 0000000..92266c1
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages.transactions;
+
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+public class ExistsRequest implements TransactionRequest {
+
+    private final LogicalDatastoreType store;
+    private final YangInstanceIdentifier path;
+
+    public ExistsRequest(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+        this.store = store;
+        this.path = path;
+    }
+
+    public LogicalDatastoreType getStore() {
+        return store;
+    }
+
+    public YangInstanceIdentifier getPath() {
+        return path;
+    }
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/MergeRequest.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/MergeRequest.java
new file mode 100644 (file)
index 0000000..2176626
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages.transactions;
+
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+
+public class MergeRequest implements TransactionRequest {
+
+    private final NormalizedNodeMessage data;
+    private final LogicalDatastoreType store;
+
+    public MergeRequest(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
+        this.store = store;
+        this.data = data;
+    }
+
+    public NormalizedNodeMessage getNormalizedNodeMessage() {
+        return data;
+    }
+
+    public LogicalDatastoreType getStore() {
+        return store;
+    }
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/PutRequest.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/PutRequest.java
new file mode 100644 (file)
index 0000000..61294d5
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages.transactions;
+
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+
+public class PutRequest implements TransactionRequest {
+
+    private final LogicalDatastoreType store;
+    private final NormalizedNodeMessage data;
+
+    public PutRequest(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
+        this.store = store;
+        this.data = data;
+    }
+
+    public NormalizedNodeMessage getNormalizedNodeMessage() {
+        return data;
+    }
+
+    public LogicalDatastoreType getStore() {
+        return store;
+    }
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/ReadRequest.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/ReadRequest.java
new file mode 100644 (file)
index 0000000..49c6a40
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages.transactions;
+
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+public class ReadRequest implements TransactionRequest {
+
+    private final LogicalDatastoreType store;
+    private final YangInstanceIdentifier path;
+
+    public ReadRequest(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+        this.store = store;
+        this.path = path;
+    }
+
+    public LogicalDatastoreType getStore() {
+        return store;
+    }
+
+    public YangInstanceIdentifier getPath() {
+        return path;
+    }
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/SubmitReply.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/SubmitReply.java
new file mode 100644 (file)
index 0000000..46e6ee3
--- /dev/null
@@ -0,0 +1,17 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages.transactions;
+
+import java.io.Serializable;
+
+/**
+ * Message sent from master back to the slave when submit is successfully performed.
+ */
+public class SubmitReply implements Serializable {
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/SubmitRequest.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/SubmitRequest.java
new file mode 100644 (file)
index 0000000..d764aa4
--- /dev/null
@@ -0,0 +1,12 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages.transactions;
+
+public class SubmitRequest implements TransactionRequest {
+}
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/TransactionRequest.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/TransactionRequest.java
new file mode 100644 (file)
index 0000000..b5ef9f1
--- /dev/null
@@ -0,0 +1,19 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages.transactions;
+
+import java.io.Serializable;
+
+/**
+ * API for transaction request messages, slave sends these message types to master for performing required operation.
+ * This interface helps better handle request messages in actor. All messages are send with operations defined in
+ * NetconfProxyDOMTransaction. Messages requiring response are send by ask otherwise with tell.
+ */
+public interface TransactionRequest extends Serializable {
+}
diff --git a/netconf/netconf-topology-singleton/src/main/resources/org/opendaylight/blueprint/netconf-topology-singleton.xml b/netconf/netconf-topology-singleton/src/main/resources/org/opendaylight/blueprint/netconf-topology-singleton.xml
new file mode 100644 (file)
index 0000000..64471a1
--- /dev/null
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- vi: set et smarttab sw=4 tabstop=4: -->
+<!--
+Copyright Â© 2016 Cisco Systems, Inc. and others. All rights reserved.
+
+This program and the accompanying materials are made available under the
+terms of the Eclipse Public License v1.0 which accompanies this distribution,
+and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
+           xmlns:odl="http://opendaylight.org/xmlns/blueprint/v1.0.0"
+           xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.0.0"
+           odl:use-default-for-reference-types="true">
+
+    <reference id="dataBroker"
+               interface="org.opendaylight.controller.md.sal.binding.api.DataBroker"/>
+
+    <reference id="rpcRegistry"
+               interface="org.opendaylight.controller.sal.binding.api.RpcProviderRegistry"/>
+
+    <reference id="clusterSingletonService"
+               interface="org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider"/>
+
+    <reference id="bindingAwareBroker"
+               interface="org.opendaylight.controller.sal.binding.api.BindingAwareBroker"
+                />
+
+    <reference id="keepAliveExecutor"
+               interface="org.opendaylight.controller.config.threadpool.ScheduledThreadPool"
+                />
+
+    <reference id="processingExecutor"
+               interface="org.opendaylight.controller.config.threadpool.ThreadPool"
+                />
+
+    <reference id="domBroker"
+               interface="org.opendaylight.controller.sal.core.api.Broker"
+                />
+
+    <reference id="actorSystemProvider"
+               interface="org.opendaylight.controller.cluster.ActorSystemProvider"
+                />
+
+    <reference id="eventExecutor"
+               interface="io.netty.util.concurrent.EventExecutor"
+               odl:type="global-event-executor"/>
+
+    <reference id="clientDispatcherDependency"
+               interface="org.opendaylight.netconf.client.NetconfClientDispatcher"
+               />
+
+    <bean id="netconfTopologyServiceProvider"
+          class="org.opendaylight.netconf.topology.singleton.impl.NetconfTopologyManager"
+          init-method="init" destroy-method="close">
+        <argument ref="dataBroker" />
+        <argument ref="rpcRegistry" />
+        <argument ref="clusterSingletonService" />
+        <argument ref="bindingAwareBroker" />
+        <argument ref="keepAliveExecutor" />
+        <argument ref="processingExecutor" />
+        <argument ref="domBroker" />
+        <argument ref="actorSystemProvider" />
+        <argument ref="eventExecutor" />
+        <argument ref="clientDispatcherDependency" />
+        <argument value="topology-netconf" />
+    </bean>
+
+    <service ref="netconfTopologyServiceProvider" interface="org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService"
+            />
+
+</blueprint>
diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeActorTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeActorTest.java
new file mode 100644 (file)
index 0000000..a77104e
--- /dev/null
@@ -0,0 +1,221 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils.DEFAULT_SCHEMA_REPOSITORY;
+
+import akka.actor.ActorContext;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.pattern.Patterns;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import akka.util.Timeout;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
+import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
+import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
+import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
+import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
+import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
+import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceException;
+import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
+import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+public class NetconfNodeActorTest {
+
+    private static final Timeout TIMEOUT = new Timeout(Duration.create(5, "seconds"));
+    private static ActorSystem system;
+
+    @Rule
+    public final ExpectedException exception = ExpectedException.none();
+
+    private ActorRef masterRef;
+    private RemoteDeviceId remoteDeviceId;
+
+    @Before
+    public void setup() throws UnknownHostException {
+
+        remoteDeviceId = new RemoteDeviceId("netconf-topology",
+                new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9999));
+
+        final NetconfTopologySetup setup = mock(NetconfTopologySetup.class);
+
+        final Props props = NetconfNodeActor.props(setup, remoteDeviceId, DEFAULT_SCHEMA_REPOSITORY,
+                DEFAULT_SCHEMA_REPOSITORY);
+
+        system = ActorSystem.create();
+
+        masterRef = TestActorRef.create(system, props, "master_messages");
+    }
+
+    @After
+    public void teardown() {
+        JavaTestKit.shutdownActorSystem(system);
+        system = null;
+    }
+
+    @Test
+    public void testInitDataMessages() throws Exception {
+
+        final DOMDataBroker domDataBroker = mock(DOMDataBroker.class);
+        final List<SourceIdentifier> sourceIdentifiers = Lists.newArrayList();
+
+        /* Test init master data */
+
+        final Future<Object> initialDataToActor =
+                Patterns.ask(masterRef, new CreateInitialMasterActorData(domDataBroker, sourceIdentifiers),
+                        TIMEOUT);
+
+        final Object success = Await.result(initialDataToActor, TIMEOUT.duration());
+        assertTrue(success instanceof MasterActorDataInitialized);
+
+
+        /* Test refresh master data */
+
+        final RemoteDeviceId remoteDeviceId2 = new RemoteDeviceId("netconf-topology2",
+                new InetSocketAddress(InetAddress.getByName("127.0.0.2"), 9999));
+
+        final NetconfTopologySetup setup2 = mock(NetconfTopologySetup.class);
+
+        final Future<Object> refreshDataToActor =
+                Patterns.ask(masterRef, new RefreshSetupMasterActorData(setup2, remoteDeviceId2),
+                        TIMEOUT);
+
+        final Object success2 = Await.result(refreshDataToActor, TIMEOUT.duration());
+        assertTrue(success2 instanceof MasterActorDataInitialized);
+
+    }
+
+    @Test
+    public void testRegisterMountPointMessage() throws Exception {
+
+        final DOMDataBroker domDataBroker = mock(DOMDataBroker.class);
+        final List<SourceIdentifier> sourceIdentifiers =
+                Lists.newArrayList(SourceIdentifier.create("testID", Optional.absent()));
+
+        // init master data
+
+        final Future<Object> initialDataToActor =
+                Patterns.ask(masterRef, new CreateInitialMasterActorData(domDataBroker, sourceIdentifiers),
+                        TIMEOUT);
+
+        final Object successInit = Await.result(initialDataToActor, TIMEOUT.duration());
+
+        assertTrue(successInit instanceof MasterActorDataInitialized);
+
+        // test if slave get right identifiers from master
+
+        final Future<Object> registerMountPointFuture =
+                Patterns.ask(masterRef, new AskForMasterMountPoint(),
+                        TIMEOUT);
+
+        final RegisterMountPoint success =
+                (RegisterMountPoint) Await.result(registerMountPointFuture, TIMEOUT.duration());
+
+        assertEquals(sourceIdentifiers, success.getSourceIndentifiers());
+
+    }
+
+    @Test
+    public void testYangTextSchemaSourceRequestMessage() throws Exception {
+        final SchemaRepository schemaRepository = mock(SchemaRepository.class);
+        final SourceIdentifier sourceIdentifier = SourceIdentifier.create("testID", Optional.absent());
+        final Props props = NetconfNodeActor.props(mock(NetconfTopologySetup.class), remoteDeviceId,
+                DEFAULT_SCHEMA_REPOSITORY, schemaRepository);
+
+        final ActorRef actorRefSchemaRepo = TestActorRef.create(system, props, "master_mocked_schema_repository");
+        final ActorContext actorContext = mock(ActorContext.class);
+        doReturn(system.dispatcher()).when(actorContext).dispatcher();
+
+        final ProxyYangTextSourceProvider proxyYang =
+                new ProxyYangTextSourceProvider(actorRefSchemaRepo, actorContext);
+        // test if asking for source is resolved and sended back
+
+        final YangTextSchemaSource yangTextSchemaSource = new YangTextSchemaSource(sourceIdentifier) {
+            @Override
+            protected MoreObjects.ToStringHelper addToStringAttributes(MoreObjects.ToStringHelper toStringHelper) {
+                return null;
+            }
+
+            @Override
+            public InputStream openStream() throws IOException {
+                return new ByteArrayInputStream("YANG".getBytes());
+            }
+        };
+
+
+        final CheckedFuture<YangTextSchemaSource, SchemaSourceException> result =
+                Futures.immediateCheckedFuture(yangTextSchemaSource);
+
+        doReturn(result).when(schemaRepository).getSchemaSource(sourceIdentifier, YangTextSchemaSource.class);
+
+        final Future<YangTextSchemaSourceSerializationProxy> resolvedSchema =
+                proxyYang.getYangTextSchemaSource(sourceIdentifier);
+
+        final YangTextSchemaSourceSerializationProxy success = Await.result(resolvedSchema, TIMEOUT.duration());
+
+        assertEquals(sourceIdentifier, success.getRepresentation().getIdentifier());
+        assertEquals("YANG", convertStreamToString(success.getRepresentation().openStream()));
+
+
+        // test if asking for source is missing
+        exception.expect(MissingSchemaSourceException.class);
+
+        final SchemaSourceException schemaSourceException =
+                new MissingSchemaSourceException("Fail", sourceIdentifier);
+
+        final CheckedFuture<YangTextSchemaSource, SchemaSourceException> resultFail =
+                Futures.immediateFailedCheckedFuture(schemaSourceException);
+
+        doReturn(resultFail).when(schemaRepository).getSchemaSource(sourceIdentifier, YangTextSchemaSource.class);
+
+        final Future<YangTextSchemaSourceSerializationProxy> failedSchema =
+                proxyYang.getYangTextSchemaSource(sourceIdentifier);
+
+        Await.result(failedSchema, TIMEOUT.duration());
+
+    }
+
+    private String convertStreamToString(java.io.InputStream is) {
+        java.util.Scanner s = new java.util.Scanner(is).useDelimiter("\\A");
+        return s.hasNext() ? s.next() : "";
+    }
+
+}
diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfTopologyManagerTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfTopologyManagerTest.java
new file mode 100644 (file)
index 0000000..9a3749c
--- /dev/null
@@ -0,0 +1,271 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl;
+
+import static junit.framework.TestCase.assertFalse;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.MockitoAnnotations.initMocks;
+import static org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType.DELETE;
+import static org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType.WRITE;
+
+import com.google.common.util.concurrent.Futures;
+import io.netty.util.concurrent.EventExecutor;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import javax.annotation.Nonnull;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.opendaylight.controller.cluster.ActorSystemProvider;
+import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
+import org.opendaylight.controller.config.threadpool.ThreadPool;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
+import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
+import org.opendaylight.netconf.client.NetconfClientDispatcher;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Host;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4Address;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.PortNumber;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
+import org.opendaylight.yangtools.yang.binding.Identifier;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+public class NetconfTopologyManagerTest {
+
+    private final String topologyId = "topologyID";
+    private NetconfTopologyManager netconfTopologyManager;
+
+    @Mock
+    private DataBroker dataBroker;
+
+    @Mock
+    private ClusterSingletonServiceProvider clusterSingletonServiceProvider;
+
+    @Before
+    public void setUp() {
+        initMocks(this);
+
+        final RpcProviderRegistry rpcProviderRegistry = mock(RpcProviderRegistry.class);
+        final BindingAwareBroker bindingAwareBroker = mock(BindingAwareBroker.class);
+        final ScheduledThreadPool keepaliveExecutor = mock(ScheduledThreadPool.class);
+        final ThreadPool processingExecutor = mock(ThreadPool.class);
+        final Broker domBroker = mock(Broker.class);
+        final ActorSystemProvider actorSystemProvider = mock(ActorSystemProvider.class);
+        final EventExecutor eventExecutor = mock(EventExecutor.class);
+        final NetconfClientDispatcher clientDispatcher = mock(NetconfClientDispatcher.class);
+
+        netconfTopologyManager = new NetconfTopologyManager(dataBroker, rpcProviderRegistry,
+                clusterSingletonServiceProvider, bindingAwareBroker, keepaliveExecutor, processingExecutor, domBroker,
+                actorSystemProvider, eventExecutor, clientDispatcher, topologyId);
+    }
+    @Test
+    public void testWriteConfiguration() throws Exception {
+
+        final ClusterSingletonServiceRegistration clusterRegistration = mock(ClusterSingletonServiceRegistration.class);
+
+        final Field fieldContexts = NetconfTopologyManager.class.getDeclaredField("contexts");
+        fieldContexts.setAccessible(true);
+        @SuppressWarnings("unchecked")
+        final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts =
+                (Map<InstanceIdentifier<Node>, NetconfTopologyContext>) fieldContexts.get(netconfTopologyManager);
+
+        final Field fieldClusterRegistrations = NetconfTopologyManager.class.getDeclaredField("clusterRegistrations");
+        fieldClusterRegistrations.setAccessible(true);
+        @SuppressWarnings("unchecked")
+        final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration> clusterRegistrations =
+                (Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>)
+                        fieldClusterRegistrations.get(netconfTopologyManager);
+
+        final Collection<DataTreeModification<Node>> changes = new ArrayList<>();
+
+        final InstanceIdentifier<Node> instanceIdentifier = NetconfTopologyUtils.createTopologyNodeListPath(
+                new NodeKey(new NodeId("node-id-1")),"topology-1");
+
+        final InstanceIdentifier<Node> instanceIdentifierDiferent = NetconfTopologyUtils.createTopologyNodeListPath(
+                new NodeKey(new NodeId("node-id-2")),"topology-2");
+
+        final DataTreeIdentifier<Node> rootIdentifier =
+                new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, instanceIdentifier);
+
+        final DataTreeIdentifier<Node> rootIdentifierDifferent =
+                new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, instanceIdentifierDiferent);
+
+        @SuppressWarnings("unchecked")
+        final DataObjectModification<Node> objectModification = mock(DataObjectModification.class);
+
+        final NetconfNode netconfNode = new NetconfNodeBuilder()
+                .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
+                .setPort(new PortNumber(9999))
+                .setReconnectOnChangedSchema(true)
+                .setDefaultRequestTimeoutMillis(1000L)
+                .setBetweenAttemptsTimeoutMillis(100)
+                .setSchemaless(false)
+                .setTcpOnly(false)
+                .build();
+        final Node node = new NodeBuilder().setNodeId(new NodeId("node-id"))
+                .addAugmentation(NetconfNode.class, netconfNode).build();
+
+        final Identifier key = new NodeKey(new NodeId("node-id"));
+
+        @SuppressWarnings("unchecked")
+        final InstanceIdentifier.IdentifiableItem<Node, NodeKey> pathArgument =
+                new InstanceIdentifier.IdentifiableItem(Node.class, key);
+
+
+        // testing WRITE on two identical rootIdentifiers and one different
+
+        changes.add(new CustomTreeModification(rootIdentifier, objectModification));
+        changes.add(new CustomTreeModification(rootIdentifier, objectModification));
+        changes.add(new CustomTreeModification(rootIdentifierDifferent, objectModification));
+
+        doReturn(WRITE).when(objectModification).getModificationType();
+        doReturn(node).when(objectModification).getDataAfter();
+        doReturn(pathArgument).when(objectModification).getIdentifier();
+        doReturn(clusterRegistration).when(clusterSingletonServiceProvider).registerClusterSingletonService(any());
+
+        netconfTopologyManager.onDataTreeChanged(changes);
+
+        verify(clusterSingletonServiceProvider, times(2)).registerClusterSingletonService(any());
+
+        // only two created contexts
+        assertEquals(2, contexts.size());
+        assertTrue(contexts.containsKey(rootIdentifier.getRootIdentifier()));
+        assertTrue(contexts.containsKey(rootIdentifierDifferent.getRootIdentifier()));
+
+        // only two created cluster registrations
+        assertEquals(2, contexts.size());
+        assertTrue(clusterRegistrations.containsKey(rootIdentifier.getRootIdentifier()));
+        assertTrue(clusterRegistrations.containsKey(rootIdentifierDifferent.getRootIdentifier()));
+
+        // after delete there should be no context and clustered registrations
+        doReturn(DELETE).when(objectModification).getModificationType();
+
+        doNothing().when(clusterRegistration).close();
+
+        netconfTopologyManager.onDataTreeChanged(changes);
+
+        verify(clusterRegistration, times(2)).close();
+
+        // empty map of contexts
+        assertTrue(contexts.isEmpty());
+        assertFalse(contexts.containsKey(rootIdentifier.getRootIdentifier()));
+        assertFalse(contexts.containsKey(rootIdentifierDifferent.getRootIdentifier()));
+
+        // empty map of clustered registrations
+        assertTrue(clusterRegistrations.isEmpty());
+        assertFalse(clusterRegistrations.containsKey(rootIdentifier.getRootIdentifier()));
+        assertFalse(clusterRegistrations.containsKey(rootIdentifierDifferent.getRootIdentifier()));
+
+    }
+
+    @Test
+    public void testRegisterDataTreeChangeListener() {
+
+        final WriteTransaction wtx = mock(WriteTransaction.class);
+
+        doReturn(wtx).when(dataBroker).newWriteOnlyTransaction();
+        doNothing().when(wtx).merge(any(), any(), any());
+        doReturn(Futures.immediateCheckedFuture(null)).when(wtx).submit();
+        doReturn(null).when(dataBroker).registerDataChangeListener(any(), any(), any(), any());
+
+        netconfTopologyManager.init();
+
+        // verify if listener is called with right parameters = registered on right path
+
+        verify(dataBroker, times(1)).registerDataTreeChangeListener(
+                new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, NetconfTopologyUtils
+                        .createTopologyListPath(topologyId).child(Node.class)), netconfTopologyManager);
+
+    }
+
+    @Test
+    public void testClose() throws Exception {
+
+        final Field fieldContexts = NetconfTopologyManager.class.getDeclaredField("contexts");
+        fieldContexts.setAccessible(true);
+        @SuppressWarnings("unchecked")
+        final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts =
+                (Map<InstanceIdentifier<Node>, NetconfTopologyContext>) fieldContexts.get(netconfTopologyManager);
+
+        final Field fieldClusterRegistrations = NetconfTopologyManager.class.getDeclaredField("clusterRegistrations");
+        fieldClusterRegistrations.setAccessible(true);
+        @SuppressWarnings("unchecked")
+        final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration> clusterRegistrations =
+                (Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>)
+                        fieldClusterRegistrations.get(netconfTopologyManager);
+
+        final InstanceIdentifier<Node> instanceIdentifier = NetconfTopologyUtils.createTopologyNodeListPath(
+                new NodeKey(new NodeId("node-id-1")),"topology-1");
+
+
+        final NetconfTopologyContext context = mock(NetconfTopologyContext.class);
+        final ClusterSingletonServiceRegistration clusterRegistration =
+                mock(ClusterSingletonServiceRegistration.class);
+        contexts.put(instanceIdentifier, context);
+        clusterRegistrations.put(instanceIdentifier, clusterRegistration);
+
+        doNothing().when(context).closeFinal();
+        doNothing().when(clusterRegistration).close();
+
+        netconfTopologyManager.close();
+        verify(context, times(1)).closeFinal();
+        verify(clusterRegistration, times(1)).close();
+
+        assertTrue(contexts.isEmpty());
+        assertTrue(clusterRegistrations.isEmpty());
+
+    }
+
+    private class CustomTreeModification  implements DataTreeModification<Node> {
+
+        private final DataTreeIdentifier<Node> rootPath;
+        private final DataObjectModification<Node> rootNode;
+
+        CustomTreeModification(DataTreeIdentifier<Node> rootPath, DataObjectModification<Node> rootNode) {
+            this.rootPath = rootPath;
+            this.rootNode = rootNode;
+        }
+
+        @Nonnull
+        @Override
+        public DataTreeIdentifier<Node> getRootPath() {
+            return rootPath;
+        }
+
+        @Nonnull
+        @Override
+        public DataObjectModification<Node> getRootNode() {
+            return rootNode;
+        }
+    }
+}
diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/RemoteDeviceConnectorImplTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/RemoteDeviceConnectorImplTest.java
new file mode 100644 (file)
index 0000000..c1515ab
--- /dev/null
@@ -0,0 +1,271 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import io.netty.util.concurrent.EventExecutor;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.concurrent.ExecutorService;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
+import org.opendaylight.controller.config.threadpool.ThreadPool;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
+import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
+import org.opendaylight.netconf.client.NetconfClientDispatcher;
+import org.opendaylight.netconf.client.NetconfClientSessionListener;
+import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
+import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfiguration;
+import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
+import org.opendaylight.netconf.sal.connect.netconf.sal.KeepaliveSalFacade;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfConnectorDTO;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Host;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4Address;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.PortNumber;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.Credentials;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.credentials.LoginPasswordBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
+
+public class RemoteDeviceConnectorImplTest {
+
+    private static final NodeId NODE_ID = new NodeId("testing-node");
+    private static final String TOPOLOGY_ID = "testing-topology";
+
+    @Mock
+    private DataBroker dataBroker;
+
+    @Mock
+    private RpcProviderRegistry rpcProviderRegistry;
+
+    @Mock
+    private ClusterSingletonServiceProvider clusterSingletonServiceProvider;
+
+    @Mock
+    private BindingAwareBroker bindingAwareBroker;
+
+    @Mock
+    private ScheduledThreadPool keepaliveExecutor;
+
+    @Mock
+    private ThreadPool processingExecutor;
+
+    @Mock
+    private Broker domBroker;
+
+    @Mock
+    private ActorSystem actorSystem;
+
+    @Mock
+    private EventExecutor eventExecutor;
+
+    @Mock
+    private NetconfClientDispatcher clientDispatcher;
+
+    private NetconfTopologySetup.NetconfTopologySetupBuilder builder;
+    private RemoteDeviceId remoteDeviceId;
+
+    @Before
+    public void setUp() throws UnknownHostException {
+        initMocks(this);
+
+        remoteDeviceId = new RemoteDeviceId(TOPOLOGY_ID,
+                new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9999));
+
+        builder = new NetconfTopologySetup.NetconfTopologySetupBuilder();
+        builder.setDataBroker(dataBroker);
+        builder.setRpcProviderRegistry(rpcProviderRegistry);
+        builder.setClusterSingletonServiceProvider(clusterSingletonServiceProvider);
+        builder.setBindingAwareBroker(bindingAwareBroker);
+        builder.setKeepaliveExecutor(keepaliveExecutor);
+        builder.setProcessingExecutor(processingExecutor);
+        builder.setDomBroker(domBroker);
+        builder.setActorSystem(actorSystem);
+        builder.setEventExecutor(eventExecutor);
+        builder.setNetconfClientDispatcher(clientDispatcher);
+        builder.setTopologyId(TOPOLOGY_ID);
+
+    }
+
+    @Test
+    public void testStopRemoteDeviceConnection() {
+        final Credentials credentials = new LoginPasswordBuilder().setPassword("admin").setUsername("admin").build();
+        final NetconfNode netconfNode = new NetconfNodeBuilder()
+                .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
+                .setPort(new PortNumber(9999))
+                .setReconnectOnChangedSchema(true)
+                .setDefaultRequestTimeoutMillis(1000L)
+                .setBetweenAttemptsTimeoutMillis(100)
+                .setSchemaless(false)
+                .setTcpOnly(false)
+                .setCredentials(credentials)
+                .build();
+        final Node node = new NodeBuilder().setNodeId(NODE_ID).addAugmentation(NetconfNode.class, netconfNode).build();
+
+        builder.setNode(node);
+
+
+        final NetconfDeviceCommunicator communicator = mock (NetconfDeviceCommunicator.class);
+        final RemoteDeviceHandler salFacade = mock(RemoteDeviceHandler.class);
+
+        final TestingRemoteDeviceConnectorImpl remoteDeviceConnection =
+                new TestingRemoteDeviceConnectorImpl(builder.build(), remoteDeviceId, communicator, salFacade);
+
+        final ActorRef masterRef = mock(ActorRef.class);
+
+        remoteDeviceConnection.startRemoteDeviceConnection(masterRef);
+
+        remoteDeviceConnection.stopRemoteDeviceConnection();
+
+        verify(communicator, times(1)).close();
+        verify(salFacade, times(1)).close();
+
+    }
+
+    @Test
+    public void testMasterSalFacade() throws UnknownHostException {
+        final ExecutorService executorService = mock(ExecutorService.class);
+        doReturn(executorService).when(processingExecutor).getExecutor();
+
+        final Credentials credentials = new LoginPasswordBuilder().setPassword("admin").setUsername("admin").build();
+        final NetconfNode netconfNode = new NetconfNodeBuilder()
+                .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
+                .setPort(new PortNumber(9999))
+                .setReconnectOnChangedSchema(true)
+                .setDefaultRequestTimeoutMillis(1000L)
+                .setBetweenAttemptsTimeoutMillis(100)
+                .setSchemaless(false)
+                .setTcpOnly(false)
+                .setCredentials(credentials)
+                .build();
+
+        final RemoteDeviceConnectorImpl remoteDeviceConnection =
+                new RemoteDeviceConnectorImpl(builder.build(), remoteDeviceId);
+
+        final ActorRef masterRef = mock(ActorRef.class);
+
+        final NetconfConnectorDTO connectorDTO =
+                remoteDeviceConnection.createDeviceCommunicator(NODE_ID, netconfNode, masterRef);
+
+        assertTrue(connectorDTO.getFacade() instanceof MasterSalFacade);
+    }
+
+    @Test
+    public void testKeapAliveFacade() throws UnknownHostException {
+        final ExecutorService executorService = mock(ExecutorService.class);
+        doReturn(executorService).when(processingExecutor).getExecutor();
+
+        final Credentials credentials = new LoginPasswordBuilder().setPassword("admin").setUsername("admin").build();
+        final NetconfNode netconfNode = new NetconfNodeBuilder()
+                .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
+                .setPort(new PortNumber(9999))
+                .setReconnectOnChangedSchema(true)
+                .setDefaultRequestTimeoutMillis(1000L)
+                .setBetweenAttemptsTimeoutMillis(100)
+                .setSchemaless(false)
+                .setTcpOnly(false)
+                .setCredentials(credentials)
+                .setKeepaliveDelay(1L)
+                .build();
+
+        final RemoteDeviceId remoteDeviceId = new RemoteDeviceId(TOPOLOGY_ID,
+                new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9999));
+
+        final RemoteDeviceConnectorImpl remoteDeviceConnection =
+                new RemoteDeviceConnectorImpl(builder.build(), remoteDeviceId);
+
+        final ActorRef masterRef = mock(ActorRef.class);
+
+        final NetconfConnectorDTO connectorDTO =
+                remoteDeviceConnection.createDeviceCommunicator(NODE_ID, netconfNode, masterRef);
+
+        assertTrue(connectorDTO.getFacade() instanceof KeepaliveSalFacade);
+    }
+
+    @Test
+    public void testGetClientConfig() throws UnknownHostException {
+        final NetconfClientSessionListener listener = mock(NetconfClientSessionListener.class);
+        final Host host = new Host(new IpAddress(new Ipv4Address("127.0.0.1")));
+        final PortNumber portNumber = new PortNumber(9999);
+        final NetconfNode testingNode = new NetconfNodeBuilder()
+                .setConnectionTimeoutMillis(1000L)
+                .setDefaultRequestTimeoutMillis(2000L)
+                .setHost(host)
+                .setPort(portNumber)
+                .setCredentials(new LoginPasswordBuilder()
+                        .setUsername("testuser")
+                        .setPassword("testpassword").build())
+                .setTcpOnly(true)
+                .build();
+
+        final RemoteDeviceConnectorImpl remoteDeviceConnection =
+                new RemoteDeviceConnectorImpl(builder.build(), remoteDeviceId);
+
+        final NetconfReconnectingClientConfiguration defaultClientConfig =
+                remoteDeviceConnection.getClientConfig(listener, testingNode);
+
+        assertEquals(defaultClientConfig.getConnectionTimeoutMillis().longValue(), 1000L);
+        assertEquals(defaultClientConfig.getAddress(), new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9999));
+        assertSame(defaultClientConfig.getSessionListener(), listener);
+        assertEquals(defaultClientConfig.getAuthHandler().getUsername(), "testuser");
+        assertEquals(defaultClientConfig.getProtocol(), NetconfClientConfiguration.NetconfClientProtocol.TCP);
+    }
+
+    @Test
+    public void testSchemaResourceDTO() throws UnknownHostException {
+        final ExecutorService executorService = mock(ExecutorService.class);
+        doReturn(executorService).when(processingExecutor).getExecutor();
+
+        final Credentials credentials = new LoginPasswordBuilder().setPassword("admin").setUsername("admin").build();
+        final NetconfNode netconfNode = new NetconfNodeBuilder()
+                .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
+                .setPort(new PortNumber(9999))
+                .setReconnectOnChangedSchema(true)
+                .setDefaultRequestTimeoutMillis(1000L)
+                .setBetweenAttemptsTimeoutMillis(100)
+                .setSchemaless(false)
+                .setTcpOnly(false)
+                .setCredentials(credentials)
+                .setSchemaCacheDirectory("schemas-test")
+                .build();
+
+        final RemoteDeviceConnectorImpl remoteDeviceConnection =
+                new RemoteDeviceConnectorImpl(builder.build(), remoteDeviceId);
+
+        final ActorRef masterRef = mock(ActorRef.class);
+
+        remoteDeviceConnection.createDeviceCommunicator(NODE_ID, netconfNode, masterRef);
+
+        assertTrue(remoteDeviceConnection.getSchemaResourcesDTOs().containsKey("schemas-test"));
+    }
+
+}
diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/TestingRemoteDeviceConnectorImpl.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/TestingRemoteDeviceConnectorImpl.java
new file mode 100644 (file)
index 0000000..d4b4ec0
--- /dev/null
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+
+import akka.actor.ActorRef;
+import com.google.common.util.concurrent.Futures;
+import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfConnectorDTO;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+
+class TestingRemoteDeviceConnectorImpl extends RemoteDeviceConnectorImpl {
+
+    private final NetconfDeviceCommunicator communicator;
+    private final RemoteDeviceHandler salFacade;
+
+    TestingRemoteDeviceConnectorImpl(final NetconfTopologySetup netconfTopologyDeviceSetup,
+                                            final RemoteDeviceId remoteDeviceId,
+                                            final NetconfDeviceCommunicator communicator,
+                                            final RemoteDeviceHandler salFacade) {
+        super(netconfTopologyDeviceSetup, remoteDeviceId);
+        this.communicator = communicator;
+        this.salFacade = salFacade;
+    }
+
+    @Override
+    public NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId, final NetconfNode node,
+                                                        final ActorRef deviceContextActorRef) {
+        final NetconfConnectorDTO connectorDTO = new NetconfConnectorDTO(communicator, salFacade);
+        doReturn(Futures.immediateCheckedFuture(null)).when(communicator).initializeRemoteConnection(any(), any());
+
+        return connectorDTO;
+    }
+
+}
diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ReadOnlyTransactionTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/ReadOnlyTransactionTest.java
new file mode 100644 (file)
index 0000000..661c511
--- /dev/null
@@ -0,0 +1,262 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.tx;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.MockitoAnnotations.initMocks;
+import static org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils.DEFAULT_SCHEMA_REPOSITORY;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.pattern.Patterns;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import akka.util.Timeout;
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.Mock;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
+import org.opendaylight.netconf.topology.singleton.impl.NetconfDOMDataBroker;
+import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
+import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
+import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+public class ReadOnlyTransactionTest {
+    private static final Timeout TIMEOUT = new Timeout(Duration.create(5, "seconds"));
+    private static final int TIMEOUT_SEC = 5;
+    private static ActorSystem system;
+
+    @Rule
+    public final ExpectedException exception = ExpectedException.none();
+
+    private ActorRef masterRef;
+    private NetconfDOMDataBroker slaveDataBroker;
+    private DOMDataBroker masterDataBroker;
+    private List<SourceIdentifier> sourceIdentifiers;
+
+    @Mock
+    private DOMDataReadOnlyTransaction readTx;
+
+    @Before
+    public void setup() throws UnknownHostException {
+        initMocks(this);
+
+        system = ActorSystem.create();
+
+        final RemoteDeviceId remoteDeviceId = new RemoteDeviceId("netconf-topology",
+                new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9999));
+
+        final NetconfTopologySetup setup = mock(NetconfTopologySetup.class);
+        final Props props = NetconfNodeActor.props(setup, remoteDeviceId, DEFAULT_SCHEMA_REPOSITORY,
+                DEFAULT_SCHEMA_REPOSITORY);
+
+        masterRef = TestActorRef.create(system, props, "master_read");
+
+        sourceIdentifiers = Lists.newArrayList();
+
+        // Create master data broker
+
+        final DOMDataBroker delegateDataBroker = mock(DOMDataBroker.class);
+        readTx = mock(DOMDataReadOnlyTransaction.class);
+
+        doReturn(readTx).when(delegateDataBroker).newReadOnlyTransaction();
+
+        final NetconfDOMTransaction masterDOMTransactions =
+                new NetconfMasterDOMTransaction(delegateDataBroker);
+
+        masterDataBroker =
+                new NetconfDOMDataBroker(system, remoteDeviceId, masterDOMTransactions);
+
+        // Create slave data broker for testing proxy
+
+        final NetconfDOMTransaction proxyDOMTransactions =
+                new NetconfProxyDOMTransaction(system, masterRef);
+
+        slaveDataBroker = new NetconfDOMDataBroker(system, remoteDeviceId, proxyDOMTransactions);
+
+
+    }
+
+    @After
+    public void teardown() {
+        JavaTestKit.shutdownActorSystem(system);
+        system = null;
+    }
+
+    @Test
+    public void testRead() throws Exception {
+
+        /* Initialize data on master */
+
+        initializeDataTest();
+
+        final YangInstanceIdentifier instanceIdentifier = YangInstanceIdentifier.EMPTY;
+        final LogicalDatastoreType storeType = LogicalDatastoreType.CONFIGURATION;
+
+        // Message: EmptyReadResponse
+
+        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultEmpty =
+                Futures.immediateCheckedFuture(Optional.absent());
+
+        doReturn(resultEmpty).when(readTx).read(storeType, instanceIdentifier);
+
+        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultEmptyResponse =
+                slaveDataBroker.newReadOnlyTransaction().read(storeType,
+                        instanceIdentifier);
+
+        final Optional<NormalizedNode<?, ?>> resultEmptyMessage =
+                resultEmptyResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+        assertEquals(resultEmptyMessage, Optional.absent());
+
+        // Message: NormalizedNodeMessage
+
+        final NormalizedNode<?, ?> outputNode = ImmutableContainerNodeBuilder.create()
+                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("TestQname")))
+                .withChild(ImmutableNodes.leafNode(QName.create("NodeQname"), "foo")).build();
+
+        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultNormalizedNodeMessage =
+                Futures.immediateCheckedFuture(Optional.of(outputNode));
+
+        doReturn(resultNormalizedNodeMessage).when(readTx).read(storeType, instanceIdentifier);
+
+        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultNodeMessageResponse =
+                slaveDataBroker.newReadOnlyTransaction().read(storeType, instanceIdentifier);
+
+        final Optional<NormalizedNode<?, ?>> resultNodeMessage =
+                resultNodeMessageResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+        assertTrue(resultNodeMessage.isPresent());
+        assertEquals(resultNodeMessage.get(), outputNode);
+
+        // Message: Throwable
+
+        final ReadFailedException readFailedException = new ReadFailedException("Fail", null);
+        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultThrowable =
+                Futures.immediateFailedCheckedFuture(readFailedException);
+
+        doReturn(resultThrowable).when(readTx).read(storeType, instanceIdentifier);
+
+        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> resultThrowableResponse =
+                slaveDataBroker.newReadOnlyTransaction().read(storeType, instanceIdentifier);
+
+        exception.expect(ReadFailedException.class);
+        resultThrowableResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+    }
+
+    @Test
+    public void testExist() throws Exception {
+
+        /* Initialize data on master */
+
+        initializeDataTest();
+
+        final YangInstanceIdentifier instanceIdentifier = YangInstanceIdentifier.EMPTY;
+        final LogicalDatastoreType storeType = LogicalDatastoreType.CONFIGURATION;
+
+        // Message: True
+
+        final CheckedFuture<Boolean, ReadFailedException> resultTrue =
+                Futures.immediateCheckedFuture(true);
+
+        doReturn(resultTrue).when(readTx).exists(storeType, instanceIdentifier);
+
+        final CheckedFuture<Boolean, ReadFailedException> trueResponse =
+                slaveDataBroker.newReadOnlyTransaction().exists(storeType, instanceIdentifier);
+
+        final Boolean trueMessage = trueResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+        assertEquals(true, trueMessage);
+
+        // Message: False
+
+        final CheckedFuture<Boolean, ReadFailedException> resultFalse = Futures.immediateCheckedFuture(false);
+
+        doReturn(resultFalse).when(readTx).exists(storeType, instanceIdentifier);
+
+        final CheckedFuture<Boolean, ReadFailedException> falseResponse =
+                slaveDataBroker.newReadOnlyTransaction().exists(storeType,
+                        instanceIdentifier);
+
+        final Boolean falseMessage = falseResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+        assertEquals(false, falseMessage);
+
+        // Message: False, result null
+
+        final CheckedFuture<Boolean, ReadFailedException> resultNull = Futures.immediateCheckedFuture(null);
+
+        doReturn(resultNull).when(readTx).exists(storeType, instanceIdentifier);
+
+        final CheckedFuture<Boolean, ReadFailedException> nullResponse =
+                slaveDataBroker.newReadOnlyTransaction().exists(storeType,
+                        instanceIdentifier);
+
+        final Boolean nullFalseMessage = nullResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+        assertEquals(false, nullFalseMessage);
+
+        // Message: Throwable
+
+        final ReadFailedException readFailedException = new ReadFailedException("Fail", null);
+        final CheckedFuture<Boolean, ReadFailedException> resultThrowable =
+                Futures.immediateFailedCheckedFuture(readFailedException);
+
+        doReturn(resultThrowable).when(readTx).exists(storeType, instanceIdentifier);
+
+        final CheckedFuture<Boolean, ReadFailedException> resultThrowableResponse =
+                slaveDataBroker.newReadOnlyTransaction().exists(storeType, instanceIdentifier);
+
+        exception.expect(ReadFailedException.class);
+        resultThrowableResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+    }
+
+    private void initializeDataTest() throws Exception {
+        final Future<Object> initialDataToActor =
+                Patterns.ask(masterRef, new CreateInitialMasterActorData(masterDataBroker, sourceIdentifiers),
+                        TIMEOUT);
+
+        final Object success = Await.result(initialDataToActor, TIMEOUT.duration());
+
+        assertTrue(success instanceof MasterActorDataInitialized);
+    }
+}
diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/WriteOnlyTransactionTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/WriteOnlyTransactionTest.java
new file mode 100644 (file)
index 0000000..97b0cec
--- /dev/null
@@ -0,0 +1,257 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.tx;
+
+import static junit.framework.TestCase.assertNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.MockitoAnnotations.initMocks;
+import static org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils.DEFAULT_SCHEMA_REPOSITORY;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.pattern.Patterns;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import akka.util.Timeout;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.Mock;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
+import org.opendaylight.netconf.topology.singleton.impl.NetconfDOMDataBroker;
+import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
+import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
+import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+public class WriteOnlyTransactionTest {
+    private static final Timeout TIMEOUT = new Timeout(Duration.create(5, "seconds"));
+    private static final int TIMEOUT_SEC = 5;
+    private static ActorSystem system;
+
+    @Rule
+    public final ExpectedException exception = ExpectedException.none();
+
+    private ActorRef masterRef;
+    private NetconfDOMDataBroker slaveDataBroker;
+    private DOMDataBroker masterDataBroker;
+    private List<SourceIdentifier> sourceIdentifiers;
+
+    @Mock
+    private DOMDataWriteTransaction writeTx;
+
+    @Before
+    public void setup() throws UnknownHostException {
+        initMocks(this);
+
+        system = ActorSystem.create();
+
+        final RemoteDeviceId remoteDeviceId = new RemoteDeviceId("netconf-topology",
+                new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9999));
+
+        final NetconfTopologySetup setup = mock(NetconfTopologySetup.class);
+        final Props props = NetconfNodeActor.props(setup, remoteDeviceId, DEFAULT_SCHEMA_REPOSITORY,
+                DEFAULT_SCHEMA_REPOSITORY);
+
+        masterRef = TestActorRef.create(system, props, "master_read");
+
+        sourceIdentifiers = Lists.newArrayList();
+
+        // Create master data broker
+
+        final DOMDataBroker delegateDataBroker = mock(DOMDataBroker.class);
+        writeTx = mock(DOMDataWriteTransaction.class);
+        final DOMDataReadOnlyTransaction readTx = mock(DOMDataReadOnlyTransaction.class);
+
+        doReturn(writeTx).when(delegateDataBroker).newWriteOnlyTransaction();
+        doReturn(readTx).when(delegateDataBroker).newReadOnlyTransaction();
+
+        final NetconfDOMTransaction masterDOMTransactions =
+                new NetconfMasterDOMTransaction(delegateDataBroker);
+
+        masterDataBroker =
+                new NetconfDOMDataBroker(system, remoteDeviceId, masterDOMTransactions);
+
+        // Create slave data broker for testing proxy
+
+        final NetconfDOMTransaction proxyDOMTransactions =
+                new NetconfProxyDOMTransaction(system, masterRef);
+
+        slaveDataBroker = new NetconfDOMDataBroker(system, remoteDeviceId, proxyDOMTransactions);
+
+
+    }
+
+    @After
+    public void teardown() {
+        JavaTestKit.shutdownActorSystem(system);
+        system = null;
+    }
+
+    @Test
+    public void testPutMergeDeleteCalls() throws Exception {
+
+        /* Initialize data on master */
+
+        initializeDataTest();
+
+        final YangInstanceIdentifier instanceIdentifier = YangInstanceIdentifier.EMPTY;
+        final LogicalDatastoreType storeType = LogicalDatastoreType.CONFIGURATION;
+        final NormalizedNode<?, ?> testNode = ImmutableContainerNodeBuilder.create()
+                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("TestQname")))
+                .withChild(ImmutableNodes.leafNode(QName.create("NodeQname"), "foo")).build();
+
+        // Test of invoking put on master through slave proxy
+
+        doNothing().when(writeTx).put(storeType, instanceIdentifier, testNode);
+        slaveDataBroker.newWriteOnlyTransaction().put(storeType, instanceIdentifier, testNode);
+
+        verify(writeTx, times(1)).put(storeType, instanceIdentifier, testNode);
+
+        // Test of invoking merge on master through slave proxy
+
+        doNothing().when(writeTx).merge(storeType, instanceIdentifier, testNode);
+        slaveDataBroker.newWriteOnlyTransaction().merge(storeType, instanceIdentifier, testNode);
+
+        verify(writeTx, times(1)).merge(storeType, instanceIdentifier, testNode);
+
+        // Test of invoking delete on master through slave proxy
+
+        doNothing().when(writeTx).delete(storeType, instanceIdentifier);
+        slaveDataBroker.newWriteOnlyTransaction().delete(storeType, instanceIdentifier);
+
+        verify(writeTx, times(1)).delete(storeType, instanceIdentifier);
+
+    }
+
+    @Test
+    public void testSubmit() throws Exception {
+
+        /* Initialize data on master */
+
+        initializeDataTest();
+
+        // Without Tx
+
+        final CheckedFuture<Void,TransactionCommitFailedException> resultSubmit = Futures.immediateCheckedFuture(null);
+        doReturn(resultSubmit).when(writeTx).submit();
+
+        final CheckedFuture<Void, TransactionCommitFailedException> resultSubmitResponse =
+                slaveDataBroker.newWriteOnlyTransaction().submit();
+
+        final Object result= resultSubmitResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+        assertNull(result);
+
+        // With Tx
+
+        doNothing().when(writeTx).delete(any(), any());
+        slaveDataBroker.newWriteOnlyTransaction().delete(LogicalDatastoreType.CONFIGURATION,
+                YangInstanceIdentifier.EMPTY);
+
+        final CheckedFuture<Void,TransactionCommitFailedException> resultSubmitTx = Futures.immediateCheckedFuture(null);
+        doReturn(resultSubmitTx).when(writeTx).submit();
+
+        final CheckedFuture<Void, TransactionCommitFailedException> resultSubmitTxResponse =
+                slaveDataBroker.newWriteOnlyTransaction().submit();
+
+        final Object resultTx = resultSubmitTxResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+        assertNull(resultTx);
+
+        slaveDataBroker.newWriteOnlyTransaction().delete(LogicalDatastoreType.CONFIGURATION,
+                YangInstanceIdentifier.EMPTY);
+
+        final TransactionCommitFailedException throwable = new TransactionCommitFailedException("Fail", null);
+        final CheckedFuture<Void,TransactionCommitFailedException> resultThrowable =
+                Futures.immediateFailedCheckedFuture(throwable);
+
+        doReturn(resultThrowable).when(writeTx).submit();
+
+        final CheckedFuture<Void, TransactionCommitFailedException> resultThrowableResponse =
+                slaveDataBroker.newWriteOnlyTransaction().submit();
+
+        exception.expect(TransactionCommitFailedException.class);
+        resultThrowableResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
+    }
+
+    @Test
+    public void testCancel() throws Exception {
+
+        /* Initialize data on master */
+
+        initializeDataTest();
+
+        // Without Tx
+
+        final Boolean resultFalseNoTx = slaveDataBroker.newWriteOnlyTransaction().cancel();
+        assertEquals(false, resultFalseNoTx);
+
+        // With Tx, readWriteTx test
+
+        doNothing().when(writeTx).delete(any(), any());
+        slaveDataBroker.newReadWriteTransaction().delete(LogicalDatastoreType.CONFIGURATION,
+                YangInstanceIdentifier.EMPTY);
+
+        doReturn(true).when(writeTx).cancel();
+
+        final Boolean resultTrue = slaveDataBroker.newWriteOnlyTransaction().cancel();
+        assertEquals(true, resultTrue);
+
+        doReturn(false).when(writeTx).cancel();
+
+        final Boolean resultFalse = slaveDataBroker.newWriteOnlyTransaction().cancel();
+        assertEquals(false, resultFalse);
+
+    }
+
+    private void initializeDataTest() throws Exception {
+        final Future<Object> initialDataToActor =
+                Patterns.ask(masterRef, new CreateInitialMasterActorData(masterDataBroker, sourceIdentifiers),
+                        TIMEOUT);
+
+        final Object success = Await.result(initialDataToActor, TIMEOUT.duration());
+
+        assertTrue(success instanceof MasterActorDataInitialized);
+    }
+
+}
diff --git a/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/utils/NetconfTopologyUtilTest.java b/netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/utils/NetconfTopologyUtilTest.java
new file mode 100644 (file)
index 0000000..9f8d153
--- /dev/null
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.utils;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Host;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4Address;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.PortNumber;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+public class NetconfTopologyUtilTest {
+
+    @Test
+    public void testCreateRemoteDeviceId() {
+        final Host host = new Host(new IpAddress(new Ipv4Address("127.0.0.1")));
+        final NetconfNode netconfNode = new NetconfNodeBuilder().setHost(host).setPort(new PortNumber(9999)).build();
+        final NodeId nodeId = new NodeId("testing-node");
+        final RemoteDeviceId id = NetconfTopologyUtils.createRemoteDeviceId(nodeId, netconfNode);
+
+        assertEquals("testing-node", id.getName());
+        assertEquals(host, id.getHost());
+        assertEquals(9999, id.getAddress().getPort());
+    }
+
+    @Test
+    public void testCreateActorPath() {
+        final String actorPath = NetconfTopologyUtils.createActorPath("member", "name");
+        assertEquals("member/user/name", actorPath);
+    }
+
+    @Test
+    public void testCreateListPath() {
+        final InstanceIdentifier<Node> listPath =
+                NetconfTopologyUtils.createTopologyNodeListPath(new NodeKey(new NodeId("nodeId")), "topologyId");
+
+        assertEquals("nodeId", listPath.firstKeyOf(Node.class).getNodeId().getValue());
+        assertEquals("topologyId", listPath.firstKeyOf(Topology.class).getTopologyId().getValue());
+
+        assertEquals("topologyId",  NetconfTopologyUtils.createTopologyNodePath("topologyId").
+                firstKeyOf(Topology.class).getTopologyId().getValue());
+    }
+
+}
index 0eed113225cd51cb1ee986330975adc3865908c2..db4a1c24d6a1aa991da1fb32554c22097a2b5fba 100644 (file)
@@ -47,6 +47,7 @@
     <module>netconf-notifications-impl</module>
     <module>netconf-notifications-api</module>
     <module>netconf-topology</module>
+    <module>netconf-topology-singleton</module>
     <module>abstract-topology</module>
     <module>netconf-topology-config</module>
     <module>sal-netconf-connector</module>
index b440b1d900fa7b81c9419ae68c2e2fa05aa5fad2..1fe3fafccfff59a71e0203f92da94c0cb35812f7 100644 (file)
@@ -18,6 +18,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
@@ -29,6 +30,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev15
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.AvailableCapabilitiesBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.ClusteredConnectionStatusBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.UnavailableCapabilities;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.UnavailableCapabilitiesBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapability;
@@ -51,7 +53,7 @@ import org.opendaylight.yangtools.yang.common.QName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-final class NetconfDeviceTopologyAdapter implements AutoCloseable {
+public final class NetconfDeviceTopologyAdapter implements AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(NetconfDeviceTopologyAdapter.class);
     public static final Function<Entry<QName, FailureReason>, UnavailableCapability> UNAVAILABLE_CAPABILITY_TRANSFORMER = new Function<Entry<QName, FailureReason>, UnavailableCapability>() {
@@ -134,6 +136,21 @@ final class NetconfDeviceTopologyAdapter implements AutoCloseable {
         commitTransaction(writeTx, "update");
     }
 
+    public void updateClusteredDeviceData(boolean up, String masterAddress, NetconfDeviceCapabilities capabilities) {
+        final NetconfNode data = buildDataForNetconfClusteredNode(up, masterAddress, capabilities);
+
+        final WriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+        LOG.trace(
+                "{}: Update device state transaction {} merging operational data started.",
+                id, writeTx.getIdentifier());
+        writeTx.put(LogicalDatastoreType.OPERATIONAL, id.getTopologyBindingPath().augmentation(NetconfNode.class), data, true);
+        LOG.trace(
+                "{}: Update device state transaction {} merging operational data ended.",
+                id, writeTx.getIdentifier());
+
+        commitTransaction(writeTx, "update");
+    }
+
     public void setDeviceAsFailed(Throwable throwable) {
         String reason = (throwable != null && throwable.getMessage() != null) ? throwable.getMessage() : UNKNOWN_REASON;
 
@@ -172,6 +189,30 @@ final class NetconfDeviceTopologyAdapter implements AutoCloseable {
         return netconfNodeBuilder.build();
     }
 
+    private NetconfNode buildDataForNetconfClusteredNode(boolean up, String masterNodeAddress, NetconfDeviceCapabilities capabilities) {
+        List<String> capabilityList = new ArrayList<>();
+        capabilityList.addAll(capabilities.getNonModuleBasedCapabilities());
+        capabilityList.addAll(FluentIterable.from(capabilities.getResolvedCapabilities()).transform(AVAILABLE_CAPABILITY_TRANSFORMER).toList());
+        final AvailableCapabilitiesBuilder avCapabalitiesBuilder = new AvailableCapabilitiesBuilder();
+        avCapabalitiesBuilder.setAvailableCapability(capabilityList);
+
+        final UnavailableCapabilities unavailableCapabilities =
+                new UnavailableCapabilitiesBuilder().setUnavailableCapability(capabilities.getUnresolvedCapabilites()
+                        .entrySet().stream().map(UNAVAILABLE_CAPABILITY_TRANSFORMER::apply)
+                        .collect(Collectors.toList())).build();
+
+        final NetconfNodeBuilder netconfNodeBuilder = new NetconfNodeBuilder()
+                .setHost(id.getHost())
+                .setPort(new PortNumber(id.getAddress().getPort()))
+                .setConnectionStatus(up ? ConnectionStatus.Connected : ConnectionStatus.Connecting)
+                .setAvailableCapabilities(avCapabalitiesBuilder.build())
+                .setUnavailableCapabilities(unavailableCapabilities)
+                .setClusteredConnectionStatus(
+                        new ClusteredConnectionStatusBuilder().setNetconfMasterNode(masterNodeAddress).build());
+
+        return netconfNodeBuilder.build();
+    }
+
     public void removeDeviceConfiguration() {
         final WriteTransaction writeTx = txChain.newWriteOnlyTransaction();
 
index 62b88b974f2091e575b6a4f70b59ad4b1b7b1e55..b3a5f3ff845456086211f147bbfddf92281e208a 100644 (file)
@@ -153,6 +153,10 @@ module netconf-node-topology {
                     }
                 }
             }
+            leaf netconf-master-node {
+                config false;
+                type string;
+            }
         }
 
         leaf connected-message {