2fd37c3367e26c2e1e442cef19589018c8e44ee7
[l2switch.git] / hosttracker / implementation / src / main / java / org / opendaylight / l2switch / hosttracker / plugin / internal / HostTrackerImpl.java
1 /**
2  * Copyright (c) 2014 AndrĂ© Martins, Colin Dixon, Evan Zeller and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.l2switch.hosttracker.plugin.internal;
9
10 import com.google.common.base.Optional;
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import java.util.ArrayList;
14 import java.util.Collection;
15 import java.util.List;
16 import java.util.concurrent.ExecutionException;
17 import java.util.concurrent.Executors;
18 import java.util.concurrent.ScheduledExecutorService;
19 import java.util.concurrent.TimeUnit;
20 import javax.annotation.Nonnull;
21 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
22 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
23 import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
24 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
25 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
26 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
27 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
28 import org.opendaylight.l2switch.hosttracker.plugin.inventory.Host;
29 import org.opendaylight.l2switch.hosttracker.plugin.util.Utilities;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.address.tracker.rev140617.AddressCapableNodeConnector;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.address.tracker.rev140617.address.node.connector.Addresses;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.host.tracker.rev140624.HostId;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.host.tracker.rev140624.HostNode;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.host.tracker.rev140624.host.AttachmentPointsBuilder;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.l2switch.host.tracker.config.rev140528.HostTrackerConfig;
38 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
39 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
40 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TpId;
41 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
42 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
43 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Link;
44 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
45 import org.opendaylight.yangtools.concepts.ListenerRegistration;
46 import org.opendaylight.yangtools.yang.binding.DataObject;
47 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 @SuppressWarnings("rawtypes")
52 public class HostTrackerImpl implements DataTreeChangeListener<DataObject> {
53
54     private static final int CPUS = Runtime.getRuntime().availableProcessors();
55
56     private static final String TOPOLOGY_NAME = "flow:1";
57
58     private static final Logger LOG = LoggerFactory.getLogger(HostTrackerImpl.class);
59
60     private final DataBroker dataService;
61     private final String topologyId;
62     private final long hostPurgeInterval;
63     private final long hostPurgeAge;
64
65     private final ScheduledExecutorService exec = Executors.newScheduledThreadPool(CPUS);
66
67     private final ConcurrentClusterAwareHostHashMap hosts;
68     private final ConcurrentClusterAwareLinkHashMap links;
69     private final OperationProcessor opProcessor;
70     private final Thread processorThread;
71     private ListenerRegistration<DataTreeChangeListener> addrsNodeListenerRegistration;
72     private ListenerRegistration<DataTreeChangeListener> hostNodeListenerRegistration;
73     private ListenerRegistration<DataTreeChangeListener> linkNodeListenerRegistration;
74
75     /**
76      * It creates hosts using reference to MD-SAl / toplogy module. For every hostPurgeIntervalInput time interval
77      * it requests to purge hosts that are not seen for hostPurgeAgeInput time interval.
78      *
79      * @param dataService A reference to the MD-SAL
80      * @param config Default configuration
81      */
82     public HostTrackerImpl(final DataBroker dataService, final HostTrackerConfig config) {
83         Preconditions.checkNotNull(dataService, "dataBrokerService should not be null.");
84         Preconditions.checkArgument(config.getHostPurgeAge() >= 0, "hostPurgeAgeInput must be non-negative");
85         Preconditions.checkArgument(config.getHostPurgeInterval() >= 0, "hostPurgeIntervalInput must be non-negative");
86         this.dataService = dataService;
87         this.hostPurgeAge = config.getHostPurgeAge();
88         this.hostPurgeInterval = config.getHostPurgeInterval();
89         this.opProcessor = new OperationProcessor(dataService);
90         processorThread = new Thread(opProcessor);
91         final String maybeTopologyId = config.getTopologyId();
92         if (maybeTopologyId == null || maybeTopologyId.isEmpty()) {
93             this.topologyId = TOPOLOGY_NAME;
94         } else {
95             this.topologyId = maybeTopologyId;
96         }
97         this.hosts = new ConcurrentClusterAwareHostHashMap(opProcessor, this.topologyId);
98         this.links = new ConcurrentClusterAwareLinkHashMap(opProcessor);
99
100         if (hostPurgeInterval > 0) {
101             exec.scheduleWithFixedDelay(() -> purgeHostsNotSeenInLast(hostPurgeAge), 0, hostPurgeInterval,
102                     TimeUnit.SECONDS);
103         }
104     }
105
106     @SuppressWarnings("unchecked")
107     public void init() {
108         processorThread.start();
109
110         InstanceIdentifier<Addresses> addrCapableNodeConnectors = //
111                 InstanceIdentifier.builder(Nodes.class) //
112                         .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class)
113                         .child(NodeConnector.class) //
114                         .augmentation(AddressCapableNodeConnector.class)//
115                         .child(Addresses.class).build();
116         this.addrsNodeListenerRegistration = dataService.registerDataTreeChangeListener(new DataTreeIdentifier<>(
117                 LogicalDatastoreType.OPERATIONAL, addrCapableNodeConnectors), (DataTreeChangeListener)this);
118
119         InstanceIdentifier<HostNode> hostNodes = InstanceIdentifier.builder(NetworkTopology.class)//
120                 .child(Topology.class, new TopologyKey(new TopologyId(topologyId)))//
121                 .child(Node.class)
122                 .augmentation(HostNode.class).build();
123         this.hostNodeListenerRegistration = dataService.registerDataTreeChangeListener(new DataTreeIdentifier<>(
124                 LogicalDatastoreType.OPERATIONAL, hostNodes), (DataTreeChangeListener)this);
125
126         InstanceIdentifier<Link> linkIID = InstanceIdentifier.builder(NetworkTopology.class)//
127                 .child(Topology.class, new TopologyKey(new TopologyId(topologyId)))
128                 .child(Link.class).build();
129
130         this.linkNodeListenerRegistration = dataService.registerDataTreeChangeListener(new DataTreeIdentifier<>(
131                 LogicalDatastoreType.OPERATIONAL, linkIID), (DataTreeChangeListener)this);
132
133         //Processing addresses that existed before we register as a data change listener.
134 //        ReadOnlyTransaction newReadOnlyTransaction = dataService.newReadOnlyTransaction();
135 //        InstanceIdentifier<NodeConnector> iinc = addrCapableNodeConnectors.firstIdentifierOf(NodeConnector.class);
136 //        InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node> iin
137 //                = addrCapableNodeConnectors.firstIdentifierOf(
138 //                    org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class);
139 //        ListenableFuture<Optional<NodeConnector>> dataFuture = newReadOnlyTransaction.read(
140 //            LogicalDatastoreType.OPERATIONAL, iinc);
141 //        try {
142 //            NodeConnector get = dataFuture.get().get();
143 //            log.trace("test "+get);
144 //        } catch (InterruptedException | ExecutionException ex) {
145 //            java.util.logging.Logger.getLogger(HostTrackerImpl.class.getName()).log(Level.SEVERE, null, ex);
146 //        }
147 //        Futures.addCallback(dataFuture, new FutureCallback<Optional<NodeConnector>>() {
148 //            @Override
149 //            public void onSuccess(final Optional<NodeConnector> result) {
150 //                if (result.isPresent()) {
151 //                    log.trace("Processing NEW NODE? " + result.get().getId().getValue());
152 ////                    processHost(result, dataObject, node);
153 //                }
154 //            }
155 //
156 //            @Override
157 //            public void onFailure(Throwable arg0) {
158 //            }
159 //        });
160     }
161
162     @Override
163     public void onDataTreeChanged(Collection<DataTreeModification<DataObject>> changes) {
164         exec.submit(() -> {
165             for (DataTreeModification<?> change: changes) {
166                 DataObjectModification<?> rootNode = change.getRootNode();
167                 final InstanceIdentifier<?> identifier = change.getRootPath().getRootIdentifier();
168                 switch (rootNode.getModificationType()) {
169                     case SUBTREE_MODIFIED:
170                     case WRITE:
171                         onModifiedData(identifier, rootNode);
172                         break;
173                     case DELETE:
174                         onDeletedData(identifier, rootNode);
175                         break;
176                     default:
177                         break;
178                 }
179             }
180         });
181     }
182
183     @SuppressWarnings("unchecked")
184     private void onModifiedData(InstanceIdentifier<?> iid, DataObjectModification<?> rootNode) {
185         final DataObject dataObject = rootNode.getDataAfter();
186         if (dataObject instanceof Addresses) {
187             packetReceived((Addresses) dataObject, iid);
188         } else if (dataObject instanceof Node) {
189             hosts.putLocally((InstanceIdentifier<Node>) iid, Host.createHost((Node) dataObject));
190         } else if (dataObject instanceof  Link) {
191             links.putLocally((InstanceIdentifier<Link>) iid, (Link) dataObject);
192         }
193     }
194
195     @SuppressWarnings("unchecked")
196     private void onDeletedData(InstanceIdentifier<?> iid, DataObjectModification<?> rootNode) {
197         if (iid.getTargetType().equals(Node.class)) {
198             Node node = (Node) rootNode.getDataBefore();
199             InstanceIdentifier<Node> iiN = (InstanceIdentifier<Node>) iid;
200             HostNode hostNode = node.getAugmentation(HostNode.class);
201             if (hostNode != null) {
202                 hosts.removeLocally(iiN);
203             }
204         } else if (iid.getTargetType().equals(Link.class)) {
205             // TODO performance improvement here
206             InstanceIdentifier<Link> iiL = (InstanceIdentifier<Link>) iid;
207             links.removeLocally(iiL);
208             linkRemoved((InstanceIdentifier<Link>) iid, (Link) rootNode.getDataBefore());
209         }
210
211     }
212
213     public void packetReceived(Addresses addrs, InstanceIdentifier<?> ii) {
214         InstanceIdentifier<NodeConnector> iinc = ii.firstIdentifierOf(NodeConnector.class);
215         InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node> iin =
216             ii.firstIdentifierOf(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class);
217
218         ListenableFuture<Optional<NodeConnector>> futureNodeConnector;
219         ListenableFuture<Optional<
220             org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node>> futureNode;
221         try (ReadOnlyTransaction readTx = dataService.newReadOnlyTransaction()) {
222             futureNodeConnector = readTx.read(LogicalDatastoreType.OPERATIONAL, iinc);
223             futureNode = readTx.read(LogicalDatastoreType.OPERATIONAL, iin);
224             readTx.close();
225         }
226         Optional<NodeConnector> opNodeConnector = null;
227         Optional<org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node> opNode = null;
228         try {
229             opNodeConnector = futureNodeConnector.get();
230             opNode = futureNode.get();
231         } catch (ExecutionException | InterruptedException ex) {
232             LOG.warn(ex.getLocalizedMessage());
233         }
234         if (opNode != null && opNode.isPresent()
235                 && opNodeConnector != null && opNodeConnector.isPresent()) {
236             processHost(opNode.get(), opNodeConnector.get(), addrs);
237         }
238     }
239
240     private void processHost(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node node,
241                              NodeConnector nodeConnector,
242                              Addresses addrs) {
243         List<Host> hostsToMod = new ArrayList<>();
244         List<Host> hostsToRem = new ArrayList<>();
245         List<Link> linksToRem = new ArrayList<>();
246         List<Link> linksToAdd = new ArrayList<>();
247         synchronized (hosts) {
248             LOG.trace("Processing nodeConnector: {} ", nodeConnector.getId().toString());
249             HostId hostId = Host.createHostId(addrs);
250             if (hostId != null) {
251                 if (isNodeConnectorInternal(nodeConnector)) {
252                     LOG.trace("NodeConnector is internal: {} ", nodeConnector.getId().toString());
253
254                     removeNodeConnectorFromHost(hostsToMod, hostsToRem, nodeConnector);
255                     hosts.removeAll(hostsToRem);
256                     hosts.putAll(hostsToMod);
257                 } else {
258                     LOG.trace("NodeConnector is NOT internal {} ", nodeConnector.getId().toString());
259                     Host host = new Host(addrs, nodeConnector);
260                     if (hosts.containsKey(host.getId())) {
261                         hosts.get(host.getId()).mergeHostWith(host);
262                     } else {
263                         hosts.put(host.getId(), host);
264                     }
265                     List<Link> newLinks = hosts.get(host.getId()).createLinks(node);
266                     if (newLinks != null) {
267                         linksToAdd.addAll(newLinks);
268                     }
269                     hosts.submit(host.getId());
270                 }
271             }
272         }
273         writeDataToDataStore(linksToAdd, linksToRem);
274     }
275
276     /**
277      * It verifies if a given NodeConnector is *internal*. An *internal*
278      * NodeConnector is considered to be all NodeConnetors that are NOT attached
279      * to hosts created by hosttracker.
280      *
281      * @param nodeConnector the nodeConnector to check if it is internal or not.
282      * @return true if it was found a host connected to this nodeConnetor, false
283      *     if it was not found a network topology or it was not found a host connected to this nodeConnetor.
284      */
285     private boolean isNodeConnectorInternal(NodeConnector nodeConnector) {
286         TpId tpId = new TpId(nodeConnector.getKey().getId().getValue());
287         InstanceIdentifier<NetworkTopology> ntII
288                 = InstanceIdentifier.builder(NetworkTopology.class).build();
289         ListenableFuture<Optional<NetworkTopology>> lfONT;
290         try (ReadOnlyTransaction rot = dataService.newReadOnlyTransaction()) {
291             lfONT = rot.read(LogicalDatastoreType.OPERATIONAL, ntII);
292         }
293         Optional<NetworkTopology> optionalNT;
294         try {
295             optionalNT = lfONT.get();
296         } catch (InterruptedException | ExecutionException ex) {
297             LOG.warn(ex.getLocalizedMessage());
298             return false;
299         }
300         if (optionalNT.isPresent()) {
301             NetworkTopology networkTopo = optionalNT.get();
302             for (Topology t : networkTopo.getTopology()) {
303                 if (t.getLink() != null) {
304                     for (Link l : t.getLink()) {
305                         if (l.getSource().getSourceTp().equals(tpId)
306                                 && !l.getDestination().getDestTp().getValue().startsWith(Host.NODE_PREFIX)
307                                 || l.getDestination().getDestTp().equals(tpId)
308                                 && !l.getSource().getSourceTp().getValue().startsWith(Host.NODE_PREFIX)) {
309                             return true;
310                         }
311                     }
312                 }
313             }
314         }
315         return false;
316     }
317
318     private void removeLinksFromHosts(List<Host> hostsToMod, List<Host> hostsToRem, Link linkRemoved) {
319         for (Host h : hosts.values()) {
320             h.removeTerminationPoint(linkRemoved.getSource().getSourceTp());
321             h.removeTerminationPoint(linkRemoved.getDestination().getDestTp());
322             if (h.isOrphan()) {
323                 hostsToRem.add(h);
324             } else {
325                 hostsToMod.add(h);
326             }
327         }
328     }
329
330     private void removeNodeConnectorFromHost(List<Host> hostsToMod, List<Host> hostsToRem, NodeConnector nc) {
331         AttachmentPointsBuilder atStD = Utilities.createAPsfromNodeConnector(nc);
332         for (Host h : hosts.values()) {
333             h.removeAttachmentPoints(atStD);
334             if (h.isOrphan()) {
335                 hostsToRem.add(h);
336             } else {
337                 hostsToMod.add(h);
338             }
339         }
340     }
341
342     private void linkRemoved(InstanceIdentifier<Link> iiLink, Link linkRemoved) {
343         LOG.trace("linkRemoved");
344         List<Host> hostsToMod = new ArrayList<>();
345         List<Host> hostsToRem = new ArrayList<>();
346         synchronized (hosts) {
347             removeLinksFromHosts(hostsToMod, hostsToRem, linkRemoved);
348             hosts.removeAll(hostsToRem);
349             hosts.putAll(hostsToMod);
350         }
351     }
352
353     private void writeDataToDataStore(List<Link> linksToAdd, List<Link> linksToRemove) {
354         if (linksToAdd != null) {
355             for (final Link l : linksToAdd) {
356                 final InstanceIdentifier<Link> lIID = Utilities.buildLinkIID(l.getKey(), topologyId);
357                 LOG.trace("Writing link from MD_SAL: {}", lIID.toString());
358                 opProcessor.enqueueOperation(tx -> tx.merge(LogicalDatastoreType.OPERATIONAL, lIID, l, true));
359             }
360         }
361         if (linksToRemove != null) {
362             for (Link l : linksToRemove) {
363                 final InstanceIdentifier<Link> lIID = Utilities.buildLinkIID(l.getKey(), topologyId);
364                 LOG.trace("Removing link from MD_SAL: {}", lIID.toString());
365                 opProcessor.enqueueOperation(tx -> tx.delete(LogicalDatastoreType.OPERATIONAL,  lIID));
366             }
367         }
368     }
369
370     /**
371      * Remove all hosts that haven't been observed more recently than the specified number of
372      * hostsPurgeAgeInSeconds.
373      *
374      * @param hostsPurgeAgeInSeconds remove hosts that haven't been observed in longer than this number of
375      *               hostsPurgeAgeInSeconds.
376      */
377     protected void purgeHostsNotSeenInLast(final long hostsPurgeAgeInSeconds) {
378         int numHostsPurged = 0;
379         final long nowInMillis = System.currentTimeMillis();
380         final long nowInSeconds = TimeUnit.MILLISECONDS.toSeconds(nowInMillis);
381         // iterate through all hosts in the local cache
382         for (Host h : hosts.values()) {
383             final HostNode hn = h.getHostNode().getAugmentation(HostNode.class);
384             if (hn == null) {
385                 LOG.warn("Encountered non-host node {} in hosts during purge", h);
386             } else if (hn.getAddresses() != null) {
387                 boolean purgeHosts = false;
388                 // if the node is a host and has addresses, check to see if it's been seen recently
389                 purgeHosts = hostReadyForPurge(hn, nowInSeconds,hostsPurgeAgeInSeconds);
390                 if (purgeHosts) {
391                     numHostsPurged = removeHosts(h, numHostsPurged);
392                 }
393             } else {
394                 LOG.warn("Encountered host node {} with no address in hosts during purge", hn);
395             }
396         }
397         LOG.debug("Number of purged hosts during current purge interval - {}. ", numHostsPurged);
398     }
399
400     /**
401      * Checks if hosts need to be purged.
402      *
403      * @param hostNode reference to HostNode class
404      * @param currentTimeInSeconds current time in seconds
405      * @param expirationPeriod timelimit set to hosts for expiration
406      * @return boolean - whether the hosts are ready to be purged
407      */
408     private boolean hostReadyForPurge(final HostNode hostNode, final long currentTimeInSeconds,
409             final long expirationPeriod) {
410         // checks if hosts need to be purged
411         for (Addresses addrs : hostNode.getAddresses()) {
412             long lastSeenTimeInSeconds = addrs.getLastSeen() / 1000;
413             if (lastSeenTimeInSeconds > currentTimeInSeconds - expirationPeriod) {
414                 LOG.debug("Host node {} NOT ready for purge", hostNode);
415                 return false;
416             }
417         }
418         LOG.debug("Host node {} ready for purge", hostNode);
419         return true;
420     }
421
422     /**
423      * Removes hosts from locally and MD-SAL. Throws warning message if not removed successfully
424      *
425      * @param host  reference to Host node
426      */
427     private int removeHosts(@Nonnull final Host host, int numHostsPurged) {
428         // remove associated links with the host before removing hosts
429         removeAssociatedLinksFromHosts(host);
430         // purge hosts from local & MD-SAL database
431         if (hosts.remove(host.getId()) != null) {
432             numHostsPurged++;
433             LOG.debug("Removed host with id {} during purge.", host.getId());
434         } else {
435             LOG.warn("Unexpected error encountered - Failed to remove host {} during purge", host);
436         }
437
438         return numHostsPurged;
439     }
440
441     /**
442      * Removes links associated with the given hosts from local and MD-SAL database.
443      * Throws warning message if not removed successfully.
444      *
445      * @param host  reference to Host node
446      */
447     private void removeAssociatedLinksFromHosts(@Nonnull final Host host) {
448         if (host.getId() != null) {
449             List<Link> linksToRemove = new ArrayList<>();
450             for (Link link: links.values()) {
451                 if (link.toString().contains(host.getId().getValue())) {
452                     linksToRemove.add(link);
453                 }
454             }
455             links.removeAll(linksToRemove);
456         } else {
457             LOG.warn("Encountered host with no id , Unexpected host id {}. ", host);
458         }
459     }
460
461     public void close() {
462         processorThread.interrupt();
463         this.addrsNodeListenerRegistration.close();
464         this.hostNodeListenerRegistration.close();
465         this.linkNodeListenerRegistration.close();
466         this.exec.shutdownNow();
467         this.hosts.clear();
468     }
469 }