import com.google.common.base.Optional;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.atomic.AtomicInteger;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.openflowplugin.applications.statistics.manager.StatNodeRegistration;
import org.opendaylight.openflowplugin.applications.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager;
import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager.StatDataStoreOperation;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.statistics.FlowStatisticsBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionAware;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowCookie;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
/**
* statistics-manager
* org.opendaylight.openflowplugin.applications.statistics.manager.impl
*
* StatListenCommitFlow
- * Class is a NotifyListener for FlowStatistics and DataChangeListener for Config/DataStore for Flow node.
+ * Class is a NotifyListener for FlowStatistics and DataTreeChangeListener for Config/DataStore for Flow node.
* All expected (registered) FlowStatistics will be builded and commit to Operational/DataStore.
- * DataChangeEven should call create/delete Flow in Operational/DS create process needs to pair
+ * DataTreeModification should call create/delete Flow in Operational/DS create process needs to pair
* Device Flow HashCode and FlowId from Config/DS
- *
- * @author <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
- *
*/
public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, OpendaylightFlowStatisticsListener>
implements OpendaylightFlowStatisticsListener {
private final AtomicInteger unaccountedFlowsCounter = new AtomicInteger(0);
public StatListenCommitFlow (final StatisticsManager manager, final DataBroker db,
- final NotificationProviderService nps){
- super(manager, db, nps, Flow.class);
+ final NotificationProviderService nps,
+ final StatNodeRegistration nrm){
+ super(manager, db, nps, Flow.class,nrm);
}
@Override
if (( ! inputObj.isPresent()) || ( ! (inputObj.get() instanceof Table))) {
return;
}
+
+ if(!nodeRegistrationManager.isFlowCapableNodeOwner(nodeId)) { return; }
+
final Table table = (Table) inputObj.get();
final List<? extends TransactionAware> cacheNotifs = txContainer.get().getNotifications();
for (final TransactionAware notif : cacheNotifs) {
}
}
}
+
+ @Override
+ public UUID generatedUUIDForNode() {
+ return manager.getGeneratedUUIDForNode(getNodeIdentifier());
+ }
});
}
tx.merge(LogicalDatastoreType.OPERATIONAL, tableRef, tableNew);
}
+ protected void processDataChange(Collection<DataTreeModification<Flow>> changes) {
+ if (!changes.isEmpty()) {
+ for (DataTreeModification<Flow> dataChange : changes) {
+ if (dataChange.getRootNode().getModificationType() == DataObjectModification.ModificationType.DELETE) {
+ final InstanceIdentifier<Node> nodeIdent = dataChange.getRootPath().getRootIdentifier()
+ .firstIdentifierOf(Node.class);
+ if (!removedDataBetweenStatsCycle.containsKey(nodeIdent)) {
+ removedDataBetweenStatsCycle.put(nodeIdent, new ArrayList<>());
+ }
+ Flow data = dataChange.getRootNode().getDataBefore();
+ removedDataBetweenStatsCycle.get(nodeIdent).add(data);
+ LOG.debug("Node: {} :: Flow removed {}",nodeIdent.firstKeyOf(Node.class).getId(), data.toString());
+ }
+ }
+ }
+ }
+
@Override
public void onFlowsStatisticsUpdate(final FlowsStatisticsUpdate notification) {
final TransactionId transId = notification.getTransactionId();
final NodeId nodeId = notification.getId();
if ( ! isExpectedStatistics(transId, nodeId)) {
- LOG.debug("STAT-MANAGER - FlowsStatisticsUpdate: unregistred notification detect TransactionId {}", transId);
+ LOG.debug("STAT-MANAGER - FlowsStatisticsUpdate: unregistered notification detect TransactionId {}",
+ transId);
return;
}
manager.getRpcMsgManager().addNotification(notification, nodeId);
if (( ! txContainer.isPresent()) || txContainer.get().getNotifications() == null) {
return;
}
+ if(!nodeRegistrationManager.isFlowCapableNodeOwner(nodeId)) { return; }
+
final List<FlowAndStatisticsMapList> flowStats = new ArrayList<FlowAndStatisticsMapList>(10);
final InstanceIdentifier<Node> nodeIdent = InstanceIdentifier.create(Nodes.class)
.child(Node.class, new NodeKey(nodeId));
notifyToCollectNextStatistics(nodeIdent, transId);
}
+ @Override
+ public UUID generatedUUIDForNode() {
+ return manager.getGeneratedUUIDForNode(getNodeIdentifier());
+ }
+
});
}
final InstanceIdentifier<FlowCapableNode> fNodeIdent = nodeIdent.augmentation(FlowCapableNode.class);
+ //cleanup the hashmap ID for the flows deleted between two stats cycle, also cleanup the
+ // data change cache as well.
+ ArrayList<Flow> deletedFlows = removedDataBetweenStatsCycle.remove(nodeIdent);
+
+ if (deletedFlows != null && !deletedFlows.isEmpty()) {
+ LOG.trace("Number of flows deleted from node {} between two stats cycles are {}", nodeIdent, deletedFlows
+ .size());
+ }
+
final Optional<FlowCapableNode> fNode;
try {
fNode = tx.read(LogicalDatastoreType.OPERATIONAL, fNodeIdent).checkedGet();
for (final FlowAndStatisticsMapList flowStat : list) {
final TableKey tableKey = new TableKey(flowStat.getTableId());
final TableFlowUpdateState tableState = nodeState.getTable(tableKey, tx);
- tableState.reportFlow(flowStat,tx);
+ Flow removedConfigFlow = getFlowIfRemoved(flowStat, deletedFlows);
+ if (removedConfigFlow == null) {
+ tableState.reportFlow(flowStat,tx, false);
+ } else {
+ deletedFlows.remove(removedConfigFlow);
+ tableState.reportFlow(flowStat,tx, true);
+ }
}
+ if (deletedFlows != null ) {
+ deletedFlows.clear();
+ }
for (final TableFlowUpdateState table : nodeState.getTables()) {
table.removeUnreportedFlows(tx);
}
}
+ private Flow getFlowIfRemoved(FlowAndStatisticsMapList flowStat, ArrayList<Flow> deletedFlows) {
+ if (deletedFlows != null && !deletedFlows.isEmpty()) {
+ for (Flow flow : deletedFlows) {
+ final FlowAndStatisticsMapList configFlowStats = new FlowAndStatisticsMapListBuilder(flow).build();
+ if ( flowStat.getMatch().equals(configFlowStats.getMatch()) &&
+ flowStat.getPriority().equals(configFlowStats.getPriority()) &&
+ flowStat.getCookie().equals(configFlowStats.getCookie()!=null?configFlowStats.getCookie():
+ new FlowCookie(new BigInteger("0")))) {
+ LOG.debug("Flow statistics {} are related to flow {}, but it's REMOVED from the config data store" +
+ "store", flowStat, flow);
+ return flow;
+ }
+ }
+ }
+ return null;
+ }
/**
* Method adds statistics to Flow
*
//flowHashId.getKey() too verbose for standard log.
if(LOG.isDebugEnabled()) {
final FlowId currData = flowIdByHash.get(flowHashId.getKey());
- LOG.debug("flow hashing hit a duplicate for {} -> {}. Curr value: {} Equals:{}. Exception was raised:",
+ LOG.debug("Flow hashing hit a duplicate for {} -> {}. Curr value: {} Equals:{}. " +
+ "Exception was raised:",
flowHashId.getKey(), flowHashId.getFlowId(), currData, flowHashId.getFlowId().equals(currData), e);
}
else
{
- LOG.warn("flow hashing hit a duplicate {}. Exception was raised: {}. Enable DEBUG for more detail.",
+ LOG.warn("Flow hashing hit a duplicate {}. Exception was raised: {}. Enable DEBUG for" +
+ " more detail.",
flowHashId.getFlowId().toString().substring(0, Math.min(TRUNCATED_LOG_MESSAGE_LENGTH,flowHashId.getFlowId().toString().length())),
e.getMessage().substring(0,Math.min(TRUNCATED_LOG_MESSAGE_LENGTH,e.getMessage().length())));
}
}
}
- private FlowKey getFlowKeyAndRemoveHash(final FlowHashIdMapKey key) {
+ private FlowKey getFlowKeyByHash(final FlowHashIdMapKey key) {
final FlowId ret = flowIdByHash.get(key);
if(ret != null) {
- flowIdByHash.remove(key);
return new FlowKey(ret);
}
return null;
return flowIdByHash;
}
- void reportFlow(final FlowAndStatisticsMapList flowStat, final ReadWriteTransaction trans) {
+ void reportFlow(final FlowAndStatisticsMapList flowStat,
+ final ReadWriteTransaction trans,
+ boolean wasRemoved) {
ensureTableFowHashIdMapping(trans);
final FlowHashIdMapKey hashingKey = new FlowHashIdMapKey(buildFlowIdOperKey(flowStat));
- FlowKey flowKey = getFlowKeyAndRemoveHash(hashingKey);
- if (flowKey == null) {
+ FlowKey flowKey = getFlowKeyByHash(hashingKey);
+ if (flowKey == null || wasRemoved) {
flowKey = searchInConfiguration(flowStat, trans);
if ( flowKey == null) {
flowKey = makeAlienFlowKey();
}
updateHashCache(trans,flowKey,hashingKey);
+ } else {
+ flowIdByHash.remove(hashingKey);
}
final FlowBuilder flowBuilder = new FlowBuilder(flowStat);
flowBuilder.setKey(flowKey);
final InstanceIdentifier<Flow> flowRef = tableRef.child(Flow.class, flowKey);
if (nodeDeleteMap != null && flowKey.getId().getValue().startsWith(ALIEN_SYSTEM_FLOW_ID)) {
final Integer lifeIndex = nodeDeleteMap.get(flowRef);
- if (lifeIndex > 0) {
+ if (lifeIndex != null && lifeIndex > 0) {
break;
} else {
nodeDeleteMap.remove(flowRef);