Update upstreams to 2023.03 Argon SR3
[l2switch.git] / hosttracker / implementation / src / main / java / org / opendaylight / l2switch / hosttracker / plugin / internal / HostTrackerImpl.java
1 /*
2  * Copyright (c) 2014 AndrĂ© Martins, Colin Dixon, Evan Zeller and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.l2switch.hosttracker.plugin.internal;
10
11 import static java.util.Objects.requireNonNull;
12
13 import com.google.common.base.Preconditions;
14 import com.google.common.util.concurrent.FluentFuture;
15 import java.util.ArrayList;
16 import java.util.Collection;
17 import java.util.List;
18 import java.util.Optional;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.ScheduledExecutorService;
22 import java.util.concurrent.TimeUnit;
23 import org.opendaylight.l2switch.hosttracker.plugin.inventory.Host;
24 import org.opendaylight.l2switch.hosttracker.plugin.util.Utilities;
25 import org.opendaylight.mdsal.binding.api.DataBroker;
26 import org.opendaylight.mdsal.binding.api.DataObjectModification;
27 import org.opendaylight.mdsal.binding.api.DataTreeChangeListener;
28 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
29 import org.opendaylight.mdsal.binding.api.DataTreeModification;
30 import org.opendaylight.mdsal.binding.api.ReadTransaction;
31 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.address.tracker.rev140617.AddressCapableNodeConnector;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.address.tracker.rev140617.address.node.connector.Addresses;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.host.tracker.rev140624.HostId;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.host.tracker.rev140624.HostNode;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.host.tracker.rev140624.host.AttachmentPointsBuilder;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.l2switch.host.tracker.config.rev140528.HostTrackerConfig;
40 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
41 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
42 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TpId;
43 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
44 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
45 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Link;
46 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
47 import org.opendaylight.yangtools.concepts.ListenerRegistration;
48 import org.opendaylight.yangtools.yang.binding.DataObject;
49 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52
53 @SuppressWarnings("rawtypes")
54 public class HostTrackerImpl implements DataTreeChangeListener<DataObject> {
55
56     private static final Logger LOG = LoggerFactory.getLogger(HostTrackerImpl.class);
57
58     private static final int CPUS = Runtime.getRuntime().availableProcessors();
59
60     private static final String TOPOLOGY_NAME = "flow:1";
61
62     private final DataBroker dataService;
63     private final String topologyId;
64     private final long hostPurgeInterval;
65     private final long hostPurgeAge;
66
67     private final ScheduledExecutorService exec = Executors.newScheduledThreadPool(CPUS);
68
69     private final ConcurrentClusterAwareHostHashMap hosts;
70     private final ConcurrentClusterAwareLinkHashMap links;
71     private final OperationProcessor opProcessor;
72     private final Thread processorThread;
73     private ListenerRegistration<DataTreeChangeListener> addrsNodeListenerRegistration;
74     private ListenerRegistration<DataTreeChangeListener> hostNodeListenerRegistration;
75     private ListenerRegistration<DataTreeChangeListener> linkNodeListenerRegistration;
76
77     /**
78      * It creates hosts using reference to MD-SAl / toplogy module. For every hostPurgeIntervalInput time interval
79      * it requests to purge hosts that are not seen for hostPurgeAgeInput time interval.
80      *
81      * @param dataService A reference to the MD-SAL
82      * @param config Default configuration
83      */
84     public HostTrackerImpl(final DataBroker dataService, final HostTrackerConfig config) {
85         this.dataService = requireNonNull(dataService);
86         this.opProcessor = new OperationProcessor(dataService);
87         Preconditions.checkArgument(config.getHostPurgeAge() >= 0, "hostPurgeAgeInput must be non-negative");
88         Preconditions.checkArgument(config.getHostPurgeInterval() >= 0, "hostPurgeIntervalInput must be non-negative");
89         this.hostPurgeAge = config.getHostPurgeAge();
90         this.hostPurgeInterval = config.getHostPurgeInterval();
91         processorThread = new Thread(opProcessor);
92         final String maybeTopologyId = config.getTopologyId();
93         if (maybeTopologyId == null || maybeTopologyId.isEmpty()) {
94             this.topologyId = TOPOLOGY_NAME;
95         } else {
96             this.topologyId = maybeTopologyId;
97         }
98         this.hosts = new ConcurrentClusterAwareHostHashMap(opProcessor, this.topologyId);
99         this.links = new ConcurrentClusterAwareLinkHashMap(opProcessor);
100
101         if (hostPurgeInterval > 0) {
102             exec.scheduleWithFixedDelay(() -> purgeHostsNotSeenInLast(hostPurgeAge), 0, hostPurgeInterval,
103                     TimeUnit.SECONDS);
104         }
105     }
106
107     @SuppressWarnings("unchecked")
108     public void init() {
109         processorThread.start();
110
111         InstanceIdentifier<Addresses> addrCapableNodeConnectors =
112                 InstanceIdentifier.builder(Nodes.class)
113                         .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class)
114                         .child(NodeConnector.class)
115                         .augmentation(AddressCapableNodeConnector.class)
116                         .child(Addresses.class).build();
117         this.addrsNodeListenerRegistration = dataService.registerDataTreeChangeListener(
118             DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL, addrCapableNodeConnectors),
119             // FIXME: add an specialized object instead of going through raw types!
120             (DataTreeChangeListener)this);
121
122         InstanceIdentifier<HostNode> hostNodes = InstanceIdentifier.builder(NetworkTopology.class)
123                 .child(Topology.class, new TopologyKey(new TopologyId(topologyId)))
124                 .child(Node.class)
125                 .augmentation(HostNode.class).build();
126         this.hostNodeListenerRegistration = dataService.registerDataTreeChangeListener(
127             DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL, hostNodes),
128             // FIXME: add an specialized object instead of going through raw types!
129             (DataTreeChangeListener)this);
130
131         InstanceIdentifier<Link> linkIID = InstanceIdentifier.builder(NetworkTopology.class)
132                 .child(Topology.class, new TopologyKey(new TopologyId(topologyId)))
133                 .child(Link.class).build();
134
135         this.linkNodeListenerRegistration = dataService.registerDataTreeChangeListener(
136             // FIXME: add an specialized object instead of going through raw types!
137             DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL, linkIID), (DataTreeChangeListener)this);
138
139         //Processing addresses that existed before we register as a data change listener.
140 //        ReadOnlyTransaction newReadOnlyTransaction = dataService.newReadOnlyTransaction();
141 //        InstanceIdentifier<NodeConnector> iinc = addrCapableNodeConnectors.firstIdentifierOf(NodeConnector.class);
142 //        InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node> iin
143 //                = addrCapableNodeConnectors.firstIdentifierOf(
144 //                    org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class);
145 //        ListenableFuture<Optional<NodeConnector>> dataFuture = newReadOnlyTransaction.read(
146 //            LogicalDatastoreType.OPERATIONAL, iinc);
147 //        try {
148 //            NodeConnector get = dataFuture.get().get();
149 //            log.trace("test "+get);
150 //        } catch (InterruptedException | ExecutionException ex) {
151 //            java.util.logging.Logger.getLogger(HostTrackerImpl.class.getName()).log(Level.SEVERE, null, ex);
152 //        }
153 //        Futures.addCallback(dataFuture, new FutureCallback<Optional<NodeConnector>>() {
154 //            @Override
155 //            public void onSuccess(final Optional<NodeConnector> result) {
156 //                if (result.isPresent()) {
157 //                    log.trace("Processing NEW NODE? " + result.get().getId().getValue());
158 ////                    processHost(result, dataObject, node);
159 //                }
160 //            }
161
162 //            @Override
163 //            public void onFailure(Throwable arg0) {
164 //            }
165 //        });
166     }
167
168     @Override
169     public void onDataTreeChanged(Collection<DataTreeModification<DataObject>> changes) {
170         exec.submit(() -> {
171             for (DataTreeModification<?> change: changes) {
172                 DataObjectModification<?> rootNode = change.getRootNode();
173                 final InstanceIdentifier<?> identifier = change.getRootPath().getRootIdentifier();
174                 switch (rootNode.getModificationType()) {
175                     case SUBTREE_MODIFIED:
176                     case WRITE:
177                         onModifiedData(identifier, rootNode);
178                         break;
179                     case DELETE:
180                         onDeletedData(identifier, rootNode);
181                         break;
182                     default:
183                         break;
184                 }
185             }
186         });
187     }
188
189     @SuppressWarnings("unchecked")
190     private void onModifiedData(InstanceIdentifier<?> iid, DataObjectModification<?> rootNode) {
191         final DataObject dataObject = rootNode.getDataAfter();
192         if (dataObject instanceof Addresses) {
193             packetReceived((Addresses) dataObject, iid);
194         } else if (dataObject instanceof Node) {
195             hosts.putLocally((InstanceIdentifier<Node>) iid, Host.createHost((Node) dataObject));
196         } else if (dataObject instanceof  Link) {
197             links.putLocally((InstanceIdentifier<Link>) iid, (Link) dataObject);
198         }
199     }
200
201     @SuppressWarnings("unchecked")
202     private void onDeletedData(InstanceIdentifier<?> iid, DataObjectModification<?> rootNode) {
203         if (iid.getTargetType().equals(Node.class)) {
204             Node node = (Node) rootNode.getDataBefore();
205             InstanceIdentifier<Node> iiN = (InstanceIdentifier<Node>) iid;
206             HostNode hostNode = node.augmentation(HostNode.class);
207             if (hostNode != null) {
208                 hosts.removeLocally(iiN);
209             }
210         } else if (iid.getTargetType().equals(Link.class)) {
211             // TODO performance improvement here
212             InstanceIdentifier<Link> iiL = (InstanceIdentifier<Link>) iid;
213             links.removeLocally(iiL);
214             linkRemoved((InstanceIdentifier<Link>) iid, (Link) rootNode.getDataBefore());
215         }
216     }
217
218     public void packetReceived(Addresses addrs, InstanceIdentifier<?> ii) {
219         InstanceIdentifier<NodeConnector> iinc = ii.firstIdentifierOf(NodeConnector.class);
220         InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node> iin =
221             ii.firstIdentifierOf(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class);
222
223         FluentFuture<Optional<NodeConnector>> futureNodeConnector;
224         FluentFuture<Optional<
225             org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node>> futureNode;
226         try (ReadTransaction readTx = dataService.newReadOnlyTransaction()) {
227             futureNodeConnector = readTx.read(LogicalDatastoreType.OPERATIONAL, iinc);
228             futureNode = readTx.read(LogicalDatastoreType.OPERATIONAL, iin);
229             readTx.close();
230         }
231         Optional<NodeConnector> opNodeConnector = null;
232         Optional<org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node> opNode = null;
233         try {
234             opNodeConnector = futureNodeConnector.get();
235             opNode = futureNode.get();
236         } catch (ExecutionException | InterruptedException e) {
237             LOG.warn("Failed to get node connector {}", iinc, e);
238         }
239         if (opNode != null && opNode.isPresent()
240                 && opNodeConnector != null && opNodeConnector.isPresent()) {
241             processHost(opNode.orElseThrow(), opNodeConnector.orElseThrow(), addrs);
242         }
243     }
244
245     private void processHost(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node node,
246                              NodeConnector nodeConnector,
247                              Addresses addrs) {
248         List<Host> hostsToMod = new ArrayList<>();
249         List<Host> hostsToRem = new ArrayList<>();
250         List<Link> linksToRem = new ArrayList<>();
251         List<Link> linksToAdd = new ArrayList<>();
252         synchronized (hosts) {
253             LOG.trace("Processing nodeConnector: {} ", nodeConnector.getId());
254             HostId hostId = Host.createHostId(addrs);
255             if (hostId != null) {
256                 if (isNodeConnectorInternal(nodeConnector)) {
257                     LOG.trace("NodeConnector is internal: {} ", nodeConnector.getId());
258                     removeNodeConnectorFromHost(hostsToMod, hostsToRem, nodeConnector);
259                     hosts.removeAll(hostsToRem);
260                     hosts.putAll(hostsToMod);
261                 } else {
262                     LOG.trace("NodeConnector is NOT internal {} ", nodeConnector.getId());
263                     Host host = new Host(addrs, nodeConnector);
264                     if (hosts.containsKey(host.getId())) {
265                         hosts.get(host.getId()).mergeHostWith(host);
266                     } else {
267                         hosts.put(host.getId(), host);
268                     }
269                     List<Link> newLinks = hosts.get(host.getId()).createLinks(node);
270                     if (newLinks != null) {
271                         linksToAdd.addAll(newLinks);
272                     }
273                     hosts.submit(host.getId());
274                 }
275             }
276         }
277         writeDataToDataStore(linksToAdd, linksToRem);
278     }
279
280     /**
281      * It verifies if a given NodeConnector is *internal*. An *internal*
282      * NodeConnector is considered to be all NodeConnetors that are NOT attached
283      * to hosts created by hosttracker.
284      *
285      * @param nodeConnector the nodeConnector to check if it is internal or not.
286      * @return true if it was found a host connected to this nodeConnetor, false
287      *     if it was not found a network topology or it was not found a host connected to this nodeConnetor.
288      */
289     private boolean isNodeConnectorInternal(NodeConnector nodeConnector) {
290         TpId tpId = new TpId(nodeConnector.key().getId().getValue());
291         InstanceIdentifier<NetworkTopology> ntII
292                 = InstanceIdentifier.builder(NetworkTopology.class).build();
293         FluentFuture<Optional<NetworkTopology>> lfONT;
294         try (ReadTransaction rot = dataService.newReadOnlyTransaction()) {
295             lfONT = rot.read(LogicalDatastoreType.OPERATIONAL, ntII);
296         }
297         Optional<NetworkTopology> optionalNT;
298         try {
299             optionalNT = lfONT.get();
300         } catch (InterruptedException | ExecutionException e) {
301             LOG.warn("Failed to get network topology {}", ntII, e);
302             return false;
303         }
304         if (optionalNT.isPresent()) {
305             NetworkTopology networkTopo = optionalNT.orElseThrow();
306             for (Topology t : networkTopo.nonnullTopology().values()) {
307                 if (t.getLink() != null) {
308                     for (Link l : t.nonnullLink().values()) {
309                         if (l.getSource().getSourceTp().equals(tpId)
310                                 && !l.getDestination().getDestTp().getValue().startsWith(Host.NODE_PREFIX)
311                                 || l.getDestination().getDestTp().equals(tpId)
312                                 && !l.getSource().getSourceTp().getValue().startsWith(Host.NODE_PREFIX)) {
313                             return true;
314                         }
315                     }
316                 }
317             }
318         }
319         return false;
320     }
321
322     private void removeLinksFromHosts(List<Host> hostsToMod, List<Host> hostsToRem, Link linkRemoved) {
323         for (Host h : hosts.values()) {
324             h.removeTerminationPoint(linkRemoved.getSource().getSourceTp());
325             h.removeTerminationPoint(linkRemoved.getDestination().getDestTp());
326             if (h.isOrphan()) {
327                 hostsToRem.add(h);
328             } else {
329                 hostsToMod.add(h);
330             }
331         }
332     }
333
334     private void removeNodeConnectorFromHost(List<Host> hostsToMod, List<Host> hostsToRem, NodeConnector nc) {
335         AttachmentPointsBuilder atStD = Utilities.createAPsfromNodeConnector(nc);
336         for (Host h : hosts.values()) {
337             h.removeAttachmentPoints(atStD);
338             if (h.isOrphan()) {
339                 hostsToRem.add(h);
340             } else {
341                 hostsToMod.add(h);
342             }
343         }
344     }
345
346     private void linkRemoved(InstanceIdentifier<Link> iiLink, Link linkRemoved) {
347         LOG.trace("linkRemoved");
348         List<Host> hostsToMod = new ArrayList<>();
349         List<Host> hostsToRem = new ArrayList<>();
350         synchronized (hosts) {
351             removeLinksFromHosts(hostsToMod, hostsToRem, linkRemoved);
352             hosts.removeAll(hostsToRem);
353             hosts.putAll(hostsToMod);
354         }
355     }
356
357     private void writeDataToDataStore(List<Link> linksToAdd, List<Link> linksToRemove) {
358         if (linksToAdd != null) {
359             for (final Link l : linksToAdd) {
360                 final InstanceIdentifier<Link> lIID = Utilities.buildLinkIID(l.key(), topologyId);
361                 LOG.trace("Writing link from MD_SAL: {}", lIID.toString());
362                 opProcessor.enqueueOperation(
363                     tx -> tx.mergeParentStructureMerge(LogicalDatastoreType.OPERATIONAL, lIID, l));
364             }
365         }
366         if (linksToRemove != null) {
367             for (Link l : linksToRemove) {
368                 final InstanceIdentifier<Link> lIID = Utilities.buildLinkIID(l.key(), topologyId);
369                 LOG.trace("Removing link from MD_SAL: {}", lIID.toString());
370                 opProcessor.enqueueOperation(tx -> tx.delete(LogicalDatastoreType.OPERATIONAL,  lIID));
371             }
372         }
373     }
374
375     /**
376      * Remove all hosts that haven't been observed more recently than the specified number of
377      * hostsPurgeAgeInSeconds.
378      *
379      * @param hostsPurgeAgeInSeconds remove hosts that haven't been observed in longer than this number of
380      *               hostsPurgeAgeInSeconds.
381      */
382     protected void purgeHostsNotSeenInLast(final long hostsPurgeAgeInSeconds) {
383         int numHostsPurged = 0;
384         final long nowInMillis = System.currentTimeMillis();
385         final long nowInSeconds = TimeUnit.MILLISECONDS.toSeconds(nowInMillis);
386         // iterate through all hosts in the local cache
387         for (Host h : hosts.values()) {
388             final HostNode hn = h.getHostNode().augmentation(HostNode.class);
389             if (hn == null) {
390                 LOG.warn("Encountered non-host node {} in hosts during purge", h);
391             } else if (hn.getAddresses() != null) {
392                 boolean purgeHosts = false;
393                 // if the node is a host and has addresses, check to see if it's been seen recently
394                 purgeHosts = hostReadyForPurge(hn, nowInSeconds,hostsPurgeAgeInSeconds);
395                 if (purgeHosts) {
396                     numHostsPurged = removeHosts(h, numHostsPurged);
397                 }
398             } else {
399                 LOG.warn("Encountered host node {} with no address in hosts during purge", hn);
400             }
401         }
402         LOG.debug("Number of purged hosts during current purge interval - {}. ", numHostsPurged);
403     }
404
405     /**
406      * Checks if hosts need to be purged.
407      *
408      * @param hostNode reference to HostNode class
409      * @param currentTimeInSeconds current time in seconds
410      * @param expirationPeriod timelimit set to hosts for expiration
411      * @return boolean - whether the hosts are ready to be purged
412      */
413     private static boolean hostReadyForPurge(final HostNode hostNode, final long currentTimeInSeconds,
414             final long expirationPeriod) {
415         // checks if hosts need to be purged
416         for (Addresses addrs : hostNode.nonnullAddresses().values()) {
417             long lastSeenTimeInSeconds = addrs.getLastSeen() / 1000;
418             if (lastSeenTimeInSeconds > currentTimeInSeconds - expirationPeriod) {
419                 LOG.debug("Host node {} NOT ready for purge", hostNode);
420                 return false;
421             }
422         }
423         LOG.debug("Host node {} ready for purge", hostNode);
424         return true;
425     }
426
427     /**
428      * Removes hosts from locally and MD-SAL. Throws warning message if not removed successfully
429      *
430      * @param host  reference to Host node
431      */
432     private int removeHosts(final Host host, int numHostsPurged) {
433         // remove associated links with the host before removing hosts
434         removeAssociatedLinksFromHosts(host);
435         // purge hosts from local & MD-SAL database
436         final HostId hostId = host.getId();
437         if (hosts.remove(hostId) != null) {
438             numHostsPurged++;
439             LOG.debug("Removed host with id {} during purge.", hostId);
440         } else {
441             LOG.warn("Unexpected error encountered - Failed to remove host {} during purge", host);
442         }
443
444         return numHostsPurged;
445     }
446
447     /**
448      * Removes links associated with the given hosts from local and MD-SAL database.
449      * Throws warning message if not removed successfully.
450      *
451      * @param host  reference to Host node
452      */
453     private void removeAssociatedLinksFromHosts(final Host host) {
454         if (host.getId() != null) {
455             List<Link> linksToRemove = new ArrayList<>();
456             for (Link link: links.values()) {
457                 if (link.toString().contains(host.getId().getValue())) {
458                     linksToRemove.add(link);
459                 }
460             }
461             links.removeAll(linksToRemove);
462         } else {
463             LOG.warn("Encountered host with no id , Unexpected host id {}. ", host);
464         }
465     }
466
467     public void close() {
468         processorThread.interrupt();
469         this.addrsNodeListenerRegistration.close();
470         this.hostNodeListenerRegistration.close();
471         this.linkNodeListenerRegistration.close();
472         this.exec.shutdownNow();
473         this.hosts.clear();
474     }
475 }