Merge "Unify maven-bundle-plugin version at 2.4.0"
[controller.git] / opendaylight / md-sal / sal-netconf-connector / src / main / java / org / opendaylight / controller / sal / connect / netconf / NetconfDevice.java
index abbbb68265e3209915f32f17e589ff76c4dc2a91..54242cf26da315643d16cb350be9d4a60f300de1 100644 (file)
@@ -21,6 +21,7 @@ import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.toF
 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.toRpcMessage;
 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.wrap;
 
+import com.google.common.base.Preconditions;
 import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.net.URI;
@@ -38,6 +39,8 @@ import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
 import org.opendaylight.controller.md.sal.common.api.data.DataReader;
 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
+import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
+import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
 import org.opendaylight.controller.sal.core.api.Provider;
@@ -47,6 +50,8 @@ import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction
 import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
 import org.opendaylight.controller.sal.core.api.mount.MountProvisionService;
 import org.opendaylight.protocol.framework.ReconnectStrategy;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode;
 import org.opendaylight.yangtools.concepts.Registration;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.RpcResult;
@@ -115,10 +120,15 @@ public class NetconfDevice implements Provider, //
 
     SchemaSourceProvider<InputStream> remoteSourceProvider;
 
-    DataBrokerService dataBroker;
+    private volatile DataBrokerService dataBroker;
 
     NetconfDeviceListener listener;
 
+    private boolean rollbackSupported;
+
+    private NetconfClientConfiguration clientConfig;
+    private volatile DataProviderService dataProviderService;
+
     public NetconfDevice(String name) {
         this.name = name;
         this.logger = LoggerFactory.getLogger(NetconfDevice.class + "#" + name);
@@ -131,11 +141,12 @@ public class NetconfDevice implements Provider, //
         checkState(schemaSourceProvider != null, "Schema Source Provider must be set.");
         checkState(eventExecutor != null, "Event executor must be set.");
 
-        listener = new NetconfDeviceListener(this);
+        Preconditions.checkArgument(clientConfig.getSessionListener() instanceof NetconfDeviceListener);
+        listener = (NetconfDeviceListener) clientConfig.getSessionListener();
 
         logger.info("Starting NETCONF Client {} for address {}", name, socketAddress);
 
-        dispatcher.createClient(socketAddress, listener, reconnectStrategy);
+        dispatcher.createClient(clientConfig);
     }
 
     Optional<SchemaContext> getSchemaContext() {
@@ -172,33 +183,46 @@ public class NetconfDevice implements Provider, //
         }
     }
 
-    void bringUp(SchemaSourceProvider<String> delegate, Set<QName> capabilities) {
-        remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
-        deviceContextProvider = new NetconfDeviceSchemaContextProvider(this, remoteSourceProvider);
-        deviceContextProvider.createContextFromCapabilities(capabilities);
-        if (mountInstance != null && getSchemaContext().isPresent()) {
-            mountInstance.setSchemaContext(getSchemaContext().get());
-        }
+    void bringUp(final SchemaSourceProvider<String> delegate, final Set<QName> capabilities, final boolean rollbackSupported) {
+        // This has to be called from separate thread, not from netty thread calling onSessionUp in DeviceListener.
+        // Reason: delegate.getSchema blocks thread when waiting for response
+        // however, if the netty thread is blocked, no incoming message can be processed
+        // ... netty should pick another thread from pool to process incoming message, but it does not http://netty.io/wiki/thread-model.html
+        // TODO redesign +refactor
+        processingExecutor.submit(new Runnable() {
+            @Override
+            public void run() {
+                NetconfDevice.this.rollbackSupported = rollbackSupported;
+                remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
+                deviceContextProvider = new NetconfDeviceSchemaContextProvider(NetconfDevice.this, remoteSourceProvider);
+                deviceContextProvider.createContextFromCapabilities(capabilities);
+                if (mountInstance != null && getSchemaContext().isPresent()) {
+                    mountInstance.setSchemaContext(getSchemaContext().get());
+                }
 
-        updateDeviceState(true, capabilities);
+                updateDeviceState(true, capabilities);
 
-        if (mountInstance != null) {
-            confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this);
-            operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this);
-            commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this);
+                if (mountInstance != null) {
+                    confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, NetconfDevice.this);
+                    operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, NetconfDevice.this);
+                    commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, NetconfDevice.this);
 
-            List<RpcRegistration> rpcs = new ArrayList<>();
-            // TODO same condition twice
-            if (mountInstance != null && getSchemaContext().isPresent()) {
-                for (RpcDefinition rpc : mountInstance.getSchemaContext().getOperations()) {
-                    rpcs.add(mountInstance.addRpcImplementation(rpc.getQName(), this));
+                    List<RpcRegistration> rpcs = new ArrayList<>();
+                    // TODO same condition twice
+                    if (mountInstance != null && getSchemaContext().isPresent()) {
+                        for (RpcDefinition rpc : mountInstance.getSchemaContext().getOperations()) {
+                            rpcs.add(mountInstance.addRpcImplementation(rpc.getQName(), NetconfDevice.this));
+                        }
+                    }
+                    rpcReg = rpcs;
                 }
             }
-            rpcReg = rpcs;
-        }
+        });
     }
 
     private void updateDeviceState(boolean up, Set<QName> capabilities) {
+        checkDataStoreState();
+
         DataModificationTransaction transaction = dataBroker.beginTransaction();
 
         CompositeNodeBuilder<ImmutableCompositeNode> it = ImmutableCompositeNode.builder();
@@ -208,7 +232,7 @@ public class NetconfDevice implements Provider, //
 
         logger.debug("Client capabilities {}", capabilities);
         for (QName capability : capabilities) {
-            it.addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability);
+            it.addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability.toString());
         }
 
         logger.debug("Update device state transaction " + transaction.getIdentifier()
@@ -275,7 +299,7 @@ public class NetconfDevice implements Provider, //
 
     @Override
     public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
-        return listener.sendRequest(toRpcMessage(rpc, input, getSchemaContext()));
+        return listener.sendRequest(toRpcMessage(rpc, input, getSchemaContext()), rpc);
     }
 
     @Override
@@ -287,6 +311,22 @@ public class NetconfDevice implements Provider, //
     public void onSessionInitiated(ProviderSession session) {
         dataBroker = session.getService(DataBrokerService.class);
 
+        processingExecutor.submit(new Runnable() {
+            @Override
+            public void run() {
+                updateInitialState();
+            }
+        });
+
+        mountService = session.getService(MountProvisionService.class);
+        if (mountService != null) {
+            mountInstance = mountService.createOrGetMountPoint(path);
+        }
+    }
+
+    private void updateInitialState() {
+        checkDataStoreState();
+
         DataModificationTransaction transaction = dataBroker.beginTransaction();
         if (operationalNodeNotExisting(transaction)) {
             transaction.putOperationalData(path, getNodeWithId());
@@ -302,13 +342,13 @@ public class NetconfDevice implements Provider, //
         } catch (ExecutionException e) {
             throw new RuntimeException("Read configuration data " + path + " failed", e);
         }
-
-        mountService = session.getService(MountProvisionService.class);
-        if (mountService != null) {
-            mountInstance = mountService.createOrGetMountPoint(path);
-        }
     }
 
+    private void checkDataStoreState() {
+        // read data from Nodes/Node in order to wait with write until schema for Nodes/Node is present in datastore
+        dataProviderService.readOperationalData(org.opendaylight.yangtools.yang.binding.InstanceIdentifier.builder(
+                Nodes.class).child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class).augmentation(NetconfNode.class).build());    }
+
     CompositeNode getNodeWithId() {
         SimpleNodeTOImpl id = new SimpleNodeTOImpl(INVENTORY_ID, null, name);
         return new CompositeNodeTOImpl(INVENTORY_NODE, null, Collections.<Node<?>> singletonList(id));
@@ -353,7 +393,7 @@ public class NetconfDevice implements Provider, //
     public DataCommitTransaction<InstanceIdentifier, CompositeNode> requestCommit(
             DataModification<InstanceIdentifier, CompositeNode> modification) {
         NetconfDeviceTwoPhaseCommitTransaction twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this,
-                modification, true);
+                modification, true, rollbackSupported);
         try {
             twoPhaseCommit.prepare();
         } catch (InterruptedException e) {
@@ -450,6 +490,14 @@ public class NetconfDevice implements Provider, //
     public void setDispatcher(final NetconfClientDispatcher dispatcher) {
         this.dispatcher = dispatcher;
     }
+
+    public void setClientConfig(final NetconfClientConfiguration clientConfig) {
+        this.clientConfig = clientConfig;
+    }
+
+    public void setDataProviderService(final DataProviderService dataProviderService) {
+        this.dataProviderService = dataProviderService;
+    }
 }
 
 class NetconfDeviceSchemaContextProvider {