2 * Copyright (c) 2014 André Martins, Colin Dixon, Evan Zeller and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.l2switch.hosttracker.plugin.internal;
10 import com.google.common.base.Optional;
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import java.util.ArrayList;
14 import java.util.List;
17 import java.util.concurrent.ExecutionException;
18 import java.util.concurrent.Executors;
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.concurrent.TimeUnit;
21 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
22 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
23 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
24 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
25 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
26 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
27 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
28 import org.opendaylight.l2switch.hosttracker.plugin.inventory.Host;
29 import org.opendaylight.l2switch.hosttracker.plugin.util.Utilities;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.address.tracker.rev140617.AddressCapableNodeConnector;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.address.tracker.rev140617.address.node.connector.Addresses;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.host.tracker.rev140624.HostId;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.host.tracker.rev140624.HostNode;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.host.tracker.rev140624.host.AttachmentPointsBuilder;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
37 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.LinkId;
38 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
39 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
40 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TpId;
41 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
42 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
43 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Link;
44 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
45 import org.opendaylight.yangtools.concepts.ListenerRegistration;
46 import org.opendaylight.yangtools.yang.binding.DataObject;
47 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
51 public class HostTrackerImpl implements DataChangeListener {
53 private static final int CPUS = Runtime.getRuntime().availableProcessors();
57 * controller/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/FlowCapableTopologyProvider.java
59 private static final String TOPOLOGY_NAME = "flow:1";
61 private static final Logger LOG = LoggerFactory.getLogger(HostTrackerImpl.class);
63 private final DataBroker dataService;
64 private final String topologyId;
65 private final long hostPurgeInterval;
66 private final long hostPurgeAge;
67 private static int numHostsPurged;
69 private ScheduledExecutorService exec = Executors.newScheduledThreadPool(CPUS);
71 private final ConcurrentClusterAwareHostHashMap<HostId, Host> hosts;
72 private final ConcurrentClusterAwareLinkHashMap<LinkId, Link> links;
73 private final OperationProcessor opProcessor;
74 private ListenerRegistration<DataChangeListener> addrsNodeListerRegistration;
75 private ListenerRegistration<DataChangeListener> hostNodeListerRegistration;
78 * It creates hosts using reference to MD-SAl / toplogy module. For every hostPurgeIntervalInput time interval
79 * it requests to purge hosts that are not seen for hostPurgeAgeInput time interval.
81 * @param dataService A reference to the MD-SAL
82 * @param topologyId The topology on which this host tracker will look for hosts
83 * @param hostPurgeAgeInput how old the last observation of a host must be before it will be purged
84 * @param hostPurgeIntervalInput how often to calculate hosts to be purged and remove them
86 public HostTrackerImpl(final DataBroker dataService, final String topologyId, final long hostPurgeAgeInput,
87 final long hostPurgeIntervalInput) {
88 Preconditions.checkNotNull(dataService, "dataBrokerService should not be null.");
89 Preconditions.checkArgument(hostPurgeAgeInput >= 0, "hostPurgeAgeInput must be non-negative");
90 Preconditions.checkArgument(hostPurgeIntervalInput >= 0, "hostPurgeIntervalInput must be non-negative");
91 this.dataService = dataService;
92 this.hostPurgeAge = hostPurgeAgeInput;
93 this.hostPurgeInterval = hostPurgeIntervalInput;
94 this.opProcessor = new OperationProcessor(dataService);
95 Thread processorThread = new Thread(opProcessor);
96 processorThread.start();
97 if (topologyId == null || topologyId.isEmpty()) {
98 this.topologyId = TOPOLOGY_NAME;
100 this.topologyId = topologyId;
102 this.hosts = new ConcurrentClusterAwareHostHashMap<>(opProcessor, this.topologyId);
103 this.links = new ConcurrentClusterAwareLinkHashMap<>(opProcessor, this.topologyId);
105 if (hostPurgeIntervalInput > 0) {
106 exec.scheduleWithFixedDelay(new Runnable() {
109 purgeHostsNotSeenInLast(hostPurgeAge);
111 }, 0, hostPurgeInterval, TimeUnit.SECONDS);
115 public void registerAsDataChangeListener() {
116 InstanceIdentifier<Addresses> addrCapableNodeConnectors = //
117 InstanceIdentifier.builder(Nodes.class) //
118 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class) //
119 .child(NodeConnector.class) //
120 .augmentation(AddressCapableNodeConnector.class)//
121 .child(Addresses.class).build();
122 this.addrsNodeListerRegistration = dataService.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, addrCapableNodeConnectors, this, DataChangeScope.SUBTREE);
124 InstanceIdentifier<HostNode> hostNodes = InstanceIdentifier.builder(NetworkTopology.class)//
125 .child(Topology.class, new TopologyKey(new TopologyId(topologyId)))//
127 .augmentation(HostNode.class).build();
128 this.hostNodeListerRegistration = dataService.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, hostNodes, this, DataChangeScope.SUBTREE);
130 InstanceIdentifier<Link> lIID = InstanceIdentifier.builder(NetworkTopology.class)//
131 .child(Topology.class, new TopologyKey(new TopologyId(topologyId)))//
132 .child(Link.class).build();
134 this.addrsNodeListerRegistration = dataService.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, lIID, this, DataChangeScope.BASE);
136 //Processing addresses that existed before we register as a data change listener.
137 // ReadOnlyTransaction newReadOnlyTransaction = dataService.newReadOnlyTransaction();
138 // InstanceIdentifier<NodeConnector> iinc = addrCapableNodeConnectors.firstIdentifierOf(NodeConnector.class);
139 // InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node> iin//
140 // = addrCapableNodeConnectors.firstIdentifierOf(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class);
141 // ListenableFuture<Optional<NodeConnector>> dataFuture = newReadOnlyTransaction.read(LogicalDatastoreType.OPERATIONAL, iinc);
143 // NodeConnector get = dataFuture.get().get();
144 // log.trace("test "+get);
145 // } catch (InterruptedException | ExecutionException ex) {
146 // java.util.logging.Logger.getLogger(HostTrackerImpl.class.getName()).log(Level.SEVERE, null, ex);
148 // Futures.addCallback(dataFuture, new FutureCallback<Optional<NodeConnector>>() {
150 // public void onSuccess(final Optional<NodeConnector> result) {
151 // if (result.isPresent()) {
152 // log.trace("Processing NEW NODE? " + result.get().getId().getValue());
153 //// processHost(result, dataObject, node);
158 // public void onFailure(Throwable arg0) {
164 public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
166 exec.submit(new Runnable() {
169 if (change == null) {
170 LOG.info("In onDataChanged: No processing done as change even is null.");
173 Map<InstanceIdentifier<?>, DataObject> updatedData = change.getUpdatedData();
174 Map<InstanceIdentifier<?>, DataObject> createdData = change.getCreatedData();
175 Map<InstanceIdentifier<?>, DataObject> originalData = change.getOriginalData();
176 Set<InstanceIdentifier<?>> deletedData = change.getRemovedPaths();
178 for (InstanceIdentifier<?> iid : deletedData) {
179 if (iid.getTargetType().equals(Node.class)) {
180 Node node = ((Node) originalData.get(iid));
181 InstanceIdentifier<Node> iiN = (InstanceIdentifier<Node>) iid;
182 HostNode hostNode = node.getAugmentation(HostNode.class);
183 if (hostNode != null) {
184 synchronized (hosts) {
186 hosts.removeLocally(iiN);
187 } catch (ClassCastException ex) {
188 LOG.debug("Exception occurred while remove host locally", ex);
192 } else if (iid.getTargetType().equals(Link.class)) {
193 // TODO performance improvement here
194 InstanceIdentifier<Link> iiL = (InstanceIdentifier<Link>) iid;
195 synchronized (links) {
197 links.removeLocally(iiL);
198 } catch (ClassCastException ex) {
199 LOG.debug("Exception occurred while remove link locally", ex);
202 linkRemoved((InstanceIdentifier<Link>) iid, (Link) originalData.get(iid));
206 for (Map.Entry<InstanceIdentifier<?>, DataObject> entrySet : updatedData.entrySet()) {
207 InstanceIdentifier<?> iiD = entrySet.getKey();
208 final DataObject dataObject = entrySet.getValue();
209 if (dataObject instanceof Addresses) {
210 packetReceived((Addresses) dataObject, iiD);
211 } else if (dataObject instanceof Node) {
212 synchronized (hosts) {
213 hosts.putLocally((InstanceIdentifier<Node>) iiD, Host.createHost((Node) dataObject));
215 } else if (dataObject instanceof Link) {
216 synchronized (links) {
217 links.putLocally((InstanceIdentifier<Link>) iiD, (Link) dataObject);
222 for (Map.Entry<InstanceIdentifier<?>, DataObject> entrySet : createdData.entrySet()) {
223 InstanceIdentifier<?> iiD = entrySet.getKey();
224 final DataObject dataObject = entrySet.getValue();
225 if (dataObject instanceof Addresses) {
226 packetReceived((Addresses) dataObject, iiD);
227 } else if (dataObject instanceof Node) {
228 synchronized (hosts) {
229 hosts.putLocally((InstanceIdentifier<Node>) iiD, Host.createHost((Node) dataObject));
231 } else if (dataObject instanceof Link) {
232 synchronized (links) {
233 links.putLocally((InstanceIdentifier<Link>) iiD, (Link) dataObject);
241 public void packetReceived(Addresses addrs, InstanceIdentifier<?> ii) {
242 InstanceIdentifier<NodeConnector> iinc = ii.firstIdentifierOf(NodeConnector.class);
243 InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node> iin//
244 = ii.firstIdentifierOf(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class);
246 ListenableFuture<Optional<NodeConnector>> futureNodeConnector;
247 ListenableFuture<Optional<org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node>> futureNode;
248 try (ReadOnlyTransaction readTx = dataService.newReadOnlyTransaction()) {
249 futureNodeConnector = readTx.read(LogicalDatastoreType.OPERATIONAL, iinc);
250 futureNode = readTx.read(LogicalDatastoreType.OPERATIONAL, iin);
253 Optional<NodeConnector> opNodeConnector = null;
254 Optional<org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node> opNode = null;
256 opNodeConnector = futureNodeConnector.get();
257 opNode = futureNode.get();
258 } catch (ExecutionException | InterruptedException ex) {
259 LOG.warn(ex.getLocalizedMessage());
261 if (opNode != null && opNode.isPresent()
262 && opNodeConnector != null && opNodeConnector.isPresent()) {
263 processHost(opNode.get(), opNodeConnector.get(), addrs);
267 private void processHost(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node node,
268 NodeConnector nodeConnector,
270 List<Host> hostsToMod = new ArrayList<>();
271 List<Host> hostsToRem = new ArrayList<>();
272 List<Link> linksToRem = new ArrayList<>();
273 List<Link> linksToAdd = new ArrayList<>();
274 synchronized (hosts) {
275 LOG.trace("Processing nodeConnector: {} ", nodeConnector.getId().toString());
276 HostId hId = Host.createHostId(addrs);
278 if (isNodeConnectorInternal(nodeConnector)) {
279 LOG.trace("NodeConnector is internal: {} ", nodeConnector.getId().toString());
281 removeNodeConnectorFromHost(hostsToMod, hostsToRem, nodeConnector);
282 hosts.removeAll(hostsToRem);
283 hosts.putAll(hostsToMod);
285 LOG.trace("NodeConnector is NOT internal {} ", nodeConnector.getId().toString());
286 Host host = new Host(addrs, nodeConnector);
287 if (hosts.containsKey(host.getId())) {
288 hosts.get(host.getId()).mergeHostWith(host);
290 hosts.put(host.getId(), host);
292 List<Link> newLinks = hosts.get(host.getId()).createLinks(node);
293 if (newLinks != null) {
294 linksToAdd.addAll(newLinks);
296 hosts.submit(host.getId());
300 writeDatatoMDSAL(linksToAdd, linksToRem);
304 * It verifies if a given NodeConnector is *internal*. An *internal*
305 * NodeConnector is considered to be all NodeConnetors that are NOT attached
306 * to hosts created by hosttracker.
308 * @param nodeConnector the nodeConnector to check if it is internal or not.
309 * @return true if it was found a host connected to this nodeConnetor, false
310 * if it was not found a network topology or it was not found a host
311 * connected to this nodeConnetor.
313 private boolean isNodeConnectorInternal(NodeConnector nodeConnector) {
314 TpId tpId = new TpId(nodeConnector.getKey().getId().getValue());
315 InstanceIdentifier<NetworkTopology> ntII
316 = InstanceIdentifier.builder(NetworkTopology.class).build();
317 ListenableFuture<Optional<NetworkTopology>> lfONT;
318 try (ReadOnlyTransaction rot = dataService.newReadOnlyTransaction()) {
319 lfONT = rot.read(LogicalDatastoreType.OPERATIONAL, ntII);
322 Optional<NetworkTopology> oNT;
325 } catch (InterruptedException | ExecutionException ex) {
326 LOG.warn(ex.getLocalizedMessage());
329 if (oNT != null && oNT.isPresent()) {
330 NetworkTopology networkTopo = oNT.get();
331 for (Topology t : networkTopo.getTopology()) {
332 if (t.getLink() != null) {
333 for (Link l : t.getLink()) {
334 if ((l.getSource().getSourceTp().equals(tpId)
335 && !l.getDestination().getDestTp().getValue().startsWith(Host.NODE_PREFIX))
336 || (l.getDestination().getDestTp().equals(tpId)
337 && !l.getSource().getSourceTp().getValue().startsWith(Host.NODE_PREFIX))) {
347 private void removeLinksFromHosts(List<Host> hostsToMod, List<Host> hostsToRem, Link linkRemoved) {
348 for (Host h : hosts.values()) {
349 h.removeTerminationPoint(linkRemoved.getSource().getSourceTp());
350 h.removeTerminationPoint(linkRemoved.getDestination().getDestTp());
359 private void removeNodeConnectorFromHost(List<Host> hostsToMod, List<Host> hostsToRem, NodeConnector nc) {
360 AttachmentPointsBuilder atStD = Utilities.createAPsfromNodeConnector(nc);
361 for (Host h : hosts.values()) {
362 h.removeAttachmentPoints(atStD);
371 private void linkRemoved(InstanceIdentifier<Link> iiLink, Link linkRemoved) {
372 LOG.trace("linkRemoved");
373 List<Host> hostsToMod = new ArrayList<>();
374 List<Host> hostsToRem = new ArrayList<>();
375 synchronized (hosts) {
376 removeLinksFromHosts(hostsToMod, hostsToRem, linkRemoved);
377 hosts.removeAll(hostsToRem);
378 hosts.putAll(hostsToMod);
382 private void writeDatatoMDSAL(List<Link> linksToAdd, List<Link> linksToRemove) {
383 if (linksToAdd != null) {
384 for (final Link l : linksToAdd) {
385 final InstanceIdentifier<Link> lIID = Utilities.buildLinkIID(l.getKey(), topologyId);
386 LOG.trace("Writing link from MD_SAL: {}", lIID.toString());
387 opProcessor.enqueueOperation(new HostTrackerOperation() {
389 public void applyOperation(ReadWriteTransaction tx) {
390 tx.merge(LogicalDatastoreType.OPERATIONAL, lIID, l, true);
395 if (linksToRemove != null) {
396 for (Link l : linksToRemove) {
397 final InstanceIdentifier<Link> lIID = Utilities.buildLinkIID(l.getKey(), topologyId);
398 LOG.trace("Removing link from MD_SAL: {}", lIID.toString());
399 opProcessor.enqueueOperation(new HostTrackerOperation() {
401 public void applyOperation(ReadWriteTransaction tx) {
402 tx.delete(LogicalDatastoreType.OPERATIONAL, lIID);
410 * Remove all hosts that haven't been observed more recently than the specified number of
411 * hostsPurgeAgeInSeconds.
413 * @param hostsPurgeAgeInSeconds remove hosts that haven't been observed in longer than this number of
414 * hostsPurgeAgeInSeconds.
415 * @return the number of purged hosts
417 protected int purgeHostsNotSeenInLast(final long hostsPurgeAgeInSeconds) {
419 final long nowInMillis = System.currentTimeMillis();
420 final long nowInSeconds = TimeUnit.MILLISECONDS.toSeconds(nowInMillis);
421 // iterate through all hosts in the local cache
422 for (Host h : hosts.values()) {
423 final HostNode hn = h.getHostNode().getAugmentation(HostNode.class);
425 LOG.warn("Encountered non-host node {} in hosts during purge", hn);
426 } else if (hn.getAddresses() != null) {
427 boolean purgeHosts = false;
428 // if the node is a host and has addresses, check to see if it's been seen recently
429 purgeHosts = hostReadyForPurge( hn, nowInSeconds,hostsPurgeAgeInSeconds);
434 LOG.warn("Encountered host node {} with no address in hosts during purge", hn);
437 LOG.debug("Number of purged hosts during current purge interval - {}. ", numHostsPurged);
438 return numHostsPurged;
442 * Checks if hosts need to be purged
444 * @param hostNode reference to HostNode class
445 * @param currentTimeInSeconds current time in seconds
446 * @param expirationPeriod timelimit set to hosts for expiration
447 * @return boolean - whether the hosts are ready to be purged
449 private boolean hostReadyForPurge(final HostNode hostNode,final long currentTimeInSeconds,final long expirationPeriod) {
450 // checks if hosts need to be purged
451 for (Addresses addrs : hostNode.getAddresses()) {
452 long lastSeenTimeInSeconds = addrs.getLastSeen()/1000;
453 if (lastSeenTimeInSeconds > (currentTimeInSeconds - expirationPeriod)) {
454 LOG.debug("Host node {} NOT ready for purge", hostNode);
458 LOG.debug("Host node {} ready for purge", hostNode);
463 * Removes hosts from locally and MD-SAL. Throws warning message if not removed successfully
465 * @param host reference to Host node
467 private void removeHosts(final Host host){
468 // remove associated links with the host before removing hosts
469 removeAssociatedLinksFromHosts(host);
470 // purge hosts from local & MD-SAL database
471 if (hosts.remove(host.getId()) != null) {
473 LOG.debug("Removed host with id {} during purge.", host.getId());
475 LOG.warn("Unexpected error encountered - Failed to remove host {} during purge", host);
480 * Removes links associated with the given hosts from local and MD-SAL database.
481 * Throws warning message if not removed successfully.
483 * @param host reference to Host node
485 private void removeAssociatedLinksFromHosts(final Host host) {
487 if (host.getId() != null) {
488 List<Link> linksToRemove = new ArrayList<>();
489 for (Link link: links.values()) {
490 if (link.toString().contains(host.getId().getValue())) {
491 linksToRemove.add(link);
494 links.removeAll(linksToRemove);
496 LOG.warn("Encountered host with no id , Unexpected host id {}. ", host);
499 LOG.warn("Encountered Host with no value, Unexpected host {}. ", host);
503 public void close() {
504 this.addrsNodeListerRegistration.close();
505 this.hostNodeListerRegistration.close();
506 this.exec.shutdownNow();
507 synchronized (hosts) {