BUG-2637: migration consequence - fix unit test
[controller.git] / opendaylight / md-sal / compatibility / sal-compatibility / src / main / java / org / opendaylight / controller / sal / compatibility / InventoryAndReadAdapter.java
1 /**
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.compatibility;
9
10 import com.google.common.base.Optional;
11 import com.google.common.cache.Cache;
12 import com.google.common.cache.CacheBuilder;
13 import com.google.common.collect.Iterables;
14
15 import org.opendaylight.controller.md.sal.binding.util.TypeSafeDataReader;
16 import org.opendaylight.controller.sal.binding.api.data.DataBrokerService;
17 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
18 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
19 import org.opendaylight.controller.sal.core.ConstructionException;
20 import org.opendaylight.controller.sal.core.Edge;
21 import org.opendaylight.controller.sal.core.Node;
22 import org.opendaylight.controller.sal.core.NodeConnector;
23 import org.opendaylight.controller.sal.core.NodeTable;
24 import org.opendaylight.controller.sal.core.NodeTable.NodeTableIDType;
25 import org.opendaylight.controller.sal.core.Property;
26 import org.opendaylight.controller.sal.core.UpdateType;
27 import org.opendaylight.controller.sal.inventory.IPluginInInventoryService;
28 import org.opendaylight.controller.sal.inventory.IPluginOutInventoryService;
29 import org.opendaylight.controller.sal.reader.FlowOnNode;
30 import org.opendaylight.controller.sal.reader.IPluginInReadService;
31 import org.opendaylight.controller.sal.reader.IPluginOutReadService;
32 import org.opendaylight.controller.sal.reader.NodeConnectorStatistics;
33 import org.opendaylight.controller.sal.reader.NodeDescription;
34 import org.opendaylight.controller.sal.reader.NodeTableStatistics;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsUpdate;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowsStatisticsUpdate;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowStatisticsFromFlowTableInput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowStatisticsFromFlowTableInputBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowStatisticsFromFlowTableOutput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableInputBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsData;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.FlowTableStatisticsUpdate;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.GetFlowTablesStatisticsInputBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsListener;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsService;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.and.statistics.map.FlowTableAndStatisticsMap;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.statistics.FlowTableStatistics;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.FlowTopologyDiscoveryService;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.Link;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRemoved;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorUpdated;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRemoved;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdated;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.GenericStatistics;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.node.connector.statistics.Bytes;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.node.connector.statistics.Packets;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatistics;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetAllNodeConnectorsStatisticsInputBuilder;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.GetNodeConnectorStatisticsInputBuilder;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.NodeConnectorStatisticsUpdate;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
84 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
85 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.PathArgument;
86 import org.opendaylight.yangtools.yang.common.RpcResult;
87 import org.slf4j.Logger;
88 import org.slf4j.LoggerFactory;
89
90 import java.util.ArrayList;
91 import java.util.Collections;
92 import java.util.HashSet;
93 import java.util.Iterator;
94 import java.util.List;
95 import java.util.Map;
96 import java.util.Set;
97 import java.util.concurrent.ConcurrentHashMap;
98 import java.util.concurrent.ConcurrentMap;
99 import java.util.concurrent.CopyOnWriteArrayList;
100 import java.util.concurrent.Future;
101 import java.util.concurrent.TimeUnit;
102
103 public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInInventoryService, OpendaylightFlowStatisticsListener, OpendaylightFlowTableStatisticsListener, OpendaylightPortStatisticsListener {
104     private static final Logger LOG = LoggerFactory.getLogger(InventoryAndReadAdapter.class);
105     private static final short OPENFLOWV10_TABLE_ID = 0;
106     private static final int SLEEP_FOR_NOTIFICATIONS_MILLIS = 500;
107
108     private final InventoryNotificationProvider inventoryNotificationProvider = new InventoryNotificationProvider();
109     private final Map<PathArgument,List<PathArgument>> nodeToNodeConnectorsMap = new ConcurrentHashMap<>();
110     private List<IPluginOutInventoryService> inventoryPublisher = new CopyOnWriteArrayList<>();
111     private List<IPluginOutReadService> statisticsPublisher = new CopyOnWriteArrayList<>();
112     private Cache<String, TransactionNotificationList<? extends TransactionAware>> txCache;
113
114     private OpendaylightFlowTableStatisticsService flowTableStatisticsService;
115     private OpendaylightPortStatisticsService nodeConnectorStatisticsService;
116     private OpendaylightFlowStatisticsService flowStatisticsService;
117     private FlowTopologyDiscoveryService topologyDiscovery;
118     private DataProviderService dataProviderService;
119     private DataBrokerService dataService;
120
121     public DataBrokerService getDataService() {
122         return dataService;
123     }
124
125     public void setDataService(final DataBrokerService dataService) {
126         this.dataService = dataService;
127     }
128
129     public DataProviderService getDataProviderService() {
130         return dataProviderService;
131     }
132
133     public void setDataProviderService(final DataProviderService dataProviderService) {
134         this.dataProviderService = dataProviderService;
135     }
136
137     public OpendaylightFlowStatisticsService getFlowStatisticsService() {
138         return flowStatisticsService;
139     }
140
141     public void setFlowStatisticsService(final OpendaylightFlowStatisticsService flowStatisticsService) {
142         this.flowStatisticsService = flowStatisticsService;
143     }
144
145     public OpendaylightPortStatisticsService getNodeConnectorStatisticsService() {
146         return nodeConnectorStatisticsService;
147     }
148
149     public void setNodeConnectorStatisticsService(final OpendaylightPortStatisticsService nodeConnectorStatisticsService) {
150         this.nodeConnectorStatisticsService = nodeConnectorStatisticsService;
151     }
152
153     public OpendaylightFlowTableStatisticsService getFlowTableStatisticsService() {
154         return flowTableStatisticsService;
155     }
156
157     public void setFlowTableStatisticsService(final OpendaylightFlowTableStatisticsService flowTableStatisticsService) {
158         this.flowTableStatisticsService = flowTableStatisticsService;
159     }
160
161     public FlowTopologyDiscoveryService getTopologyDiscovery() {
162         return topologyDiscovery;
163     }
164
165     public void setTopologyDiscovery(final FlowTopologyDiscoveryService topologyDiscovery) {
166         this.topologyDiscovery = topologyDiscovery;
167     }
168
169     public List<IPluginOutReadService> getStatisticsPublisher() {
170         return statisticsPublisher;
171     }
172
173     public void setStatisticsPublisher(final List<IPluginOutReadService> statisticsPublisher) {
174         this.statisticsPublisher = statisticsPublisher;
175     }
176
177     public List<IPluginOutInventoryService> getInventoryPublisher() {
178         return inventoryPublisher;
179     }
180
181     public void setInventoryPublisher(final List<IPluginOutInventoryService> inventoryPublisher) {
182         this.inventoryPublisher = inventoryPublisher;
183     }
184
185     public void startAdapter() {
186         inventoryNotificationProvider.setDataProviderService(getDataProviderService());
187         inventoryNotificationProvider.setInventoryPublisher(getInventoryPublisher());
188         txCache = CacheBuilder.newBuilder().expireAfterWrite(60L, TimeUnit.SECONDS).maximumSize(10000).build();
189         // inventoryNotificationProvider.start();
190     }
191
192     public boolean setInventoryPublisher(final IPluginOutInventoryService listener) {
193         return getInventoryPublisher().add(listener);
194     }
195
196     public boolean unsetInventoryPublisher(final IPluginOutInventoryService listener) {
197         return getInventoryPublisher().remove(listener);
198     }
199
200     public boolean setReadPublisher(final IPluginOutReadService listener) {
201         return getStatisticsPublisher().add(listener);
202     }
203
204     public Boolean unsetReadPublisher(final IPluginOutReadService listener) {
205         if (listener != null) {
206             return getStatisticsPublisher().remove(listener);
207         }
208         return false;
209     }
210
211     protected DataModificationTransaction startChange() {
212         return getDataProviderService().beginTransaction();
213     }
214
215     @Override
216     public long getTransmitRate(final NodeConnector connector) {
217         final FlowCapableNodeConnector nodeConnector = this.readOperFlowCapableNodeConnector(NodeMapping.toNodeConnectorRef(connector));
218         return nodeConnector.getCurrentSpeed().longValue();
219     }
220
221     private FlowCapableNode readOperFlowCapableNode(final NodeRef ref) {
222         final org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node node =
223                 (org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node)getDataService().readOperationalData(ref.getValue());
224         if (node == null) {
225             return null;
226         }
227
228         return node.getAugmentation(FlowCapableNode.class);
229     }
230
231     private org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node readConfigNode(final Node node) {
232         final InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node> nodeRef =
233                 InstanceIdentifier.builder(Nodes.class)
234                 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class, InventoryMapping.toNodeKey(node))
235                 .build();
236
237         return (org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node) startChange().readConfigurationData(nodeRef);
238     }
239
240     private org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector readConfigNodeConnector(final NodeConnector connector) {
241         final InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector> nodeConnectorRef =
242                 InstanceIdentifier.builder(Nodes.class)
243                 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class, InventoryMapping.toNodeKey(connector.getNode()))
244                 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector.class, InventoryMapping.toNodeConnectorKey(connector))
245                 .toInstance();
246
247         return((org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector) startChange().readConfigurationData(nodeConnectorRef));
248     }
249
250     /**
251      * Read a table of a node from configuration data store.
252      *
253      * @param node Node id
254      * @param id Table id
255      * @return Table contents, or null if not present
256      */
257     private Table readOperationalTable(final Node node, final short id) {
258         final InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class)
259                 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class, NodeMapping.toNodeKey(node))
260                 .augmentation(FlowCapableNode.class)
261                 .child(Table.class, new TableKey(id))
262                 .build();
263
264         return (Table) startChange().readOperationalData(tableRef);
265     }
266
267     @Override
268     public List<FlowOnNode> readAllFlow(final Node node, final boolean cached) {
269         final ArrayList<FlowOnNode> ret= new ArrayList<>();
270         if (cached) {
271             final Table table = readOperationalTable(node, OPENFLOWV10_TABLE_ID);
272             if (table != null) {
273                 final List<Flow> flows = table.getFlow();
274                 LOG.trace("Number of flows installed in table 0 of node {} : {}", node, flows.size());
275
276                 for (final Flow flow : flows) {
277                     final FlowStatisticsData statsFromDataStore = flow.getAugmentation(FlowStatisticsData.class);
278                     if (statsFromDataStore != null) {
279                         final FlowOnNode it = new FlowOnNode(ToSalConversionsUtils.toFlow(flow, node));
280                         ret.add(addFlowStats(it, statsFromDataStore.getFlowStatistics()));
281                     }
282                 }
283             }
284         } else {
285             LOG.debug("readAllFlow cached:{}", cached);
286             GetAllFlowStatisticsFromFlowTableInput input =
287                 new GetAllFlowStatisticsFromFlowTableInputBuilder()
288                     .setNode(NodeMapping.toNodeRef(node))
289                     .setTableId(new TableId(OPENFLOWV10_TABLE_ID))
290                     .build();
291
292             Future<RpcResult<GetAllFlowStatisticsFromFlowTableOutput>> future =
293                 getFlowStatisticsService().getAllFlowStatisticsFromFlowTable(input);
294
295             RpcResult<GetAllFlowStatisticsFromFlowTableOutput> result = null;
296             try {
297                 // having a blocking call is fine here, as we need to join
298                 // the notifications and return the result
299                 result = future.get();
300             } catch (Exception e) {
301                LOG.error("Exception in getAllFlowStatisticsFromFlowTable ", e);
302                return ret;
303             }
304
305             GetAllFlowStatisticsFromFlowTableOutput output = result.getResult();
306             if (output == null) {
307                 return ret;
308             }
309
310             TransactionId transactionId = output.getTransactionId();
311             String cacheKey = buildCacheKey(transactionId, NodeMapping.toNodeId(node));
312             LOG.info("readAllFlow transactionId:{} cacheKey:{}", transactionId, cacheKey);
313
314             // insert an entry in tempcache, will get updated when notification is received
315             txCache.put(cacheKey, new TransactionNotificationList<FlowsStatisticsUpdate>(
316                 transactionId, node.getNodeIDString()));
317
318             TransactionNotificationList<FlowsStatisticsUpdate> txnList =
319                 (TransactionNotificationList<FlowsStatisticsUpdate>) txCache.getIfPresent(cacheKey);
320
321             // this loop would not be infinite as the cache will remove an entry
322             // after defined time if not written to
323             while (txnList != null && !txnList.areAllNotificationsGathered()) {
324                 LOG.debug("readAllFlow waiting for notification...");
325                 waitForNotification();
326                 txnList = (TransactionNotificationList<FlowsStatisticsUpdate>) txCache.getIfPresent(cacheKey);
327             }
328
329             if (txnList == null) {
330                 return ret;
331             }
332
333             List<FlowsStatisticsUpdate> notifications = txnList.getNotifications();
334             for (FlowsStatisticsUpdate flowsStatisticsUpdate : notifications) {
335                 List<FlowAndStatisticsMapList> flowAndStatisticsMapList = flowsStatisticsUpdate.getFlowAndStatisticsMapList();
336                 if (flowAndStatisticsMapList != null) {
337                     for (FlowAndStatisticsMapList flowAndStatistics : flowAndStatisticsMapList) {
338                         final FlowOnNode it = new FlowOnNode(ToSalConversionsUtils.toFlow(flowAndStatistics, node));
339                         ret.add(addFlowStats(it, flowAndStatistics));
340                     }
341                 }
342             }
343         }
344         return ret;
345     }
346
347     private String buildCacheKey(final TransactionId id, final NodeId nodeId) {
348         return String.valueOf(id.getValue()) + "-" + nodeId.getValue();
349     }
350
351     private void waitForNotification() {
352         try {
353             // going for a simple sleep approach,as wait-notify on a monitor would require
354             // us to maintain monitors per txn-node combo
355             Thread.sleep(SLEEP_FOR_NOTIFICATIONS_MILLIS);
356             LOG.trace("statCollector is waking up from a wait stat Response sleep");
357         } catch (final InterruptedException e) {
358             LOG.warn("statCollector has been interrupted waiting stat Response sleep", e);
359         }
360     }
361
362     @Override
363     public List<NodeConnectorStatistics> readAllNodeConnector(final Node node, final boolean cached) {
364         final ArrayList<NodeConnectorStatistics> ret = new ArrayList<>();
365
366         final org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node dsNode = readConfigNode(node);
367         if (dsNode != null) {
368             for (final org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector dsNodeConnector : dsNode.getNodeConnector()) {
369                 final FlowCapableNodeConnectorStatistics stats = (dsNodeConnector.getAugmentation(FlowCapableNodeConnectorStatisticsData.class));
370                 if (stats != null) {
371                     try {
372                         ret.add(toNodeConnectorStatistics(stats.getFlowCapableNodeConnectorStatistics(), dsNode.getId(), dsNodeConnector.getId()));
373                     } catch (ConstructionException e) {
374                         LOG.warn("Failed to instantiate node connector statistics for node {} connector {}, ignoring it",
375                                 dsNode.getId(), dsNodeConnector.getId(), e);
376                     }
377                 }
378             }
379         }
380
381         //TODO: Refer TODO (main)
382         getNodeConnectorStatisticsService().getAllNodeConnectorsStatistics(
383                 new GetAllNodeConnectorsStatisticsInputBuilder().setNode(NodeMapping.toNodeRef(node)).build());
384         return ret;
385     }
386
387     @Override
388     public List<NodeTableStatistics> readAllNodeTable(final Node node, final boolean cached) {
389         final NodeRef nodeRef = NodeMapping.toNodeRef(node);
390
391         final ArrayList<NodeTableStatistics> ret = new ArrayList<>();
392         final FlowCapableNode dsFlowCapableNode = this.readOperFlowCapableNode(nodeRef);
393         if (dsFlowCapableNode != null) {
394             for (final Table table : dsFlowCapableNode.getTable()) {
395                 final FlowTableStatisticsData tableStats = table.getAugmentation(FlowTableStatisticsData.class);
396                 if (tableStats != null) {
397                     try {
398                         ret.add(toNodeTableStatistics(tableStats.getFlowTableStatistics(), table.getId(), node));
399                     } catch (ConstructionException e) {
400                         LOG.warn("Failed to instantiate table statistics for node {} table {}, ignoring it", node, table.getId(), e);
401                     }
402                 }
403             }
404         }
405
406         //TODO: Refer TODO (main)
407         getFlowTableStatisticsService().getFlowTablesStatistics(new GetFlowTablesStatisticsInputBuilder().setNode(nodeRef).build());
408         return ret;
409     }
410
411     @Override
412     public NodeDescription readDescription(final Node node, final boolean cached) {
413         return this.toNodeDescription(NodeMapping.toNodeRef(node));
414     }
415
416     @Override
417     public FlowOnNode readFlow(final Node node, final org.opendaylight.controller.sal.flowprogrammer.Flow targetFlow, final boolean cached) {
418         FlowOnNode ret = null;
419         final Table table = readOperationalTable(node, OPENFLOWV10_TABLE_ID);
420         if (table != null) {
421             final List<Flow> flows = table.getFlow();
422             InventoryAndReadAdapter.LOG.trace("Number of flows installed in table 0 of node {} : {}", node, flows.size());
423
424             for (final Flow mdsalFlow : flows) {
425                 if(FromSalConversionsUtils.flowEquals(mdsalFlow, MDFlowMapping.toMDSalflow(targetFlow))) {
426                     final FlowStatisticsData statsFromDataStore = mdsalFlow.getAugmentation(FlowStatisticsData.class);
427                     if (statsFromDataStore != null) {
428                         InventoryAndReadAdapter.LOG.debug("Found matching flow in the data store flow table ");
429                         ret = addFlowStats(new FlowOnNode(targetFlow), statsFromDataStore.getFlowStatistics());
430
431                         // FIXME: break; ?
432                     }
433                 }
434             }
435         }
436
437         //TODO: Refer TODO (main)
438         final GetFlowStatisticsFromFlowTableInputBuilder input = new GetFlowStatisticsFromFlowTableInputBuilder().setNode(NodeMapping.toNodeRef(node));
439         input.fieldsFrom(MDFlowMapping.toMDSalflow(targetFlow));
440         getFlowStatisticsService().getFlowStatisticsFromFlowTable(input.build());
441         return ret;
442     }
443
444     @Override
445     public NodeConnectorStatistics readNodeConnector(final NodeConnector connector, final boolean cached) {
446         final NodeConnectorId ncId = InventoryMapping.toNodeConnectorKey(connector).getId();
447
448         NodeConnectorStatistics ret = null;
449         final org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector nodeConnectorFromDS = readConfigNodeConnector(connector);
450         if (nodeConnectorFromDS != null) {
451             final FlowCapableNodeConnectorStatistics stats = nodeConnectorFromDS.getAugmentation(FlowCapableNodeConnectorStatisticsData.class);
452             if (stats != null) {
453                 try {
454                     ret = toNodeConnectorStatistics(stats.getFlowCapableNodeConnectorStatistics(),
455                             InventoryMapping.toNodeKey(connector.getNode()).getId(), ncId);
456                 } catch (ConstructionException e) {
457                     LOG.warn("Failed to instantiate node connector statistics for connector {}, ignoring it",
458                             connector, e);
459                 }
460             }
461         }
462
463         getNodeConnectorStatisticsService().getNodeConnectorStatistics(
464                 new GetNodeConnectorStatisticsInputBuilder().setNode(NodeMapping.toNodeRef(connector.getNode())).setNodeConnectorId(ncId).build());
465         return ret;
466     }
467
468     @Override
469     public NodeTableStatistics readNodeTable(final NodeTable nodeTable, final boolean cached) {
470         NodeTableStatistics nodeStats = null;
471         final Table table = readOperationalTable(nodeTable.getNode(), (short) nodeTable.getID());
472         if (table != null) {
473             final FlowTableStatisticsData tableStats = table.getAugmentation(FlowTableStatisticsData.class);
474             if (tableStats != null) {
475                 try {
476                     nodeStats = toNodeTableStatistics(tableStats.getFlowTableStatistics(), table.getId(), nodeTable.getNode());
477                 } catch (ConstructionException e) {
478                     LOG.warn("Failed to instantiate table statistics for node {} table {}, ignoring it",
479                             nodeTable.getNode(), table.getId(), e);
480                 }
481             }
482         }
483
484         //TODO: Refer TODO (main)
485         getFlowTableStatisticsService().getFlowTablesStatistics(
486                 new GetFlowTablesStatisticsInputBuilder().setNode(NodeMapping.toNodeRef(nodeTable.getNode())).build());
487         return nodeStats;
488     }
489
490     public void onNodeConnectorRemovedInternal(final NodeConnectorRemoved update) {
491         // Never received
492     }
493
494     public void onNodeRemovedInternal(final NodeRemoved notification) {
495         this.removeNodeConnectors(notification.getNodeRef().getValue());
496         try {
497             final Node aDNode = NodeMapping.toADNode(notification.getNodeRef());
498             this.publishNodeUpdate(aDNode, UpdateType.REMOVED, Collections.<Property>emptySet());
499         } catch (ConstructionException e) {
500             LOG.warn("Failed to construct node for {}, not propagating update", notification.getNodeRef(), e);
501         }
502     }
503
504     public void onNodeConnectorUpdatedInternal(final NodeConnectorUpdated update) {
505         final NodeConnectorRef ref = update.getNodeConnectorRef();
506         final UpdateType updateType;
507         if (!this.isKnownNodeConnector(ref.getValue())) {
508             this.recordNodeConnector(ref.getValue());
509             updateType = UpdateType.ADDED;
510         } else {
511             updateType = UpdateType.CHANGED;
512         }
513
514         try {
515             final NodeConnector nodeConnector;
516             nodeConnector = NodeMapping.toADNodeConnector(ref);
517             this.publishNodeConnectorUpdate(nodeConnector, updateType, NodeMapping.toADNodeConnectorProperties(update));
518         } catch (ConstructionException e) {
519             LOG.warn("Failed to construct node connector for {}, not reporting the update", ref, e);
520         }
521     }
522
523     public void onNodeUpdatedInternal(final NodeUpdated notification) {
524         final NodeRef ref = notification.getNodeRef();
525
526         final UpdateType updateType;
527         if (dataService.readOperationalData(ref.getValue()) == null) {
528             updateType = UpdateType.ADDED;
529         } else {
530             updateType = UpdateType.CHANGED;
531         }
532
533         final Node aDNode;
534         try {
535             aDNode = NodeMapping.toADNode(ref);
536         } catch (ConstructionException e) {
537             LOG.warn("Failed to construct node for {}, not reporting the update", ref, e);
538             return;
539         }
540
541         this.publishNodeUpdate(aDNode, updateType, NodeMapping.toADNodeProperties(notification));
542         for (final IPluginOutReadService statsPublisher : getStatisticsPublisher()) {
543             final NodeDescription description = this.toNodeDescription(ref);
544             if (description != null) {
545                 final InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node> nodeRef =
546                         InstanceIdentifier.builder(Nodes.class)
547                         .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class, new NodeKey(notification.getId()))
548                         .toInstance();
549                 try {
550                     statsPublisher.descriptionStatisticsUpdated(NodeMapping.toADNode(nodeRef), description);
551                 } catch (ConstructionException e) {
552                     LOG.warn("Failed to construct node for {}, not reporting the update to publisher {}", nodeRef, statsPublisher, e);
553                 }
554             }
555         }
556     }
557
558     @Override
559     public ConcurrentMap<Node,Map<String,Property>> getNodeProps() {
560         final ConcurrentHashMap<Node,Map<String,Property>> props = new ConcurrentHashMap<>();
561         final Nodes nodes = this.readOperAllMDNodes();
562         for (final org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node node : nodes.getNode()) {
563             final FlowCapableNode fcn = node.getAugmentation(FlowCapableNode.class);
564             if (fcn != null) {
565                 final ConcurrentHashMap<String,Property> perNodePropMap = new ConcurrentHashMap<String, Property>();
566                 final HashSet<Property> perNodeProps = NodeMapping.toADNodeProperties(fcn, node.getId());
567                 if (perNodeProps != null) {
568                     for (final Property perNodeProp : perNodeProps) {
569                         perNodePropMap.put(perNodeProp.getName(), perNodeProp);
570                     }
571                 }
572
573                 try {
574                     final Node adNode = NodeMapping.toADNode(node.getId());
575                     props.put(adNode, perNodePropMap);
576                 } catch (ConstructionException e) {
577                     LOG.warn("Failed to construct node for {}, skipping it", node, e);
578                 }
579             }
580         }
581         return props;
582     }
583
584     private Nodes readOperAllMDNodes() {
585         final TypeSafeDataReader reader = TypeSafeDataReader.forReader(getDataService());
586         return reader.readOperationalData(InstanceIdentifier.builder(Nodes.class).build());
587     }
588
589     @Override
590     public ConcurrentMap<NodeConnector,Map<String,Property>> getNodeConnectorProps(final Boolean refresh) {
591         final ConcurrentHashMap<NodeConnector,Map<String,Property>> props = new ConcurrentHashMap<>();
592         for (final org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node node : this.readOperAllMDNodes().getNode()) {
593             for (final org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector nc : node.getNodeConnector()) {
594                 final FlowCapableNodeConnector fcnc = nc.getAugmentation(FlowCapableNodeConnector.class);
595                 if (fcnc != null) {
596                     final ConcurrentHashMap<String,Property> ncpsm = new ConcurrentHashMap<>();
597                     final HashSet<Property> ncps = NodeMapping.toADNodeConnectorProperties(fcnc);
598                     if (ncps != null) {
599                         for (final Property p : ncps) {
600                             ncpsm.put(p.getName(), p);
601                         }
602                     }
603
604                     try {
605                         props.put(NodeMapping.toADNodeConnector(nc.getId(), node.getId()), ncpsm);
606                     } catch (ConstructionException e) {
607                         LOG.warn("Failed to instantiate node {} connector {}, not reporting it", node.getId(), nc.getId(), e);
608                     }
609                 }
610             }
611         }
612         return props;
613     }
614
615     private FlowCapableNodeConnector readOperFlowCapableNodeConnector(final NodeConnectorRef ref) {
616         final org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector nc =
617                 (org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector)
618                 getDataService().readOperationalData(ref.getValue());
619         return nc.getAugmentation(FlowCapableNodeConnector.class);
620     }
621
622     private static NodeConnectorStatistics toNodeConnectorStatistics(final org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.NodeConnectorStatistics nodeConnectorStatistics, final NodeId nodeId, final NodeConnectorId nodeConnectorId) throws ConstructionException {
623         final NodeConnectorStatistics it = new NodeConnectorStatistics();
624
625         final Packets packets = nodeConnectorStatistics.getPackets();
626         it.setReceivePacketCount(packets.getReceived().longValue());
627         it.setTransmitPacketCount(packets.getTransmitted().longValue());
628
629         final Bytes bytes = nodeConnectorStatistics.getBytes();
630         it.setReceiveByteCount(bytes.getReceived().longValue());
631         it.setTransmitByteCount(bytes.getTransmitted().longValue());
632
633         it.setReceiveDropCount(nodeConnectorStatistics.getReceiveDrops().longValue());
634         it.setTransmitDropCount(nodeConnectorStatistics.getTransmitDrops().longValue());
635         it.setReceiveErrorCount(nodeConnectorStatistics.getReceiveErrors().longValue());
636         it.setTransmitErrorCount(nodeConnectorStatistics.getTransmitErrors().longValue());
637         it.setReceiveFrameErrorCount(nodeConnectorStatistics.getReceiveFrameError().longValue());
638         it.setReceiveOverRunErrorCount(nodeConnectorStatistics.getReceiveOverRunError().longValue());
639         it.setReceiveCRCErrorCount(nodeConnectorStatistics.getReceiveCrcError().longValue());
640         it.setCollisionCount(nodeConnectorStatistics.getCollisionCount().longValue());
641
642         final InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector> nodeConnectorRef =
643                 InstanceIdentifier.builder(Nodes.class)
644                 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class, new NodeKey(nodeId))
645                 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector.class, new NodeConnectorKey(nodeConnectorId))
646                 .build();
647         it.setNodeConnector(NodeMapping.toADNodeConnector(new NodeConnectorRef(nodeConnectorRef)));
648         return it;
649     }
650
651     private static NodeTableStatistics toNodeTableStatistics(final FlowTableStatistics tableStats, final Short tableId, final Node node) throws ConstructionException {
652         final NodeTableStatistics it = new NodeTableStatistics();
653         it.setActiveCount(tableStats.getActiveFlows().getValue().intValue());
654         it.setLookupCount(tableStats.getPacketsLookedUp().getValue().longValue());
655         it.setMatchedCount(tableStats.getPacketsMatched().getValue().longValue());
656         it.setName(tableId.toString());
657         it.setNodeTable(new NodeTable(NodeTableIDType.OPENFLOW, tableId.byteValue(), node));
658         return it;
659     }
660
661     private NodeDescription toNodeDescription(final NodeRef nodeRef) {
662         final FlowCapableNode capableNode = this.readOperFlowCapableNode(nodeRef);
663         if (capableNode == null) {
664             return null;
665         }
666
667         final NodeDescription it = new NodeDescription();
668         it.setManufacturer(capableNode.getManufacturer());
669         it.setSerialNumber(capableNode.getSerialNumber());
670         it.setSoftware(capableNode.getSoftware());
671         it.setDescription(capableNode.getDescription());
672         return it;
673     }
674
675     public Edge toADEdge(final Link link) throws ConstructionException {
676         NodeConnectorRef _source = link.getSource();
677         NodeConnector _aDNodeConnector = NodeMapping.toADNodeConnector(_source);
678         NodeConnectorRef _destination = link.getDestination();
679         NodeConnector _aDNodeConnector_1 = NodeMapping.toADNodeConnector(_destination);
680         Edge _edge = new Edge(_aDNodeConnector, _aDNodeConnector_1);
681         return _edge;
682     }
683
684     /**
685      * OpendaylightFlowStatisticsListener interface implementation
686      */
687     @Override
688     public void onAggregateFlowStatisticsUpdate(final AggregateFlowStatisticsUpdate notification) {
689         // Ignoring this notification as there does not seem to be a way to bubble this up to AD-SAL
690     }
691
692     @Override
693     public void onFlowsStatisticsUpdate(final FlowsStatisticsUpdate notification) {
694         final ArrayList<FlowOnNode> adsalFlowsStatistics = new ArrayList<>();
695         final InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node> nodeRef =
696                 InstanceIdentifier.builder(Nodes.class)
697                 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class, new NodeKey(notification.getId()))
698                 .build();
699
700         final Node aDNode;
701         try {
702             aDNode = NodeMapping.toADNode(nodeRef);
703         } catch (ConstructionException e) {
704             LOG.warn("Failed to construct node for {}, ignoring it", notification.getId(), e);
705             return;
706         }
707
708         for (final FlowAndStatisticsMapList flowStats : notification.getFlowAndStatisticsMapList()) {
709             if (flowStats.getTableId() == 0) {
710                 adsalFlowsStatistics.add(InventoryAndReadAdapter.toFlowOnNode(flowStats, aDNode));
711             }
712         }
713         for (final IPluginOutReadService statsPublisher : getStatisticsPublisher()) {
714             statsPublisher.nodeFlowStatisticsUpdated(aDNode, adsalFlowsStatistics);
715         }
716
717         updateTransactionCache(notification, notification.getId(), !notification.isMoreReplies());
718     }
719
720     /**
721      * OpendaylightFlowTableStatisticsListener interface implementation
722      */
723     @Override
724     public void onFlowTableStatisticsUpdate(final FlowTableStatisticsUpdate notification) {
725         ArrayList<NodeTableStatistics> adsalFlowTableStatistics = new ArrayList<>();
726         for (final FlowTableAndStatisticsMap stats : notification.getFlowTableAndStatisticsMap()) {
727             if (stats.getTableId().getValue() == 0) {
728                 final NodeTableStatistics it = new NodeTableStatistics();
729                 it.setActiveCount(stats.getActiveFlows().getValue().intValue());
730                 it.setLookupCount(stats.getPacketsLookedUp().getValue().longValue());
731                 it.setMatchedCount(stats.getPacketsMatched().getValue().longValue());
732                 adsalFlowTableStatistics.add(it);
733             }
734         }
735
736         final InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node> nodeRef =
737                 InstanceIdentifier.builder(Nodes.class)
738                 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class, new NodeKey(notification.getId()))
739                 .build();
740
741         final Node aDNode;
742         try {
743             aDNode = NodeMapping.toADNode(nodeRef);
744         } catch (ConstructionException e) {
745             LOG.warn("Failed to construct node for {}, ignoring it", notification.getId(), e);
746             return;
747         }
748
749         for (final IPluginOutReadService statsPublisher : getStatisticsPublisher()) {
750             statsPublisher.nodeTableStatisticsUpdated(aDNode, adsalFlowTableStatistics);
751         }
752     }
753
754     /**
755      * OpendaylightPortStatisticsUpdate interface implementation
756      */
757     @Override
758     public void onNodeConnectorStatisticsUpdate(final NodeConnectorStatisticsUpdate notification) {
759         final ArrayList<NodeConnectorStatistics> adsalPortStatistics = new ArrayList<NodeConnectorStatistics>();
760         for (final NodeConnectorStatisticsAndPortNumberMap nodeConnectorStatistics : notification.getNodeConnectorStatisticsAndPortNumberMap()) {
761             try {
762                 adsalPortStatistics.add(toNodeConnectorStatistics(
763                         nodeConnectorStatistics, notification.getId(), nodeConnectorStatistics.getNodeConnectorId()));
764             } catch (ConstructionException e) {
765                 LOG.warn("Failed to create statistics for node {} connector {}, not updating them",
766                         notification.getId(), nodeConnectorStatistics.getNodeConnectorId(), e);
767             }
768         }
769
770         final InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node> nodeRef =
771                 InstanceIdentifier.builder(Nodes.class)
772                 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class, new NodeKey(notification.getId()))
773                 .build();
774
775         final Node aDNode;
776         try {
777             aDNode = NodeMapping.toADNode(nodeRef);
778         } catch (ConstructionException e) {
779             LOG.warn("Failed to construct node for {}, ignoring it", notification.getId(), e);
780             return;
781         }
782
783         for (final IPluginOutReadService statsPublisher : getStatisticsPublisher()) {
784             statsPublisher.nodeConnectorStatisticsUpdated(aDNode, adsalPortStatistics);
785         }
786     }
787
788     private static FlowOnNode toFlowOnNode(final FlowAndStatisticsMapList flowAndStatsMap, final Node node) {
789         final FlowOnNode it = new FlowOnNode(ToSalConversionsUtils.toFlow(flowAndStatsMap, node));
790         return addFlowStats(it, flowAndStatsMap);
791     }
792
793     private static FlowOnNode addFlowStats(final FlowOnNode node, final GenericStatistics stats) {
794         node.setByteCount(stats.getByteCount().getValue().longValue());
795         node.setPacketCount(stats.getPacketCount().getValue().longValue());
796         node.setDurationSeconds(stats.getDuration().getSecond().getValue().intValue());
797         node.setDurationNanoseconds(stats.getDuration().getNanosecond().getValue().intValue());
798         return node;
799     }
800
801     @Override
802     public Set<Node> getConfiguredNotConnectedNodes() {
803         return Collections.emptySet();
804     }
805
806     private void publishNodeUpdate(final Node node, final UpdateType updateType, final Set<Property> properties) {
807         for (final IPluginOutInventoryService publisher : getInventoryPublisher()) {
808             publisher.updateNode(node, updateType, properties);
809         }
810     }
811
812     private void publishNodeConnectorUpdate(final NodeConnector nodeConnector, final UpdateType updateType, final Set<Property> properties) {
813         for (final IPluginOutInventoryService publisher : getInventoryPublisher()) {
814             publisher.updateNodeConnector(nodeConnector, updateType, properties);
815         }
816     }
817
818     private boolean isKnownNodeConnector(final InstanceIdentifier<? extends Object> nodeConnectorIdentifier) {
819         final Iterator<PathArgument> it = nodeConnectorIdentifier.getPathArguments().iterator();
820
821         if (!it.hasNext()) {
822             return false;
823         }
824         it.next();
825
826         if (!it.hasNext()) {
827             return false;
828         }
829         final PathArgument nodePath = it.next();
830
831         if (!it.hasNext()) {
832             return false;
833         }
834         final PathArgument nodeConnectorPath = it.next();
835
836         final List<PathArgument> nodeConnectors = nodeToNodeConnectorsMap.get(nodePath);
837         return nodeConnectors == null ? false :
838             nodeConnectors.contains(nodeConnectorPath);
839     }
840
841     private boolean recordNodeConnector(final InstanceIdentifier<? extends Object> nodeConnectorIdentifier) {
842         final Iterator<PathArgument> it = nodeConnectorIdentifier.getPathArguments().iterator();
843
844         if (!it.hasNext()) {
845             return false;
846         }
847         it.next();
848
849         if (!it.hasNext()) {
850             return false;
851         }
852         final PathArgument nodePath = it.next();
853
854         if (!it.hasNext()) {
855             return false;
856         }
857         final PathArgument nodeConnectorPath = it.next();
858
859         synchronized (this) {
860             List<PathArgument> nodeConnectors = this.nodeToNodeConnectorsMap.get(nodePath);
861             if (nodeConnectors == null) {
862                 nodeConnectors = new ArrayList<>();
863                 this.nodeToNodeConnectorsMap.put(nodePath, nodeConnectors);
864             }
865
866             return nodeConnectors.add(nodeConnectorPath);
867         }
868     }
869
870     private List<PathArgument> removeNodeConnectors(final InstanceIdentifier<? extends Object> nodeIdentifier) {
871         return this.nodeToNodeConnectorsMap.remove(Iterables.get(nodeIdentifier.getPathArguments(), 1));
872     }
873
874     private <T extends TransactionAware> void updateTransactionCache(T notification, NodeId nodeId, boolean lastNotification) {
875
876         String cacheKey = buildCacheKey(notification.getTransactionId(), nodeId);
877         TransactionNotificationList<T> txnList = (TransactionNotificationList<T>) txCache.getIfPresent(cacheKey);
878         final Optional<TransactionNotificationList<T>> optional = Optional.<TransactionNotificationList<T>>fromNullable(txnList);
879         if (optional.isPresent()) {
880             LOG.info("updateTransactionCache cacheKey:{}, lastNotification:{}, txnList-present:{}", cacheKey, lastNotification, optional.isPresent());
881             TransactionNotificationList<T> txn = optional.get();
882             txn.addNotification(notification);
883             txn.setAllNotificationsGathered(lastNotification);
884         }
885     }
886
887     private class TransactionNotificationList<T extends TransactionAware> {
888         private TransactionId id;
889         private String nId;
890         private List<T> notifications;
891         private boolean allNotificationsGathered;
892
893         public TransactionNotificationList(TransactionId id, String nId) {
894             this.nId = nId;
895             this.id = id;
896             notifications = new ArrayList<T>();
897         }
898
899         public void addNotification(T notification) {
900             notifications.add(notification);
901         }
902
903         public void setAllNotificationsGathered(boolean allNotificationsGathered) {
904             this.allNotificationsGathered = allNotificationsGathered;
905         }
906
907         public boolean areAllNotificationsGathered() {
908             return allNotificationsGathered;
909         }
910
911         public List<T> getNotifications() {
912             return notifications;
913         }
914
915     }
916
917 }