2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.applications.statistics.manager.impl;
11 import java.util.ArrayList;
12 import java.util.Collections;
13 import java.util.HashMap;
14 import java.util.Iterator;
15 import java.util.LinkedList;
16 import java.util.List;
18 import java.util.Map.Entry;
19 import java.util.concurrent.atomic.AtomicInteger;
21 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
22 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
23 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
24 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
25 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
26 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager;
27 import org.opendaylight.openflowplugin.applications.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
28 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager.StatDataStoreOperation;
29 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
30 import org.opendaylight.openflowplugin.applications.statistics.manager.impl.helper.FlowComparator;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowHashIdMapping;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowHashIdMappingBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.nodes.node.table.FlowHashIdMap;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.nodes.node.table.FlowHashIdMapBuilder;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.nodes.node.table.FlowHashIdMapKey;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsData;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsDataBuilder;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsUpdate;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsDataBuilder;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowsStatisticsUpdate;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.aggregate.flow.statistics.AggregateFlowStatisticsBuilder;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapListBuilder;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.statistics.FlowStatisticsBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
61 import org.opendaylight.yangtools.yang.binding.DataObject;
62 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
63 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
67 import com.google.common.base.Optional;
68 import com.google.common.collect.BiMap;
69 import com.google.common.collect.HashBiMap;
73 * org.opendaylight.openflowplugin.applications.statistics.manager.impl
75 * StatListenCommitFlow
76 * Class is a NotifyListener for FlowStatistics and DataChangeListener for Config/DataStore for Flow node.
77 * All expected (registered) FlowStatistics will be builded and commit to Operational/DataStore.
78 * DataChangeEven should call create/delete Flow in Operational/DS create process needs to pair
79 * Device Flow HashCode and FlowId from Config/DS
81 * @author <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
84 public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, OpendaylightFlowStatisticsListener>
85 implements OpendaylightFlowStatisticsListener {
87 protected static final Logger LOG = LoggerFactory.getLogger(StatListenCommitFlow.class);
89 private static final String ALIEN_SYSTEM_FLOW_ID = "#UF$TABLE*";
91 private static final Integer REMOVE_AFTER_MISSING_COLLECTION = 1;
93 private final AtomicInteger unaccountedFlowsCounter = new AtomicInteger(0);
95 public StatListenCommitFlow (final StatisticsManager manager, final DataBroker db,
96 final NotificationProviderService nps){
97 super(manager, db, nps, Flow.class);
101 protected OpendaylightFlowStatisticsListener getStatNotificationListener() {
106 protected InstanceIdentifier<Flow> getWildCardedRegistrationPath() {
107 return InstanceIdentifier.create(Nodes.class).child(Node.class)
108 .augmentation(FlowCapableNode.class).child(Table.class).child(Flow.class);
112 public void onAggregateFlowStatisticsUpdate(final AggregateFlowStatisticsUpdate notification) {
113 final TransactionId transId = notification.getTransactionId();
114 final NodeId nodeId = notification.getId();
115 if ( ! isExpectedStatistics(transId, nodeId)) {
116 LOG.debug("STAT-MANAGER - AggregateFlowStatisticsUpdate: unregistred notification detect TransactionId {}", transId);
119 manager.getRpcMsgManager().addNotification(notification, nodeId);
120 if (notification.isMoreReplies()) {
123 /* check flow Capable Node and write statistics */
124 manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
126 public void applyOperation(final ReadWriteTransaction tx) {
128 final Optional<TransactionCacheContainer<?>> txContainer = getTransactionCacheContainer(transId, nodeId);
129 if (( ! txContainer.isPresent()) || txContainer.get().getNotifications() == null) {
132 final Optional<? extends DataObject> inputObj = txContainer.get().getConfInput();
133 if (( ! inputObj.isPresent()) || ( ! (inputObj.get() instanceof Table))) {
136 final Table table = (Table) inputObj.get();
137 final List<? extends TransactionAware> cacheNotifs = txContainer.get().getNotifications();
138 for (final TransactionAware notif : cacheNotifs) {
139 if (notif instanceof AggregateFlowStatisticsUpdate) {
140 final AggregateFlowStatisticsData stats = new AggregateFlowStatisticsDataBuilder()
141 .setAggregateFlowStatistics(new AggregateFlowStatisticsBuilder(notification).build()).build();
142 final InstanceIdentifier<FlowCapableNode> fNodeIdent = InstanceIdentifier.create(Nodes.class)
143 .child(Node.class, new NodeKey(nodeId)).augmentation(FlowCapableNode.class);
144 final InstanceIdentifier<Table> tableRef = fNodeIdent.child(Table.class, table.getKey());
145 final InstanceIdentifier<AggregateFlowStatisticsData> tableStatRef = tableRef
146 .augmentation(AggregateFlowStatisticsData.class);
147 Optional<FlowCapableNode> fNode = Optional.absent();
149 fNode = tx.read(LogicalDatastoreType.OPERATIONAL, fNodeIdent).checkedGet();
150 } catch (final ReadFailedException e) {
151 LOG.debug("Read Operational/DS for FlowCapableNode fail! {}", fNodeIdent, e);
154 if (fNode.isPresent()) {
155 ensureTable(tx, table.getId(), tableRef);
156 tx.put(LogicalDatastoreType.OPERATIONAL, tableStatRef, stats);
164 public void ensureTable(final ReadWriteTransaction tx, final Short tableId, final InstanceIdentifier<Table> tableRef) {
165 final Table tableNew = new TableBuilder().setId(tableId).build();
166 tx.merge(LogicalDatastoreType.OPERATIONAL, tableRef, tableNew);
170 public void onFlowsStatisticsUpdate(final FlowsStatisticsUpdate notification) {
171 final TransactionId transId = notification.getTransactionId();
172 final NodeId nodeId = notification.getId();
173 if ( ! isExpectedStatistics(transId, nodeId)) {
174 LOG.debug("STAT-MANAGER - FlowsStatisticsUpdate: unregistred notification detect TransactionId {}", transId);
177 manager.getRpcMsgManager().addNotification(notification, nodeId);
178 if (notification.isMoreReplies()) {
179 LOG.trace("Next notification for join txId {}", transId);
182 /* add flow's statistics */
183 manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
185 public void applyOperation(final ReadWriteTransaction tx) {
186 final Optional<TransactionCacheContainer<?>> txContainer = getTransactionCacheContainer(transId, nodeId);
187 if (( ! txContainer.isPresent()) || txContainer.get().getNotifications() == null) {
190 final List<FlowAndStatisticsMapList> flowStats = new ArrayList<FlowAndStatisticsMapList>(10);
191 final InstanceIdentifier<Node> nodeIdent = InstanceIdentifier.create(Nodes.class)
192 .child(Node.class, new NodeKey(nodeId));
193 final List<? extends TransactionAware> cacheNotifs = txContainer.get().getNotifications();
194 for (final TransactionAware notif : cacheNotifs) {
195 if (notif instanceof FlowsStatisticsUpdate) {
196 final List<FlowAndStatisticsMapList> notifList =
197 ((FlowsStatisticsUpdate) notif).getFlowAndStatisticsMapList();
198 if (notifList != null) {
199 flowStats.addAll(notifList);
204 statsFlowCommitAll(flowStats, nodeIdent, tx);
205 /* cleaning all not cached hash collisions */
206 final Map<InstanceIdentifier<Flow>, Integer> listAliens = mapNodesForDelete.get(nodeIdent);
207 if (listAliens != null) {
208 for (final Entry<InstanceIdentifier<Flow>, Integer> nodeForDelete : listAliens.entrySet()) {
209 final Integer lifeIndex = nodeForDelete.getValue();
210 if (nodeForDelete.getValue() > 0) {
211 nodeForDelete.setValue(Integer.valueOf(lifeIndex.intValue() - 1));
213 final InstanceIdentifier<Flow> flowNodeIdent = nodeForDelete.getKey();
214 mapNodesForDelete.get(nodeIdent).remove(flowNodeIdent);
215 tx.delete(LogicalDatastoreType.OPERATIONAL, flowNodeIdent);
219 /* Notification for continue collecting statistics */
220 notifyToCollectNextStatistics(nodeIdent, transId);
226 private void statsFlowCommitAll(final List<FlowAndStatisticsMapList> list,
227 final InstanceIdentifier<Node> nodeIdent, final ReadWriteTransaction tx) {
229 final InstanceIdentifier<FlowCapableNode> fNodeIdent = nodeIdent.augmentation(FlowCapableNode.class);
231 final Optional<FlowCapableNode> fNode;
233 fNode = tx.read(LogicalDatastoreType.OPERATIONAL, fNodeIdent).checkedGet();
235 catch (final ReadFailedException e) {
236 LOG.debug("Read FlowCapableNode {} in Operational/DS fail! Statistic scan not be updated.", nodeIdent, e);
239 if ( ! fNode.isPresent()) {
240 LOG.trace("FlowCapableNode {} is not presented in Operational/DS. Statisticscan not be updated.", nodeIdent);
244 final NodeUpdateState nodeState = new NodeUpdateState(fNodeIdent,fNode.get());
246 for (final FlowAndStatisticsMapList flowStat : list) {
247 final TableKey tableKey = new TableKey(flowStat.getTableId());
248 final TableFlowUpdateState tableState = nodeState.getTable(tableKey, tx);
249 tableState.reportFlow(flowStat,tx);
252 for (final TableFlowUpdateState table : nodeState.getTables()) {
253 table.removeUnreportedFlows(tx);
258 * Method adds statistics to Flow
263 private void addStatistics(final FlowBuilder flowBuilder, final FlowAndStatisticsMapList deviceFlow) {
264 final FlowAndStatisticsMapListBuilder stats = new FlowAndStatisticsMapListBuilder(deviceFlow);
265 final FlowStatisticsBuilder flowStatisticsBuilder = new FlowStatisticsBuilder(stats.build());
266 final FlowStatisticsDataBuilder flowStatisticsData =new FlowStatisticsDataBuilder();
267 flowStatisticsData.setFlowStatistics(flowStatisticsBuilder.build());
268 flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
272 * build pseudoUnique hashCode for flow in table
273 * for future easy identification
275 * FIXME: we expect same version for YANG models for all clusters and that has to be fix
276 * FIXME: CREATE BETTER KEY - for flow (MATCH is the problem)
278 static String buildFlowIdOperKey(final FlowAndStatisticsMapList deviceFlow) {
279 return new StringBuffer().append(deviceFlow.getMatch())
280 .append(deviceFlow.getPriority()).append(deviceFlow.getCookie().getValue()).toString();
283 private class NodeUpdateState {
284 private final InstanceIdentifier<FlowCapableNode> nodeIdentifier;
285 private final Map<TableKey,TableFlowUpdateState> tables = new HashMap<>();
287 public NodeUpdateState(final InstanceIdentifier<FlowCapableNode> fNodeIdent, final FlowCapableNode flowCapableNode) {
288 nodeIdentifier = fNodeIdent;
289 final List<Table> tableList = flowCapableNode.getTable();
290 if(tableList != null) {
291 for (final Table table : tableList) {
292 final TableKey tableKey = table.getKey();
293 tables.put(tableKey, new TableFlowUpdateState(nodeIdentifier.child(Table.class,tableKey),table));
298 public Iterable<TableFlowUpdateState> getTables() {
299 return tables.values();
302 TableFlowUpdateState getTable(final TableKey key,final ReadWriteTransaction tx) {
303 TableFlowUpdateState table = tables.get(key);
305 table = new TableFlowUpdateState(nodeIdentifier.child(Table.class, key), null);
306 tables.put(key, table);
312 private class TableFlowUpdateState {
314 private boolean tableEnsured = false;
315 final KeyedInstanceIdentifier<Table, TableKey> tableRef;
316 final TableKey tableKey;
317 final BiMap<FlowHashIdMapKey, FlowId> flowIdByHash;
318 List<Flow> configFlows;
320 public TableFlowUpdateState(final KeyedInstanceIdentifier<Table, TableKey> tablePath, final Table table) {
321 tableRef = tablePath;
322 tableKey = tablePath.getKey();
323 flowIdByHash = HashBiMap.create();
325 final FlowHashIdMapping flowHashMapping = table.getAugmentation(FlowHashIdMapping.class);
326 if (flowHashMapping != null) {
327 final List<FlowHashIdMap> flowHashMap = flowHashMapping.getFlowHashIdMap() != null
328 ? flowHashMapping.getFlowHashIdMap() : Collections.<FlowHashIdMap> emptyList();
329 for (final FlowHashIdMap flowHashId : flowHashMap) {
331 flowIdByHash.put(flowHashId.getKey(), flowHashId.getFlowId());
332 } catch (final Exception e) {
333 LOG.warn("flow hashing hit a duplicate for {} -> {}. Exception was raised: {}",
334 flowHashId.getKey(), flowHashId.getFlowId(), e.getMessage());
341 private void ensureTableFowHashIdMapping(final ReadWriteTransaction tx) {
342 if( ! tableEnsured) {
343 ensureTable(tx, tableKey.getId(), tableRef);
344 final FlowHashIdMapping emptyMapping = new FlowHashIdMappingBuilder()
345 .setFlowHashIdMap(Collections.<FlowHashIdMap> emptyList()).build();
346 tx.merge(LogicalDatastoreType.OPERATIONAL, tableRef.augmentation(FlowHashIdMapping.class), emptyMapping);
351 private FlowKey searchInConfiguration(final FlowAndStatisticsMapList flowStat, final ReadWriteTransaction trans) {
353 final Iterator<Flow> it = configFlows.iterator();
354 while(it.hasNext()) {
355 final Flow cfgFlow = it.next();
356 final FlowKey cfgKey = cfgFlow.getKey();
357 final FlowId cfgFlowId = cfgKey.getId();
359 if(! flowIdByHash.inverse().containsKey(cfgFlowId)) {
360 if(FlowComparator.flowEquals(flowStat, cfgFlow)) {
368 private void initConfigFlows() {
369 final Optional<Table> table = readLatestConfiguration(tableRef);
370 List<Flow> localList = null;
371 if(table.isPresent()) {
372 localList = table.get().getFlow();
374 if(localList == null) {
375 configFlows = Collections.emptyList();
377 configFlows = new LinkedList<>(localList);
381 private FlowKey getFlowKeyAndRemoveHash(final FlowHashIdMapKey key) {
382 final FlowId ret = flowIdByHash.get(key);
384 flowIdByHash.remove(key);
385 return new FlowKey(ret);
390 /* Returns FlowKey which doesn't exist in any DataStore for now */
391 private FlowKey makeAlienFlowKey() {
392 final StringBuilder sBuilder = new StringBuilder(ALIEN_SYSTEM_FLOW_ID)
393 .append(tableKey.getId()).append("-").append(unaccountedFlowsCounter.incrementAndGet());
394 final FlowId flowId = new FlowId(sBuilder.toString());
395 return new FlowKey(flowId);
398 private Map<FlowHashIdMapKey, FlowId> getRemovalList() {
402 void reportFlow(final FlowAndStatisticsMapList flowStat, final ReadWriteTransaction trans) {
403 ensureTableFowHashIdMapping(trans);
404 final FlowHashIdMapKey hashingKey = new FlowHashIdMapKey(buildFlowIdOperKey(flowStat));
405 FlowKey flowKey = getFlowKeyAndRemoveHash(hashingKey);
406 if (flowKey == null) {
407 flowKey = searchInConfiguration(flowStat, trans);
408 if ( flowKey == null) {
409 flowKey = makeAlienFlowKey();
411 updateHashCache(trans,flowKey,hashingKey);
413 final FlowBuilder flowBuilder = new FlowBuilder(flowStat);
414 flowBuilder.setKey(flowKey);
415 addStatistics(flowBuilder, flowStat);
416 final InstanceIdentifier<Flow> flowIdent = tableRef.child(Flow.class, flowKey);
417 trans.put(LogicalDatastoreType.OPERATIONAL, flowIdent, flowBuilder.build());
418 /* check life for Alien flows */
419 if (flowKey.getId().getValue().startsWith(ALIEN_SYSTEM_FLOW_ID)) {
420 removeData(flowIdent, REMOVE_AFTER_MISSING_COLLECTION);
424 /* Build and deploy new FlowHashId map */
425 private void updateHashCache(final ReadWriteTransaction trans, final FlowKey flowKey, final FlowHashIdMapKey hashingKey) {
426 final FlowHashIdMapBuilder flHashIdMap = new FlowHashIdMapBuilder();
427 flHashIdMap.setFlowId(flowKey.getId());
428 flHashIdMap.setKey(hashingKey);
429 final KeyedInstanceIdentifier<FlowHashIdMap, FlowHashIdMapKey> flHashIdent = tableRef
430 .augmentation(FlowHashIdMapping.class).child(FlowHashIdMap.class, hashingKey);
431 /* Add new FlowHashIdMap */
432 trans.put(LogicalDatastoreType.OPERATIONAL, flHashIdent, flHashIdMap.build());
435 void removeUnreportedFlows(final ReadWriteTransaction tx) {
436 final InstanceIdentifier<Node> nodeIdent = tableRef.firstIdentifierOf(Node.class);
437 final List<InstanceIdentifier<Flow>> listMissingConfigFlows = notStatReportedConfigFlows();
438 final Map<InstanceIdentifier<Flow>, Integer> nodeDeleteMap = mapNodesForDelete.get(nodeIdent);
439 final Map<FlowHashIdMapKey, FlowId> listForRemove = getRemovalList();
440 for (final Entry<FlowHashIdMapKey, FlowId> entryForRemove : listForRemove.entrySet()) {
441 final FlowKey flowKey = new FlowKey(entryForRemove.getValue());
442 final InstanceIdentifier<Flow> flowRef = tableRef.child(Flow.class, flowKey);
443 if (nodeDeleteMap != null && flowKey.getId().getValue().startsWith(ALIEN_SYSTEM_FLOW_ID)) {
444 final Integer lifeIndex = nodeDeleteMap.get(flowRef);
448 nodeDeleteMap.remove(flowRef);
451 if (listMissingConfigFlows.remove(flowRef)) {
452 // it is probable that some multipart message was lost
456 final InstanceIdentifier<FlowHashIdMap> flHashIdent =
457 tableRef.augmentation(FlowHashIdMapping.class).child(FlowHashIdMap.class, entryForRemove.getKey());
458 tx.delete(LogicalDatastoreType.OPERATIONAL, flowRef);
459 tx.delete(LogicalDatastoreType.OPERATIONAL, flHashIdent);
463 List<InstanceIdentifier<Flow>> notStatReportedConfigFlows() {
464 if (configFlows != null) {
465 final List<InstanceIdentifier<Flow>> returnList = new ArrayList<>(configFlows.size());
466 for (final Flow confFlow : configFlows) {
467 final InstanceIdentifier<Flow> confFlowIdent = tableRef.child(Flow.class, confFlow.getKey());
468 returnList.add(confFlowIdent);
472 return Collections.emptyList();