* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.netconf.topology.singleton.impl;
import akka.actor.ActorSystem;
import akka.util.Timeout;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import io.netty.util.concurrent.EventExecutor;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
+import org.opendaylight.aaa.encrypt.AAAEncryptionService;
import org.opendaylight.controller.cluster.ActorSystemProvider;
import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
import org.opendaylight.controller.config.threadpool.ThreadPool;
-import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
-import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.DataObjectModification;
+import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
+import org.opendaylight.mdsal.binding.api.DataTreeModification;
+import org.opendaylight.mdsal.binding.api.WriteTransaction;
+import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMMountPointService;
+import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
- private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new HashMap<>();
+ private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new ConcurrentHashMap<>();
private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
- clusterRegistrations = new HashMap<>();
+ clusterRegistrations = new ConcurrentHashMap<>();
private final DataBroker dataBroker;
- private final RpcProviderRegistry rpcProviderRegistry;
+ private final DOMRpcProviderService rpcProviderRegistry;
private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
- private final ScheduledThreadPool keepaliveExecutor;
- private final ThreadPool processingExecutor;
+ private final ScheduledExecutorService keepaliveExecutor;
+ private final ListeningExecutorService processingExecutor;
private final ActorSystem actorSystem;
private final EventExecutor eventExecutor;
private final NetconfClientDispatcher clientDispatcher;
private final String topologyId;
private final Duration writeTxIdleTimeout;
private final DOMMountPointService mountPointService;
-
+ private final AAAEncryptionService encryptionService;
private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
+ private String privateKeyPath;
+ private String privateKeyPassphrase;
- public NetconfTopologyManager(final DataBroker dataBroker, final RpcProviderRegistry rpcProviderRegistry,
+ public NetconfTopologyManager(final DataBroker dataBroker, final DOMRpcProviderService rpcProviderRegistry,
final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
final ActorSystemProvider actorSystemProvider,
final EventExecutor eventExecutor, final NetconfClientDispatcher clientDispatcher,
final String topologyId, final Config config,
- final DOMMountPointService mountPointService) {
+ final DOMMountPointService mountPointService,
+ final AAAEncryptionService encryptionService) {
+
this.dataBroker = Preconditions.checkNotNull(dataBroker);
this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
this.clusterSingletonServiceProvider = Preconditions.checkNotNull(clusterSingletonServiceProvider);
- this.keepaliveExecutor = Preconditions.checkNotNull(keepaliveExecutor);
- this.processingExecutor = Preconditions.checkNotNull(processingExecutor);
+ this.keepaliveExecutor = keepaliveExecutor.getExecutor();
+ this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor.getExecutor());
this.actorSystem = Preconditions.checkNotNull(actorSystemProvider).getActorSystem();
this.eventExecutor = Preconditions.checkNotNull(eventExecutor);
this.clientDispatcher = Preconditions.checkNotNull(clientDispatcher);
this.topologyId = Preconditions.checkNotNull(topologyId);
this.writeTxIdleTimeout = Duration.apply(config.getWriteTransactionIdleTimeout(), TimeUnit.SECONDS);
this.mountPointService = mountPointService;
+ this.encryptionService = Preconditions.checkNotNull(encryptionService);
+
}
// Blueprint init method
public void init() {
- dataChangeListenerRegistration = registerDataTreeChangeListener(topologyId);
+ dataChangeListenerRegistration = registerDataTreeChangeListener();
}
@Override
// TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
@SuppressWarnings("checkstyle:IllegalCatch")
private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
- final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
+ final NetconfNode netconfNode = node.augmentation(NetconfNode.class);
Preconditions.checkNotNull(netconfNode);
Preconditions.checkNotNull(netconfNode.getHost());
Preconditions.checkNotNull(netconfNode.getHost().getIpAddress());
final ServiceGroupIdentifier serviceGroupIdent =
ServiceGroupIdentifier.create(instanceIdentifier.toString());
- final NetconfTopologyContext newNetconfTopologyContext =
- new NetconfTopologyContext(createSetup(instanceIdentifier, node), serviceGroupIdent,
- actorResponseWaitTime, mountPointService);
+ final NetconfTopologyContext newNetconfTopologyContext = newNetconfTopologyContext(
+ createSetup(instanceIdentifier, node), serviceGroupIdent, actorResponseWaitTime);
int tries = 3;
while (true) {
if (--tries <= 0) {
LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
newNetconfTopologyContext, e);
- close();
+ close(newNetconfTopologyContext);
break;
}
}
}
-
}
- @SuppressWarnings("checkstyle:IllegalCatch")
private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
- if (contexts.containsKey(instanceIdentifier)) {
- try {
- clusterRegistrations.get(instanceIdentifier).close();
- contexts.get(instanceIdentifier).closeFinal();
- } catch (final Exception e) {
- LOG.warn("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier);
- }
- contexts.remove(instanceIdentifier);
- clusterRegistrations.remove(instanceIdentifier);
+ final NetconfTopologyContext netconfTopologyContext = contexts.remove(instanceIdentifier);
+ if (netconfTopologyContext != null) {
+ close(clusterRegistrations.remove(instanceIdentifier));
+ close(netconfTopologyContext);
}
}
- @SuppressWarnings("checkstyle:IllegalCatch")
+ @VisibleForTesting
+ protected NetconfTopologyContext newNetconfTopologyContext(final NetconfTopologySetup setup,
+ final ServiceGroupIdentifier serviceGroupIdent, final Timeout actorResponseWaitTime) {
+ return new NetconfTopologyContext(setup, serviceGroupIdent, actorResponseWaitTime, mountPointService);
+ }
+
@Override
public void close() {
if (dataChangeListenerRegistration != null) {
dataChangeListenerRegistration.close();
dataChangeListenerRegistration = null;
}
- contexts.forEach((instanceIdentifier, netconfTopologyContext) -> {
- try {
- netconfTopologyContext.closeFinal();
- } catch (final Exception e) {
- LOG.error("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier, e);
- }
- });
- clusterRegistrations.forEach((instanceIdentifier, clusterSingletonServiceRegistration) -> {
- try {
- clusterSingletonServiceRegistration.close();
- } catch (final Exception e) {
- LOG.error("Error at unregistering from cluster. InstanceIdentifier: " + instanceIdentifier, e);
- }
- });
+
+ contexts.values().forEach(NetconfTopologyManager::close);
+ clusterRegistrations.values().forEach(NetconfTopologyManager::close);
+
contexts.clear();
clusterRegistrations.clear();
}
- private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener(final String topologyId) {
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private static void close(final AutoCloseable closeable) {
+ try {
+ closeable.close();
+ } catch (Exception e) {
+ LOG.warn("Error closing {}", closeable, e);
+ }
+ }
+
+ /**
+ * Sets the private key path from location specified in configuration file using blueprint.
+ */
+ public void setPrivateKeyPath(final String privateKeyPath) {
+ this.privateKeyPath = privateKeyPath;
+ }
+
+ /**
+ * Sets the private key passphrase from location specified in configuration file using blueprint.
+ */
+ public void setPrivateKeyPassphrase(final String privateKeyPassphrase) {
+ this.privateKeyPassphrase = privateKeyPassphrase;
+ }
+
+ private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener() {
final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
- initTopology(wtx, LogicalDatastoreType.CONFIGURATION, topologyId);
- initTopology(wtx, LogicalDatastoreType.OPERATIONAL, topologyId);
- Futures.addCallback(wtx.submit(), new FutureCallback<Void>() {
+ initTopology(wtx, LogicalDatastoreType.CONFIGURATION);
+ initTopology(wtx, LogicalDatastoreType.OPERATIONAL);
+ wtx.commit().addCallback(new FutureCallback<CommitInfo>() {
@Override
- public void onSuccess(final Void result) {
+ public void onSuccess(final CommitInfo result) {
LOG.debug("topology initialization successful");
}
@Override
public void onFailure(@Nonnull final Throwable throwable) {
- LOG.error("Unable to initialize netconf-topology, {}", throwable);
+ LOG.error("Unable to initialize netconf-topology", throwable);
}
- });
+ }, MoreExecutors.directExecutor());
LOG.debug("Registering datastore listener");
- return dataBroker.registerDataTreeChangeListener(
- new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION,
- NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
+ return dataBroker.registerDataTreeChangeListener(DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION,
+ NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
}
- private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType,
- final String topologyId) {
+ private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType) {
final NetworkTopology networkTopology = new NetworkTopologyBuilder().build();
final InstanceIdentifier<NetworkTopology> networkTopologyId =
InstanceIdentifier.builder(NetworkTopology.class).build();
.setTopologyId(topologyId)
.setNetconfClientDispatcher(clientDispatcher)
.setSchemaResourceDTO(NetconfTopologyUtils.setupSchemaCacheDTO(node))
- .setIdleTimeout(writeTxIdleTimeout);
+ .setIdleTimeout(writeTxIdleTimeout)
+ .setPrivateKeyPath(privateKeyPath)
+ .setPrivateKeyPassphrase(privateKeyPassphrase)
+ .setEncryptionService(encryptionService);
return builder.build();
}