/* * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.netconf.topology.singleton.impl; import akka.actor.ActorSystem; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import io.netty.util.concurrent.EventExecutor; import java.util.Collection; import java.util.HashMap; import java.util.Map; import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.ActorSystemProvider; import org.opendaylight.controller.config.threadpool.ScheduledThreadPool; import org.opendaylight.controller.config.threadpool.ThreadPool; import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.DataObjectModification; import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier; import org.opendaylight.controller.md.sal.binding.api.DataTreeModification; import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.sal.binding.api.BindingAwareBroker; import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; import org.opendaylight.controller.sal.core.api.Broker; import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider; import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration; import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier; import org.opendaylight.netconf.client.NetconfClientDispatcher; import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService; import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup; import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder; import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils; import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopologyBuilder; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class NetconfTopologyManager implements ClusteredDataTreeChangeListener, NetconfTopologySingletonService, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class); private final Map, NetconfTopologyContext> contexts = new HashMap<>(); private final Map, ClusterSingletonServiceRegistration> clusterRegistrations = new HashMap<>(); private ListenerRegistration dataChangeListenerRegistration; private final DataBroker dataBroker; private final RpcProviderRegistry rpcProviderRegistry; private final ClusterSingletonServiceProvider clusterSingletonServiceProvider; private final BindingAwareBroker bindingAwareBroker; private final ScheduledThreadPool keepaliveExecutor; private final ThreadPool processingExecutor; private final Broker domBroker; private final ActorSystem actorSystem; private final EventExecutor eventExecutor; private final NetconfClientDispatcher clientDispatcher; private final String topologyId; public NetconfTopologyManager(final DataBroker dataBroker, final RpcProviderRegistry rpcProviderRegistry, final ClusterSingletonServiceProvider clusterSingletonServiceProvider, final BindingAwareBroker bindingAwareBroker, final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor, final Broker domBroker, final ActorSystemProvider actorSystemProvider, final EventExecutor eventExecutor, final NetconfClientDispatcher clientDispatcher, final String topologyId) { this.dataBroker = Preconditions.checkNotNull(dataBroker); this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry); this.clusterSingletonServiceProvider = Preconditions.checkNotNull(clusterSingletonServiceProvider); this.bindingAwareBroker = Preconditions.checkNotNull(bindingAwareBroker); this.keepaliveExecutor = Preconditions.checkNotNull(keepaliveExecutor); this.processingExecutor = Preconditions.checkNotNull(processingExecutor); this.domBroker = Preconditions.checkNotNull(domBroker); this.actorSystem = Preconditions.checkNotNull(actorSystemProvider).getActorSystem(); this.eventExecutor = Preconditions.checkNotNull(eventExecutor); this.clientDispatcher = Preconditions.checkNotNull(clientDispatcher); this.topologyId = Preconditions.checkNotNull(topologyId); } // Blueprint init method public void init() { dataChangeListenerRegistration = registerDataTreeChangeListener(topologyId); } @Override public void onDataTreeChanged(@Nonnull final Collection> changes) { for (DataTreeModification change : changes) { final DataObjectModification rootNode = change.getRootNode(); final InstanceIdentifier dataModifIdent = change.getRootPath().getRootIdentifier(); final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier()); switch (rootNode.getModificationType()) { case SUBTREE_MODIFIED: LOG.debug("Config for node {} updated", nodeId); refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter()); break; case WRITE: if (contexts.containsKey(dataModifIdent)) { LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId); refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter()); } else { LOG.debug("Config for node {} created", nodeId); startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter()); } break; case DELETE: LOG.debug("Config for node {} deleted", nodeId); stopNetconfDeviceContext(dataModifIdent); break; default: LOG.warn("Unknown operation for {}.", nodeId); } } } private void refreshNetconfDeviceContext(InstanceIdentifier instanceIdentifier, Node node) { final NetconfTopologyContext context = contexts.get(instanceIdentifier); context.refresh(createSetup(instanceIdentifier, node)); } private void startNetconfDeviceContext(final InstanceIdentifier instanceIdentifier, final Node node) { final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class); Preconditions.checkNotNull(netconfNode); Preconditions.checkNotNull(netconfNode.getHost()); Preconditions.checkNotNull(netconfNode.getHost().getIpAddress()); final ServiceGroupIdentifier serviceGroupIdent = ServiceGroupIdentifier.create(instanceIdentifier.toString()); final NetconfTopologyContext newNetconfTopologyContext = new NetconfTopologyContext(createSetup(instanceIdentifier, node), serviceGroupIdent); final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration = clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext); clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration); contexts.put(instanceIdentifier, newNetconfTopologyContext); } private void stopNetconfDeviceContext(final InstanceIdentifier instanceIdentifier) { if (contexts.containsKey(instanceIdentifier)) { try { clusterRegistrations.get(instanceIdentifier).close(); contexts.get(instanceIdentifier).closeFinal(); } catch (Exception e) { LOG.warn("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier); } contexts.remove(instanceIdentifier); clusterRegistrations.remove(instanceIdentifier); } } @Override public void close() { if (dataChangeListenerRegistration != null) { dataChangeListenerRegistration.close(); dataChangeListenerRegistration = null; } contexts.forEach((instanceIdentifier, netconfTopologyContext) -> { try { netconfTopologyContext.closeFinal(); } catch (Exception e) { LOG.error("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier, e); } }); clusterRegistrations.forEach((instanceIdentifier, clusterSingletonServiceRegistration) -> { try { clusterSingletonServiceRegistration.close(); } catch (Exception e) { LOG.error("Error at unregistering from cluster. InstanceIdentifier: " + instanceIdentifier, e); } }); contexts.clear(); clusterRegistrations.clear(); } private ListenerRegistration registerDataTreeChangeListener(String topologyId) { final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction(); initTopology(wtx, LogicalDatastoreType.CONFIGURATION, topologyId); initTopology(wtx, LogicalDatastoreType.OPERATIONAL, topologyId); Futures.addCallback(wtx.submit(), new FutureCallback() { @Override public void onSuccess(Void result) { LOG.debug("topology initialization successful"); } @Override public void onFailure(@Nonnull Throwable throwable) { LOG.error("Unable to initialize netconf-topology, {}", throwable); } }); LOG.debug("Registering datastore listener"); return dataBroker.registerDataTreeChangeListener( new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this); } private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType, String topologyId) { final NetworkTopology networkTopology = new NetworkTopologyBuilder().build(); final InstanceIdentifier networkTopologyId = InstanceIdentifier.builder(NetworkTopology.class).build(); wtx.merge(datastoreType, networkTopologyId, networkTopology); final Topology topology = new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build(); wtx.merge(datastoreType, networkTopologyId.child(Topology.class, new TopologyKey(new TopologyId(topologyId))), topology); } private NetconfTopologySetup createSetup(final InstanceIdentifier instanceIdentifier, final Node node) { final NetconfTopologySetupBuilder builder = NetconfTopologySetupBuilder.create() .setClusterSingletonServiceProvider(clusterSingletonServiceProvider) .setDataBroker(dataBroker) .setInstanceIdentifier(instanceIdentifier) .setRpcProviderRegistry(rpcProviderRegistry) .setNode(node) .setBindingAwareBroker(bindingAwareBroker) .setActorSystem(actorSystem) .setEventExecutor(eventExecutor) .setDomBroker(domBroker) .setKeepaliveExecutor(keepaliveExecutor) .setProcessingExecutor(processingExecutor) .setTopologyId(topologyId) .setNetconfClientDispatcher(clientDispatcher); return builder.build(); } }