import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.netty.util.concurrent.EventExecutor;
+import java.io.File;
import java.math.BigDecimal;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import javax.annotation.Nonnull;
import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
import org.opendaylight.controller.config.threadpool.ThreadPool;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
import org.opendaylight.netconf.sal.connect.netconf.sal.KeepaliveSalFacade;
+import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceSalFacade;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
import org.opendaylight.netconf.topology.NetconfTopology;
import org.opendaylight.netconf.topology.SchemaRepositoryProvider;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.credentials.Credentials;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.Identifier;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter;
+import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
+import org.opendaylight.yangtools.yang.model.repo.util.FilesystemSchemaSourceCache;
+import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository;
+import org.opendaylight.yangtools.yang.parser.util.TextToASTTransformer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class NetconfTopologyImpl implements NetconfTopology, BindingAwareProvider, Provider, AutoCloseable {
+public class NetconfTopologyImpl implements NetconfTopology, DataTreeChangeListener<Node>, BindingAwareProvider, Provider, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyImpl.class);
+ private static final long DEFAULT_REQUEST_TIMEOUT_MILIS = 60000L;
+ private static final int DEFAULT_KEEPALIVE_DELAY = 0;
+ private static final boolean DEFAULT_RECONNECT_ON_CHANGED_SCHEMA = false;
+ private static final int DEFAULT_MAX_CONNECTION_ATTEMPTS = 0;
+ private static final int DEFAULT_BETWEEN_ATTEMPTS_TIMEOUT_MILLIS = 2000;
+ private static final BigDecimal DEFAULT_SLEEP_FACTOR = new BigDecimal(1.5);
+
+ private static FilesystemSchemaSourceCache<YangTextSchemaSource> CACHE = null;
+ //keep track of already initialized repositories to avoid adding redundant listeners
+ private static final Set<SchemaRepository> INITIALIZED_SCHEMA_REPOSITORIES = new HashSet<>();
+
private final String topologyId;
+ private final boolean listenForConfigChanges;
private final NetconfClientDispatcher clientDispatcher;
private final BindingAwareBroker bindingAwareBroker;
private final Broker domBroker;
private final EventExecutor eventExecutor;
private final ScheduledThreadPool keepaliveExecutor;
private final ThreadPool processingExecutor;
- private final SchemaRepositoryProvider sharedSchemaRepository;
+ private final SharedSchemaRepository sharedSchemaRepository;
- private SchemaSourceRegistry schemaSourceRegistry = null;
+ private SchemaSourceRegistry schemaRegistry = null;
private SchemaContextFactory schemaContextFactory = null;
private DOMMountPointService mountPointService = null;
private DataBroker dataBroker = null;
private final HashMap<NodeId, NetconfConnectorDTO> activeConnectors = new HashMap<>();
- public NetconfTopologyImpl(final String topologyId, final NetconfClientDispatcher clientDispatcher,
+ private ListenerRegistration<NetconfTopologyImpl> listenerRegistration = null;
+
+ public NetconfTopologyImpl(final String topologyId, final boolean listenForConfigChanges, final NetconfClientDispatcher clientDispatcher,
final BindingAwareBroker bindingAwareBroker, final Broker domBroker,
final EventExecutor eventExecutor, final ScheduledThreadPool keepaliveExecutor,
- final ThreadPool processingExecutor, final SchemaRepositoryProvider sharedSchemaRepository) {
+ final ThreadPool processingExecutor, final SchemaRepositoryProvider schemaRepositoryProvider) {
this.topologyId = topologyId;
+ this.listenForConfigChanges = listenForConfigChanges;
this.clientDispatcher = clientDispatcher;
this.bindingAwareBroker = bindingAwareBroker;
this.domBroker = domBroker;
this.eventExecutor = eventExecutor;
this.keepaliveExecutor = keepaliveExecutor;
this.processingExecutor = processingExecutor;
- this.sharedSchemaRepository = sharedSchemaRepository;
+ this.sharedSchemaRepository = schemaRepositoryProvider.getSharedSchemaRepository();
+
+ initFilesystemSchemaSourceCache(sharedSchemaRepository);
registerToSal(this, this);
}
connectorDTO.getCommunicator().disconnect();
}
activeConnectors.clear();
+
+ if (listenerRegistration != null) {
+ listenerRegistration.close();
+ listenerRegistration = null;
+ }
}
@Override
@Override
public ListenableFuture<NetconfDeviceCapabilities> connectNode(NodeId nodeId, Node configNode) {
+ LOG.info("Connecting RemoteDevice{{}} , with config {}", nodeId, configNode);
return setupConnection(nodeId, configNode);
}
@Override
public ListenableFuture<Void> disconnectNode(NodeId nodeId) {
+ LOG.debug("Disconnecting RemoteDevice{{}}", nodeId.getValue());
if (!activeConnectors.containsKey(nodeId)) {
return Futures.immediateFailedFuture(new IllegalStateException("Unable to disconnect device that is not connected"));
}
// retrieve connection, and disconnect it
- activeConnectors.remove(nodeId).getCommunicator().disconnect();
+ final NetconfConnectorDTO connectorDTO = activeConnectors.remove(nodeId);
+ connectorDTO.getCommunicator().close();
+ connectorDTO.getFacade().close();
return Futures.immediateFuture(null);
}
@Override
public void registerConnectionStatusListener(NodeId node, RemoteDeviceHandler<NetconfSessionPreferences> listener) {
- activeConnectors.get(node).getMountPointFacade().registerConnectionStatusListener(listener);
+ if (activeConnectors.get(node).getFacade() instanceof TopologyMountPointFacade) {
+ ((TopologyMountPointFacade) activeConnectors.get(node).getFacade()).registerConnectionStatusListener(listener);
+ } else {
+ LOG.warn("Unable to register a connection status listener on a regular salFacade, reconfigure for topology mountpoint facade");
+ }
}
private ListenableFuture<NetconfDeviceCapabilities> setupConnection(final NodeId nodeId,
final Node configNode) {
final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
+ Preconditions.checkNotNull(netconfNode.getHost());
+ Preconditions.checkNotNull(netconfNode.getPort());
+ Preconditions.checkNotNull(netconfNode.isTcpOnly());
+
final NetconfConnectorDTO deviceCommunicatorDTO = createDeviceCommunicator(nodeId, netconfNode);
final NetconfDeviceCommunicator deviceCommunicator = deviceCommunicatorDTO.getCommunicator();
final NetconfReconnectingClientConfiguration clientConfig = getClientConfig(deviceCommunicator, netconfNode);
private NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId,
final NetconfNode node) {
+ //setup default values since default value is not supported yet in mdsal
+ // TODO remove this when mdsal starts supporting default values
+ final Long defaultRequestTimeoutMillis = node.getDefaultRequestTimeoutMillis() == null ? DEFAULT_REQUEST_TIMEOUT_MILIS : node.getDefaultRequestTimeoutMillis();
+ final Long keepaliveDelay = node.getKeepaliveDelay() == null ? DEFAULT_KEEPALIVE_DELAY : node.getKeepaliveDelay();
+ final Boolean reconnectOnChangedSchema = node.isReconnectOnChangedSchema() == null ? DEFAULT_RECONNECT_ON_CHANGED_SCHEMA : node.isReconnectOnChangedSchema();
+
IpAddress ipAddress = node.getHost().getIpAddress();
InetSocketAddress address = new InetSocketAddress(ipAddress.getIpv4Address() != null ?
ipAddress.getIpv4Address().getValue() : ipAddress.getIpv6Address().getValue(),
RemoteDeviceId remoteDeviceId = new RemoteDeviceId(nodeId.getValue(), address);
// we might need to create a new SalFacade to maintain backwards compatibility with special case loopback connection
- TopologyMountPointFacade mountPointFacade =
- new TopologyMountPointFacade(remoteDeviceId, domBroker, bindingAwareBroker, node.getDefaultRequestTimeoutMillis());
- RemoteDeviceHandler<NetconfSessionPreferences> salFacade = mountPointFacade;
- if (node.getKeepaliveDelay() > 0) {
- salFacade = new KeepaliveSalFacade(remoteDeviceId, mountPointFacade, keepaliveExecutor.getExecutor(), node.getKeepaliveDelay());
+// TopologyMountPointFacade mountPointFacade =
+// new TopologyMountPointFacade(remoteDeviceId, domBroker, bindingAwareBroker, defaultRequestTimeoutMillis);
+ RemoteDeviceHandler<NetconfSessionPreferences> salFacade =
+ new NetconfDeviceSalFacade(remoteDeviceId, domBroker, bindingAwareBroker, defaultRequestTimeoutMillis);
+// new TopologyMountPointFacade(remoteDeviceId, domBroker, bindingAwareBroker, defaultRequestTimeoutMillis);
+
+ if (keepaliveDelay > 0) {
+ LOG.warn("Adding keepalive facade, for device {}", nodeId);
+ salFacade = new KeepaliveSalFacade(remoteDeviceId, salFacade, keepaliveExecutor.getExecutor(), keepaliveDelay);
}
NetconfDevice.SchemaResourcesDTO schemaResourcesDTO =
- new NetconfDevice.SchemaResourcesDTO(schemaSourceRegistry, schemaContextFactory, new NetconfStateSchemas.NetconfStateSchemasResolverImpl());
+ new NetconfDevice.SchemaResourcesDTO(schemaRegistry, schemaContextFactory, new NetconfStateSchemas.NetconfStateSchemasResolverImpl());
NetconfDevice device = new NetconfDevice(schemaResourcesDTO, remoteDeviceId, salFacade,
- processingExecutor.getExecutor(), node.isReconnectOnChangedSchema());
+ processingExecutor.getExecutor(), reconnectOnChangedSchema);
- return new NetconfConnectorDTO(new NetconfDeviceCommunicator(remoteDeviceId, device), mountPointFacade);
+ return new NetconfConnectorDTO(new NetconfDeviceCommunicator(remoteDeviceId, device), salFacade);
}
public NetconfReconnectingClientConfiguration getClientConfig(final NetconfDeviceCommunicator listener, NetconfNode node) {
+
+ //setup default values since default value is not supported yet in mdsal
+ // TODO remove this when mdsal starts supporting default values
+ final long clientConnectionTimeoutMillis = node.getDefaultRequestTimeoutMillis() == null ? DEFAULT_REQUEST_TIMEOUT_MILIS : node.getDefaultRequestTimeoutMillis();
+ final long maxConnectionAttempts = node.getMaxConnectionAttempts() == null ? DEFAULT_MAX_CONNECTION_ATTEMPTS : node.getMaxConnectionAttempts();
+ final int betweenAttemptsTimeoutMillis = node.getBetweenAttemptsTimeoutMillis() == null ? DEFAULT_BETWEEN_ATTEMPTS_TIMEOUT_MILLIS : node.getBetweenAttemptsTimeoutMillis();
+ final BigDecimal sleepFactor = node.getSleepFactor() == null ? DEFAULT_SLEEP_FACTOR : node.getSleepFactor();
+
final InetSocketAddress socketAddress = getSocketAddress(node.getHost(), node.getPort().getValue());
- final long clientConnectionTimeoutMillis = node.getDefaultRequestTimeoutMillis();
final ReconnectStrategyFactory sf = new TimedReconnectStrategyFactory(eventExecutor,
- node.getMaxConnectionAttempts(), node.getBetweenAttemptsTimeoutMillis(), node.getSleepFactor());
+ maxConnectionAttempts, betweenAttemptsTimeoutMillis, sleepFactor);
final ReconnectStrategy strategy = sf.createReconnectStrategy();
final AuthenticationHandler authHandler;
@Override
public void onSessionInitiated(ProviderContext session) {
dataBroker = session.getSALService(DataBroker.class);
+
+ if (listenForConfigChanges) {
+ LOG.warn("Registering datastore listener");
+ listenerRegistration =
+ dataBroker.registerDataTreeChangeListener(
+ new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, createTopologyId(topologyId).child(Node.class)), this);
+ }
+ }
+
+ @Override
+ public void onDataTreeChanged(@Nonnull Collection<DataTreeModification<Node>> collection) {
+ for (DataTreeModification<Node> change : collection) {
+ final DataObjectModification<Node> rootNode = change.getRootNode();
+ switch (rootNode.getModificationType()) {
+ case SUBTREE_MODIFIED:
+ LOG.debug("Config for node {} updated", getNodeId(rootNode.getIdentifier()));
+ disconnectNode(getNodeId(rootNode.getIdentifier()));
+ connectNode(getNodeId(rootNode.getIdentifier()), rootNode.getDataAfter());
+ break;
+ case WRITE:
+ LOG.debug("Config for node {} created", getNodeId(rootNode.getIdentifier()));
+ if (activeConnectors.containsKey(getNodeId(rootNode.getIdentifier()))) {
+ LOG.warn("RemoteDevice{{}} was already configured, reconfiguring..");
+ disconnectNode(getNodeId(rootNode.getIdentifier()));
+ }
+ connectNode(getNodeId(rootNode.getIdentifier()), rootNode.getDataAfter());
+ break;
+ case DELETE:
+ LOG.debug("Config for node {} deleted", getNodeId(rootNode.getIdentifier()));
+ disconnectNode(getNodeId(rootNode.getIdentifier()));
+ break;
+ }
+ }
+ }
+
+ private void initFilesystemSchemaSourceCache(SharedSchemaRepository repository) {
+ LOG.warn("Schema repository used: {}", repository.getIdentifier());
+ if (CACHE == null) {
+ CACHE = new FilesystemSchemaSourceCache<>(repository, YangTextSchemaSource.class, new File("cache/schema"));
+ }
+ if (!INITIALIZED_SCHEMA_REPOSITORIES.contains(repository)) {
+ repository.registerSchemaSourceListener(CACHE);
+ repository.registerSchemaSourceListener(TextToASTTransformer.create(repository, repository));
+ INITIALIZED_SCHEMA_REPOSITORIES.add(repository);
+ }
+ setSchemaRegistry(repository);
+ setSchemaContextFactory(repository.createSchemaContextFactory(SchemaSourceFilter.ALWAYS_ACCEPT));
+ }
+
+ public void setSchemaRegistry(final SchemaSourceRegistry schemaRegistry) {
+ this.schemaRegistry = schemaRegistry;
+ }
+
+ public void setSchemaContextFactory(final SchemaContextFactory schemaContextFactory) {
+ this.schemaContextFactory = schemaContextFactory;
+ }
+
+ //TODO this needs to be an util method, since netconf clustering uses this aswell
+ /**
+ * Determines the Netconf Node Node ID, given the node's instance
+ * identifier.
+ *
+ * @param pathArgument Node's path arument
+ * @return NodeId for the node
+ */
+ private NodeId getNodeId(final PathArgument pathArgument) {
+ if (pathArgument instanceof InstanceIdentifier.IdentifiableItem<?, ?>) {
+
+ final Identifier key = ((InstanceIdentifier.IdentifiableItem) pathArgument).getKey();
+ if(key instanceof NodeKey) {
+ return ((NodeKey) key).getNodeId();
+ }
+ }
+ throw new IllegalStateException("Unable to create NodeId from: " + pathArgument);
+ }
+
+ private static InstanceIdentifier<Topology> createTopologyId(final String topologyId) {
+ final InstanceIdentifier<NetworkTopology> networkTopology = InstanceIdentifier.create(NetworkTopology.class);
+ return networkTopology.child(Topology.class, new TopologyKey(new TopologyId(topologyId)));
}
private static final class NetconfConnectorDTO {
private final NetconfDeviceCommunicator communicator;
- private final TopologyMountPointFacade mountPointFacade;
+ private final RemoteDeviceHandler<NetconfSessionPreferences> facade;
- private NetconfConnectorDTO(final NetconfDeviceCommunicator communicator, final TopologyMountPointFacade mountPointFacade) {
+ private NetconfConnectorDTO(final NetconfDeviceCommunicator communicator, final RemoteDeviceHandler<NetconfSessionPreferences> facade) {
this.communicator = communicator;
- this.mountPointFacade = mountPointFacade;
+ this.facade = facade;
}
public NetconfDeviceCommunicator getCommunicator() {
return communicator;
}
- public TopologyMountPointFacade getMountPointFacade() {
- return mountPointFacade;
+ public RemoteDeviceHandler<NetconfSessionPreferences> getFacade() {
+ return facade;
}
}