2 * Copyright (c) 2014 André Martins, Colin Dixon, Evan Zeller and others. All
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.l2switch.hosttracker.plugin.internal;
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import java.util.ArrayList;
15 import java.util.List;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.Executors;
21 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
22 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
23 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
24 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
25 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
26 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
27 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
28 import org.opendaylight.l2switch.hosttracker.plugin.inventory.Host;
29 import org.opendaylight.l2switch.hosttracker.plugin.util.Utilities;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.address.tracker.rev140617.AddressCapableNodeConnector;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.address.tracker.rev140617.address.node.connector.Addresses;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.host.tracker.rev140624.HostId;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.host.tracker.rev140624.HostNode;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.host.tracker.rev140624.host.AttachmentPointsBuilder;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
37 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
38 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
39 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TpId;
40 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
41 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
42 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Link;
43 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
44 import org.opendaylight.yangtools.concepts.ListenerRegistration;
45 import org.opendaylight.yangtools.yang.binding.DataObject;
46 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
50 public class HostTrackerImpl implements DataChangeListener {
52 private static final int CPUS = Runtime.getRuntime().availableProcessors();
56 * controller/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/FlowCapableTopologyProvider.java
58 private static final String TOPOLOGY_NAME = "flow:1";
60 private static final Logger log = LoggerFactory.getLogger(HostTrackerImpl.class);
62 private final DataBroker dataService;
63 private final String topologyId;
65 ExecutorService exec = Executors.newFixedThreadPool(CPUS);
67 private final ConcurrentClusterAwareHostHashMap<HostId, Host> hosts;
68 private final OperationProcessor opProcessor;
69 private ListenerRegistration<DataChangeListener> addrsNodeListerRegistration;
70 private ListenerRegistration<DataChangeListener> hostNodeListerRegistration;
72 public HostTrackerImpl(DataBroker dataService, String topologyId) {
73 Preconditions.checkNotNull(dataService, "dataBrokerService should not be null.");
74 this.dataService = dataService;
75 this.opProcessor = new OperationProcessor(dataService);
76 Thread processorThread = new Thread(opProcessor);
77 processorThread.start();
78 if (topologyId == null || topologyId.isEmpty()) {
79 this.topologyId = TOPOLOGY_NAME;
81 this.topologyId = topologyId;
83 this.hosts = new ConcurrentClusterAwareHostHashMap<>(opProcessor, this.topologyId);
86 public void registerAsDataChangeListener() {
87 InstanceIdentifier<Addresses> addrCapableNodeConnectors = //
88 InstanceIdentifier.builder(Nodes.class) //
89 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class) //
90 .child(NodeConnector.class) //
91 .augmentation(AddressCapableNodeConnector.class)//
92 .child(Addresses.class).build();
93 this.addrsNodeListerRegistration = dataService.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, addrCapableNodeConnectors, this, DataChangeScope.SUBTREE);
95 InstanceIdentifier<HostNode> hostNodes = InstanceIdentifier.builder(NetworkTopology.class)//
96 .child(Topology.class, new TopologyKey(new TopologyId(topologyId)))//
98 .augmentation(HostNode.class).build();
99 this.hostNodeListerRegistration = dataService.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, hostNodes, this, DataChangeScope.SUBTREE);
101 InstanceIdentifier<Link> lIID = InstanceIdentifier.builder(NetworkTopology.class)//
102 .child(Topology.class, new TopologyKey(new TopologyId(topologyId)))//
103 .child(Link.class).build();
105 this.addrsNodeListerRegistration = dataService.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, lIID, this, DataChangeScope.BASE);
107 //Processing addresses that existed before we register as a data change listener.
108 // ReadOnlyTransaction newReadOnlyTransaction = dataService.newReadOnlyTransaction();
109 // InstanceIdentifier<NodeConnector> iinc = addrCapableNodeConnectors.firstIdentifierOf(NodeConnector.class);
110 // InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node> iin//
111 // = addrCapableNodeConnectors.firstIdentifierOf(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class);
112 // ListenableFuture<Optional<NodeConnector>> dataFuture = newReadOnlyTransaction.read(LogicalDatastoreType.OPERATIONAL, iinc);
114 // NodeConnector get = dataFuture.get().get();
115 // log.trace("test "+get);
116 // } catch (InterruptedException | ExecutionException ex) {
117 // java.util.logging.Logger.getLogger(HostTrackerImpl.class.getName()).log(Level.SEVERE, null, ex);
119 // Futures.addCallback(dataFuture, new FutureCallback<Optional<NodeConnector>>() {
121 // public void onSuccess(final Optional<NodeConnector> result) {
122 // if (result.isPresent()) {
123 // log.trace("Processing NEW NODE? " + result.get().getId().getValue());
124 //// processHost(result, dataObject, node);
129 // public void onFailure(Throwable arg0) {
135 public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
137 exec.submit(new Runnable() {
140 if (change == null) {
141 log.info("In onDataChanged: No processing done as change even is null.");
144 Map<InstanceIdentifier<?>, DataObject> updatedData = change.getUpdatedData();
145 Map<InstanceIdentifier<?>, DataObject> createdData = change.getCreatedData();
146 Map<InstanceIdentifier<?>, DataObject> originalData = change.getOriginalData();
147 Set<InstanceIdentifier<?>> deletedData = change.getRemovedPaths();
149 for (InstanceIdentifier<?> iid : deletedData) {
150 if (iid.getTargetType().equals(Node.class)) {
151 Node node = ((Node) originalData.get(iid));
152 InstanceIdentifier<Node> iiN = (InstanceIdentifier<Node>) iid;
153 HostNode hostNode = node.getAugmentation(HostNode.class);
154 if (hostNode != null) {
155 synchronized (hosts) {
157 hosts.removeLocally(iiN);
158 } catch (ClassCastException ex) {
162 } else if (iid.getTargetType().equals(Link.class)) {
163 // TODO performance improvement here
164 linkRemoved((InstanceIdentifier<Link>) iid, (Link) originalData.get(iid));
168 for (Map.Entry<InstanceIdentifier<?>, DataObject> entrySet : updatedData.entrySet()) {
169 InstanceIdentifier<?> iiD = entrySet.getKey();
170 final DataObject dataObject = entrySet.getValue();
171 if (dataObject instanceof Addresses) {
172 packetReceived((Addresses) dataObject, iiD);
173 } else if (dataObject instanceof Node) {
174 synchronized (hosts) {
175 hosts.putLocally((InstanceIdentifier<Node>) iiD, Host.createHost((Node) dataObject));
180 for (Map.Entry<InstanceIdentifier<?>, DataObject> entrySet : createdData.entrySet()) {
181 InstanceIdentifier<?> iiD = entrySet.getKey();
182 final DataObject dataObject = entrySet.getValue();
183 if (dataObject instanceof Addresses) {
184 packetReceived((Addresses) dataObject, iiD);
185 } else if (dataObject instanceof Node) {
186 synchronized (hosts) {
187 hosts.putLocally((InstanceIdentifier<Node>) iiD, Host.createHost((Node) dataObject));
195 public void packetReceived(Addresses addrs, InstanceIdentifier<?> ii) {
196 InstanceIdentifier<NodeConnector> iinc = ii.firstIdentifierOf(NodeConnector.class);
197 InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node> iin//
198 = ii.firstIdentifierOf(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class);
200 ListenableFuture<Optional<NodeConnector>> futureNodeConnector;
201 ListenableFuture<Optional<org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node>> futureNode;
202 try (ReadOnlyTransaction readTx = dataService.newReadOnlyTransaction()) {
203 futureNodeConnector = readTx.read(LogicalDatastoreType.OPERATIONAL, iinc);
204 futureNode = readTx.read(LogicalDatastoreType.OPERATIONAL, iin);
207 Optional<NodeConnector> opNodeConnector = null;
208 Optional<org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node> opNode = null;
210 opNodeConnector = futureNodeConnector.get();
211 opNode = futureNode.get();
212 } catch (ExecutionException | InterruptedException ex) {
213 log.warn(ex.getLocalizedMessage());
215 if (opNode != null && opNode.isPresent()
216 && opNodeConnector != null && opNodeConnector.isPresent()) {
217 processHost(opNode.get(), opNodeConnector.get(), addrs);
221 private void processHost(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node node,
222 NodeConnector nodeConnector,
224 List<Host> hostsToMod = new ArrayList<>();
225 List<Host> hostsToRem = new ArrayList<>();
226 List<Link> linksToRem = new ArrayList<>();
227 List<Link> linksToAdd = new ArrayList<>();
228 synchronized (hosts) {
229 log.trace("Processing nodeConnector " + nodeConnector.getId().toString());
230 HostId hId = Host.createHostId(addrs);
232 if (isNodeConnectorInternal(nodeConnector)) {
233 log.trace("NodeConnector is internal " + nodeConnector.getId().toString());
235 removeNodeConnectorFromHost(hostsToMod, hostsToRem, nodeConnector);
236 hosts.removeAll(hostsToRem);
237 hosts.putAll(hostsToMod);
239 log.trace("NodeConnector is NOT internal " + nodeConnector.getId().toString());
240 Host host = new Host(addrs, nodeConnector);
241 if (hosts.containsKey(host.getId())) {
242 hosts.get(host.getId()).mergeHostWith(host);
244 hosts.put(host.getId(), host);
246 List<Link> newLinks = hosts.get(host.getId()).createLinks(node);
247 if (newLinks != null) {
248 linksToAdd.addAll(newLinks);
250 hosts.submit(host.getId());
254 writeDatatoMDSAL(linksToAdd, linksToRem);
258 * It verifies if a given NodeConnector is *internal*. An *internal*
259 * NodeConnector is considered to be all NodeConnetors that are NOT attached
260 * to hosts created by hosttracker.
262 * @param nodeConnector the nodeConnector to check if it is internal or not.
263 * @return true if it was found a host connected to this nodeConnetor, false
264 * if it was not found a network topology or it was not found a host
265 * connected to this nodeConnetor.
267 private boolean isNodeConnectorInternal(NodeConnector nodeConnector) {
268 TpId tpId = new TpId(nodeConnector.getKey().getId().getValue());
269 InstanceIdentifier<NetworkTopology> ntII
270 = InstanceIdentifier.builder(NetworkTopology.class).build();
271 ListenableFuture<Optional<NetworkTopology>> lfONT;
272 try (ReadOnlyTransaction rot = dataService.newReadOnlyTransaction()) {
273 lfONT = rot.read(LogicalDatastoreType.OPERATIONAL, ntII);
276 Optional<NetworkTopology> oNT;
279 } catch (InterruptedException | ExecutionException ex) {
280 log.warn(ex.getLocalizedMessage());
283 if (oNT != null && oNT.isPresent()) {
284 NetworkTopology networkTopo = oNT.get();
285 for (Topology t : networkTopo.getTopology()) {
286 if (t.getLink() != null) {
287 for (Link l : t.getLink()) {
288 if ((l.getSource().getSourceTp().equals(tpId)
289 && !l.getDestination().getDestTp().getValue().startsWith(Host.NODE_PREFIX))
290 || (l.getDestination().getDestTp().equals(tpId)
291 && !l.getSource().getSourceTp().getValue().startsWith(Host.NODE_PREFIX))) {
301 private void removeLinksFromHosts(List<Host> hostsToMod, List<Host> hostsToRem, Link linkRemoved) {
302 for (Host h : hosts.values()) {
303 h.removeTerminationPoint(linkRemoved.getSource().getSourceTp());
304 h.removeTerminationPoint(linkRemoved.getDestination().getDestTp());
313 private void removeNodeConnectorFromHost(List<Host> hostsToMod, List<Host> hostsToRem, NodeConnector nc) {
314 AttachmentPointsBuilder atStD = Utilities.createAPsfromNodeConnector(nc);
315 for (Host h : hosts.values()) {
316 h.removeAttachmentPoints(atStD);
325 private void linkRemoved(InstanceIdentifier<Link> iiLink, Link linkRemoved) {
326 log.trace("linkRemoved");
327 List<Host> hostsToMod = new ArrayList<>();
328 List<Host> hostsToRem = new ArrayList<>();
329 synchronized (hosts) {
330 removeLinksFromHosts(hostsToMod, hostsToRem, linkRemoved);
331 hosts.removeAll(hostsToRem);
332 hosts.putAll(hostsToMod);
336 private void writeDatatoMDSAL(List<Link> linksToAdd, List<Link> linksToRemove){
337 if (linksToAdd != null) {
338 for (final Link l : linksToAdd) {
339 final InstanceIdentifier<Link> lIID = Utilities.buildLinkIID(l.getKey(), topologyId);
340 log.trace("Writing link from MD_SAL: " + lIID.toString());
341 opProcessor.enqueueOperation(new HostTrackerOperation() {
343 public void applyOperation(ReadWriteTransaction tx) {
344 tx.merge(LogicalDatastoreType.OPERATIONAL, lIID, l, true);
349 if (linksToRemove != null) {
350 for (Link l : linksToRemove) {
351 final InstanceIdentifier<Link> lIID = Utilities.buildLinkIID(l.getKey(), topologyId);
352 log.trace("Removing link from MD_SAL: " + lIID.toString());
353 opProcessor.enqueueOperation(new HostTrackerOperation() {
355 public void applyOperation(ReadWriteTransaction tx) {
356 tx.delete(LogicalDatastoreType.OPERATIONAL, lIID);
363 public void close() {
364 this.addrsNodeListerRegistration.close();
365 this.hostNodeListerRegistration.close();
366 synchronized (hosts) {