2 * Copyright (c) 2014 André Martins, Colin Dixon, Evan Zeller and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.l2switch.hosttracker.plugin.internal;
10 import com.google.common.base.Optional;
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import java.util.ArrayList;
14 import java.util.Collection;
15 import java.util.List;
16 import java.util.concurrent.ExecutionException;
17 import java.util.concurrent.Executors;
18 import java.util.concurrent.ScheduledExecutorService;
19 import java.util.concurrent.TimeUnit;
20 import javax.annotation.Nonnull;
21 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
22 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
23 import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
24 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
25 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
26 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
27 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
28 import org.opendaylight.l2switch.hosttracker.plugin.inventory.Host;
29 import org.opendaylight.l2switch.hosttracker.plugin.util.Utilities;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.address.tracker.rev140617.AddressCapableNodeConnector;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.address.tracker.rev140617.address.node.connector.Addresses;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.host.tracker.rev140624.HostId;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.host.tracker.rev140624.HostNode;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.host.tracker.rev140624.host.AttachmentPointsBuilder;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.l2switch.host.tracker.config.rev140528.HostTrackerConfig;
38 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
39 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
40 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TpId;
41 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
42 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
43 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Link;
44 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
45 import org.opendaylight.yangtools.concepts.ListenerRegistration;
46 import org.opendaylight.yangtools.yang.binding.DataObject;
47 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
51 @SuppressWarnings("rawtypes")
52 public class HostTrackerImpl implements DataTreeChangeListener<DataObject> {
54 private static final int CPUS = Runtime.getRuntime().availableProcessors();
56 private static final String TOPOLOGY_NAME = "flow:1";
58 private static final Logger LOG = LoggerFactory.getLogger(HostTrackerImpl.class);
60 private final DataBroker dataService;
61 private final String topologyId;
62 private final long hostPurgeInterval;
63 private final long hostPurgeAge;
65 private final ScheduledExecutorService exec = Executors.newScheduledThreadPool(CPUS);
67 private final ConcurrentClusterAwareHostHashMap hosts;
68 private final ConcurrentClusterAwareLinkHashMap links;
69 private final OperationProcessor opProcessor;
70 private final Thread processorThread;
71 private ListenerRegistration<DataTreeChangeListener> addrsNodeListenerRegistration;
72 private ListenerRegistration<DataTreeChangeListener> hostNodeListenerRegistration;
73 private ListenerRegistration<DataTreeChangeListener> linkNodeListenerRegistration;
76 * It creates hosts using reference to MD-SAl / toplogy module. For every hostPurgeIntervalInput time interval
77 * it requests to purge hosts that are not seen for hostPurgeAgeInput time interval.
79 * @param dataService A reference to the MD-SAL
80 * @param config Default configuration
82 public HostTrackerImpl(final DataBroker dataService, final HostTrackerConfig config) {
83 Preconditions.checkNotNull(dataService, "dataBrokerService should not be null.");
84 Preconditions.checkArgument(config.getHostPurgeAge() >= 0, "hostPurgeAgeInput must be non-negative");
85 Preconditions.checkArgument(config.getHostPurgeInterval() >= 0, "hostPurgeIntervalInput must be non-negative");
86 this.dataService = dataService;
87 this.hostPurgeAge = config.getHostPurgeAge();
88 this.hostPurgeInterval = config.getHostPurgeInterval();
89 this.opProcessor = new OperationProcessor(dataService);
90 processorThread = new Thread(opProcessor);
91 final String maybeTopologyId = config.getTopologyId();
92 if (maybeTopologyId == null || maybeTopologyId.isEmpty()) {
93 this.topologyId = TOPOLOGY_NAME;
95 this.topologyId = maybeTopologyId;
97 this.hosts = new ConcurrentClusterAwareHostHashMap(opProcessor, this.topologyId);
98 this.links = new ConcurrentClusterAwareLinkHashMap(opProcessor);
100 if (hostPurgeInterval > 0) {
101 exec.scheduleWithFixedDelay(() -> purgeHostsNotSeenInLast(hostPurgeAge), 0, hostPurgeInterval,
106 @SuppressWarnings("unchecked")
108 processorThread.start();
110 InstanceIdentifier<Addresses> addrCapableNodeConnectors = //
111 InstanceIdentifier.builder(Nodes.class) //
112 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class)
113 .child(NodeConnector.class) //
114 .augmentation(AddressCapableNodeConnector.class)//
115 .child(Addresses.class).build();
116 this.addrsNodeListenerRegistration = dataService.registerDataTreeChangeListener(new DataTreeIdentifier<>(
117 LogicalDatastoreType.OPERATIONAL, addrCapableNodeConnectors), (DataTreeChangeListener)this);
119 InstanceIdentifier<HostNode> hostNodes = InstanceIdentifier.builder(NetworkTopology.class)//
120 .child(Topology.class, new TopologyKey(new TopologyId(topologyId)))//
122 .augmentation(HostNode.class).build();
123 this.hostNodeListenerRegistration = dataService.registerDataTreeChangeListener(new DataTreeIdentifier<>(
124 LogicalDatastoreType.OPERATIONAL, hostNodes), (DataTreeChangeListener)this);
126 InstanceIdentifier<Link> linkIID = InstanceIdentifier.builder(NetworkTopology.class)//
127 .child(Topology.class, new TopologyKey(new TopologyId(topologyId)))
128 .child(Link.class).build();
130 this.linkNodeListenerRegistration = dataService.registerDataTreeChangeListener(new DataTreeIdentifier<>(
131 LogicalDatastoreType.OPERATIONAL, linkIID), (DataTreeChangeListener)this);
133 //Processing addresses that existed before we register as a data change listener.
134 // ReadOnlyTransaction newReadOnlyTransaction = dataService.newReadOnlyTransaction();
135 // InstanceIdentifier<NodeConnector> iinc = addrCapableNodeConnectors.firstIdentifierOf(NodeConnector.class);
136 // InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node> iin
137 // = addrCapableNodeConnectors.firstIdentifierOf(
138 // org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class);
139 // ListenableFuture<Optional<NodeConnector>> dataFuture = newReadOnlyTransaction.read(
140 // LogicalDatastoreType.OPERATIONAL, iinc);
142 // NodeConnector get = dataFuture.get().get();
143 // log.trace("test "+get);
144 // } catch (InterruptedException | ExecutionException ex) {
145 // java.util.logging.Logger.getLogger(HostTrackerImpl.class.getName()).log(Level.SEVERE, null, ex);
147 // Futures.addCallback(dataFuture, new FutureCallback<Optional<NodeConnector>>() {
149 // public void onSuccess(final Optional<NodeConnector> result) {
150 // if (result.isPresent()) {
151 // log.trace("Processing NEW NODE? " + result.get().getId().getValue());
152 //// processHost(result, dataObject, node);
157 // public void onFailure(Throwable arg0) {
163 public void onDataTreeChanged(Collection<DataTreeModification<DataObject>> changes) {
165 for (DataTreeModification<?> change: changes) {
166 DataObjectModification<?> rootNode = change.getRootNode();
167 final InstanceIdentifier<?> identifier = change.getRootPath().getRootIdentifier();
168 switch (rootNode.getModificationType()) {
169 case SUBTREE_MODIFIED:
171 onModifiedData(identifier, rootNode);
174 onDeletedData(identifier, rootNode);
183 @SuppressWarnings("unchecked")
184 private void onModifiedData(InstanceIdentifier<?> iid, DataObjectModification<?> rootNode) {
185 final DataObject dataObject = rootNode.getDataAfter();
186 if (dataObject instanceof Addresses) {
187 packetReceived((Addresses) dataObject, iid);
188 } else if (dataObject instanceof Node) {
189 hosts.putLocally((InstanceIdentifier<Node>) iid, Host.createHost((Node) dataObject));
190 } else if (dataObject instanceof Link) {
191 links.putLocally((InstanceIdentifier<Link>) iid, (Link) dataObject);
195 @SuppressWarnings("unchecked")
196 private void onDeletedData(InstanceIdentifier<?> iid, DataObjectModification<?> rootNode) {
197 if (iid.getTargetType().equals(Node.class)) {
198 Node node = (Node) rootNode.getDataBefore();
199 InstanceIdentifier<Node> iiN = (InstanceIdentifier<Node>) iid;
200 HostNode hostNode = node.getAugmentation(HostNode.class);
201 if (hostNode != null) {
202 hosts.removeLocally(iiN);
204 } else if (iid.getTargetType().equals(Link.class)) {
205 // TODO performance improvement here
206 InstanceIdentifier<Link> iiL = (InstanceIdentifier<Link>) iid;
207 links.removeLocally(iiL);
208 linkRemoved((InstanceIdentifier<Link>) iid, (Link) rootNode.getDataBefore());
213 public void packetReceived(Addresses addrs, InstanceIdentifier<?> ii) {
214 InstanceIdentifier<NodeConnector> iinc = ii.firstIdentifierOf(NodeConnector.class);
215 InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node> iin =
216 ii.firstIdentifierOf(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class);
218 ListenableFuture<Optional<NodeConnector>> futureNodeConnector;
219 ListenableFuture<Optional<
220 org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node>> futureNode;
221 try (ReadOnlyTransaction readTx = dataService.newReadOnlyTransaction()) {
222 futureNodeConnector = readTx.read(LogicalDatastoreType.OPERATIONAL, iinc);
223 futureNode = readTx.read(LogicalDatastoreType.OPERATIONAL, iin);
226 Optional<NodeConnector> opNodeConnector = null;
227 Optional<org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node> opNode = null;
229 opNodeConnector = futureNodeConnector.get();
230 opNode = futureNode.get();
231 } catch (ExecutionException | InterruptedException ex) {
232 LOG.warn(ex.getLocalizedMessage());
234 if (opNode != null && opNode.isPresent()
235 && opNodeConnector != null && opNodeConnector.isPresent()) {
236 processHost(opNode.get(), opNodeConnector.get(), addrs);
240 private void processHost(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node node,
241 NodeConnector nodeConnector,
243 List<Host> hostsToMod = new ArrayList<>();
244 List<Host> hostsToRem = new ArrayList<>();
245 List<Link> linksToRem = new ArrayList<>();
246 List<Link> linksToAdd = new ArrayList<>();
247 synchronized (hosts) {
248 LOG.trace("Processing nodeConnector: {} ", nodeConnector.getId().toString());
249 HostId hostId = Host.createHostId(addrs);
250 if (hostId != null) {
251 if (isNodeConnectorInternal(nodeConnector)) {
252 LOG.trace("NodeConnector is internal: {} ", nodeConnector.getId().toString());
254 removeNodeConnectorFromHost(hostsToMod, hostsToRem, nodeConnector);
255 hosts.removeAll(hostsToRem);
256 hosts.putAll(hostsToMod);
258 LOG.trace("NodeConnector is NOT internal {} ", nodeConnector.getId().toString());
259 Host host = new Host(addrs, nodeConnector);
260 if (hosts.containsKey(host.getId())) {
261 hosts.get(host.getId()).mergeHostWith(host);
263 hosts.put(host.getId(), host);
265 List<Link> newLinks = hosts.get(host.getId()).createLinks(node);
266 if (newLinks != null) {
267 linksToAdd.addAll(newLinks);
269 hosts.submit(host.getId());
273 writeDataToDataStore(linksToAdd, linksToRem);
277 * It verifies if a given NodeConnector is *internal*. An *internal*
278 * NodeConnector is considered to be all NodeConnetors that are NOT attached
279 * to hosts created by hosttracker.
281 * @param nodeConnector the nodeConnector to check if it is internal or not.
282 * @return true if it was found a host connected to this nodeConnetor, false
283 * if it was not found a network topology or it was not found a host connected to this nodeConnetor.
285 private boolean isNodeConnectorInternal(NodeConnector nodeConnector) {
286 TpId tpId = new TpId(nodeConnector.getKey().getId().getValue());
287 InstanceIdentifier<NetworkTopology> ntII
288 = InstanceIdentifier.builder(NetworkTopology.class).build();
289 ListenableFuture<Optional<NetworkTopology>> lfONT;
290 try (ReadOnlyTransaction rot = dataService.newReadOnlyTransaction()) {
291 lfONT = rot.read(LogicalDatastoreType.OPERATIONAL, ntII);
293 Optional<NetworkTopology> optionalNT;
295 optionalNT = lfONT.get();
296 } catch (InterruptedException | ExecutionException ex) {
297 LOG.warn(ex.getLocalizedMessage());
300 if (optionalNT.isPresent()) {
301 NetworkTopology networkTopo = optionalNT.get();
302 for (Topology t : networkTopo.getTopology()) {
303 if (t.getLink() != null) {
304 for (Link l : t.getLink()) {
305 if (l.getSource().getSourceTp().equals(tpId)
306 && !l.getDestination().getDestTp().getValue().startsWith(Host.NODE_PREFIX)
307 || l.getDestination().getDestTp().equals(tpId)
308 && !l.getSource().getSourceTp().getValue().startsWith(Host.NODE_PREFIX)) {
318 private void removeLinksFromHosts(List<Host> hostsToMod, List<Host> hostsToRem, Link linkRemoved) {
319 for (Host h : hosts.values()) {
320 h.removeTerminationPoint(linkRemoved.getSource().getSourceTp());
321 h.removeTerminationPoint(linkRemoved.getDestination().getDestTp());
330 private void removeNodeConnectorFromHost(List<Host> hostsToMod, List<Host> hostsToRem, NodeConnector nc) {
331 AttachmentPointsBuilder atStD = Utilities.createAPsfromNodeConnector(nc);
332 for (Host h : hosts.values()) {
333 h.removeAttachmentPoints(atStD);
342 private void linkRemoved(InstanceIdentifier<Link> iiLink, Link linkRemoved) {
343 LOG.trace("linkRemoved");
344 List<Host> hostsToMod = new ArrayList<>();
345 List<Host> hostsToRem = new ArrayList<>();
346 synchronized (hosts) {
347 removeLinksFromHosts(hostsToMod, hostsToRem, linkRemoved);
348 hosts.removeAll(hostsToRem);
349 hosts.putAll(hostsToMod);
353 private void writeDataToDataStore(List<Link> linksToAdd, List<Link> linksToRemove) {
354 if (linksToAdd != null) {
355 for (final Link l : linksToAdd) {
356 final InstanceIdentifier<Link> lIID = Utilities.buildLinkIID(l.getKey(), topologyId);
357 LOG.trace("Writing link from MD_SAL: {}", lIID.toString());
358 opProcessor.enqueueOperation(tx -> tx.merge(LogicalDatastoreType.OPERATIONAL, lIID, l, true));
361 if (linksToRemove != null) {
362 for (Link l : linksToRemove) {
363 final InstanceIdentifier<Link> lIID = Utilities.buildLinkIID(l.getKey(), topologyId);
364 LOG.trace("Removing link from MD_SAL: {}", lIID.toString());
365 opProcessor.enqueueOperation(tx -> tx.delete(LogicalDatastoreType.OPERATIONAL, lIID));
371 * Remove all hosts that haven't been observed more recently than the specified number of
372 * hostsPurgeAgeInSeconds.
374 * @param hostsPurgeAgeInSeconds remove hosts that haven't been observed in longer than this number of
375 * hostsPurgeAgeInSeconds.
377 protected void purgeHostsNotSeenInLast(final long hostsPurgeAgeInSeconds) {
378 int numHostsPurged = 0;
379 final long nowInMillis = System.currentTimeMillis();
380 final long nowInSeconds = TimeUnit.MILLISECONDS.toSeconds(nowInMillis);
381 // iterate through all hosts in the local cache
382 for (Host h : hosts.values()) {
383 final HostNode hn = h.getHostNode().getAugmentation(HostNode.class);
385 LOG.warn("Encountered non-host node {} in hosts during purge", h);
386 } else if (hn.getAddresses() != null) {
387 boolean purgeHosts = false;
388 // if the node is a host and has addresses, check to see if it's been seen recently
389 purgeHosts = hostReadyForPurge(hn, nowInSeconds,hostsPurgeAgeInSeconds);
391 numHostsPurged = removeHosts(h, numHostsPurged);
394 LOG.warn("Encountered host node {} with no address in hosts during purge", hn);
397 LOG.debug("Number of purged hosts during current purge interval - {}. ", numHostsPurged);
401 * Checks if hosts need to be purged.
403 * @param hostNode reference to HostNode class
404 * @param currentTimeInSeconds current time in seconds
405 * @param expirationPeriod timelimit set to hosts for expiration
406 * @return boolean - whether the hosts are ready to be purged
408 private boolean hostReadyForPurge(final HostNode hostNode, final long currentTimeInSeconds,
409 final long expirationPeriod) {
410 // checks if hosts need to be purged
411 for (Addresses addrs : hostNode.getAddresses()) {
412 long lastSeenTimeInSeconds = addrs.getLastSeen() / 1000;
413 if (lastSeenTimeInSeconds > currentTimeInSeconds - expirationPeriod) {
414 LOG.debug("Host node {} NOT ready for purge", hostNode);
418 LOG.debug("Host node {} ready for purge", hostNode);
423 * Removes hosts from locally and MD-SAL. Throws warning message if not removed successfully
425 * @param host reference to Host node
427 private int removeHosts(@Nonnull final Host host, int numHostsPurged) {
428 // remove associated links with the host before removing hosts
429 removeAssociatedLinksFromHosts(host);
430 // purge hosts from local & MD-SAL database
431 if (hosts.remove(host.getId()) != null) {
433 LOG.debug("Removed host with id {} during purge.", host.getId());
435 LOG.warn("Unexpected error encountered - Failed to remove host {} during purge", host);
438 return numHostsPurged;
442 * Removes links associated with the given hosts from local and MD-SAL database.
443 * Throws warning message if not removed successfully.
445 * @param host reference to Host node
447 private void removeAssociatedLinksFromHosts(@Nonnull final Host host) {
448 if (host.getId() != null) {
449 List<Link> linksToRemove = new ArrayList<>();
450 for (Link link: links.values()) {
451 if (link.toString().contains(host.getId().getValue())) {
452 linksToRemove.add(link);
455 links.removeAll(linksToRemove);
457 LOG.warn("Encountered host with no id , Unexpected host id {}. ", host);
461 public void close() {
462 processorThread.interrupt();
463 this.addrsNodeListenerRegistration.close();
464 this.hostNodeListenerRegistration.close();
465 this.linkNodeListenerRegistration.close();
466 this.exec.shutdownNow();