X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-netconf-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fconnect%2Fnetconf%2FNetconfDevice.java;h=54242cf26da315643d16cb350be9d4a60f300de1;hp=abbbb68265e3209915f32f17e589ff76c4dc2a91;hb=325ce8c85b1ed89edd4aed2fc4fd2237ccc3b203;hpb=2c975930426e129f4bd86e9fa5dec257cbe2d6b6 diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java index abbbb68265..54242cf26d 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java @@ -21,6 +21,7 @@ import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.toF import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.toRpcMessage; import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.wrap; +import com.google.common.base.Preconditions; import java.io.InputStream; import java.net.InetSocketAddress; import java.net.URI; @@ -38,6 +39,8 @@ import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler; import org.opendaylight.controller.md.sal.common.api.data.DataModification; import org.opendaylight.controller.md.sal.common.api.data.DataReader; import org.opendaylight.controller.netconf.client.NetconfClientDispatcher; +import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration; +import org.opendaylight.controller.sal.binding.api.data.DataProviderService; import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration; import org.opendaylight.controller.sal.core.api.Provider; @@ -47,6 +50,8 @@ import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance; import org.opendaylight.controller.sal.core.api.mount.MountProvisionService; import org.opendaylight.protocol.framework.ReconnectStrategy; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode; import org.opendaylight.yangtools.concepts.Registration; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.common.RpcResult; @@ -115,10 +120,15 @@ public class NetconfDevice implements Provider, // SchemaSourceProvider remoteSourceProvider; - DataBrokerService dataBroker; + private volatile DataBrokerService dataBroker; NetconfDeviceListener listener; + private boolean rollbackSupported; + + private NetconfClientConfiguration clientConfig; + private volatile DataProviderService dataProviderService; + public NetconfDevice(String name) { this.name = name; this.logger = LoggerFactory.getLogger(NetconfDevice.class + "#" + name); @@ -131,11 +141,12 @@ public class NetconfDevice implements Provider, // checkState(schemaSourceProvider != null, "Schema Source Provider must be set."); checkState(eventExecutor != null, "Event executor must be set."); - listener = new NetconfDeviceListener(this); + Preconditions.checkArgument(clientConfig.getSessionListener() instanceof NetconfDeviceListener); + listener = (NetconfDeviceListener) clientConfig.getSessionListener(); logger.info("Starting NETCONF Client {} for address {}", name, socketAddress); - dispatcher.createClient(socketAddress, listener, reconnectStrategy); + dispatcher.createClient(clientConfig); } Optional getSchemaContext() { @@ -172,33 +183,46 @@ public class NetconfDevice implements Provider, // } } - void bringUp(SchemaSourceProvider delegate, Set capabilities) { - remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate); - deviceContextProvider = new NetconfDeviceSchemaContextProvider(this, remoteSourceProvider); - deviceContextProvider.createContextFromCapabilities(capabilities); - if (mountInstance != null && getSchemaContext().isPresent()) { - mountInstance.setSchemaContext(getSchemaContext().get()); - } + void bringUp(final SchemaSourceProvider delegate, final Set capabilities, final boolean rollbackSupported) { + // This has to be called from separate thread, not from netty thread calling onSessionUp in DeviceListener. + // Reason: delegate.getSchema blocks thread when waiting for response + // however, if the netty thread is blocked, no incoming message can be processed + // ... netty should pick another thread from pool to process incoming message, but it does not http://netty.io/wiki/thread-model.html + // TODO redesign +refactor + processingExecutor.submit(new Runnable() { + @Override + public void run() { + NetconfDevice.this.rollbackSupported = rollbackSupported; + remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate); + deviceContextProvider = new NetconfDeviceSchemaContextProvider(NetconfDevice.this, remoteSourceProvider); + deviceContextProvider.createContextFromCapabilities(capabilities); + if (mountInstance != null && getSchemaContext().isPresent()) { + mountInstance.setSchemaContext(getSchemaContext().get()); + } - updateDeviceState(true, capabilities); + updateDeviceState(true, capabilities); - if (mountInstance != null) { - confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this); - operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this); - commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this); + if (mountInstance != null) { + confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, NetconfDevice.this); + operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, NetconfDevice.this); + commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, NetconfDevice.this); - List rpcs = new ArrayList<>(); - // TODO same condition twice - if (mountInstance != null && getSchemaContext().isPresent()) { - for (RpcDefinition rpc : mountInstance.getSchemaContext().getOperations()) { - rpcs.add(mountInstance.addRpcImplementation(rpc.getQName(), this)); + List rpcs = new ArrayList<>(); + // TODO same condition twice + if (mountInstance != null && getSchemaContext().isPresent()) { + for (RpcDefinition rpc : mountInstance.getSchemaContext().getOperations()) { + rpcs.add(mountInstance.addRpcImplementation(rpc.getQName(), NetconfDevice.this)); + } + } + rpcReg = rpcs; } } - rpcReg = rpcs; - } + }); } private void updateDeviceState(boolean up, Set capabilities) { + checkDataStoreState(); + DataModificationTransaction transaction = dataBroker.beginTransaction(); CompositeNodeBuilder it = ImmutableCompositeNode.builder(); @@ -208,7 +232,7 @@ public class NetconfDevice implements Provider, // logger.debug("Client capabilities {}", capabilities); for (QName capability : capabilities) { - it.addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability); + it.addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability.toString()); } logger.debug("Update device state transaction " + transaction.getIdentifier() @@ -275,7 +299,7 @@ public class NetconfDevice implements Provider, // @Override public ListenableFuture> invokeRpc(QName rpc, CompositeNode input) { - return listener.sendRequest(toRpcMessage(rpc, input, getSchemaContext())); + return listener.sendRequest(toRpcMessage(rpc, input, getSchemaContext()), rpc); } @Override @@ -287,6 +311,22 @@ public class NetconfDevice implements Provider, // public void onSessionInitiated(ProviderSession session) { dataBroker = session.getService(DataBrokerService.class); + processingExecutor.submit(new Runnable() { + @Override + public void run() { + updateInitialState(); + } + }); + + mountService = session.getService(MountProvisionService.class); + if (mountService != null) { + mountInstance = mountService.createOrGetMountPoint(path); + } + } + + private void updateInitialState() { + checkDataStoreState(); + DataModificationTransaction transaction = dataBroker.beginTransaction(); if (operationalNodeNotExisting(transaction)) { transaction.putOperationalData(path, getNodeWithId()); @@ -302,13 +342,13 @@ public class NetconfDevice implements Provider, // } catch (ExecutionException e) { throw new RuntimeException("Read configuration data " + path + " failed", e); } - - mountService = session.getService(MountProvisionService.class); - if (mountService != null) { - mountInstance = mountService.createOrGetMountPoint(path); - } } + private void checkDataStoreState() { + // read data from Nodes/Node in order to wait with write until schema for Nodes/Node is present in datastore + dataProviderService.readOperationalData(org.opendaylight.yangtools.yang.binding.InstanceIdentifier.builder( + Nodes.class).child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class).augmentation(NetconfNode.class).build()); } + CompositeNode getNodeWithId() { SimpleNodeTOImpl id = new SimpleNodeTOImpl(INVENTORY_ID, null, name); return new CompositeNodeTOImpl(INVENTORY_NODE, null, Collections.> singletonList(id)); @@ -353,7 +393,7 @@ public class NetconfDevice implements Provider, // public DataCommitTransaction requestCommit( DataModification modification) { NetconfDeviceTwoPhaseCommitTransaction twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this, - modification, true); + modification, true, rollbackSupported); try { twoPhaseCommit.prepare(); } catch (InterruptedException e) { @@ -450,6 +490,14 @@ public class NetconfDevice implements Provider, // public void setDispatcher(final NetconfClientDispatcher dispatcher) { this.dispatcher = dispatcher; } + + public void setClientConfig(final NetconfClientConfiguration clientConfig) { + this.clientConfig = clientConfig; + } + + public void setDataProviderService(final DataProviderService dataProviderService) { + this.dataProviderService = dataProviderService; + } } class NetconfDeviceSchemaContextProvider {