2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.md.statistics.manager.impl;
11 import java.util.ArrayList;
12 import java.util.Collections;
13 import java.util.HashMap;
14 import java.util.Iterator;
15 import java.util.LinkedList;
16 import java.util.List;
18 import java.util.Map.Entry;
19 import java.util.concurrent.atomic.AtomicInteger;
21 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
22 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
23 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
24 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
25 import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
26 import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
27 import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
28 import org.opendaylight.controller.md.statistics.manager.impl.helper.FlowComparator;
29 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowHashIdMapping;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowHashIdMappingBuilder;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.nodes.node.table.FlowHashIdMap;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.nodes.node.table.FlowHashIdMapBuilder;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.nodes.node.table.FlowHashIdMapKey;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsData;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsDataBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsUpdate;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsDataBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowsStatisticsUpdate;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.aggregate.flow.statistics.AggregateFlowStatisticsBuilder;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapListBuilder;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.statistics.FlowStatisticsBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
59 import org.opendaylight.yangtools.yang.binding.DataObject;
60 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
61 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
65 import com.google.common.base.Optional;
66 import com.google.common.collect.BiMap;
67 import com.google.common.collect.HashBiMap;
71 * org.opendaylight.controller.md.statistics.manager.impl
73 * StatListenCommitFlow
74 * Class is a NotifyListener for FlowStatistics and DataChangeListener for Config/DataStore for Flow node.
75 * All expected (registered) FlowStatistics will be builded and commit to Operational/DataStore.
76 * DataChangeEven should call create/delete Flow in Operational/DS create process needs to pair
77 * Device Flow HashCode and FlowId from Config/DS
79 * @author <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
82 public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, OpendaylightFlowStatisticsListener>
83 implements OpendaylightFlowStatisticsListener {
85 private static final Logger LOG = LoggerFactory.getLogger(StatListenCommitFlow.class);
87 private static final String ALIEN_SYSTEM_FLOW_ID = "#UF$TABLE*";
89 private final AtomicInteger unaccountedFlowsCounter = new AtomicInteger(0);
91 public StatListenCommitFlow (final StatisticsManager manager, final DataBroker db,
92 final NotificationProviderService nps){
93 super(manager, db, nps, Flow.class);
97 protected OpendaylightFlowStatisticsListener getStatNotificationListener() {
102 protected InstanceIdentifier<Flow> getWildCardedRegistrationPath() {
103 return InstanceIdentifier.create(Nodes.class).child(Node.class)
104 .augmentation(FlowCapableNode.class).child(Table.class).child(Flow.class);
108 public void onAggregateFlowStatisticsUpdate(final AggregateFlowStatisticsUpdate notification) {
109 final TransactionId transId = notification.getTransactionId();
110 final NodeId nodeId = notification.getId();
111 if ( ! isExpectedStatistics(transId, nodeId)) {
112 LOG.debug("STAT-MANAGER - AggregateFlowStatisticsUpdate: unregistred notification detect TransactionId {}", transId);
115 manager.getRpcMsgManager().addNotification(notification, nodeId);
116 if (notification.isMoreReplies()) {
119 /* check flow Capable Node and write statistics */
120 manager.enqueue(new StatDataStoreOperation() {
122 public void applyOperation(final ReadWriteTransaction tx) {
124 final Optional<TransactionCacheContainer<?>> txContainer = getTransactionCacheContainer(transId, nodeId);
125 if (( ! txContainer.isPresent()) || txContainer.get().getNotifications() == null) {
128 final Optional<? extends DataObject> inputObj = txContainer.get().getConfInput();
129 if (( ! inputObj.isPresent()) || ( ! (inputObj.get() instanceof Table))) {
132 final Table table = (Table) inputObj.get();
133 final List<? extends TransactionAware> cacheNotifs = txContainer.get().getNotifications();
134 for (final TransactionAware notif : cacheNotifs) {
135 if (notif instanceof AggregateFlowStatisticsUpdate) {
136 final AggregateFlowStatisticsData stats = new AggregateFlowStatisticsDataBuilder()
137 .setAggregateFlowStatistics(new AggregateFlowStatisticsBuilder(notification).build()).build();
138 final InstanceIdentifier<FlowCapableNode> fNodeIdent = InstanceIdentifier.create(Nodes.class)
139 .child(Node.class, new NodeKey(nodeId)).augmentation(FlowCapableNode.class);
140 final InstanceIdentifier<AggregateFlowStatisticsData> tableStatRef = fNodeIdent
141 .child(Table.class, table.getKey()).augmentation(AggregateFlowStatisticsData.class);
142 Optional<FlowCapableNode> fNode = Optional.absent();
144 fNode = tx.read(LogicalDatastoreType.OPERATIONAL, fNodeIdent).checkedGet();
146 catch (final ReadFailedException e) {
147 LOG.debug("Read Operational/DS for FlowCapableNode fail! {}", fNodeIdent, e);
150 if (fNode.isPresent()) {
151 tx.put(LogicalDatastoreType.OPERATIONAL, tableStatRef, stats, true);
160 public void onFlowsStatisticsUpdate(final FlowsStatisticsUpdate notification) {
161 final TransactionId transId = notification.getTransactionId();
162 final NodeId nodeId = notification.getId();
163 if ( ! isExpectedStatistics(transId, nodeId)) {
164 LOG.debug("STAT-MANAGER - FlowsStatisticsUpdate: unregistred notification detect TransactionId {}", transId);
167 manager.getRpcMsgManager().addNotification(notification, nodeId);
168 if (notification.isMoreReplies()) {
169 LOG.trace("Next notification for join txId {}", transId);
172 /* add flow's statistics */
173 manager.enqueue(new StatDataStoreOperation() {
175 public void applyOperation(final ReadWriteTransaction tx) {
176 final Optional<TransactionCacheContainer<?>> txContainer = getTransactionCacheContainer(transId, nodeId);
177 if (( ! txContainer.isPresent()) || txContainer.get().getNotifications() == null) {
180 final List<FlowAndStatisticsMapList> flowStats = new ArrayList<FlowAndStatisticsMapList>(10);
181 final InstanceIdentifier<Node> nodeIdent = InstanceIdentifier.create(Nodes.class)
182 .child(Node.class, new NodeKey(nodeId));
183 final List<? extends TransactionAware> cacheNotifs = txContainer.get().getNotifications();
184 for (final TransactionAware notif : cacheNotifs) {
185 if (notif instanceof FlowsStatisticsUpdate) {
186 final List<FlowAndStatisticsMapList> notifList =
187 ((FlowsStatisticsUpdate) notif).getFlowAndStatisticsMapList();
188 if (notifList != null) {
189 flowStats.addAll(notifList);
194 statsFlowCommitAll(flowStats, nodeIdent, tx);
195 /* Notification for continue collecting statistics */
196 notifyToCollectNextStatistics(nodeIdent);
201 private void statsFlowCommitAll(final List<FlowAndStatisticsMapList> list,
202 final InstanceIdentifier<Node> nodeIdent, final ReadWriteTransaction tx) {
204 final InstanceIdentifier<FlowCapableNode> fNodeIdent = nodeIdent.augmentation(FlowCapableNode.class);
206 final Optional<FlowCapableNode> fNode;
208 fNode = tx.read(LogicalDatastoreType.OPERATIONAL, fNodeIdent).checkedGet();
210 catch (final ReadFailedException e) {
211 LOG.debug("Read FlowCapableNode {} in Operational/DS fail! Statistic scan not be updated.", nodeIdent, e);
214 if ( ! fNode.isPresent()) {
215 LOG.trace("FlowCapableNode {} is not presented in Operational/DS. Statisticscan not be updated.", nodeIdent);
219 final NodeUpdateState nodeState = new NodeUpdateState(fNodeIdent,fNode.get());
221 for (final FlowAndStatisticsMapList flowStat : list) {
222 final TableKey tableKey = new TableKey(flowStat.getTableId());
223 final TableFlowUpdateState tableState = nodeState.getTable(tableKey, tx);
224 tableState.reportFlow(flowStat,tx);
227 for (final TableFlowUpdateState table : nodeState.getTables()) {
228 table.removeUnreportedFlows(tx);
233 * Method adds statistics to Flow
238 private void addStatistics(final FlowBuilder flowBuilder, final FlowAndStatisticsMapList deviceFlow) {
239 final FlowAndStatisticsMapListBuilder stats = new FlowAndStatisticsMapListBuilder(deviceFlow);
240 final FlowStatisticsBuilder flowStatisticsBuilder = new FlowStatisticsBuilder(stats.build());
241 final FlowStatisticsDataBuilder flowStatisticsData =new FlowStatisticsDataBuilder();
242 flowStatisticsData.setFlowStatistics(flowStatisticsBuilder.build());
243 flowBuilder.addAugmentation(FlowStatisticsData.class, flowStatisticsData.build());
247 * build pseudoUnique hashCode for flow in table
248 * for future easy identification
250 static String buildHashCode(final FlowAndStatisticsMapList deviceFlow) {
251 final FlowBuilder builder = new FlowBuilder();
252 builder.setMatch(deviceFlow.getMatch());
253 builder.setCookie(deviceFlow.getCookie());
254 builder.setPriority(deviceFlow.getPriority());
255 final Flow flowForHashCode = builder.build();
256 return String.valueOf(flowForHashCode.hashCode());
259 private class NodeUpdateState {
260 private final InstanceIdentifier<FlowCapableNode> nodeIdentifier;
261 private final Map<TableKey,TableFlowUpdateState> tables = new HashMap<>();
263 public NodeUpdateState(final InstanceIdentifier<FlowCapableNode> fNodeIdent, final FlowCapableNode flowCapableNode) {
264 nodeIdentifier = fNodeIdent;
265 final List<Table> tableList = flowCapableNode.getTable();
266 if(tableList != null) {
267 for (final Table table : tableList) {
268 final TableKey tableKey = table.getKey();
269 tables.put(tableKey, new TableFlowUpdateState(nodeIdentifier.child(Table.class,tableKey),table));
274 public Iterable<TableFlowUpdateState> getTables() {
275 return tables.values();
278 TableFlowUpdateState getTable(final TableKey key,final ReadWriteTransaction tx) {
279 TableFlowUpdateState table = tables.get(key);
281 table = new TableFlowUpdateState(nodeIdentifier.child(Table.class, key), null);
282 tables.put(key, table);
288 private class TableFlowUpdateState {
289 private boolean tableEnsured = false;
290 final KeyedInstanceIdentifier<Table, TableKey> tableRef;
291 final TableKey tableKey;
292 final BiMap<FlowHashIdMapKey, FlowId> flowIdByHash;
293 List<Flow> configFlows;
295 public TableFlowUpdateState(final KeyedInstanceIdentifier<Table, TableKey> tablePath, final Table table) {
296 tableRef = tablePath;
297 tableKey = tablePath.getKey();
298 flowIdByHash = HashBiMap.create();
300 final FlowHashIdMapping flowHashMapping = table.getAugmentation(FlowHashIdMapping.class);
301 if (flowHashMapping != null) {
302 final List<FlowHashIdMap> flowHashMap = flowHashMapping.getFlowHashIdMap() != null
303 ? flowHashMapping.getFlowHashIdMap() : Collections.<FlowHashIdMap> emptyList();
304 for (final FlowHashIdMap flowHashId : flowHashMap) {
305 flowIdByHash.put(flowHashId.getKey(), flowHashId.getFlowId());
311 private void ensureTable(final ReadWriteTransaction tx) {
312 if( ! tableEnsured) {
313 final FlowHashIdMapping emptyMapping = new FlowHashIdMappingBuilder()
314 .setFlowHashIdMap(Collections.<FlowHashIdMap> emptyList()).build();
315 tx.merge(LogicalDatastoreType.OPERATIONAL, tableRef.augmentation(FlowHashIdMapping.class), emptyMapping, true);
320 private FlowKey searchInConfiguration(final FlowAndStatisticsMapList flowStat, final ReadWriteTransaction trans) {
321 initConfigFlows(trans);
322 final Iterator<Flow> it = configFlows.iterator();
323 while(it.hasNext()) {
324 final Flow cfgFlow = it.next();
325 final FlowKey cfgKey = cfgFlow.getKey();
326 if(flowIdByHash.inverse().containsKey(cfgKey)) {
328 } else if(FlowComparator.flowEquals(flowStat, cfgFlow)) {
336 private void initConfigFlows(final ReadWriteTransaction trans) {
337 Optional<Table> table = readLatestConfiguration(tableRef);
339 table = trans.read(LogicalDatastoreType.CONFIGURATION, tableRef).checkedGet();
340 } catch (final ReadFailedException e) {
341 table = Optional.absent();
343 List<Flow> localList = null;
344 if(table.isPresent()) {
345 localList = table.get().getFlow();
347 if(localList == null) {
348 configFlows = Collections.emptyList();
350 configFlows = new LinkedList<>(localList);
354 private FlowKey getFlowKeyAndRemoveHash(final FlowHashIdMapKey key) {
355 final FlowId ret = flowIdByHash.get(key);
357 flowIdByHash.remove(key);
358 return new FlowKey(ret);
363 /* Returns FlowKey which doesn't exist in any DataStore for now */
364 private FlowKey makeAlienFlowKey() {
365 final StringBuilder sBuilder = new StringBuilder(ALIEN_SYSTEM_FLOW_ID)
366 .append(tableKey.getId()).append("-").append(unaccountedFlowsCounter.incrementAndGet());
367 final FlowId flowId = new FlowId(sBuilder.toString());
368 return new FlowKey(flowId);
371 private Map<FlowHashIdMapKey, FlowId> getRemovalList() {
375 void reportFlow(final FlowAndStatisticsMapList flowStat, final ReadWriteTransaction trans) {
377 final FlowHashIdMapKey hashingKey = new FlowHashIdMapKey(buildHashCode(flowStat));
378 FlowKey flowKey = getFlowKeyAndRemoveHash(hashingKey);
379 if (flowKey == null) {
380 flowKey = searchInConfiguration(flowStat, trans);
381 if ( flowKey == null) {
382 flowKey = makeAlienFlowKey();
384 updateHashCache(trans,flowKey,hashingKey);
386 final FlowBuilder flowBuilder = new FlowBuilder(flowStat);
387 flowBuilder.setKey(flowKey);
388 addStatistics(flowBuilder, flowStat);
389 final InstanceIdentifier<Flow> flowIdent = tableRef.child(Flow.class, flowKey);
390 trans.put(LogicalDatastoreType.OPERATIONAL, flowIdent, flowBuilder.build());
391 /* check life for Alien flows */
392 if (flowKey.getId().getValue().startsWith(ALIEN_SYSTEM_FLOW_ID)) {
393 removeData(flowIdent, Integer.valueOf(5));
397 /* Build and deploy new FlowHashId map */
398 private void updateHashCache(final ReadWriteTransaction trans, final FlowKey flowKey, final FlowHashIdMapKey hashingKey) {
399 // TODO Auto-generated method stub
400 final FlowHashIdMapBuilder flHashIdMap = new FlowHashIdMapBuilder();
401 flHashIdMap.setFlowId(flowKey.getId());
402 flHashIdMap.setKey(hashingKey);
403 final KeyedInstanceIdentifier<FlowHashIdMap, FlowHashIdMapKey> flHashIdent = tableRef
404 .augmentation(FlowHashIdMapping.class).child(FlowHashIdMap.class, hashingKey);
405 /* Add new FlowHashIdMap */
406 trans.put(LogicalDatastoreType.OPERATIONAL, flHashIdent, flHashIdMap.build());
409 void removeUnreportedFlows(final ReadWriteTransaction tx) {
410 final Map<FlowHashIdMapKey, FlowId> listForRemove = getRemovalList();
411 final Optional<Table> configTable = readLatestConfiguration(tableRef);
412 List<Flow> configFlows = Collections.emptyList();
413 if (configTable.isPresent() && configTable.get().getFlow() != null) {
414 configFlows = new ArrayList<>(configTable.get().getFlow());
416 for (final Entry<FlowHashIdMapKey, FlowId> entryForRemove : listForRemove.entrySet()) {
417 final FlowKey flowKey = new FlowKey(entryForRemove.getValue());
418 final InstanceIdentifier<Flow> flowRef = tableRef.child(Flow.class, flowKey);
419 final InstanceIdentifier<FlowStatisticsData> flowStatIdent = flowRef.augmentation(FlowStatisticsData.class);
420 if (flowKey.getId().getValue().startsWith(ALIEN_SYSTEM_FLOW_ID)) {
421 final InstanceIdentifier<Node> nodeIdent = tableRef.firstIdentifierOf(Node.class);
422 final Integer lifeIndex = mapNodesForDelete.get(nodeIdent).remove(flowRef);
424 mapNodesForDelete.get(nodeIdent).put(flowRef, Integer.valueOf(lifeIndex.intValue() - 1));
428 if (configFlows.remove(flowRef)) {
429 /* Node is still presented in Config/DataStore - probably lost some multipart msg */
433 final Optional<FlowStatisticsData> flowStatNodeCheck;
435 flowStatNodeCheck = tx.read(LogicalDatastoreType.OPERATIONAL, flowStatIdent).checkedGet();
437 catch (final ReadFailedException e) {
438 LOG.debug("Read FlowStatistics {} in Operational/DS fail! Statisticscan not beupdated.", flowStatIdent, e);
441 if (flowStatNodeCheck.isPresent()) {
442 /* Node isn't new and it has not been removed yet */
443 final InstanceIdentifier<FlowHashIdMap> flHashIdent = tableRef.augmentation(FlowHashIdMapping.class).child(FlowHashIdMap.class, entryForRemove.getKey());
444 tx.delete(LogicalDatastoreType.OPERATIONAL, flowRef);
445 tx.delete(LogicalDatastoreType.OPERATIONAL, flHashIdent);