Merge "Implement transaction chain for hosttracker in an attempt to alleviate BUG2640."
[l2switch.git] / hosttracker / implementation / src / main / java / org / opendaylight / l2switch / hosttracker / plugin / internal / HostTrackerImpl.java
1 /**
2  * Copyright (c) 2014 AndrĂ© Martins, Colin Dixon, Evan Zeller and others. All
3  * rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9 package org.opendaylight.l2switch.hosttracker.plugin.internal;
10
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import java.util.ArrayList;
15 import java.util.List;
16 import java.util.Map;
17 import java.util.Set;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.Executors;
21 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
22 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
23 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
24 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
25 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
26 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
27 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
28 import org.opendaylight.l2switch.hosttracker.plugin.inventory.Host;
29 import org.opendaylight.l2switch.hosttracker.plugin.util.Utilities;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.address.tracker.rev140617.AddressCapableNodeConnector;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.address.tracker.rev140617.address.node.connector.Addresses;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.host.tracker.rev140624.HostId;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.host.tracker.rev140624.HostNode;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.host.tracker.rev140624.host.AttachmentPointsBuilder;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
37 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
38 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
39 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TpId;
40 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
41 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
42 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Link;
43 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
44 import org.opendaylight.yangtools.concepts.ListenerRegistration;
45 import org.opendaylight.yangtools.yang.binding.DataObject;
46 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 public class HostTrackerImpl implements DataChangeListener {
51
52     private static final int CPUS = Runtime.getRuntime().availableProcessors();
53
54     /**
55      * As defined on
56      * controller/opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/FlowCapableTopologyProvider.java
57      */
58     private static final String TOPOLOGY_NAME = "flow:1";
59
60     private static final Logger log = LoggerFactory.getLogger(HostTrackerImpl.class);
61
62     private final DataBroker dataService;
63     private final String topologyId;
64
65     ExecutorService exec = Executors.newFixedThreadPool(CPUS);
66
67     private final ConcurrentClusterAwareHostHashMap<HostId, Host> hosts;
68     private final OperationProcessor opProcessor;
69     private ListenerRegistration<DataChangeListener> addrsNodeListerRegistration;
70     private ListenerRegistration<DataChangeListener> hostNodeListerRegistration;
71
72     public HostTrackerImpl(DataBroker dataService, String topologyId) {
73         Preconditions.checkNotNull(dataService, "dataBrokerService should not be null.");
74         this.dataService = dataService;
75         this.opProcessor = new OperationProcessor(dataService);
76         Thread processorThread = new Thread(opProcessor);
77         processorThread.start();
78         if (topologyId == null || topologyId.isEmpty()) {
79             this.topologyId = TOPOLOGY_NAME;
80         } else {
81             this.topologyId = topologyId;
82         }
83         this.hosts = new ConcurrentClusterAwareHostHashMap<>(opProcessor, this.topologyId);
84     }
85
86     public void registerAsDataChangeListener() {
87         InstanceIdentifier<Addresses> addrCapableNodeConnectors = //
88                 InstanceIdentifier.builder(Nodes.class) //
89                 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class) //
90                 .child(NodeConnector.class) //
91                 .augmentation(AddressCapableNodeConnector.class)//
92                 .child(Addresses.class).build();
93         this.addrsNodeListerRegistration = dataService.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, addrCapableNodeConnectors, this, DataChangeScope.SUBTREE);
94
95         InstanceIdentifier<HostNode> hostNodes = InstanceIdentifier.builder(NetworkTopology.class)//
96                 .child(Topology.class, new TopologyKey(new TopologyId(topologyId)))//
97                 .child(Node.class)
98                 .augmentation(HostNode.class).build();
99         this.hostNodeListerRegistration = dataService.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, hostNodes, this, DataChangeScope.SUBTREE);
100
101         InstanceIdentifier<Link> lIID = InstanceIdentifier.builder(NetworkTopology.class)//
102                 .child(Topology.class, new TopologyKey(new TopologyId(topologyId)))//
103                 .child(Link.class).build();
104
105         this.addrsNodeListerRegistration = dataService.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, lIID, this, DataChangeScope.BASE);
106
107         //Processing addresses that existed before we register as a data change listener.
108 //        ReadOnlyTransaction newReadOnlyTransaction = dataService.newReadOnlyTransaction();
109 //        InstanceIdentifier<NodeConnector> iinc = addrCapableNodeConnectors.firstIdentifierOf(NodeConnector.class);
110 //        InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node> iin//
111 //                = addrCapableNodeConnectors.firstIdentifierOf(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class);
112 //        ListenableFuture<Optional<NodeConnector>> dataFuture = newReadOnlyTransaction.read(LogicalDatastoreType.OPERATIONAL, iinc);
113 //        try {
114 //            NodeConnector get = dataFuture.get().get();
115 //            log.trace("test "+get);
116 //        } catch (InterruptedException | ExecutionException ex) {
117 //            java.util.logging.Logger.getLogger(HostTrackerImpl.class.getName()).log(Level.SEVERE, null, ex);
118 //        }
119 //        Futures.addCallback(dataFuture, new FutureCallback<Optional<NodeConnector>>() {
120 //            @Override
121 //            public void onSuccess(final Optional<NodeConnector> result) {
122 //                if (result.isPresent()) {
123 //                    log.trace("Processing NEW NODE? " + result.get().getId().getValue());
124 ////                    processHost(result, dataObject, node);
125 //                }
126 //            }
127 //
128 //            @Override
129 //            public void onFailure(Throwable arg0) {
130 //            }
131 //        });
132     }
133
134     @Override
135     public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
136
137         exec.submit(new Runnable() {
138             @Override
139             public void run() {
140                 if (change == null) {
141                     log.info("In onDataChanged: No processing done as change even is null.");
142                     return;
143                 }
144                 Map<InstanceIdentifier<?>, DataObject> updatedData = change.getUpdatedData();
145                 Map<InstanceIdentifier<?>, DataObject> createdData = change.getCreatedData();
146                 Map<InstanceIdentifier<?>, DataObject> originalData = change.getOriginalData();
147                 Set<InstanceIdentifier<?>> deletedData = change.getRemovedPaths();
148
149                 for (InstanceIdentifier<?> iid : deletedData) {
150                     if (iid.getTargetType().equals(Node.class)) {
151                         Node node = ((Node) originalData.get(iid));
152                         InstanceIdentifier<Node> iiN = (InstanceIdentifier<Node>) iid;
153                         HostNode hostNode = node.getAugmentation(HostNode.class);
154                         if (hostNode != null) {
155                             synchronized (hosts) {
156                                 try {
157                                     hosts.removeLocally(iiN);
158                                 } catch (ClassCastException ex) {
159                                 }
160                             }
161                         }
162                     } else if (iid.getTargetType().equals(Link.class)) {
163                         // TODO performance improvement here
164                         linkRemoved((InstanceIdentifier<Link>) iid, (Link) originalData.get(iid));
165                     }
166                 }
167
168                 for (Map.Entry<InstanceIdentifier<?>, DataObject> entrySet : updatedData.entrySet()) {
169                     InstanceIdentifier<?> iiD = entrySet.getKey();
170                     final DataObject dataObject = entrySet.getValue();
171                     if (dataObject instanceof Addresses) {
172                         packetReceived((Addresses) dataObject, iiD);
173                     } else if (dataObject instanceof Node) {
174                         synchronized (hosts) {
175                             hosts.putLocally((InstanceIdentifier<Node>) iiD, Host.createHost((Node) dataObject));
176                         }
177                     }
178                 }
179
180                 for (Map.Entry<InstanceIdentifier<?>, DataObject> entrySet : createdData.entrySet()) {
181                     InstanceIdentifier<?> iiD = entrySet.getKey();
182                     final DataObject dataObject = entrySet.getValue();
183                     if (dataObject instanceof Addresses) {
184                         packetReceived((Addresses) dataObject, iiD);
185                     } else if (dataObject instanceof Node) {
186                         synchronized (hosts) {
187                             hosts.putLocally((InstanceIdentifier<Node>) iiD, Host.createHost((Node) dataObject));
188                         }
189                     }
190                 }
191             }
192         });
193     }
194
195     public void packetReceived(Addresses addrs, InstanceIdentifier<?> ii) {
196         InstanceIdentifier<NodeConnector> iinc = ii.firstIdentifierOf(NodeConnector.class);
197         InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node> iin//
198                 = ii.firstIdentifierOf(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class);
199
200         ListenableFuture<Optional<NodeConnector>> futureNodeConnector;
201         ListenableFuture<Optional<org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node>> futureNode;
202         try (ReadOnlyTransaction readTx = dataService.newReadOnlyTransaction()) {
203             futureNodeConnector = readTx.read(LogicalDatastoreType.OPERATIONAL, iinc);
204             futureNode = readTx.read(LogicalDatastoreType.OPERATIONAL, iin);
205             readTx.close();
206         }
207         Optional<NodeConnector> opNodeConnector = null;
208         Optional<org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node> opNode = null;
209         try {
210             opNodeConnector = futureNodeConnector.get();
211             opNode = futureNode.get();
212         } catch (ExecutionException | InterruptedException ex) {
213             log.warn(ex.getLocalizedMessage());
214         }
215         if (opNode != null && opNode.isPresent()
216                 && opNodeConnector != null && opNodeConnector.isPresent()) {
217             processHost(opNode.get(), opNodeConnector.get(), addrs);
218         }
219     }
220
221     private void processHost(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node node,
222             NodeConnector nodeConnector,
223             Addresses addrs) {
224         List<Host> hostsToMod = new ArrayList<>();
225         List<Host> hostsToRem = new ArrayList<>();
226         List<Link> linksToRem = new ArrayList<>();
227         List<Link> linksToAdd = new ArrayList<>();
228         synchronized (hosts) {
229             log.trace("Processing nodeConnector " + nodeConnector.getId().toString());
230             HostId hId = Host.createHostId(addrs);
231             if (hId != null) {
232                 if (isNodeConnectorInternal(nodeConnector)) {
233                     log.trace("NodeConnector is internal " + nodeConnector.getId().toString());
234
235                     removeNodeConnectorFromHost(hostsToMod, hostsToRem, nodeConnector);
236                     hosts.removeAll(hostsToRem);
237                     hosts.putAll(hostsToMod);
238                 } else {
239                     log.trace("NodeConnector is NOT internal " + nodeConnector.getId().toString());
240                     Host host = new Host(addrs, nodeConnector);
241                     if (hosts.containsKey(host.getId())) {
242                         hosts.get(host.getId()).mergeHostWith(host);
243                     } else {
244                         hosts.put(host.getId(), host);
245                     }
246                     List<Link> newLinks = hosts.get(host.getId()).createLinks(node);
247                     if (newLinks != null) {
248                         linksToAdd.addAll(newLinks);
249                     }
250                     hosts.submit(host.getId());
251                 }
252             }
253         }
254         writeDatatoMDSAL(linksToAdd, linksToRem);
255     }
256
257     /**
258      * It verifies if a given NodeConnector is *internal*. An *internal*
259      * NodeConnector is considered to be all NodeConnetors that are NOT attached
260      * to hosts created by hosttracker.
261      *
262      * @param nodeConnector the nodeConnector to check if it is internal or not.
263      * @return true if it was found a host connected to this nodeConnetor, false
264      * if it was not found a network topology or it was not found a host
265      * connected to this nodeConnetor.
266      */
267     private boolean isNodeConnectorInternal(NodeConnector nodeConnector) {
268         TpId tpId = new TpId(nodeConnector.getKey().getId().getValue());
269         InstanceIdentifier<NetworkTopology> ntII
270                 = InstanceIdentifier.builder(NetworkTopology.class).build();
271         ListenableFuture<Optional<NetworkTopology>> lfONT;
272         try (ReadOnlyTransaction rot = dataService.newReadOnlyTransaction()) {
273             lfONT = rot.read(LogicalDatastoreType.OPERATIONAL, ntII);
274             rot.close();
275         }
276         Optional<NetworkTopology> oNT;
277         try {
278             oNT = lfONT.get();
279         } catch (InterruptedException | ExecutionException ex) {
280             log.warn(ex.getLocalizedMessage());
281             return false;
282         }
283         if (oNT != null && oNT.isPresent()) {
284             NetworkTopology networkTopo = oNT.get();
285             for (Topology t : networkTopo.getTopology()) {
286                 if (t.getLink() != null) {
287                     for (Link l : t.getLink()) {
288                         if ((l.getSource().getSourceTp().equals(tpId)
289                                 && !l.getDestination().getDestTp().getValue().startsWith(Host.NODE_PREFIX))
290                                 || (l.getDestination().getDestTp().equals(tpId)
291                                 && !l.getSource().getSourceTp().getValue().startsWith(Host.NODE_PREFIX))) {
292                             return true;
293                         }
294                     }
295                 }
296             }
297         }
298         return false;
299     }
300
301     private void removeLinksFromHosts(List<Host> hostsToMod, List<Host> hostsToRem, Link linkRemoved) {
302         for (Host h : hosts.values()) {
303             h.removeTerminationPoint(linkRemoved.getSource().getSourceTp());
304             h.removeTerminationPoint(linkRemoved.getDestination().getDestTp());
305             if (h.isOrphan()) {
306                 hostsToRem.add(h);
307             } else {
308                 hostsToMod.add(h);
309             }
310         }
311     }
312
313     private void removeNodeConnectorFromHost(List<Host> hostsToMod, List<Host> hostsToRem, NodeConnector nc) {
314         AttachmentPointsBuilder atStD = Utilities.createAPsfromNodeConnector(nc);
315         for (Host h : hosts.values()) {
316             h.removeAttachmentPoints(atStD);
317             if (h.isOrphan()) {
318                 hostsToRem.add(h);
319             } else {
320                 hostsToMod.add(h);
321             }
322         }
323     }
324
325     private void linkRemoved(InstanceIdentifier<Link> iiLink, Link linkRemoved) {
326         log.trace("linkRemoved");
327         List<Host> hostsToMod = new ArrayList<>();
328         List<Host> hostsToRem = new ArrayList<>();
329         synchronized (hosts) {
330             removeLinksFromHosts(hostsToMod, hostsToRem, linkRemoved);
331             hosts.removeAll(hostsToRem);
332             hosts.putAll(hostsToMod);
333         }
334     }
335
336     private void writeDatatoMDSAL(List<Link> linksToAdd, List<Link> linksToRemove){
337         if (linksToAdd != null) {
338             for (final Link l : linksToAdd) {
339                 final InstanceIdentifier<Link> lIID = Utilities.buildLinkIID(l.getKey(), topologyId);
340                 log.trace("Writing link from MD_SAL: " + lIID.toString());
341                 opProcessor.enqueueOperation(new HostTrackerOperation() {
342                     @Override
343                     public void applyOperation(ReadWriteTransaction tx) {
344                         tx.merge(LogicalDatastoreType.OPERATIONAL, lIID, l, true);
345                     }
346                 });
347             }
348         }
349         if (linksToRemove != null) {
350             for (Link l : linksToRemove) {
351                 final InstanceIdentifier<Link> lIID = Utilities.buildLinkIID(l.getKey(), topologyId);
352                 log.trace("Removing link from MD_SAL: " + lIID.toString());
353                 opProcessor.enqueueOperation(new HostTrackerOperation() {
354                     @Override
355                     public void applyOperation(ReadWriteTransaction tx) {
356                         tx.delete(LogicalDatastoreType.OPERATIONAL,  lIID);
357                     }
358                 });
359             }
360         }
361     }
362
363     public void close() {
364         this.addrsNodeListerRegistration.close();
365         this.hostNodeListerRegistration.close();
366         synchronized (hosts) {
367             this.hosts.clear();
368         }
369     }
370 }