/** * Copyright (c) 2014 André Martins, Colin Dixon, Evan Zeller and others. All * rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.l2switch.hosttracker.plugin.internal; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.l2switch.hosttracker.plugin.inventory.Host; import org.opendaylight.l2switch.hosttracker.plugin.util.Utilities; import static org.opendaylight.l2switch.hosttracker.plugin.util.Utilities.TOPOLOGY_NAME; import org.opendaylight.yang.gen.v1.urn.opendaylight.address.tracker.rev140617.AddressCapableNodeConnector; import org.opendaylight.yang.gen.v1.urn.opendaylight.address.tracker.rev140617.address.node.connector.Addresses; import org.opendaylight.yang.gen.v1.urn.opendaylight.host.tracker.rev140624.HostId; import org.opendaylight.yang.gen.v1.urn.opendaylight.host.tracker.rev140624.HostNode; import org.opendaylight.yang.gen.v1.urn.opendaylight.host.tracker.rev140624.host.AttachmentPointsBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TpId; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Link; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class HostTrackerImpl implements DataChangeListener { private static final int CPUS = Runtime.getRuntime().availableProcessors(); private static final Logger log = LoggerFactory.getLogger(HostTrackerImpl.class); private final DataBroker dataService; private final ConcurrentClusterAwareHostHashMap hosts; private ListenerRegistration addrsNodeListerRegistration; private ListenerRegistration hostNodeListerRegistration; public HostTrackerImpl(DataBroker dataService) { Preconditions.checkNotNull(dataService, "dataBrokerService should not be null."); this.dataService = dataService; this.hosts = new ConcurrentClusterAwareHostHashMap<>(dataService); } public void registerAsDataChangeListener() { InstanceIdentifier addrCapableNodeConnectors = // InstanceIdentifier.builder(Nodes.class) // .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class) // .child(NodeConnector.class) // .augmentation(AddressCapableNodeConnector.class)// .child(Addresses.class).build(); this.addrsNodeListerRegistration = dataService.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, addrCapableNodeConnectors, this, DataChangeScope.SUBTREE); InstanceIdentifier hostNodes = InstanceIdentifier.builder(NetworkTopology.class)// .child(Topology.class, new TopologyKey(new TopologyId(Utilities.TOPOLOGY_NAME)))// .child(Node.class) .augmentation(HostNode.class).build(); this.hostNodeListerRegistration = dataService.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, hostNodes, this, DataChangeScope.SUBTREE); InstanceIdentifier lIID = InstanceIdentifier.builder(NetworkTopology.class)// .child(Topology.class, new TopologyKey(new TopologyId(TOPOLOGY_NAME)))// .child(Link.class).build(); this.addrsNodeListerRegistration = dataService.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, lIID, this, DataChangeScope.BASE); //Processing addresses that existed before we register as a data change listener. // ReadOnlyTransaction newReadOnlyTransaction = dataService.newReadOnlyTransaction(); // InstanceIdentifier iinc = addrCapableNodeConnectors.firstIdentifierOf(NodeConnector.class); // InstanceIdentifier iin// // = addrCapableNodeConnectors.firstIdentifierOf(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class); // ListenableFuture> dataFuture = newReadOnlyTransaction.read(LogicalDatastoreType.OPERATIONAL, iinc); // try { // NodeConnector get = dataFuture.get().get(); // log.trace("test "+get); // } catch (InterruptedException | ExecutionException ex) { // java.util.logging.Logger.getLogger(HostTrackerImpl.class.getName()).log(Level.SEVERE, null, ex); // } // Futures.addCallback(dataFuture, new FutureCallback>() { // @Override // public void onSuccess(final Optional result) { // if (result.isPresent()) { // log.trace("Processing NEW NODE? " + result.get().getId().getValue()); //// processHost(result, dataObject, node); // } // } // // @Override // public void onFailure(Throwable arg0) { // } // }); } @Override public void onDataChanged(final AsyncDataChangeEvent, DataObject> change) { ExecutorService exec = Executors.newFixedThreadPool(CPUS); exec.submit(new Runnable() { @Override public void run() { if (change == null) { log.info("In onDataChanged: No processing done as change even is null."); return; } Map, DataObject> updatedData = change.getUpdatedData(); Map, DataObject> createdData = change.getCreatedData(); Map, DataObject> originalData = change.getOriginalData(); Set> deletedData = change.getRemovedPaths(); for (InstanceIdentifier iid : deletedData) { if (iid.getTargetType().equals(Node.class)) { Node node = ((Node) originalData.get(iid)); InstanceIdentifier iiN = (InstanceIdentifier) iid; HostNode hostNode = node.getAugmentation(HostNode.class); if (hostNode != null) { synchronized (hosts) { try { hosts.removeLocally(iiN); } catch (ClassCastException ex) { } } } } else if (iid.getTargetType().equals(Link.class)) { // TODO performance improvement here linkRemoved((InstanceIdentifier) iid, (Link) originalData.get(iid)); } } for (Map.Entry, DataObject> entrySet : updatedData.entrySet()) { InstanceIdentifier iiD = entrySet.getKey(); final DataObject dataObject = entrySet.getValue(); if (dataObject instanceof Addresses) { packetReceived((Addresses) dataObject, iiD); } else if (dataObject instanceof Node) { synchronized (hosts) { hosts.putLocally((InstanceIdentifier) iiD, Host.createHost((Node) dataObject)); } } } for (Map.Entry, DataObject> entrySet : createdData.entrySet()) { InstanceIdentifier iiD = entrySet.getKey(); final DataObject dataObject = entrySet.getValue(); if (dataObject instanceof Addresses) { packetReceived((Addresses) dataObject, iiD); } else if (dataObject instanceof Node) { synchronized (hosts) { hosts.putLocally((InstanceIdentifier) iiD, Host.createHost((Node) dataObject)); } } } } }); } public void packetReceived(Addresses addrs, InstanceIdentifier ii) { InstanceIdentifier iinc = ii.firstIdentifierOf(NodeConnector.class); InstanceIdentifier iin// = ii.firstIdentifierOf(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class); ReadOnlyTransaction readTx = dataService.newReadOnlyTransaction(); ListenableFuture> futureNodeConnector = readTx.read(LogicalDatastoreType.OPERATIONAL, iinc); ListenableFuture> futureNode // = readTx.read(LogicalDatastoreType.OPERATIONAL, iin); try { if (futureNodeConnector.get().isPresent() && futureNode.get().isPresent()) { processHost(futureNode.get().get(), futureNodeConnector.get().get(), addrs); } } catch (ExecutionException | InterruptedException ex) { } } private void processHost(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node node, NodeConnector nodeConnector, Addresses addrs) { List hostsToMod = new ArrayList<>(); List hostsToRem = new ArrayList<>(); List linksToRem = new ArrayList<>(); List linksToAdd = new ArrayList<>(); synchronized (hosts) { log.trace("Processing nodeConnector " + nodeConnector.getId().toString()); HostId hId = Host.createHostId(addrs); if (hId != null) { if (isNodeConnectorInternal(nodeConnector)) { log.trace("NodeConnector is internal " + nodeConnector.getId().toString()); removeNodeConnectorFromHost(hostsToMod, hostsToRem, nodeConnector); hosts.removeAll(hostsToRem); hosts.putAll(hostsToMod); } else { log.trace("NodeConnector is NOT internal " + nodeConnector.getId().toString()); Host host = new Host(addrs, nodeConnector); if (hosts.containsKey(host.getId())) { hosts.get(host.getId()).mergeHostWith(host); } else { hosts.put(host.getId(), host); } List newLinks = hosts.get(host.getId()).createLinks(node); if (newLinks != null) { linksToAdd.addAll(newLinks); } hosts.submit(host.getId()); } } } writeDatatoMDSAL(linksToAdd, linksToRem); } /** * It verifies if a given NodeConnector is *internal*. An *internal* * NodeConnector is considered to be all NodeConnetors that are NOT attached * to hosts created by hosttracker. * * @param nodeConnector the nodeConnector to check if it is internal or not. * @return true if it was found a host connected to this nodeConnetor, false * if it was not found a network topology or it was not found a host * connected to this nodeConnetor. */ private boolean isNodeConnectorInternal(NodeConnector nodeConnector) { TpId tpId = new TpId(nodeConnector.getKey().getId().getValue()); InstanceIdentifier ntII = InstanceIdentifier.builder(NetworkTopology.class).build(); ReadOnlyTransaction rot = dataService.newReadOnlyTransaction(); ListenableFuture> lfONT = rot.read(LogicalDatastoreType.OPERATIONAL, ntII); Optional oNT; try { oNT = lfONT.get(); } catch (InterruptedException | ExecutionException ex) { return false; } if (oNT != null && oNT.isPresent()) { NetworkTopology networkTopo = oNT.get(); for (Topology t : networkTopo.getTopology()) { for (Link l : t.getLink()) { if ((l.getSource().getSourceTp().equals(tpId) && !l.getDestination().getDestTp().getValue().startsWith(Host.NODE_PREFIX)) || (l.getDestination().getDestTp().equals(tpId) && !l.getSource().getSourceTp().getValue().startsWith(Host.NODE_PREFIX))) { return true; } } } } return false; } private void removeLinksFromHosts(List hostsToMod, List hostsToRem, Link linkRemoved) { for (Host h : hosts.values()) { h.removeTerminationPoint(linkRemoved.getSource().getSourceTp()); h.removeTerminationPoint(linkRemoved.getDestination().getDestTp()); if (h.isOrphan()) { hostsToRem.add(h); } else { hostsToMod.add(h); } } } private void removeNodeConnectorFromHost(List hostsToMod, List hostsToRem, NodeConnector nc) { AttachmentPointsBuilder atStD = Utilities.createAPsfromNodeConnector(nc); for (Host h : hosts.values()) { h.removeAttachmentPoints(atStD); if (h.isOrphan()) { hostsToRem.add(h); } else { hostsToMod.add(h); } } } private void linkRemoved(InstanceIdentifier iiLink, Link linkRemoved) { log.trace("linkRemoved"); List hostsToMod = new ArrayList<>(); List hostsToRem = new ArrayList<>(); synchronized (hosts) { removeLinksFromHosts(hostsToMod, hostsToRem, linkRemoved); hosts.removeAll(hostsToRem); hosts.putAll(hostsToMod); } } private void writeDatatoMDSAL(List linksToAdd, List linksToRemove) { final WriteTransaction writeTx = dataService.newWriteOnlyTransaction(); if (linksToAdd != null) { for (Link l : linksToAdd) { InstanceIdentifier lIID = Utilities.buildLinkIID(l.getKey()); log.trace("Writing link from MD_SAL: " + lIID.toString()); writeTx.merge(LogicalDatastoreType.OPERATIONAL, lIID, l, true); } } if (linksToRemove != null) { for (Link l : linksToRemove) { InstanceIdentifier lIID = Utilities.buildLinkIID(l.getKey()); log.trace("Removing link from MD_SAL: " + lIID.toString()); writeTx.delete(LogicalDatastoreType.OPERATIONAL, lIID); } } final CheckedFuture writeTxResultFuture = writeTx.submit(); Futures.addCallback(writeTxResultFuture, new FutureCallback() { @Override public void onSuccess(Object o) { log.debug("Hosttracker write successful for tx :{}", writeTx.getIdentifier()); } @Override public void onFailure(Throwable throwable) { log.error("Hosttracker write transaction {} failed", writeTx.getIdentifier(), throwable.getCause()); } }); } public void close() { this.addrsNodeListerRegistration.close(); this.hostNodeListerRegistration.close(); synchronized (hosts) { this.hosts.clear(); } } }