import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.ExecutionException;
import org.opendaylight.aaa.encrypt.AAAEncryptionService;
import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
import org.opendaylight.controller.config.threadpool.ThreadPool;
import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMMountPointService;
-import org.opendaylight.netconf.api.NetconfMessage;
import org.opendaylight.netconf.client.NetconfClientDispatcher;
import org.opendaylight.netconf.client.NetconfClientSessionListener;
import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
import org.opendaylight.netconf.sal.connect.netconf.NetconfDeviceBuilder;
import org.opendaylight.netconf.sal.connect.netconf.SchemalessNetconfDevice;
import org.opendaylight.netconf.sal.connect.netconf.auth.DatastoreBackedPublicKeyAuth;
-import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
import org.opendaylight.netconf.sal.connect.netconf.listener.UserPreferences;
import org.opendaylight.netconf.sal.connect.netconf.sal.KeepaliveSalFacade;
+import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceSalFacade;
import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfKeystoreAdapter;
import org.opendaylight.netconf.sal.connect.netconf.schema.YangLibrarySchemaYangSourceProvider;
import org.opendaylight.netconf.sal.connect.netconf.schema.mapping.BaseNetconfSchemas;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.credentials.login.pw.LoginPassword;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.credentials.login.pw.unencrypted.LoginPasswordUnencrypted;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.schema.storage.YangLibrary;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.Empty;
import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
final AAAEncryptionService encryptionService,
final DeviceActionFactory deviceActionFactory,
final BaseNetconfSchemas baseSchemas) {
- this.topologyId = topologyId;
+ this.topologyId = requireNonNull(topologyId);
this.clientDispatcher = clientDispatcher;
this.eventExecutor = eventExecutor;
this.keepaliveExecutor = keepaliveExecutor;
this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor.getExecutor());
this.schemaManager = requireNonNull(schemaManager);
this.deviceActionFactory = deviceActionFactory;
- this.dataBroker = dataBroker;
+ this.dataBroker = requireNonNull(dataBroker);
this.mountPointService = mountPointService;
this.encryptionService = encryptionService;
this.baseSchemas = requireNonNull(baseSchemas);
- this.keystoreAdapter = new NetconfKeystoreAdapter(dataBroker);
+ keystoreAdapter = new NetconfKeystoreAdapter(dataBroker);
+
+ // FIXME: this should be a put(), as we are initializing and will be re-populating the datastore with all the
+ // devices. Whatever has been there before should be nuked to properly re-align lifecycle.
+ final var wtx = dataBroker.newWriteOnlyTransaction();
+ wtx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.builder(NetworkTopology.class)
+ .child(Topology.class, new TopologyKey(new TopologyId(topologyId)))
+ .build(), new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build());
+ final var future = wtx.commit();
+ try {
+ future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Unable to initialize topology {}", topologyId, e);
+ throw new IllegalStateException(e);
+ }
+
+ LOG.debug("Topology {} initialized", topologyId);
}
@Override
- public ListenableFuture<NetconfDeviceCapabilities> connectNode(final NodeId nodeId, final Node configNode) {
+ public ListenableFuture<Empty> connectNode(final NodeId nodeId, final Node configNode) {
LOG.info("Connecting RemoteDevice{{}} , with config {}", nodeId, hideCredentials(configNode));
return setupConnection(nodeId, configNode);
}
}
@Override
- public ListenableFuture<Void> disconnectNode(final NodeId nodeId) {
- LOG.debug("Disconnecting RemoteDevice{{}}", nodeId.getValue());
+ public ListenableFuture<Empty> disconnectNode(final NodeId nodeId) {
+ final var nodeName = nodeId.getValue();
+ LOG.debug("Disconnecting RemoteDevice{{}}", nodeName);
final NetconfConnectorDTO connectorDTO = activeConnectors.remove(nodeId);
if (connectorDTO == null) {
return Futures.immediateFailedFuture(
- new IllegalStateException("Unable to disconnect device that is not connected"));
+ new IllegalStateException("Cannot disconnect " + nodeName + " as it is not connected"));
}
connectorDTO.close();
- return Futures.immediateFuture(null);
+ return Futures.immediateFuture(Empty.value());
}
- protected ListenableFuture<NetconfDeviceCapabilities> setupConnection(final NodeId nodeId,
- final Node configNode) {
+ protected ListenableFuture<Empty> setupConnection(final NodeId nodeId, final Node configNode) {
final NetconfNode netconfNode = configNode.augmentation(NetconfNode.class);
final NetconfNodeAugmentedOptional nodeOptional = configNode.augmentation(NetconfNodeAugmentedOptional.class);
final NetconfDeviceCommunicator deviceCommunicator = deviceCommunicatorDTO.getCommunicator();
final NetconfClientSessionListener netconfClientSessionListener = deviceCommunicatorDTO.getSessionListener();
final NetconfReconnectingClientConfiguration clientConfig =
- getClientConfig(netconfClientSessionListener, netconfNode);
- final ListenableFuture<NetconfDeviceCapabilities> future =
+ getClientConfig(netconfClientSessionListener, netconfNode, nodeId);
+ final ListenableFuture<Empty> future =
deviceCommunicator.initializeRemoteConnection(clientDispatcher, clientConfig);
activeConnectors.put(nodeId, deviceCommunicatorDTO);
- Futures.addCallback(future, new FutureCallback<NetconfDeviceCapabilities>() {
+ Futures.addCallback(future, new FutureCallback<>() {
@Override
- public void onSuccess(final NetconfDeviceCapabilities result) {
+ public void onSuccess(final Empty result) {
LOG.debug("Connector for {} started succesfully", nodeId.getValue());
}
protected NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId, final NetconfNode node,
final NetconfNodeAugmentedOptional nodeOptional) {
- final RemoteDeviceId remoteDeviceId = NetconfNodeUtils.toRemoteDeviceId(nodeId, node);
-
+ final var deviceId = NetconfNodeUtils.toRemoteDeviceId(nodeId, node);
final long keepaliveDelay = node.requireKeepaliveDelay().toJava();
- RemoteDeviceHandler<NetconfSessionPreferences> salFacade = createSalFacade(remoteDeviceId);
+
+ final var deviceSalFacade = new NetconfDeviceSalFacade(deviceId, mountPointService, dataBroker, topologyId);
+ // The facade we are going it present to NetconfDevice
+ RemoteDeviceHandler salFacade;
+ final KeepaliveSalFacade keepAliveFacade;
if (keepaliveDelay > 0) {
LOG.info("Adding keepalive facade, for device {}", nodeId);
- salFacade = new KeepaliveSalFacade(remoteDeviceId, salFacade, this.keepaliveExecutor.getExecutor(),
- keepaliveDelay, node.requireDefaultRequestTimeoutMillis().toJava());
+ salFacade = keepAliveFacade = new KeepaliveSalFacade(deviceId, deviceSalFacade,
+ keepaliveExecutor.getExecutor(), keepaliveDelay, node.requireDefaultRequestTimeoutMillis().toJava());
+ } else {
+ salFacade = deviceSalFacade;
+ keepAliveFacade = null;
}
- final RemoteDevice<NetconfSessionPreferences, NetconfMessage, NetconfDeviceCommunicator> device;
+ final RemoteDevice<NetconfDeviceCommunicator> device;
final List<SchemaSourceRegistration<?>> yanglibRegistrations;
if (node.requireSchemaless()) {
- device = new SchemalessNetconfDevice(baseSchemas, remoteDeviceId, salFacade);
+ device = new SchemalessNetconfDevice(baseSchemas, deviceId, salFacade);
yanglibRegistrations = List.of();
} else {
final boolean reconnectOnChangedSchema = node.requireReconnectOnChangedSchema();
- final SchemaResourcesDTO resources = schemaManager.getSchemaResources(node, nodeId.getValue());
+ final SchemaResourcesDTO resources = schemaManager.getSchemaResources(node.getSchemaCacheDirectory(),
+ nodeId.getValue());
device = new NetconfDeviceBuilder()
.setReconnectOnSchemasChange(reconnectOnChangedSchema)
.setSchemaResourcesDTO(resources)
- .setGlobalProcessingExecutor(this.processingExecutor)
- .setId(remoteDeviceId)
+ .setGlobalProcessingExecutor(processingExecutor)
+ .setId(deviceId)
.setSalFacade(salFacade)
.setNode(node)
.setEventExecutor(eventExecutor)
.setDeviceActionFactory(deviceActionFactory)
.setBaseSchemas(baseSchemas)
.build();
- yanglibRegistrations = registerDeviceSchemaSources(remoteDeviceId, node, resources);
+ yanglibRegistrations = registerDeviceSchemaSources(deviceId, node, resources);
}
final Optional<UserPreferences> userCapabilities = getUserCapabilities(node);
final int rpcMessageLimit = node.requireConcurrentRpcLimit().toJava();
if (rpcMessageLimit < 1) {
- LOG.info("Concurrent rpc limit is smaller than 1, no limit will be enforced for device {}", remoteDeviceId);
+ LOG.info("Concurrent rpc limit is smaller than 1, no limit will be enforced for device {}", deviceId);
}
final NetconfDeviceCommunicator netconfDeviceCommunicator =
- userCapabilities.isPresent() ? new NetconfDeviceCommunicator(remoteDeviceId, device,
+ userCapabilities.isPresent() ? new NetconfDeviceCommunicator(deviceId, device,
userCapabilities.get(), rpcMessageLimit)
- : new NetconfDeviceCommunicator(remoteDeviceId, device, rpcMessageLimit);
+ : new NetconfDeviceCommunicator(deviceId, device, rpcMessageLimit);
- if (salFacade instanceof KeepaliveSalFacade) {
- ((KeepaliveSalFacade)salFacade).setListener(netconfDeviceCommunicator);
+ if (keepAliveFacade != null) {
+ keepAliveFacade.setListener(netconfDeviceCommunicator);
}
return new NetconfConnectorDTO(netconfDeviceCommunicator, salFacade, yanglibRegistrations);
}
public NetconfReconnectingClientConfiguration getClientConfig(final NetconfClientSessionListener listener,
- final NetconfNode node) {
+ final NetconfNode node, final NodeId nodeId) {
final ReconnectStrategyFactory sf = new TimedReconnectStrategyFactory(eventExecutor,
node.requireMaxConnectionAttempts().toJava(), node.requireBetweenAttemptsTimeoutMillis().toJava(),
node.requireSleepFactor().decimalValue());
}
return reconnectingClientConfigurationBuilder
+ .withName(nodeId.getValue())
.withAddress(NetconfNodeUtils.toInetSocketAddress(node))
.withConnectionTimeoutMillis(node.requireConnectionTimeoutMillis().toJava())
.withReconnectStrategy(sf.createReconnectStrategy())
}
private AuthenticationHandler getHandlerFromCredentials(final Credentials credentials) {
- if (credentials instanceof org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology
- .rev150114.netconf.node.credentials.credentials.LoginPassword) {
- final org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology
- .rev150114.netconf.node.credentials.credentials.LoginPassword loginPassword
- = (org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology
- .rev150114.netconf.node.credentials.credentials.LoginPassword) credentials;
+ if (credentials
+ instanceof org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology
+ .rev150114.netconf.node.credentials.credentials.LoginPassword loginPassword) {
return new LoginPasswordHandler(loginPassword.getUsername(), loginPassword.getPassword());
}
if (credentials instanceof LoginPwUnencrypted) {
throw new IllegalStateException("Unsupported credential type: " + credentials.getClass());
}
- protected abstract RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(RemoteDeviceId id);
-
private static Optional<UserPreferences> getUserCapabilities(final NetconfNode node) {
// if none of yang-module-capabilities or non-module-capabilities is specified
// just return absent