import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.nodes.node.table.FlowHashIdMapBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.nodes.node.table.FlowHashIdMapKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, OpendaylightFlowStatisticsListener>
implements OpendaylightFlowStatisticsListener {
- private static final Logger LOG = LoggerFactory.getLogger(StatListenCommitFlow.class);
+ protected static final Logger LOG = LoggerFactory.getLogger(StatListenCommitFlow.class);
private static final String ALIEN_SYSTEM_FLOW_ID = "#UF$TABLE*";
+ private static final Integer REMOVE_AFTER_MISSING_COLLECTION = 1;
+
private final AtomicInteger unaccountedFlowsCounter = new AtomicInteger(0);
public StatListenCommitFlow (final StatisticsManager manager, final DataBroker db,
.setAggregateFlowStatistics(new AggregateFlowStatisticsBuilder(notification).build()).build();
final InstanceIdentifier<FlowCapableNode> fNodeIdent = InstanceIdentifier.create(Nodes.class)
.child(Node.class, new NodeKey(nodeId)).augmentation(FlowCapableNode.class);
- final InstanceIdentifier<AggregateFlowStatisticsData> tableStatRef = fNodeIdent
- .child(Table.class, table.getKey()).augmentation(AggregateFlowStatisticsData.class);
+ final InstanceIdentifier<Table> tableRef = fNodeIdent.child(Table.class, table.getKey());
+ final InstanceIdentifier<AggregateFlowStatisticsData> tableStatRef = tableRef
+ .augmentation(AggregateFlowStatisticsData.class);
Optional<FlowCapableNode> fNode = Optional.absent();
try {
fNode = tx.read(LogicalDatastoreType.OPERATIONAL, fNodeIdent).checkedGet();
- }
- catch (final ReadFailedException e) {
+ } catch (final ReadFailedException e) {
LOG.debug("Read Operational/DS for FlowCapableNode fail! {}", fNodeIdent, e);
return;
}
if (fNode.isPresent()) {
- tx.put(LogicalDatastoreType.OPERATIONAL, tableStatRef, stats, true);
+ ensureTable(tx, table.getId(), tableRef);
+ tx.put(LogicalDatastoreType.OPERATIONAL, tableStatRef, stats);
}
}
}
});
}
+ public void ensureTable(final ReadWriteTransaction tx, final Short tableId, final InstanceIdentifier<Table> tableRef) {
+ final Table tableNew = new TableBuilder().setId(tableId).build();
+ tx.merge(LogicalDatastoreType.OPERATIONAL, tableRef, tableNew);
+ }
+
@Override
public void onFlowsStatisticsUpdate(final FlowsStatisticsUpdate notification) {
final TransactionId transId = notification.getTransactionId();
}
statsFlowCommitAll(flowStats, nodeIdent, tx);
+ /* cleaning all not cached hash collisions */
+ final Map<InstanceIdentifier<Flow>, Integer> listAliens = mapNodesForDelete.get(nodeIdent);
+ if (listAliens != null) {
+ for (final Entry<InstanceIdentifier<Flow>, Integer> nodeForDelete : listAliens.entrySet()) {
+ final Integer lifeIndex = nodeForDelete.getValue();
+ if (nodeForDelete.getValue() > 0) {
+ nodeForDelete.setValue(Integer.valueOf(lifeIndex.intValue() - 1));
+ } else {
+ final InstanceIdentifier<Flow> flowNodeIdent = nodeForDelete.getKey();
+ mapNodesForDelete.get(nodeIdent).remove(flowNodeIdent);
+ tx.delete(LogicalDatastoreType.OPERATIONAL, flowNodeIdent);
+ }
+ }
+ }
/* Notification for continue collecting statistics */
notifyToCollectNextStatistics(nodeIdent);
}
/**
* build pseudoUnique hashCode for flow in table
* for future easy identification
+ *
+ * FIXME: we expect same version for YANG models for all clusters and that has to be fix
+ * FIXME: CREATE BETTER KEY - for flow (MATCH is the problem)
*/
- static String buildHashCode(final FlowAndStatisticsMapList deviceFlow) {
- final FlowBuilder builder = new FlowBuilder();
- builder.setMatch(deviceFlow.getMatch());
- builder.setCookie(deviceFlow.getCookie());
- builder.setPriority(deviceFlow.getPriority());
- final Flow flowForHashCode = builder.build();
- return String.valueOf(flowForHashCode.hashCode());
+ static String buildFlowIdOperKey(final FlowAndStatisticsMapList deviceFlow) {
+ return new StringBuffer().append(deviceFlow.getMatch())
+ .append(deviceFlow.getPriority()).append(deviceFlow.getCookie().getValue()).toString();
}
private class NodeUpdateState {
}
private class TableFlowUpdateState {
+
private boolean tableEnsured = false;
final KeyedInstanceIdentifier<Table, TableKey> tableRef;
final TableKey tableKey;
final List<FlowHashIdMap> flowHashMap = flowHashMapping.getFlowHashIdMap() != null
? flowHashMapping.getFlowHashIdMap() : Collections.<FlowHashIdMap> emptyList();
for (final FlowHashIdMap flowHashId : flowHashMap) {
- flowIdByHash.put(flowHashId.getKey(), flowHashId.getFlowId());
+ try {
+ flowIdByHash.put(flowHashId.getKey(), flowHashId.getFlowId());
+ } catch (final Exception e) {
+ LOG.warn("flow hashing hit a duplicate for {} -> {}", flowHashId.getKey(), flowHashId.getFlowId());
+ }
}
}
}
}
- private void ensureTable(final ReadWriteTransaction tx) {
+ private void ensureTableFowHashIdMapping(final ReadWriteTransaction tx) {
if( ! tableEnsured) {
+ ensureTable(tx, tableKey.getId(), tableRef);
final FlowHashIdMapping emptyMapping = new FlowHashIdMappingBuilder()
.setFlowHashIdMap(Collections.<FlowHashIdMap> emptyList()).build();
- tx.merge(LogicalDatastoreType.OPERATIONAL, tableRef.augmentation(FlowHashIdMapping.class), emptyMapping, true);
+ tx.merge(LogicalDatastoreType.OPERATIONAL, tableRef.augmentation(FlowHashIdMapping.class), emptyMapping);
tableEnsured = true;
}
}
}
private void initConfigFlows(final ReadWriteTransaction trans) {
- Optional<Table> table = readLatestConfiguration(tableRef);
- try {
- table = trans.read(LogicalDatastoreType.CONFIGURATION, tableRef).checkedGet();
- } catch (final ReadFailedException e) {
- table = Optional.absent();
- }
+ final Optional<Table> table = readLatestConfiguration(tableRef);
List<Flow> localList = null;
if(table.isPresent()) {
localList = table.get().getFlow();
}
void reportFlow(final FlowAndStatisticsMapList flowStat, final ReadWriteTransaction trans) {
- ensureTable(trans);
- final FlowHashIdMapKey hashingKey = new FlowHashIdMapKey(buildHashCode(flowStat));
+ ensureTableFowHashIdMapping(trans);
+ final FlowHashIdMapKey hashingKey = new FlowHashIdMapKey(buildFlowIdOperKey(flowStat));
FlowKey flowKey = getFlowKeyAndRemoveHash(hashingKey);
if (flowKey == null) {
flowKey = searchInConfiguration(flowStat, trans);
trans.put(LogicalDatastoreType.OPERATIONAL, flowIdent, flowBuilder.build());
/* check life for Alien flows */
if (flowKey.getId().getValue().startsWith(ALIEN_SYSTEM_FLOW_ID)) {
- removeData(flowIdent, Integer.valueOf(5));
+ removeData(flowIdent, REMOVE_AFTER_MISSING_COLLECTION);
}
}
/* Build and deploy new FlowHashId map */
private void updateHashCache(final ReadWriteTransaction trans, final FlowKey flowKey, final FlowHashIdMapKey hashingKey) {
- // TODO Auto-generated method stub
final FlowHashIdMapBuilder flHashIdMap = new FlowHashIdMapBuilder();
flHashIdMap.setFlowId(flowKey.getId());
flHashIdMap.setKey(hashingKey);
}
void removeUnreportedFlows(final ReadWriteTransaction tx) {
+ final InstanceIdentifier<Node> nodeIdent = tableRef.firstIdentifierOf(Node.class);
+ final List<InstanceIdentifier<Flow>> listMissingConfigFlows = notStatReportedConfigFlows();
+ final Map<InstanceIdentifier<Flow>, Integer> nodeDeleteMap = mapNodesForDelete.get(nodeIdent);
final Map<FlowHashIdMapKey, FlowId> listForRemove = getRemovalList();
- final Optional<Table> configTable = readLatestConfiguration(tableRef);
- List<Flow> configFlows = Collections.emptyList();
- if (configTable.isPresent() && configTable.get().getFlow() != null) {
- configFlows = new ArrayList<>(configTable.get().getFlow());
- }
for (final Entry<FlowHashIdMapKey, FlowId> entryForRemove : listForRemove.entrySet()) {
final FlowKey flowKey = new FlowKey(entryForRemove.getValue());
final InstanceIdentifier<Flow> flowRef = tableRef.child(Flow.class, flowKey);
- final InstanceIdentifier<FlowStatisticsData> flowStatIdent = flowRef.augmentation(FlowStatisticsData.class);
- if (flowKey.getId().getValue().startsWith(ALIEN_SYSTEM_FLOW_ID)) {
- final InstanceIdentifier<Node> nodeIdent = tableRef.firstIdentifierOf(Node.class);
- final Integer lifeIndex = mapNodesForDelete.get(nodeIdent).remove(flowRef);
+ if (nodeDeleteMap != null && flowKey.getId().getValue().startsWith(ALIEN_SYSTEM_FLOW_ID)) {
+ final Integer lifeIndex = nodeDeleteMap.get(flowRef);
if (lifeIndex > 0) {
- mapNodesForDelete.get(nodeIdent).put(flowRef, Integer.valueOf(lifeIndex.intValue() - 1));
break;
+ } else {
+ nodeDeleteMap.remove(flowRef);
}
} else {
- if (configFlows.remove(flowRef)) {
- /* Node is still presented in Config/DataStore - probably lost some multipart msg */
- break;
+ if (listMissingConfigFlows.remove(flowRef)) {
+ break; // we probably lost some multipart msg
}
}
- final Optional<FlowStatisticsData> flowStatNodeCheck;
- try {
- flowStatNodeCheck = tx.read(LogicalDatastoreType.OPERATIONAL, flowStatIdent).checkedGet();
- }
- catch (final ReadFailedException e) {
- LOG.debug("Read FlowStatistics {} in Operational/DS fail! Statisticscan not beupdated.", flowStatIdent, e);
- break;
- }
- if (flowStatNodeCheck.isPresent()) {
- /* Node isn't new and it has not been removed yet */
- final InstanceIdentifier<FlowHashIdMap> flHashIdent = tableRef.augmentation(FlowHashIdMapping.class).child(FlowHashIdMap.class, entryForRemove.getKey());
- tx.delete(LogicalDatastoreType.OPERATIONAL, flowRef);
- tx.delete(LogicalDatastoreType.OPERATIONAL, flHashIdent);
+ final InstanceIdentifier<FlowHashIdMap> flHashIdent =
+ tableRef.augmentation(FlowHashIdMapping.class).child(FlowHashIdMap.class, entryForRemove.getKey());
+ tx.delete(LogicalDatastoreType.OPERATIONAL, flowRef);
+ tx.delete(LogicalDatastoreType.OPERATIONAL, flHashIdent);
+ }
+ }
+
+ List<InstanceIdentifier<Flow>> notStatReportedConfigFlows() {
+ if (configFlows != null) {
+ final List<InstanceIdentifier<Flow>> returnList = new ArrayList<>(configFlows.size());
+ for (final Flow confFlow : configFlows) {
+ final InstanceIdentifier<Flow> confFlowIdent = tableRef.child(Flow.class, confFlow.getKey());
+ returnList.add(confFlowIdent);
}
+ return returnList;
}
+ return Collections.emptyList();
}
}
}