X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=netconf%2Fnetconf-topology-singleton%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Ftopology%2Fsingleton%2Fimpl%2FNetconfTopologyManager.java;h=7e9d230d9d451c8e966068c68b2f64b55a4206e4;hb=b4e31a7c0cf7ecfdc4160a379e26ded9fedf9ecb;hp=0ca6325f6293308ff8a11582173d713c4b33770a;hpb=0514dcdfb6aab34bd345c15e36ea3dd7fa1ce053;p=netconf.git diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfTopologyManager.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfTopologyManager.java index 0ca6325f62..7e9d230d9d 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfTopologyManager.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfTopologyManager.java @@ -5,37 +5,45 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.netconf.topology.singleton.impl; +import static java.util.Objects.requireNonNull; + import akka.actor.ActorSystem; import akka.util.Timeout; -import com.google.common.base.Preconditions; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import io.netty.util.concurrent.EventExecutor; import java.util.Collection; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import javax.annotation.Nonnull; +import org.opendaylight.aaa.encrypt.AAAEncryptionService; import org.opendaylight.controller.cluster.ActorSystemProvider; import org.opendaylight.controller.config.threadpool.ScheduledThreadPool; import org.opendaylight.controller.config.threadpool.ThreadPool; -import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener; -import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.DataObjectModification; -import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier; -import org.opendaylight.controller.md.sal.binding.api.DataTreeModification; -import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.sal.binding.api.BindingAwareBroker; -import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; -import org.opendaylight.controller.sal.core.api.Broker; +import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener; +import org.opendaylight.mdsal.binding.api.DataBroker; +import org.opendaylight.mdsal.binding.api.DataObjectModification; +import org.opendaylight.mdsal.binding.api.DataTreeIdentifier; +import org.opendaylight.mdsal.binding.api.DataTreeModification; +import org.opendaylight.mdsal.binding.api.WriteTransaction; +import org.opendaylight.mdsal.common.api.CommitInfo; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.dom.api.DOMActionProviderService; +import org.opendaylight.mdsal.dom.api.DOMMountPointService; +import org.opendaylight.mdsal.dom.api.DOMRpcProviderService; import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider; import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration; import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier; import org.opendaylight.netconf.client.NetconfClientDispatcher; +import org.opendaylight.netconf.sal.connect.api.DeviceActionFactory; +import org.opendaylight.netconf.sal.connect.api.SchemaResourceManager; +import org.opendaylight.netconf.sal.connect.netconf.schema.mapping.BaseNetconfSchemas; +import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService; import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup; import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder; @@ -61,53 +69,68 @@ public class NetconfTopologyManager private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class); - private final Map, NetconfTopologyContext> contexts = new HashMap<>(); + private final Map, NetconfTopologyContext> contexts = new ConcurrentHashMap<>(); private final Map, ClusterSingletonServiceRegistration> - clusterRegistrations = new HashMap<>(); - - private ListenerRegistration dataChangeListenerRegistration; + clusterRegistrations = new ConcurrentHashMap<>(); + private final BaseNetconfSchemas baseSchemas; private final DataBroker dataBroker; - private final RpcProviderRegistry rpcProviderRegistry; + private final DOMRpcProviderService rpcProviderRegistry; + private final DOMActionProviderService actionProviderRegistry; private final ClusterSingletonServiceProvider clusterSingletonServiceProvider; - private final BindingAwareBroker bindingAwareBroker; - private final ScheduledThreadPool keepaliveExecutor; - private final ThreadPool processingExecutor; - private final Broker domBroker; + private final ScheduledExecutorService keepaliveExecutor; + private final ListeningExecutorService processingExecutor; private final ActorSystem actorSystem; private final EventExecutor eventExecutor; private final NetconfClientDispatcher clientDispatcher; private final String topologyId; private final Duration writeTxIdleTimeout; + private final DOMMountPointService mountPointService; + private final AAAEncryptionService encryptionService; + private final DeviceActionFactory deviceActionFactory; + private final SchemaResourceManager resourceManager; - public NetconfTopologyManager(final DataBroker dataBroker, final RpcProviderRegistry rpcProviderRegistry, + private ListenerRegistration dataChangeListenerRegistration; + private String privateKeyPath; + private String privateKeyPassphrase; + + public NetconfTopologyManager(final BaseNetconfSchemas baseSchemas, final DataBroker dataBroker, + final DOMRpcProviderService rpcProviderRegistry, + final DOMActionProviderService actionProviderService, final ClusterSingletonServiceProvider clusterSingletonServiceProvider, - final BindingAwareBroker bindingAwareBroker, final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor, - final Broker domBroker, final ActorSystemProvider actorSystemProvider, + final ActorSystemProvider actorSystemProvider, final EventExecutor eventExecutor, final NetconfClientDispatcher clientDispatcher, - final String topologyId, final Config config) { - this.dataBroker = Preconditions.checkNotNull(dataBroker); - this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry); - this.clusterSingletonServiceProvider = Preconditions.checkNotNull(clusterSingletonServiceProvider); - this.bindingAwareBroker = Preconditions.checkNotNull(bindingAwareBroker); - this.keepaliveExecutor = Preconditions.checkNotNull(keepaliveExecutor); - this.processingExecutor = Preconditions.checkNotNull(processingExecutor); - this.domBroker = Preconditions.checkNotNull(domBroker); - this.actorSystem = Preconditions.checkNotNull(actorSystemProvider).getActorSystem(); - this.eventExecutor = Preconditions.checkNotNull(eventExecutor); - this.clientDispatcher = Preconditions.checkNotNull(clientDispatcher); - this.topologyId = Preconditions.checkNotNull(topologyId); - this.writeTxIdleTimeout = Duration.apply(config.getWriteTransactionIdleTimeout(), TimeUnit.SECONDS); + final String topologyId, final Config config, + final DOMMountPointService mountPointService, + final AAAEncryptionService encryptionService, + final DeviceActionFactory deviceActionFactory, + final SchemaResourceManager resourceManager) { + this.baseSchemas = requireNonNull(baseSchemas); + this.dataBroker = requireNonNull(dataBroker); + this.rpcProviderRegistry = requireNonNull(rpcProviderRegistry); + this.actionProviderRegistry = requireNonNull(actionProviderService); + this.clusterSingletonServiceProvider = requireNonNull(clusterSingletonServiceProvider); + this.keepaliveExecutor = keepaliveExecutor.getExecutor(); + this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor.getExecutor()); + this.actorSystem = requireNonNull(actorSystemProvider).getActorSystem(); + this.eventExecutor = requireNonNull(eventExecutor); + this.clientDispatcher = requireNonNull(clientDispatcher); + this.topologyId = requireNonNull(topologyId); + this.writeTxIdleTimeout = Duration.apply(config.getWriteTransactionIdleTimeout().toJava(), TimeUnit.SECONDS); + this.mountPointService = mountPointService; + this.encryptionService = requireNonNull(encryptionService); + this.deviceActionFactory = requireNonNull(deviceActionFactory); + this.resourceManager = requireNonNull(resourceManager); } // Blueprint init method public void init() { - dataChangeListenerRegistration = registerDataTreeChangeListener(topologyId); + dataChangeListenerRegistration = registerDataTreeChangeListener(); } @Override - public void onDataTreeChanged(@Nonnull final Collection> changes) { + public void onDataTreeChanged(final Collection> changes) { for (final DataTreeModification change : changes) { final DataObjectModification rootNode = change.getRootNode(); final InstanceIdentifier dataModifIdent = change.getRootPath().getRootIdentifier(); @@ -141,89 +164,122 @@ public class NetconfTopologyManager context.refresh(createSetup(instanceIdentifier, node)); } + // ClusterSingletonServiceRegistration registerClusterSingletonService method throws a Runtime exception if there + // are problems with registration and client has to deal with it. Only thing we can do if this error occurs is to + // retry registration several times and log the error. + // TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider + @SuppressWarnings("checkstyle:IllegalCatch") private void startNetconfDeviceContext(final InstanceIdentifier instanceIdentifier, final Node node) { - final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class); - Preconditions.checkNotNull(netconfNode); - Preconditions.checkNotNull(netconfNode.getHost()); - Preconditions.checkNotNull(netconfNode.getHost().getIpAddress()); + final NetconfNode netconfNode = node.augmentation(NetconfNode.class); + requireNonNull(netconfNode); + requireNonNull(netconfNode.getHost()); + requireNonNull(netconfNode.getHost().getIpAddress()); - final Timeout actorResponseWaitTime = new Timeout(Duration.create(netconfNode.getActorResponseWaitTime(), - "seconds")); + final Timeout actorResponseWaitTime = new Timeout( + Duration.create(netconfNode.getActorResponseWaitTime().toJava(), "seconds")); final ServiceGroupIdentifier serviceGroupIdent = ServiceGroupIdentifier.create(instanceIdentifier.toString()); - final NetconfTopologyContext newNetconfTopologyContext = - new NetconfTopologyContext(createSetup(instanceIdentifier, node), serviceGroupIdent, - actorResponseWaitTime); + final NetconfTopologyContext newNetconfTopologyContext = newNetconfTopologyContext( + createSetup(instanceIdentifier, node), serviceGroupIdent, actorResponseWaitTime, deviceActionFactory); - final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration = - clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext); + int tries = 3; + while (true) { + try { + final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration = + clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext); + clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration); + contexts.put(instanceIdentifier, newNetconfTopologyContext); + break; + } catch (final RuntimeException e) { + LOG.warn("Unable to register cluster singleton service {}, trying again", newNetconfTopologyContext, e); - clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration); - contexts.put(instanceIdentifier, newNetconfTopologyContext); + if (--tries <= 0) { + LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context", + newNetconfTopologyContext, e); + close(newNetconfTopologyContext); + break; + } + } + } } private void stopNetconfDeviceContext(final InstanceIdentifier instanceIdentifier) { - if (contexts.containsKey(instanceIdentifier)) { - try { - clusterRegistrations.get(instanceIdentifier).close(); - contexts.get(instanceIdentifier).closeFinal(); - } catch (final Exception e) { - LOG.warn("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier); - } - contexts.remove(instanceIdentifier); - clusterRegistrations.remove(instanceIdentifier); + final NetconfTopologyContext netconfTopologyContext = contexts.remove(instanceIdentifier); + if (netconfTopologyContext != null) { + close(clusterRegistrations.remove(instanceIdentifier)); + close(netconfTopologyContext); } } + @VisibleForTesting + protected NetconfTopologyContext newNetconfTopologyContext(final NetconfTopologySetup setup, + final ServiceGroupIdentifier serviceGroupIdent, final Timeout actorResponseWaitTime, + final DeviceActionFactory deviceActionFact) { + return new NetconfTopologyContext(setup, serviceGroupIdent, actorResponseWaitTime, mountPointService, + deviceActionFact); + } + @Override public void close() { if (dataChangeListenerRegistration != null) { dataChangeListenerRegistration.close(); dataChangeListenerRegistration = null; } - contexts.forEach((instanceIdentifier, netconfTopologyContext) -> { - try { - netconfTopologyContext.closeFinal(); - } catch (final Exception e) { - LOG.error("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier, e); - } - }); - clusterRegistrations.forEach((instanceIdentifier, clusterSingletonServiceRegistration) -> { - try { - clusterSingletonServiceRegistration.close(); - } catch (final Exception e) { - LOG.error("Error at unregistering from cluster. InstanceIdentifier: " + instanceIdentifier, e); - } - }); + + contexts.values().forEach(NetconfTopologyManager::close); + clusterRegistrations.values().forEach(NetconfTopologyManager::close); + contexts.clear(); clusterRegistrations.clear(); } - private ListenerRegistration registerDataTreeChangeListener(final String topologyId) { + @SuppressWarnings("checkstyle:IllegalCatch") + private static void close(final AutoCloseable closeable) { + try { + closeable.close(); + } catch (Exception e) { + LOG.warn("Error closing {}", closeable, e); + } + } + + /** + * Sets the private key path from location specified in configuration file using blueprint. + */ + public void setPrivateKeyPath(final String privateKeyPath) { + this.privateKeyPath = privateKeyPath; + } + + /** + * Sets the private key passphrase from location specified in configuration file using blueprint. + */ + public void setPrivateKeyPassphrase(final String privateKeyPassphrase) { + this.privateKeyPassphrase = privateKeyPassphrase; + } + + private ListenerRegistration registerDataTreeChangeListener() { final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction(); - initTopology(wtx, LogicalDatastoreType.CONFIGURATION, topologyId); - initTopology(wtx, LogicalDatastoreType.OPERATIONAL, topologyId); - Futures.addCallback(wtx.submit(), new FutureCallback() { + initTopology(wtx, LogicalDatastoreType.CONFIGURATION); + initTopology(wtx, LogicalDatastoreType.OPERATIONAL); + wtx.commit().addCallback(new FutureCallback() { @Override - public void onSuccess(final Void result) { + public void onSuccess(final CommitInfo result) { LOG.debug("topology initialization successful"); } @Override - public void onFailure(@Nonnull final Throwable throwable) { - LOG.error("Unable to initialize netconf-topology, {}", throwable); + public void onFailure(final Throwable throwable) { + LOG.error("Unable to initialize netconf-topology", throwable); } - }); + }, MoreExecutors.directExecutor()); LOG.debug("Registering datastore listener"); - return dataBroker.registerDataTreeChangeListener( - new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, - NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this); + return dataBroker.registerDataTreeChangeListener(DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION, + NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this); } - private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType, final String topologyId) { + private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType) { final NetworkTopology networkTopology = new NetworkTopologyBuilder().build(); final InstanceIdentifier networkTopologyId = InstanceIdentifier.builder(NetworkTopology.class).build(); @@ -234,22 +290,28 @@ public class NetconfTopologyManager } private NetconfTopologySetup createSetup(final InstanceIdentifier instanceIdentifier, final Node node) { + final NetconfNode netconfNode = node.augmentation(NetconfNode.class); + final RemoteDeviceId deviceId = NetconfTopologyUtils.createRemoteDeviceId(node.getNodeId(), netconfNode); + final NetconfTopologySetupBuilder builder = NetconfTopologySetupBuilder.create() .setClusterSingletonServiceProvider(clusterSingletonServiceProvider) + .setBaseSchemas(baseSchemas) .setDataBroker(dataBroker) .setInstanceIdentifier(instanceIdentifier) .setRpcProviderRegistry(rpcProviderRegistry) + .setActionProviderRegistry(actionProviderRegistry) .setNode(node) - .setBindingAwareBroker(bindingAwareBroker) .setActorSystem(actorSystem) .setEventExecutor(eventExecutor) - .setDomBroker(domBroker) .setKeepaliveExecutor(keepaliveExecutor) .setProcessingExecutor(processingExecutor) .setTopologyId(topologyId) .setNetconfClientDispatcher(clientDispatcher) - .setSchemaResourceDTO(NetconfTopologyUtils.setupSchemaCacheDTO(node)) - .setIdleTimeout(writeTxIdleTimeout); + .setSchemaResourceDTO(resourceManager.getSchemaResources(netconfNode, deviceId)) + .setIdleTimeout(writeTxIdleTimeout) + .setPrivateKeyPath(privateKeyPath) + .setPrivateKeyPassphrase(privateKeyPassphrase) + .setEncryptionService(encryptionService); return builder.build(); }