2 * Copyright (c) 2014 André Martins, Colin Dixon, Evan Zeller and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.l2switch.hosttracker.plugin.internal;
11 import static java.util.Objects.requireNonNull;
13 import com.google.common.base.Preconditions;
14 import com.google.common.util.concurrent.FluentFuture;
15 import java.util.ArrayList;
16 import java.util.Collection;
17 import java.util.List;
18 import java.util.Optional;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.ScheduledExecutorService;
22 import java.util.concurrent.TimeUnit;
23 import org.opendaylight.l2switch.hosttracker.plugin.inventory.Host;
24 import org.opendaylight.l2switch.hosttracker.plugin.util.Utilities;
25 import org.opendaylight.mdsal.binding.api.DataBroker;
26 import org.opendaylight.mdsal.binding.api.DataObjectModification;
27 import org.opendaylight.mdsal.binding.api.DataTreeChangeListener;
28 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
29 import org.opendaylight.mdsal.binding.api.DataTreeModification;
30 import org.opendaylight.mdsal.binding.api.ReadTransaction;
31 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.address.tracker.rev140617.AddressCapableNodeConnector;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.address.tracker.rev140617.address.node.connector.Addresses;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.host.tracker.rev140624.HostId;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.host.tracker.rev140624.HostNode;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.host.tracker.rev140624.host.AttachmentPointsBuilder;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.l2switch.host.tracker.config.rev140528.HostTrackerConfig;
40 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
41 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
42 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TpId;
43 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
44 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
45 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Link;
46 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
47 import org.opendaylight.yangtools.concepts.ListenerRegistration;
48 import org.opendaylight.yangtools.yang.binding.DataObject;
49 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
53 @SuppressWarnings("rawtypes")
54 public class HostTrackerImpl implements DataTreeChangeListener<DataObject> {
56 private static final Logger LOG = LoggerFactory.getLogger(HostTrackerImpl.class);
58 private static final int CPUS = Runtime.getRuntime().availableProcessors();
60 private static final String TOPOLOGY_NAME = "flow:1";
62 private final DataBroker dataService;
63 private final String topologyId;
64 private final long hostPurgeInterval;
65 private final long hostPurgeAge;
67 private final ScheduledExecutorService exec = Executors.newScheduledThreadPool(CPUS);
69 private final ConcurrentClusterAwareHostHashMap hosts;
70 private final ConcurrentClusterAwareLinkHashMap links;
71 private final OperationProcessor opProcessor;
72 private final Thread processorThread;
73 private ListenerRegistration<DataTreeChangeListener> addrsNodeListenerRegistration;
74 private ListenerRegistration<DataTreeChangeListener> hostNodeListenerRegistration;
75 private ListenerRegistration<DataTreeChangeListener> linkNodeListenerRegistration;
78 * It creates hosts using reference to MD-SAl / toplogy module. For every hostPurgeIntervalInput time interval
79 * it requests to purge hosts that are not seen for hostPurgeAgeInput time interval.
81 * @param dataService A reference to the MD-SAL
82 * @param config Default configuration
84 public HostTrackerImpl(final DataBroker dataService, final HostTrackerConfig config) {
85 this.dataService = requireNonNull(dataService);
86 this.opProcessor = new OperationProcessor(dataService);
87 Preconditions.checkArgument(config.getHostPurgeAge() >= 0, "hostPurgeAgeInput must be non-negative");
88 Preconditions.checkArgument(config.getHostPurgeInterval() >= 0, "hostPurgeIntervalInput must be non-negative");
89 this.hostPurgeAge = config.getHostPurgeAge();
90 this.hostPurgeInterval = config.getHostPurgeInterval();
91 processorThread = new Thread(opProcessor);
92 final String maybeTopologyId = config.getTopologyId();
93 if (maybeTopologyId == null || maybeTopologyId.isEmpty()) {
94 this.topologyId = TOPOLOGY_NAME;
96 this.topologyId = maybeTopologyId;
98 this.hosts = new ConcurrentClusterAwareHostHashMap(opProcessor, this.topologyId);
99 this.links = new ConcurrentClusterAwareLinkHashMap(opProcessor);
101 if (hostPurgeInterval > 0) {
102 exec.scheduleWithFixedDelay(() -> purgeHostsNotSeenInLast(hostPurgeAge), 0, hostPurgeInterval,
107 @SuppressWarnings("unchecked")
109 processorThread.start();
111 InstanceIdentifier<Addresses> addrCapableNodeConnectors =
112 InstanceIdentifier.builder(Nodes.class)
113 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class)
114 .child(NodeConnector.class)
115 .augmentation(AddressCapableNodeConnector.class)
116 .child(Addresses.class).build();
117 this.addrsNodeListenerRegistration = dataService.registerDataTreeChangeListener(
118 DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL, addrCapableNodeConnectors),
119 // FIXME: add an specialized object instead of going through raw types!
120 (DataTreeChangeListener)this);
122 InstanceIdentifier<HostNode> hostNodes = InstanceIdentifier.builder(NetworkTopology.class)
123 .child(Topology.class, new TopologyKey(new TopologyId(topologyId)))
125 .augmentation(HostNode.class).build();
126 this.hostNodeListenerRegistration = dataService.registerDataTreeChangeListener(
127 DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL, hostNodes),
128 // FIXME: add an specialized object instead of going through raw types!
129 (DataTreeChangeListener)this);
131 InstanceIdentifier<Link> linkIID = InstanceIdentifier.builder(NetworkTopology.class)
132 .child(Topology.class, new TopologyKey(new TopologyId(topologyId)))
133 .child(Link.class).build();
135 this.linkNodeListenerRegistration = dataService.registerDataTreeChangeListener(
136 // FIXME: add an specialized object instead of going through raw types!
137 DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL, linkIID), (DataTreeChangeListener)this);
139 //Processing addresses that existed before we register as a data change listener.
140 // ReadOnlyTransaction newReadOnlyTransaction = dataService.newReadOnlyTransaction();
141 // InstanceIdentifier<NodeConnector> iinc = addrCapableNodeConnectors.firstIdentifierOf(NodeConnector.class);
142 // InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node> iin
143 // = addrCapableNodeConnectors.firstIdentifierOf(
144 // org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class);
145 // ListenableFuture<Optional<NodeConnector>> dataFuture = newReadOnlyTransaction.read(
146 // LogicalDatastoreType.OPERATIONAL, iinc);
148 // NodeConnector get = dataFuture.get().get();
149 // log.trace("test "+get);
150 // } catch (InterruptedException | ExecutionException ex) {
151 // java.util.logging.Logger.getLogger(HostTrackerImpl.class.getName()).log(Level.SEVERE, null, ex);
153 // Futures.addCallback(dataFuture, new FutureCallback<Optional<NodeConnector>>() {
155 // public void onSuccess(final Optional<NodeConnector> result) {
156 // if (result.isPresent()) {
157 // log.trace("Processing NEW NODE? " + result.get().getId().getValue());
158 //// processHost(result, dataObject, node);
163 // public void onFailure(Throwable arg0) {
169 public void onDataTreeChanged(Collection<DataTreeModification<DataObject>> changes) {
171 for (DataTreeModification<?> change: changes) {
172 DataObjectModification<?> rootNode = change.getRootNode();
173 final InstanceIdentifier<?> identifier = change.getRootPath().getRootIdentifier();
174 switch (rootNode.getModificationType()) {
175 case SUBTREE_MODIFIED:
177 onModifiedData(identifier, rootNode);
180 onDeletedData(identifier, rootNode);
189 @SuppressWarnings("unchecked")
190 private void onModifiedData(InstanceIdentifier<?> iid, DataObjectModification<?> rootNode) {
191 final DataObject dataObject = rootNode.getDataAfter();
192 if (dataObject instanceof Addresses) {
193 packetReceived((Addresses) dataObject, iid);
194 } else if (dataObject instanceof Node) {
195 hosts.putLocally((InstanceIdentifier<Node>) iid, Host.createHost((Node) dataObject));
196 } else if (dataObject instanceof Link) {
197 links.putLocally((InstanceIdentifier<Link>) iid, (Link) dataObject);
201 @SuppressWarnings("unchecked")
202 private void onDeletedData(InstanceIdentifier<?> iid, DataObjectModification<?> rootNode) {
203 if (iid.getTargetType().equals(Node.class)) {
204 Node node = (Node) rootNode.getDataBefore();
205 InstanceIdentifier<Node> iiN = (InstanceIdentifier<Node>) iid;
206 HostNode hostNode = node.augmentation(HostNode.class);
207 if (hostNode != null) {
208 hosts.removeLocally(iiN);
210 } else if (iid.getTargetType().equals(Link.class)) {
211 // TODO performance improvement here
212 InstanceIdentifier<Link> iiL = (InstanceIdentifier<Link>) iid;
213 links.removeLocally(iiL);
214 linkRemoved((InstanceIdentifier<Link>) iid, (Link) rootNode.getDataBefore());
218 public void packetReceived(Addresses addrs, InstanceIdentifier<?> ii) {
219 InstanceIdentifier<NodeConnector> iinc = ii.firstIdentifierOf(NodeConnector.class);
220 InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node> iin =
221 ii.firstIdentifierOf(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class);
223 FluentFuture<Optional<NodeConnector>> futureNodeConnector;
224 FluentFuture<Optional<
225 org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node>> futureNode;
226 try (ReadTransaction readTx = dataService.newReadOnlyTransaction()) {
227 futureNodeConnector = readTx.read(LogicalDatastoreType.OPERATIONAL, iinc);
228 futureNode = readTx.read(LogicalDatastoreType.OPERATIONAL, iin);
231 Optional<NodeConnector> opNodeConnector = null;
232 Optional<org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node> opNode = null;
234 opNodeConnector = futureNodeConnector.get();
235 opNode = futureNode.get();
236 } catch (ExecutionException | InterruptedException e) {
237 LOG.warn("Failed to get node connector {}", iinc, e);
239 if (opNode != null && opNode.isPresent()
240 && opNodeConnector != null && opNodeConnector.isPresent()) {
241 processHost(opNode.orElseThrow(), opNodeConnector.orElseThrow(), addrs);
245 private void processHost(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node node,
246 NodeConnector nodeConnector,
248 List<Host> hostsToMod = new ArrayList<>();
249 List<Host> hostsToRem = new ArrayList<>();
250 List<Link> linksToRem = new ArrayList<>();
251 List<Link> linksToAdd = new ArrayList<>();
252 synchronized (hosts) {
253 LOG.trace("Processing nodeConnector: {} ", nodeConnector.getId());
254 HostId hostId = Host.createHostId(addrs);
255 if (hostId != null) {
256 if (isNodeConnectorInternal(nodeConnector)) {
257 LOG.trace("NodeConnector is internal: {} ", nodeConnector.getId());
258 removeNodeConnectorFromHost(hostsToMod, hostsToRem, nodeConnector);
259 hosts.removeAll(hostsToRem);
260 hosts.putAll(hostsToMod);
262 LOG.trace("NodeConnector is NOT internal {} ", nodeConnector.getId());
263 Host host = new Host(addrs, nodeConnector);
264 if (hosts.containsKey(host.getId())) {
265 hosts.get(host.getId()).mergeHostWith(host);
267 hosts.put(host.getId(), host);
269 List<Link> newLinks = hosts.get(host.getId()).createLinks(node);
270 if (newLinks != null) {
271 linksToAdd.addAll(newLinks);
273 hosts.submit(host.getId());
277 writeDataToDataStore(linksToAdd, linksToRem);
281 * It verifies if a given NodeConnector is *internal*. An *internal*
282 * NodeConnector is considered to be all NodeConnetors that are NOT attached
283 * to hosts created by hosttracker.
285 * @param nodeConnector the nodeConnector to check if it is internal or not.
286 * @return true if it was found a host connected to this nodeConnetor, false
287 * if it was not found a network topology or it was not found a host connected to this nodeConnetor.
289 private boolean isNodeConnectorInternal(NodeConnector nodeConnector) {
290 TpId tpId = new TpId(nodeConnector.key().getId().getValue());
291 InstanceIdentifier<NetworkTopology> ntII
292 = InstanceIdentifier.builder(NetworkTopology.class).build();
293 FluentFuture<Optional<NetworkTopology>> lfONT;
294 try (ReadTransaction rot = dataService.newReadOnlyTransaction()) {
295 lfONT = rot.read(LogicalDatastoreType.OPERATIONAL, ntII);
297 Optional<NetworkTopology> optionalNT;
299 optionalNT = lfONT.get();
300 } catch (InterruptedException | ExecutionException e) {
301 LOG.warn("Failed to get network topology {}", ntII, e);
304 if (optionalNT.isPresent()) {
305 NetworkTopology networkTopo = optionalNT.orElseThrow();
306 for (Topology t : networkTopo.nonnullTopology().values()) {
307 if (t.getLink() != null) {
308 for (Link l : t.nonnullLink().values()) {
309 if (l.getSource().getSourceTp().equals(tpId)
310 && !l.getDestination().getDestTp().getValue().startsWith(Host.NODE_PREFIX)
311 || l.getDestination().getDestTp().equals(tpId)
312 && !l.getSource().getSourceTp().getValue().startsWith(Host.NODE_PREFIX)) {
322 private void removeLinksFromHosts(List<Host> hostsToMod, List<Host> hostsToRem, Link linkRemoved) {
323 for (Host h : hosts.values()) {
324 h.removeTerminationPoint(linkRemoved.getSource().getSourceTp());
325 h.removeTerminationPoint(linkRemoved.getDestination().getDestTp());
334 private void removeNodeConnectorFromHost(List<Host> hostsToMod, List<Host> hostsToRem, NodeConnector nc) {
335 AttachmentPointsBuilder atStD = Utilities.createAPsfromNodeConnector(nc);
336 for (Host h : hosts.values()) {
337 h.removeAttachmentPoints(atStD);
346 private void linkRemoved(InstanceIdentifier<Link> iiLink, Link linkRemoved) {
347 LOG.trace("linkRemoved");
348 List<Host> hostsToMod = new ArrayList<>();
349 List<Host> hostsToRem = new ArrayList<>();
350 synchronized (hosts) {
351 removeLinksFromHosts(hostsToMod, hostsToRem, linkRemoved);
352 hosts.removeAll(hostsToRem);
353 hosts.putAll(hostsToMod);
357 private void writeDataToDataStore(List<Link> linksToAdd, List<Link> linksToRemove) {
358 if (linksToAdd != null) {
359 for (final Link l : linksToAdd) {
360 final InstanceIdentifier<Link> lIID = Utilities.buildLinkIID(l.key(), topologyId);
361 LOG.trace("Writing link from MD_SAL: {}", lIID.toString());
362 opProcessor.enqueueOperation(
363 tx -> tx.mergeParentStructureMerge(LogicalDatastoreType.OPERATIONAL, lIID, l));
366 if (linksToRemove != null) {
367 for (Link l : linksToRemove) {
368 final InstanceIdentifier<Link> lIID = Utilities.buildLinkIID(l.key(), topologyId);
369 LOG.trace("Removing link from MD_SAL: {}", lIID.toString());
370 opProcessor.enqueueOperation(tx -> tx.delete(LogicalDatastoreType.OPERATIONAL, lIID));
376 * Remove all hosts that haven't been observed more recently than the specified number of
377 * hostsPurgeAgeInSeconds.
379 * @param hostsPurgeAgeInSeconds remove hosts that haven't been observed in longer than this number of
380 * hostsPurgeAgeInSeconds.
382 protected void purgeHostsNotSeenInLast(final long hostsPurgeAgeInSeconds) {
383 int numHostsPurged = 0;
384 final long nowInMillis = System.currentTimeMillis();
385 final long nowInSeconds = TimeUnit.MILLISECONDS.toSeconds(nowInMillis);
386 // iterate through all hosts in the local cache
387 for (Host h : hosts.values()) {
388 final HostNode hn = h.getHostNode().augmentation(HostNode.class);
390 LOG.warn("Encountered non-host node {} in hosts during purge", h);
391 } else if (hn.getAddresses() != null) {
392 boolean purgeHosts = false;
393 // if the node is a host and has addresses, check to see if it's been seen recently
394 purgeHosts = hostReadyForPurge(hn, nowInSeconds,hostsPurgeAgeInSeconds);
396 numHostsPurged = removeHosts(h, numHostsPurged);
399 LOG.warn("Encountered host node {} with no address in hosts during purge", hn);
402 LOG.debug("Number of purged hosts during current purge interval - {}. ", numHostsPurged);
406 * Checks if hosts need to be purged.
408 * @param hostNode reference to HostNode class
409 * @param currentTimeInSeconds current time in seconds
410 * @param expirationPeriod timelimit set to hosts for expiration
411 * @return boolean - whether the hosts are ready to be purged
413 private static boolean hostReadyForPurge(final HostNode hostNode, final long currentTimeInSeconds,
414 final long expirationPeriod) {
415 // checks if hosts need to be purged
416 for (Addresses addrs : hostNode.nonnullAddresses().values()) {
417 long lastSeenTimeInSeconds = addrs.getLastSeen() / 1000;
418 if (lastSeenTimeInSeconds > currentTimeInSeconds - expirationPeriod) {
419 LOG.debug("Host node {} NOT ready for purge", hostNode);
423 LOG.debug("Host node {} ready for purge", hostNode);
428 * Removes hosts from locally and MD-SAL. Throws warning message if not removed successfully
430 * @param host reference to Host node
432 private int removeHosts(final Host host, int numHostsPurged) {
433 // remove associated links with the host before removing hosts
434 removeAssociatedLinksFromHosts(host);
435 // purge hosts from local & MD-SAL database
436 final HostId hostId = host.getId();
437 if (hosts.remove(hostId) != null) {
439 LOG.debug("Removed host with id {} during purge.", hostId);
441 LOG.warn("Unexpected error encountered - Failed to remove host {} during purge", host);
444 return numHostsPurged;
448 * Removes links associated with the given hosts from local and MD-SAL database.
449 * Throws warning message if not removed successfully.
451 * @param host reference to Host node
453 private void removeAssociatedLinksFromHosts(final Host host) {
454 if (host.getId() != null) {
455 List<Link> linksToRemove = new ArrayList<>();
456 for (Link link: links.values()) {
457 if (link.toString().contains(host.getId().getValue())) {
458 linksToRemove.add(link);
461 links.removeAll(linksToRemove);
463 LOG.warn("Encountered host with no id , Unexpected host id {}. ", host);
467 public void close() {
468 processorThread.interrupt();
469 this.addrsNodeListenerRegistration.close();
470 this.hostNodeListenerRegistration.close();
471 this.linkNodeListenerRegistration.close();
472 this.exec.shutdownNow();