* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.netconf.topology.singleton.impl;
+import static java.util.Objects.requireNonNull;
+
import akka.actor.ActorSystem;
import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.util.concurrent.EventExecutor;
+import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
+import java.util.concurrent.ScheduledExecutorService;
import org.opendaylight.aaa.encrypt.AAAEncryptionService;
import org.opendaylight.controller.cluster.ActorSystemProvider;
import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
import org.opendaylight.controller.config.threadpool.ThreadPool;
-import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
-import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.DataObjectModification;
+import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
+import org.opendaylight.mdsal.binding.api.DataTreeModification;
+import org.opendaylight.mdsal.binding.api.RpcProviderService;
+import org.opendaylight.mdsal.binding.api.WriteTransaction;
+import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMActionProviderService;
+import org.opendaylight.mdsal.dom.api.DOMMountPointService;
+import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
import org.opendaylight.netconf.client.NetconfClientDispatcher;
+import org.opendaylight.netconf.sal.connect.api.DeviceActionFactory;
+import org.opendaylight.netconf.sal.connect.api.RemoteDeviceId;
+import org.opendaylight.netconf.sal.connect.api.SchemaResourceManager;
+import org.opendaylight.netconf.sal.connect.netconf.schema.mapping.BaseNetconfSchemas;
import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.netconf.topology.spi.NetconfNodeUtils;
+import org.opendaylight.netconf.topology.spi.NetconfTopologyRPCProvider;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNodeTopologyService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.Config;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopologyBuilder;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.Duration;
public class NetconfTopologyManager
implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
clusterRegistrations = new ConcurrentHashMap<>();
+ private final BaseNetconfSchemas baseSchemas;
private final DataBroker dataBroker;
- private final RpcProviderRegistry rpcProviderRegistry;
+ private final DOMRpcProviderService rpcProviderRegistry;
+ private final DOMActionProviderService actionProviderRegistry;
private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
- private final ScheduledThreadPool keepaliveExecutor;
- private final ThreadPool processingExecutor;
+ private final ScheduledExecutorService keepaliveExecutor;
+ private final ListeningExecutorService processingExecutor;
private final ActorSystem actorSystem;
private final EventExecutor eventExecutor;
private final NetconfClientDispatcher clientDispatcher;
private final Duration writeTxIdleTimeout;
private final DOMMountPointService mountPointService;
private final AAAEncryptionService encryptionService;
+ private final RpcProviderService rpcProviderService;
+ private final DeviceActionFactory deviceActionFactory;
+ private final SchemaResourceManager resourceManager;
+
private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
+ private Registration rpcReg;
private String privateKeyPath;
private String privateKeyPassphrase;
- public NetconfTopologyManager(final DataBroker dataBroker, final RpcProviderRegistry rpcProviderRegistry,
+ public NetconfTopologyManager(final BaseNetconfSchemas baseSchemas, final DataBroker dataBroker,
+ final DOMRpcProviderService rpcProviderRegistry,
+ final DOMActionProviderService actionProviderService,
final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
final ActorSystemProvider actorSystemProvider,
final EventExecutor eventExecutor, final NetconfClientDispatcher clientDispatcher,
final String topologyId, final Config config,
final DOMMountPointService mountPointService,
- final AAAEncryptionService encryptionService) {
-
- this.dataBroker = Preconditions.checkNotNull(dataBroker);
- this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
- this.clusterSingletonServiceProvider = Preconditions.checkNotNull(clusterSingletonServiceProvider);
- this.keepaliveExecutor = Preconditions.checkNotNull(keepaliveExecutor);
- this.processingExecutor = Preconditions.checkNotNull(processingExecutor);
- this.actorSystem = Preconditions.checkNotNull(actorSystemProvider).getActorSystem();
- this.eventExecutor = Preconditions.checkNotNull(eventExecutor);
- this.clientDispatcher = Preconditions.checkNotNull(clientDispatcher);
- this.topologyId = Preconditions.checkNotNull(topologyId);
- this.writeTxIdleTimeout = Duration.apply(config.getWriteTransactionIdleTimeout(), TimeUnit.SECONDS);
+ final AAAEncryptionService encryptionService,
+ final RpcProviderService rpcProviderService,
+ final DeviceActionFactory deviceActionFactory,
+ final SchemaResourceManager resourceManager) {
+ this.baseSchemas = requireNonNull(baseSchemas);
+ this.dataBroker = requireNonNull(dataBroker);
+ this.rpcProviderRegistry = requireNonNull(rpcProviderRegistry);
+ actionProviderRegistry = requireNonNull(actionProviderService);
+ this.clusterSingletonServiceProvider = requireNonNull(clusterSingletonServiceProvider);
+ this.keepaliveExecutor = keepaliveExecutor.getExecutor();
+ this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor.getExecutor());
+ actorSystem = requireNonNull(actorSystemProvider).getActorSystem();
+ this.eventExecutor = requireNonNull(eventExecutor);
+ this.clientDispatcher = requireNonNull(clientDispatcher);
+ this.topologyId = requireNonNull(topologyId);
+ writeTxIdleTimeout = Duration.ofSeconds(config.getWriteTransactionIdleTimeout().toJava());
this.mountPointService = mountPointService;
- this.encryptionService = Preconditions.checkNotNull(encryptionService);
-
+ this.encryptionService = requireNonNull(encryptionService);
+ this.rpcProviderService = requireNonNull(rpcProviderService);
+ this.deviceActionFactory = requireNonNull(deviceActionFactory);
+ this.resourceManager = requireNonNull(resourceManager);
}
// Blueprint init method
public void init() {
dataChangeListenerRegistration = registerDataTreeChangeListener();
+ rpcReg = rpcProviderService.registerRpcImplementation(NetconfNodeTopologyService.class,
+ new NetconfTopologyRPCProvider(dataBroker, encryptionService, topologyId));
}
@Override
- public void onDataTreeChanged(@Nonnull final Collection<DataTreeModification<Node>> changes) {
+ public void onDataTreeChanged(final Collection<DataTreeModification<Node>> changes) {
for (final DataTreeModification<Node> change : changes) {
final DataObjectModification<Node> rootNode = change.getRootNode();
final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
// TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
@SuppressWarnings("checkstyle:IllegalCatch")
private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
- final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
- Preconditions.checkNotNull(netconfNode);
- Preconditions.checkNotNull(netconfNode.getHost());
- Preconditions.checkNotNull(netconfNode.getHost().getIpAddress());
+ final NetconfNode netconfNode = requireNonNull(node.augmentation(NetconfNode.class));
- final Timeout actorResponseWaitTime = new Timeout(Duration.create(netconfNode.getActorResponseWaitTime(),
- "seconds"));
+ final Timeout actorResponseWaitTime = Timeout.create(
+ Duration.ofSeconds(netconfNode.getActorResponseWaitTime().toJava()));
final ServiceGroupIdentifier serviceGroupIdent =
ServiceGroupIdentifier.create(instanceIdentifier.toString());
final NetconfTopologyContext newNetconfTopologyContext = newNetconfTopologyContext(
- createSetup(instanceIdentifier, node), serviceGroupIdent, actorResponseWaitTime);
+ createSetup(instanceIdentifier, node), serviceGroupIdent, actorResponseWaitTime, deviceActionFactory);
int tries = 3;
while (true) {
}
@VisibleForTesting
- protected NetconfTopologyContext newNetconfTopologyContext(NetconfTopologySetup setup,
- ServiceGroupIdentifier serviceGroupIdent, Timeout actorResponseWaitTime) {
- return new NetconfTopologyContext(setup, serviceGroupIdent, actorResponseWaitTime, mountPointService);
+ protected NetconfTopologyContext newNetconfTopologyContext(final NetconfTopologySetup setup,
+ final ServiceGroupIdentifier serviceGroupIdent, final Timeout actorResponseWaitTime,
+ final DeviceActionFactory deviceActionFact) {
+ return new NetconfTopologyContext(setup, serviceGroupIdent, actorResponseWaitTime, mountPointService,
+ deviceActionFact);
}
@Override
public void close() {
+ if (rpcReg != null) {
+ rpcReg.close();
+ rpcReg = null;
+ }
if (dataChangeListenerRegistration != null) {
dataChangeListenerRegistration.close();
dataChangeListenerRegistration = null;
}
- contexts.values().forEach(netconfTopologyContext -> close(netconfTopologyContext));
-
- clusterRegistrations.values().forEach(
- clusterSingletonServiceRegistration -> close(clusterSingletonServiceRegistration));
+ contexts.values().forEach(NetconfTopologyManager::close);
+ clusterRegistrations.values().forEach(NetconfTopologyManager::close);
contexts.clear();
clusterRegistrations.clear();
}
@SuppressWarnings("checkstyle:IllegalCatch")
- private void close(AutoCloseable closeable) {
+ private static void close(final AutoCloseable closeable) {
try {
closeable.close();
} catch (Exception e) {
/**
* Sets the private key path from location specified in configuration file using blueprint.
*/
- public void setPrivateKeyPath(String privateKeyPath) {
+ public void setPrivateKeyPath(final String privateKeyPath) {
this.privateKeyPath = privateKeyPath;
}
/**
* Sets the private key passphrase from location specified in configuration file using blueprint.
*/
- public void setPrivateKeyPassphrase(String privateKeyPassphrase) {
+ public void setPrivateKeyPassphrase(final String privateKeyPassphrase) {
this.privateKeyPassphrase = privateKeyPassphrase;
}
private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener() {
final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
- initTopology(wtx, LogicalDatastoreType.CONFIGURATION);
- initTopology(wtx, LogicalDatastoreType.OPERATIONAL);
- Futures.addCallback(wtx.submit(), new FutureCallback<Void>() {
+ // FIXME: how does this play out with lifecycle? In a cluster, someone needs to ensure this call happens, but
+ // also we need to to make sure config -> oper is properly synchronized. Non-clustered case relies on
+ // oper being transient and perhaps on a put() instead, how do we handle that in the clustered case?
+ wtx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.builder(NetworkTopology.class)
+ .child(Topology.class, new TopologyKey(new TopologyId(topologyId)))
+ .build(), new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build());
+ wtx.commit().addCallback(new FutureCallback<CommitInfo>() {
@Override
- public void onSuccess(final Void result) {
+ public void onSuccess(final CommitInfo result) {
LOG.debug("topology initialization successful");
}
@Override
- public void onFailure(@Nonnull final Throwable throwable) {
- LOG.error("Unable to initialize netconf-topology, {}", throwable);
+ public void onFailure(final Throwable throwable) {
+ LOG.error("Unable to initialize netconf-topology", throwable);
}
}, MoreExecutors.directExecutor());
LOG.debug("Registering datastore listener");
- return dataBroker.registerDataTreeChangeListener(
- new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION,
- NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
- }
-
- private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType) {
- final NetworkTopology networkTopology = new NetworkTopologyBuilder().build();
- final InstanceIdentifier<NetworkTopology> networkTopologyId =
- InstanceIdentifier.builder(NetworkTopology.class).build();
- wtx.merge(datastoreType, networkTopologyId, networkTopology);
- final Topology topology = new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build();
- wtx.merge(datastoreType, networkTopologyId.child(Topology.class,
- new TopologyKey(new TopologyId(topologyId))), topology);
+ return dataBroker.registerDataTreeChangeListener(DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION,
+ NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
}
private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
- final NetconfTopologySetupBuilder builder = NetconfTopologySetupBuilder.create()
+ final NetconfNode netconfNode = node.augmentation(NetconfNode.class);
+ final RemoteDeviceId deviceId = NetconfNodeUtils.toRemoteDeviceId(node.getNodeId(), netconfNode);
+
+ return NetconfTopologySetupBuilder.create()
.setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
+ .setBaseSchemas(baseSchemas)
.setDataBroker(dataBroker)
.setInstanceIdentifier(instanceIdentifier)
.setRpcProviderRegistry(rpcProviderRegistry)
+ .setActionProviderRegistry(actionProviderRegistry)
.setNode(node)
.setActorSystem(actorSystem)
.setEventExecutor(eventExecutor)
.setProcessingExecutor(processingExecutor)
.setTopologyId(topologyId)
.setNetconfClientDispatcher(clientDispatcher)
- .setSchemaResourceDTO(NetconfTopologyUtils.setupSchemaCacheDTO(node))
+ .setSchemaResourceDTO(resourceManager.getSchemaResources(netconfNode.getSchemaCacheDirectory(),
+ deviceId))
.setIdleTimeout(writeTxIdleTimeout)
.setPrivateKeyPath(privateKeyPath)
.setPrivateKeyPassphrase(privateKeyPassphrase)
- .setEncryptionService(encryptionService);
-
- return builder.build();
+ .setEncryptionService(encryptionService)
+ .build();
}
}