import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.toRpcMessage;
import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.wrap;
+import com.google.common.base.Preconditions;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.URI;
import org.opendaylight.controller.md.sal.common.api.data.DataModification;
import org.opendaylight.controller.md.sal.common.api.data.DataReader;
import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
+import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
+import org.opendaylight.controller.netconf.client.conf.NetconfReconnectingClientConfiguration;
+import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
import org.opendaylight.controller.sal.core.api.Provider;
import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
import org.opendaylight.controller.sal.core.api.mount.MountProvisionService;
import org.opendaylight.protocol.framework.ReconnectStrategy;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode;
import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.common.RpcResult;
SchemaSourceProvider<InputStream> remoteSourceProvider;
- DataBrokerService dataBroker;
+ private volatile DataBrokerService dataBroker;
NetconfDeviceListener listener;
+ private boolean rollbackSupported;
+
+ private NetconfReconnectingClientConfiguration clientConfig;
+ private volatile DataProviderService dataProviderService;
+
public NetconfDevice(String name) {
this.name = name;
this.logger = LoggerFactory.getLogger(NetconfDevice.class + "#" + name);
checkState(schemaSourceProvider != null, "Schema Source Provider must be set.");
checkState(eventExecutor != null, "Event executor must be set.");
- listener = new NetconfDeviceListener(this);
+ Preconditions.checkArgument(clientConfig.getSessionListener() instanceof NetconfDeviceListener);
+ listener = (NetconfDeviceListener) clientConfig.getSessionListener();
logger.info("Starting NETCONF Client {} for address {}", name, socketAddress);
- dispatcher.createClient(socketAddress, listener, reconnectStrategy);
+ dispatcher.createReconnectingClient(clientConfig);
}
Optional<SchemaContext> getSchemaContext() {
}
}
- void bringUp(SchemaSourceProvider<String> delegate, Set<QName> capabilities) {
- remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
- deviceContextProvider = new NetconfDeviceSchemaContextProvider(this, remoteSourceProvider);
- deviceContextProvider.createContextFromCapabilities(capabilities);
- if (mountInstance != null && getSchemaContext().isPresent()) {
- mountInstance.setSchemaContext(getSchemaContext().get());
- }
+ void bringUp(final SchemaSourceProvider<String> delegate, final Set<QName> capabilities, final boolean rollbackSupported) {
+ // This has to be called from separate thread, not from netty thread calling onSessionUp in DeviceListener.
+ // Reason: delegate.getSchema blocks thread when waiting for response
+ // however, if the netty thread is blocked, no incoming message can be processed
+ // ... netty should pick another thread from pool to process incoming message, but it does not http://netty.io/wiki/thread-model.html
+ // TODO redesign +refactor
+ processingExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ NetconfDevice.this.rollbackSupported = rollbackSupported;
+ remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
+ deviceContextProvider = new NetconfDeviceSchemaContextProvider(NetconfDevice.this, remoteSourceProvider);
+ deviceContextProvider.createContextFromCapabilities(capabilities);
+ if (mountInstance != null && getSchemaContext().isPresent()) {
+ mountInstance.setSchemaContext(getSchemaContext().get());
+ }
- updateDeviceState(true, capabilities);
+ updateDeviceState(true, capabilities);
- if (mountInstance != null) {
- confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this);
- operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this);
- commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this);
+ if (mountInstance != null) {
+ confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, NetconfDevice.this);
+ operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, NetconfDevice.this);
+ commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, NetconfDevice.this);
- List<RpcRegistration> rpcs = new ArrayList<>();
- // TODO same condition twice
- if (mountInstance != null && getSchemaContext().isPresent()) {
- for (RpcDefinition rpc : mountInstance.getSchemaContext().getOperations()) {
- rpcs.add(mountInstance.addRpcImplementation(rpc.getQName(), this));
+ List<RpcRegistration> rpcs = new ArrayList<>();
+ // TODO same condition twice
+ if (mountInstance != null && getSchemaContext().isPresent()) {
+ for (RpcDefinition rpc : mountInstance.getSchemaContext().getOperations()) {
+ rpcs.add(mountInstance.addRpcImplementation(rpc.getQName(), NetconfDevice.this));
+ }
+ }
+ rpcReg = rpcs;
}
}
- rpcReg = rpcs;
- }
+ });
}
private void updateDeviceState(boolean up, Set<QName> capabilities) {
+ checkDataStoreState();
+
DataModificationTransaction transaction = dataBroker.beginTransaction();
CompositeNodeBuilder<ImmutableCompositeNode> it = ImmutableCompositeNode.builder();
logger.debug("Client capabilities {}", capabilities);
for (QName capability : capabilities) {
- it.addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability);
+ it.addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability.toString());
}
logger.debug("Update device state transaction " + transaction.getIdentifier()
@Override
public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
- return listener.sendRequest(toRpcMessage(rpc, input, getSchemaContext()));
+ return listener.sendRequest(toRpcMessage(rpc, input, getSchemaContext()), rpc);
}
@Override
public void onSessionInitiated(ProviderSession session) {
dataBroker = session.getService(DataBrokerService.class);
+ processingExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ updateInitialState();
+ }
+ });
+
+ mountService = session.getService(MountProvisionService.class);
+ if (mountService != null) {
+ mountInstance = mountService.createOrGetMountPoint(path);
+ }
+ }
+
+ private void updateInitialState() {
+ checkDataStoreState();
+
DataModificationTransaction transaction = dataBroker.beginTransaction();
if (operationalNodeNotExisting(transaction)) {
transaction.putOperationalData(path, getNodeWithId());
} catch (ExecutionException e) {
throw new RuntimeException("Read configuration data " + path + " failed", e);
}
-
- mountService = session.getService(MountProvisionService.class);
- if (mountService != null) {
- mountInstance = mountService.createOrGetMountPoint(path);
- }
}
+ private void checkDataStoreState() {
+ // read data from Nodes/Node in order to wait with write until schema for Nodes/Node is present in datastore
+ dataProviderService.readOperationalData(org.opendaylight.yangtools.yang.binding.InstanceIdentifier.builder(
+ Nodes.class).child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class).augmentation(NetconfNode.class).build()); }
+
CompositeNode getNodeWithId() {
SimpleNodeTOImpl id = new SimpleNodeTOImpl(INVENTORY_ID, null, name);
return new CompositeNodeTOImpl(INVENTORY_NODE, null, Collections.<Node<?>> singletonList(id));
public DataCommitTransaction<InstanceIdentifier, CompositeNode> requestCommit(
DataModification<InstanceIdentifier, CompositeNode> modification) {
NetconfDeviceTwoPhaseCommitTransaction twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this,
- modification, true);
+ modification, true, rollbackSupported);
try {
twoPhaseCommit.prepare();
} catch (InterruptedException e) {
public void setDispatcher(final NetconfClientDispatcher dispatcher) {
this.dispatcher = dispatcher;
}
+
+ public void setClientConfig(final NetconfReconnectingClientConfiguration clientConfig) {
+ this.clientConfig = clientConfig;
+ }
+
+ public void setDataProviderService(final DataProviderService dataProviderService) {
+ this.dataProviderService = dataProviderService;
+ }
}
class NetconfDeviceSchemaContextProvider {