import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.util.concurrent.EventExecutor;
+import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import org.opendaylight.aaa.encrypt.AAAEncryptionService;
import org.opendaylight.controller.cluster.ActorSystemProvider;
import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
import org.opendaylight.mdsal.binding.api.DataObjectModification;
import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
import org.opendaylight.mdsal.binding.api.DataTreeModification;
+import org.opendaylight.mdsal.binding.api.RpcProviderService;
import org.opendaylight.mdsal.binding.api.WriteTransaction;
import org.opendaylight.mdsal.common.api.CommitInfo;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
import org.opendaylight.netconf.client.NetconfClientDispatcher;
import org.opendaylight.netconf.sal.connect.api.DeviceActionFactory;
+import org.opendaylight.netconf.sal.connect.api.RemoteDeviceId;
import org.opendaylight.netconf.sal.connect.api.SchemaResourceManager;
import org.opendaylight.netconf.sal.connect.netconf.schema.mapping.BaseNetconfSchemas;
-import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.netconf.topology.spi.NetconfNodeUtils;
+import org.opendaylight.netconf.topology.spi.NetconfTopologyRPCProvider;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNodeTopologyService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.Config;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopologyBuilder;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.Duration;
public class NetconfTopologyManager
implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
private final Duration writeTxIdleTimeout;
private final DOMMountPointService mountPointService;
private final AAAEncryptionService encryptionService;
+ private final RpcProviderService rpcProviderService;
private final DeviceActionFactory deviceActionFactory;
private final SchemaResourceManager resourceManager;
private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
+ private Registration rpcReg;
private String privateKeyPath;
private String privateKeyPassphrase;
final String topologyId, final Config config,
final DOMMountPointService mountPointService,
final AAAEncryptionService encryptionService,
+ final RpcProviderService rpcProviderService,
final DeviceActionFactory deviceActionFactory,
final SchemaResourceManager resourceManager) {
this.baseSchemas = requireNonNull(baseSchemas);
this.dataBroker = requireNonNull(dataBroker);
this.rpcProviderRegistry = requireNonNull(rpcProviderRegistry);
- this.actionProviderRegistry = requireNonNull(actionProviderService);
+ actionProviderRegistry = requireNonNull(actionProviderService);
this.clusterSingletonServiceProvider = requireNonNull(clusterSingletonServiceProvider);
this.keepaliveExecutor = keepaliveExecutor.getExecutor();
this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor.getExecutor());
- this.actorSystem = requireNonNull(actorSystemProvider).getActorSystem();
+ actorSystem = requireNonNull(actorSystemProvider).getActorSystem();
this.eventExecutor = requireNonNull(eventExecutor);
this.clientDispatcher = requireNonNull(clientDispatcher);
this.topologyId = requireNonNull(topologyId);
- this.writeTxIdleTimeout = Duration.apply(config.getWriteTransactionIdleTimeout().toJava(), TimeUnit.SECONDS);
+ writeTxIdleTimeout = Duration.ofSeconds(config.getWriteTransactionIdleTimeout().toJava());
this.mountPointService = mountPointService;
this.encryptionService = requireNonNull(encryptionService);
+ this.rpcProviderService = requireNonNull(rpcProviderService);
this.deviceActionFactory = requireNonNull(deviceActionFactory);
this.resourceManager = requireNonNull(resourceManager);
}
// Blueprint init method
public void init() {
dataChangeListenerRegistration = registerDataTreeChangeListener();
+ rpcReg = rpcProviderService.registerRpcImplementation(NetconfNodeTopologyService.class,
+ new NetconfTopologyRPCProvider(dataBroker, encryptionService, topologyId));
}
@Override
// TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
@SuppressWarnings("checkstyle:IllegalCatch")
private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
- final NetconfNode netconfNode = node.augmentation(NetconfNode.class);
- requireNonNull(netconfNode);
- requireNonNull(netconfNode.getHost());
- requireNonNull(netconfNode.getHost().getIpAddress());
+ final NetconfNode netconfNode = requireNonNull(node.augmentation(NetconfNode.class));
- final Timeout actorResponseWaitTime = new Timeout(
- Duration.create(netconfNode.getActorResponseWaitTime().toJava(), "seconds"));
+ final Timeout actorResponseWaitTime = Timeout.create(
+ Duration.ofSeconds(netconfNode.getActorResponseWaitTime().toJava()));
final ServiceGroupIdentifier serviceGroupIdent =
ServiceGroupIdentifier.create(instanceIdentifier.toString());
@Override
public void close() {
+ if (rpcReg != null) {
+ rpcReg.close();
+ rpcReg = null;
+ }
if (dataChangeListenerRegistration != null) {
dataChangeListenerRegistration.close();
dataChangeListenerRegistration = null;
private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener() {
final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
- initTopology(wtx, LogicalDatastoreType.CONFIGURATION);
- initTopology(wtx, LogicalDatastoreType.OPERATIONAL);
+ // FIXME: how does this play out with lifecycle? In a cluster, someone needs to ensure this call happens, but
+ // also we need to to make sure config -> oper is properly synchronized. Non-clustered case relies on
+ // oper being transient and perhaps on a put() instead, how do we handle that in the clustered case?
+ wtx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.builder(NetworkTopology.class)
+ .child(Topology.class, new TopologyKey(new TopologyId(topologyId)))
+ .build(), new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build());
wtx.commit().addCallback(new FutureCallback<CommitInfo>() {
@Override
public void onSuccess(final CommitInfo result) {
NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
}
- private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType) {
- final NetworkTopology networkTopology = new NetworkTopologyBuilder().build();
- final InstanceIdentifier<NetworkTopology> networkTopologyId =
- InstanceIdentifier.builder(NetworkTopology.class).build();
- wtx.merge(datastoreType, networkTopologyId, networkTopology);
- final Topology topology = new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build();
- wtx.merge(datastoreType, networkTopologyId.child(Topology.class,
- new TopologyKey(new TopologyId(topologyId))), topology);
- }
-
private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
final NetconfNode netconfNode = node.augmentation(NetconfNode.class);
- final RemoteDeviceId deviceId = NetconfTopologyUtils.createRemoteDeviceId(node.getNodeId(), netconfNode);
+ final RemoteDeviceId deviceId = NetconfNodeUtils.toRemoteDeviceId(node.getNodeId(), netconfNode);
- final NetconfTopologySetupBuilder builder = NetconfTopologySetupBuilder.create()
+ return NetconfTopologySetupBuilder.create()
.setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
.setBaseSchemas(baseSchemas)
.setDataBroker(dataBroker)
.setProcessingExecutor(processingExecutor)
.setTopologyId(topologyId)
.setNetconfClientDispatcher(clientDispatcher)
- .setSchemaResourceDTO(resourceManager.getSchemaResources(netconfNode, deviceId))
+ .setSchemaResourceDTO(resourceManager.getSchemaResources(netconfNode.getSchemaCacheDirectory(),
+ deviceId))
.setIdleTimeout(writeTxIdleTimeout)
.setPrivateKeyPath(privateKeyPath)
.setPrivateKeyPassphrase(privateKeyPassphrase)
- .setEncryptionService(encryptionService);
-
- return builder.build();
+ .setEncryptionService(encryptionService)
+ .build();
}
}