Bug 7349 - Flow ID not updated in operational after removing and adding a flow with... 96/49396/4
authorAnil Vishnoi <vishnoianil@gmail.com>
Tue, 13 Dec 2016 03:57:05 +0000 (19:57 -0800)
committerAnil Vishnoi <vishnoianil@gmail.com>
Tue, 20 Dec 2016 03:06:07 +0000 (19:06 -0800)
Change-Id: Ie370cf017bd8642c52f1469cc0b01b10fa38842d
Signed-off-by: Anil Vishnoi <vishnoianil@gmail.com>
applications/statistics-manager/src/main/java/org/opendaylight/openflowplugin/applications/statistics/manager/impl/StatAbstractListenCommit.java
applications/statistics-manager/src/main/java/org/opendaylight/openflowplugin/applications/statistics/manager/impl/StatListenCommitFlow.java
applications/statistics-manager/src/main/java/org/opendaylight/openflowplugin/applications/statistics/manager/impl/StatListenCommitGroup.java
applications/statistics-manager/src/main/java/org/opendaylight/openflowplugin/applications/statistics/manager/impl/StatListenCommitMeter.java
applications/statistics-manager/src/main/java/org/opendaylight/openflowplugin/applications/statistics/manager/impl/StatListenCommitQueue.java
applications/statistics-manager/src/test/java/org/opendaylight/openflowplugin/applications/statistics/manager/impl/StatAbstractListenCommitTest.java

index e6dfc888f9a971d17484215d30b4e79409d6e613..740f82378ced82b5b905e24d8a0d3d4b1fc0c196 100644 (file)
@@ -10,10 +10,6 @@ package org.opendaylight.openflowplugin.applications.statistics.manager.impl;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
@@ -33,6 +29,12 @@ import org.opendaylight.yangtools.yang.binding.NotificationListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+
 /**
  * statistics-manager
  * org.opendaylight.openflowplugin.applications.statistics.manager.impl
@@ -52,10 +54,12 @@ public abstract class StatAbstractListenCommit<T extends DataObject, N extends N
 
     protected final Map<InstanceIdentifier<Node>, Map<InstanceIdentifier<T>, Integer>> mapNodesForDelete = new ConcurrentHashMap<>();
     protected final Map<InstanceIdentifier<Node>, Integer> mapNodeFeautureRepeater = new ConcurrentHashMap<>();
+    protected final Map<InstanceIdentifier<Node>, ArrayList<T>> removedDataBetweenStatsCycle = new
+            ConcurrentHashMap<>();
 
     private final Class<T> clazz;
 
-    private final DataBroker dataBroker;
+    protected final DataBroker dataBroker;
 
     protected final StatNodeRegistration nodeRegistrationManager;
 
@@ -98,6 +102,8 @@ public abstract class StatAbstractListenCommit<T extends DataObject, N extends N
      */
     protected abstract InstanceIdentifier<T> getWildCardedRegistrationPath();
 
+    protected abstract void processDataChange(Collection<DataTreeModification<T>> changes);
+
     @Override
     public void onDataTreeChanged(Collection<DataTreeModification<T>> changes) {
         Preconditions.checkNotNull(changes, "Changes must not be null!");
@@ -107,6 +113,7 @@ public abstract class StatAbstractListenCommit<T extends DataObject, N extends N
          * Latest read transaction will be allocated on another read using readLatestConfiguration
          */
         currentReadTxStale = true;
+        processDataChange(changes);
     }
 
     @SuppressWarnings("unchecked")
@@ -128,6 +135,7 @@ public abstract class StatAbstractListenCommit<T extends DataObject, N extends N
     @Override
     public void cleanForDisconnect(final InstanceIdentifier<Node> nodeIdent) {
         mapNodesForDelete.remove(nodeIdent);
+        removedDataBetweenStatsCycle.remove(nodeIdent);
     }
 
     @Override
index 5e8fd252a60f77f871365bd9f131516cf289b0ea..49068c71611b4dccf6f79be23168359438c9487e 100644 (file)
@@ -11,16 +11,9 @@ package org.opendaylight.openflowplugin.applications.statistics.manager.impl;
 import com.google.common.base.Optional;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
@@ -57,6 +50,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.f
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.statistics.FlowStatisticsBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionAware;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowCookie;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
@@ -67,6 +61,18 @@ import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
 /**
  * statistics-manager
  * org.opendaylight.openflowplugin.applications.statistics.manager.impl
@@ -172,12 +178,30 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
         tx.merge(LogicalDatastoreType.OPERATIONAL, tableRef, tableNew);
     }
 
+    protected void processDataChange(Collection<DataTreeModification<Flow>> changes) {
+        if (!changes.isEmpty()) {
+            for (DataTreeModification<Flow> dataChange : changes) {
+                if (dataChange.getRootNode().getModificationType() == DataObjectModification.ModificationType.DELETE) {
+                    final InstanceIdentifier<Node> nodeIdent = dataChange.getRootPath().getRootIdentifier()
+                            .firstIdentifierOf(Node.class);
+                    if (!removedDataBetweenStatsCycle.containsKey(nodeIdent)) {
+                        removedDataBetweenStatsCycle.put(nodeIdent, new ArrayList<>());
+                    }
+                    Flow data = dataChange.getRootNode().getDataBefore();
+                    removedDataBetweenStatsCycle.get(nodeIdent).add(data);
+                    LOG.debug("Node: {} :: Flow removed {}",nodeIdent.firstKeyOf(Node.class).getId(), data.toString());
+                }
+            }
+        }
+    }
+
     @Override
     public void onFlowsStatisticsUpdate(final FlowsStatisticsUpdate notification) {
         final TransactionId transId = notification.getTransactionId();
         final NodeId nodeId = notification.getId();
         if ( ! isExpectedStatistics(transId, nodeId)) {
-            LOG.debug("STAT-MANAGER - FlowsStatisticsUpdate: unregistred notification detect TransactionId {}", transId);
+            LOG.debug("STAT-MANAGER - FlowsStatisticsUpdate: unregistered notification detect TransactionId {}",
+                    transId);
             return;
         }
         manager.getRpcMsgManager().addNotification(notification, nodeId);
@@ -241,6 +265,15 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
 
         final InstanceIdentifier<FlowCapableNode> fNodeIdent = nodeIdent.augmentation(FlowCapableNode.class);
 
+        //cleanup the hashmap ID for the flows deleted between two stats cycle, also cleanup the
+        // data change cache as well.
+        ArrayList<Flow> deletedFlows = removedDataBetweenStatsCycle.remove(nodeIdent);
+
+        if (deletedFlows != null && !deletedFlows.isEmpty()) {
+            LOG.trace("Number of flows deleted from node {} between two stats cycles are {}", nodeIdent, deletedFlows
+                    .size());
+        }
+
         final Optional<FlowCapableNode> fNode;
         try {
             fNode = tx.read(LogicalDatastoreType.OPERATIONAL, fNodeIdent).checkedGet();
@@ -259,14 +292,39 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
         for (final FlowAndStatisticsMapList flowStat : list) {
             final TableKey tableKey = new TableKey(flowStat.getTableId());
             final TableFlowUpdateState tableState = nodeState.getTable(tableKey, tx);
-            tableState.reportFlow(flowStat,tx);
+            Flow removedConfigFlow = getFlowIfRemoved(flowStat, deletedFlows);
+            if (removedConfigFlow == null) {
+                tableState.reportFlow(flowStat,tx, false);
+            } else {
+                deletedFlows.remove(removedConfigFlow);
+                tableState.reportFlow(flowStat,tx, true);
+            }
         }
 
+        if (deletedFlows != null ) {
+            deletedFlows.clear();
+        }
         for (final TableFlowUpdateState table : nodeState.getTables()) {
             table.removeUnreportedFlows(tx);
         }
     }
 
+    private Flow getFlowIfRemoved(FlowAndStatisticsMapList flowStat, ArrayList<Flow> deletedFlows) {
+        if (deletedFlows != null && !deletedFlows.isEmpty()) {
+            for (Flow flow : deletedFlows) {
+                final FlowAndStatisticsMapList configFlowStats = new FlowAndStatisticsMapListBuilder(flow).build();
+                if ( flowStat.getMatch().equals(configFlowStats.getMatch()) &&
+                        flowStat.getPriority().equals(configFlowStats.getPriority()) &&
+                        flowStat.getCookie().equals(configFlowStats.getCookie()!=null?configFlowStats.getCookie():
+                                new FlowCookie(new BigInteger("0")))) {
+                    LOG.debug("Flow statistics {} are related to flow {}, but it's REMOVED from the config data store" +
+                            "store", flowStat, flow);
+                    return flow;
+                }
+            }
+        }
+        return null;
+    }
     /**
      * Method adds statistics to Flow
      *
@@ -346,12 +404,14 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
                             //flowHashId.getKey() too verbose for standard log.
                             if(LOG.isDebugEnabled()) {
                                 final FlowId currData = flowIdByHash.get(flowHashId.getKey());
-                                LOG.debug("flow hashing hit a duplicate for {} -> {}. Curr value: {} Equals:{}. Exception was raised:",
+                                LOG.debug("Flow hashing hit a duplicate for {} -> {}. Curr value: {} Equals:{}. " +
+                                        "Exception was raised:",
                                     flowHashId.getKey(), flowHashId.getFlowId(), currData, flowHashId.getFlowId().equals(currData), e);
                             }
                             else
                             {
-                                LOG.warn("flow hashing hit a duplicate {}. Exception was raised: {}. Enable DEBUG for more detail.",
+                                LOG.warn("Flow hashing hit a duplicate {}. Exception was raised: {}. Enable DEBUG for" +
+                                        " more detail.",
                                     flowHashId.getFlowId().toString().substring(0, Math.min(TRUNCATED_LOG_MESSAGE_LENGTH,flowHashId.getFlowId().toString().length())),
                                     e.getMessage().substring(0,Math.min(TRUNCATED_LOG_MESSAGE_LENGTH,e.getMessage().length())));
                             }
@@ -401,10 +461,9 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
             }
         }
 
-        private FlowKey getFlowKeyAndRemoveHash(final FlowHashIdMapKey key) {
+        private FlowKey getFlowKeyByHash(final FlowHashIdMapKey key) {
             final FlowId ret = flowIdByHash.get(key);
             if(ret != null) {
-                flowIdByHash.remove(key);
                 return new FlowKey(ret);
             }
             return null;
@@ -422,16 +481,20 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
             return flowIdByHash;
         }
 
-        void reportFlow(final FlowAndStatisticsMapList flowStat, final ReadWriteTransaction trans) {
+        void reportFlow(final FlowAndStatisticsMapList flowStat,
+                        final ReadWriteTransaction trans,
+                        boolean wasRemoved) {
             ensureTableFowHashIdMapping(trans);
             final FlowHashIdMapKey hashingKey = new FlowHashIdMapKey(buildFlowIdOperKey(flowStat));
-            FlowKey flowKey = getFlowKeyAndRemoveHash(hashingKey);
-            if (flowKey == null) {
+            FlowKey flowKey = getFlowKeyByHash(hashingKey);
+            if (flowKey == null || wasRemoved) {
                 flowKey = searchInConfiguration(flowStat, trans);
                 if ( flowKey == null) {
                     flowKey = makeAlienFlowKey();
                 }
                 updateHashCache(trans,flowKey,hashingKey);
+            } else {
+                flowIdByHash.remove(hashingKey);
             }
             final FlowBuilder flowBuilder = new FlowBuilder(flowStat);
             flowBuilder.setKey(flowKey);
index b4eaa0893fbbf09563065a9d2d90f43e05198d8f..278b5486b51ce3d8b30dc2709c4840422cbfbb0c 100644 (file)
@@ -9,11 +9,14 @@
 package org.opendaylight.openflowplugin.applications.statistics.manager.impl;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
 import java.util.UUID;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
@@ -25,6 +28,7 @@ import org.opendaylight.openflowplugin.applications.statistics.manager.Statistic
 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager.StatDataStoreOperation;
 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionAware;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupDescStatsUpdated;
@@ -90,6 +94,11 @@ public class StatListenCommitGroup extends StatAbstractListenCommit<Group, Opend
                 .augmentation(FlowCapableNode.class).child(Group.class);
     }
 
+    @Override
+    protected void processDataChange(Collection<DataTreeModification<Group>> changes) {
+        //NO-OP
+    }
+
     @Override
     public void onGroupDescStatsUpdated(final GroupDescStatsUpdated notification) {
         final TransactionId transId = notification.getTransactionId();
index 20eb166586c81acce14f5f2e9dd487f2c85277ce..58b57b78912b61339db4ccced43bcb126b223ea2 100644 (file)
@@ -11,10 +11,12 @@ package org.opendaylight.openflowplugin.applications.statistics.manager.impl;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
@@ -83,6 +85,11 @@ public class StatListenCommitMeter extends StatAbstractListenCommit<Meter, Opend
                 .augmentation(FlowCapableNode.class).child(Meter.class);
     }
 
+    @Override
+    protected void processDataChange(Collection<DataTreeModification<Meter>> changes) {
+        //NO-OP
+    }
+
     @Override
     protected OpendaylightMeterStatisticsListener getStatNotificationListener() {
         return this;
index db980dc1ee1a29d618a7a4e1ada2953cfd5eed66..cdfb8ced88426d026dc5d0cb24235efcb193e2f5 100644 (file)
@@ -10,6 +10,8 @@ package org.opendaylight.openflowplugin.applications.statistics.manager.impl;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -18,6 +20,7 @@ import java.util.Map.Entry;
 
 import java.util.UUID;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
@@ -84,6 +87,11 @@ public class StatListenCommitQueue extends StatAbstractListenCommit<Queue, Opend
             .augmentation(FlowCapableNodeConnector.class).child(Queue.class);
     }
 
+    @Override
+    protected void processDataChange(Collection<DataTreeModification<Queue>> changes) {
+        //NO-OP
+    }
+
     @Override
     public void onQueueStatisticsUpdate(final QueueStatisticsUpdate notification) {
         final TransactionId transId = notification.getTransactionId();
index 122ffd068044c6672fa6fe54d3a0bfee27c05ea2..2eaac49f54a2cbc902ede9c9dd8b1a4816479e10 100644 (file)
@@ -37,6 +37,7 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.NotificationListener;
 
 import java.util.ArrayList;
+import java.util.Collection;
 
 
 /**
@@ -77,6 +78,11 @@ public class StatAbstractListenCommitTest {
                 return InstanceIdentifier.create(DataObject.class);
             }
 
+            @Override
+            protected void processDataChange(Collection changes) {
+
+            }
+
             @Override
             protected NotificationListener getStatNotificationListener() {
                 return mockNotificationListener;