* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.netconf.topology.singleton.impl;
import akka.actor.ActorSystem;
import akka.util.Timeout;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import io.netty.util.concurrent.EventExecutor;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
+import org.opendaylight.aaa.encrypt.AAAEncryptionService;
import org.opendaylight.controller.cluster.ActorSystemProvider;
import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
import org.opendaylight.controller.config.threadpool.ThreadPool;
-import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
-import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
-import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.DataObjectModification;
+import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
+import org.opendaylight.mdsal.binding.api.DataTreeModification;
+import org.opendaylight.mdsal.binding.api.WriteTransaction;
+import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMMountPointService;
+import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.Config;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopologyBuilder;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
- private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new HashMap<>();
+ private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new ConcurrentHashMap<>();
private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
- clusterRegistrations = new HashMap<>();
-
- private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
+ clusterRegistrations = new ConcurrentHashMap<>();
private final DataBroker dataBroker;
- private final RpcProviderRegistry rpcProviderRegistry;
+ private final DOMRpcProviderService rpcProviderRegistry;
private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
- private final BindingAwareBroker bindingAwareBroker;
- private final ScheduledThreadPool keepaliveExecutor;
- private final ThreadPool processingExecutor;
- private final Broker domBroker;
+ private final ScheduledExecutorService keepaliveExecutor;
+ private final ListeningExecutorService processingExecutor;
private final ActorSystem actorSystem;
private final EventExecutor eventExecutor;
private final NetconfClientDispatcher clientDispatcher;
private final String topologyId;
+ private final Duration writeTxIdleTimeout;
+ private final DOMMountPointService mountPointService;
+ private final AAAEncryptionService encryptionService;
+ private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
+ private String privateKeyPath;
+ private String privateKeyPassphrase;
- public NetconfTopologyManager(final DataBroker dataBroker, final RpcProviderRegistry rpcProviderRegistry,
+ public NetconfTopologyManager(final DataBroker dataBroker, final DOMRpcProviderService rpcProviderRegistry,
final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
- final BindingAwareBroker bindingAwareBroker,
final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
- final Broker domBroker, final ActorSystemProvider actorSystemProvider, final EventExecutor eventExecutor,
- final NetconfClientDispatcher clientDispatcher, final String topologyId) {
+ final ActorSystemProvider actorSystemProvider,
+ final EventExecutor eventExecutor, final NetconfClientDispatcher clientDispatcher,
+ final String topologyId, final Config config,
+ final DOMMountPointService mountPointService,
+ final AAAEncryptionService encryptionService) {
+
this.dataBroker = Preconditions.checkNotNull(dataBroker);
this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
this.clusterSingletonServiceProvider = Preconditions.checkNotNull(clusterSingletonServiceProvider);
- this.bindingAwareBroker = Preconditions.checkNotNull(bindingAwareBroker);
- this.keepaliveExecutor = Preconditions.checkNotNull(keepaliveExecutor);
- this.processingExecutor = Preconditions.checkNotNull(processingExecutor);
- this.domBroker = Preconditions.checkNotNull(domBroker);
+ this.keepaliveExecutor = keepaliveExecutor.getExecutor();
+ this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor.getExecutor());
this.actorSystem = Preconditions.checkNotNull(actorSystemProvider).getActorSystem();
this.eventExecutor = Preconditions.checkNotNull(eventExecutor);
this.clientDispatcher = Preconditions.checkNotNull(clientDispatcher);
this.topologyId = Preconditions.checkNotNull(topologyId);
+ this.writeTxIdleTimeout = Duration.apply(config.getWriteTransactionIdleTimeout(), TimeUnit.SECONDS);
+ this.mountPointService = mountPointService;
+ this.encryptionService = Preconditions.checkNotNull(encryptionService);
+
}
// Blueprint init method
public void init() {
- dataChangeListenerRegistration = registerDataTreeChangeListener(topologyId);
+ dataChangeListenerRegistration = registerDataTreeChangeListener();
}
@Override
context.refresh(createSetup(instanceIdentifier, node));
}
+ // ClusterSingletonServiceRegistration registerClusterSingletonService method throws a Runtime exception if there
+ // are problems with registration and client has to deal with it. Only thing we can do if this error occurs is to
+ // retry registration several times and log the error.
+ // TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
+ @SuppressWarnings("checkstyle:IllegalCatch")
private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
- final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
+ final NetconfNode netconfNode = node.augmentation(NetconfNode.class);
Preconditions.checkNotNull(netconfNode);
Preconditions.checkNotNull(netconfNode.getHost());
Preconditions.checkNotNull(netconfNode.getHost().getIpAddress());
final ServiceGroupIdentifier serviceGroupIdent =
ServiceGroupIdentifier.create(instanceIdentifier.toString());
- final NetconfTopologyContext newNetconfTopologyContext =
- new NetconfTopologyContext(createSetup(instanceIdentifier, node), serviceGroupIdent,
- actorResponseWaitTime);
+ final NetconfTopologyContext newNetconfTopologyContext = newNetconfTopologyContext(
+ createSetup(instanceIdentifier, node), serviceGroupIdent, actorResponseWaitTime);
- final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
- clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
+ int tries = 3;
+ while (true) {
+ try {
+ final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
+ clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
+ clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
+ contexts.put(instanceIdentifier, newNetconfTopologyContext);
+ break;
+ } catch (final RuntimeException e) {
+ LOG.warn("Unable to register cluster singleton service {}, trying again", newNetconfTopologyContext, e);
- clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
- contexts.put(instanceIdentifier, newNetconfTopologyContext);
+ if (--tries <= 0) {
+ LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
+ newNetconfTopologyContext, e);
+ close(newNetconfTopologyContext);
+ break;
+ }
+ }
+ }
}
private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
- if (contexts.containsKey(instanceIdentifier)) {
- try {
- clusterRegistrations.get(instanceIdentifier).close();
- contexts.get(instanceIdentifier).closeFinal();
- } catch (final Exception e) {
- LOG.warn("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier);
- }
- contexts.remove(instanceIdentifier);
- clusterRegistrations.remove(instanceIdentifier);
+ final NetconfTopologyContext netconfTopologyContext = contexts.remove(instanceIdentifier);
+ if (netconfTopologyContext != null) {
+ close(clusterRegistrations.remove(instanceIdentifier));
+ close(netconfTopologyContext);
}
}
+ @VisibleForTesting
+ protected NetconfTopologyContext newNetconfTopologyContext(final NetconfTopologySetup setup,
+ final ServiceGroupIdentifier serviceGroupIdent, final Timeout actorResponseWaitTime) {
+ return new NetconfTopologyContext(setup, serviceGroupIdent, actorResponseWaitTime, mountPointService);
+ }
+
@Override
public void close() {
if (dataChangeListenerRegistration != null) {
dataChangeListenerRegistration.close();
dataChangeListenerRegistration = null;
}
- contexts.forEach((instanceIdentifier, netconfTopologyContext) -> {
- try {
- netconfTopologyContext.closeFinal();
- } catch (final Exception e) {
- LOG.error("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier, e);
- }
- });
- clusterRegistrations.forEach((instanceIdentifier, clusterSingletonServiceRegistration) -> {
- try {
- clusterSingletonServiceRegistration.close();
- } catch (final Exception e) {
- LOG.error("Error at unregistering from cluster. InstanceIdentifier: " + instanceIdentifier, e);
- }
- });
+
+ contexts.values().forEach(NetconfTopologyManager::close);
+ clusterRegistrations.values().forEach(NetconfTopologyManager::close);
+
contexts.clear();
clusterRegistrations.clear();
}
- private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener(final String topologyId) {
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private static void close(final AutoCloseable closeable) {
+ try {
+ closeable.close();
+ } catch (Exception e) {
+ LOG.warn("Error closing {}", closeable, e);
+ }
+ }
+
+ /**
+ * Sets the private key path from location specified in configuration file using blueprint.
+ */
+ public void setPrivateKeyPath(final String privateKeyPath) {
+ this.privateKeyPath = privateKeyPath;
+ }
+
+ /**
+ * Sets the private key passphrase from location specified in configuration file using blueprint.
+ */
+ public void setPrivateKeyPassphrase(final String privateKeyPassphrase) {
+ this.privateKeyPassphrase = privateKeyPassphrase;
+ }
+
+ private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener() {
final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
- initTopology(wtx, LogicalDatastoreType.CONFIGURATION, topologyId);
- initTopology(wtx, LogicalDatastoreType.OPERATIONAL, topologyId);
- Futures.addCallback(wtx.submit(), new FutureCallback<Void>() {
+ initTopology(wtx, LogicalDatastoreType.CONFIGURATION);
+ initTopology(wtx, LogicalDatastoreType.OPERATIONAL);
+ wtx.commit().addCallback(new FutureCallback<CommitInfo>() {
@Override
- public void onSuccess(final Void result) {
+ public void onSuccess(final CommitInfo result) {
LOG.debug("topology initialization successful");
}
@Override
public void onFailure(@Nonnull final Throwable throwable) {
- LOG.error("Unable to initialize netconf-topology, {}", throwable);
+ LOG.error("Unable to initialize netconf-topology", throwable);
}
- });
+ }, MoreExecutors.directExecutor());
LOG.debug("Registering datastore listener");
- return dataBroker.registerDataTreeChangeListener(
- new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION,
- NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
+ return dataBroker.registerDataTreeChangeListener(DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION,
+ NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
}
- private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType, final String topologyId) {
+ private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType) {
final NetworkTopology networkTopology = new NetworkTopologyBuilder().build();
final InstanceIdentifier<NetworkTopology> networkTopologyId =
InstanceIdentifier.builder(NetworkTopology.class).build();
.setInstanceIdentifier(instanceIdentifier)
.setRpcProviderRegistry(rpcProviderRegistry)
.setNode(node)
- .setBindingAwareBroker(bindingAwareBroker)
.setActorSystem(actorSystem)
.setEventExecutor(eventExecutor)
- .setDomBroker(domBroker)
.setKeepaliveExecutor(keepaliveExecutor)
.setProcessingExecutor(processingExecutor)
.setTopologyId(topologyId)
- .setNetconfClientDispatcher(clientDispatcher);
+ .setNetconfClientDispatcher(clientDispatcher)
+ .setSchemaResourceDTO(NetconfTopologyUtils.setupSchemaCacheDTO(node))
+ .setIdleTimeout(writeTxIdleTimeout)
+ .setPrivateKeyPath(privateKeyPath)
+ .setPrivateKeyPassphrase(privateKeyPassphrase)
+ .setEncryptionService(encryptionService);
return builder.build();
}