BUG 2049 DataStore failure in StatisticsManager 98/11598/13
authorVaclav Demcak <vdemcak@cisco.com>
Thu, 25 Sep 2014 21:03:30 +0000 (23:03 +0200)
committerTony Tkacik <ttkacik@cisco.com>
Sat, 27 Sep 2014 15:38:32 +0000 (17:38 +0200)
BUG 2101  When L2 switch installed, its flows are interpreted as new flows with each stats request

* hashCode is not safe in general so we'd like to prevent Collisions how we can
* we change HashBuilding to KeyBuilding from toString method
* toString is generated in CompileTime
* it can not be change during JVM run
* every one use same YANG model version and jar files
* and the same aproach could by use across claster
* add cleaning disconnected Node from StatListeningCommiter implementers

Change-Id: I645e9f07382af0b293bc43698446b7b84b95bbde
Signed-off-by: Vaclav Demcak <vdemcak@cisco.com>
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatListeningCommiter.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatAbstractListenCommit.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatListenCommitFlow.java
opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatisticsManagerImpl.java

index 7589c72a45f6da933c9ee7f7d8b2f49e46df2199..be3d40246bddb6fb88c90bd269b22a70637c273b 100644 (file)
@@ -9,7 +9,9 @@
 package org.opendaylight.controller.md.statistics.manager;
 
 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.NotificationListener;
 
 /**
@@ -30,5 +32,13 @@ import org.opendaylight.yangtools.yang.binding.NotificationListener;
 public interface StatListeningCommiter<T extends DataObject, N extends NotificationListener> extends DataChangeListener, StatNotifyCommiter<N> {
 
 
+    /**
+     * All StatListeningCommiter implementer has to clean its actual state
+     * for all cached data related to disconnected node.
+     * Method prevents unwanted dataStore changes.
+     *
+     * @param nodeIdent
+     */
+    void cleanForDisconnect(InstanceIdentifier<Node> nodeIdent);
 }
 
index 6ebf944b22b8a879903526df542d48f0a1c0fbc9..6db73d5ddc0551da6c800e3bfe6b2db79d70bd59 100644 (file)
@@ -108,6 +108,11 @@ public abstract class StatAbstractListenCommit<T extends DataObject, N extends N
         }
     }
 
+    @Override
+    public void cleanForDisconnect(final InstanceIdentifier<Node> nodeIdent) {
+        mapNodesForDelete.remove(nodeIdent);
+    }
+
     @Override
     public void close() {
         if (listenerRegistration != null) {
index e54fcc6fa2100cb21af3c41b0f78fc9bb51a872c..a19081db2527f7e2462633a783d8fcbab0e60b50 100644 (file)
@@ -86,6 +86,8 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
 
     private static final String ALIEN_SYSTEM_FLOW_ID = "#UF$TABLE*";
 
+    private static final Integer REMOVE_AFTER_MISSING_COLLECTION = 1;
+
     private final AtomicInteger unaccountedFlowsCounter = new AtomicInteger(0);
 
     public StatListenCommitFlow (final StatisticsManager manager, final DataBroker db,
@@ -142,8 +144,7 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
                         Optional<FlowCapableNode> fNode = Optional.absent();
                         try {
                             fNode = tx.read(LogicalDatastoreType.OPERATIONAL, fNodeIdent).checkedGet();
-                        }
-                        catch (final ReadFailedException e) {
+                        } catch (final ReadFailedException e) {
                             LOG.debug("Read Operational/DS for FlowCapableNode fail! {}", fNodeIdent, e);
                             return;
                         }
@@ -192,6 +193,20 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
                 }
 
                 statsFlowCommitAll(flowStats, nodeIdent, tx);
+                /* cleaning all not cached hash collisions */
+                final Map<InstanceIdentifier<Flow>, Integer> listAliens = mapNodesForDelete.get(nodeIdent);
+                if (listAliens != null) {
+                    for (final Entry<InstanceIdentifier<Flow>, Integer> nodeForDelete : listAliens.entrySet()) {
+                        final Integer lifeIndex = nodeForDelete.getValue();
+                        if (nodeForDelete.getValue() > 0) {
+                            nodeForDelete.setValue(Integer.valueOf(lifeIndex.intValue() - 1));
+                        } else {
+                            final InstanceIdentifier<Flow> flowNodeIdent = nodeForDelete.getKey();
+                            mapNodesForDelete.get(nodeIdent).remove(flowNodeIdent);
+                            tx.delete(LogicalDatastoreType.OPERATIONAL, flowNodeIdent);
+                        }
+                    }
+                }
                 /* Notification for continue collecting statistics */
                 notifyToCollectNextStatistics(nodeIdent);
             }
@@ -246,14 +261,13 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
     /**
      * build pseudoUnique hashCode for flow in table
      * for future easy identification
+     *
+     * FIXME: we expect same version for YANG models for all clusters and that has to be fix
+     * FIXME: CREATE BETTER KEY - for flow (MATCH is the problem)
      */
-    static String buildHashCode(final FlowAndStatisticsMapList deviceFlow) {
-        final FlowBuilder builder = new FlowBuilder();
-        builder.setMatch(deviceFlow.getMatch());
-        builder.setCookie(deviceFlow.getCookie());
-        builder.setPriority(deviceFlow.getPriority());
-        final Flow flowForHashCode = builder.build();
-        return String.valueOf(flowForHashCode.hashCode());
+    static String buildFlowIdOperKey(final FlowAndStatisticsMapList deviceFlow) {
+        return new StringBuffer().append(deviceFlow.getMatch())
+                .append(deviceFlow.getPriority()).append(deviceFlow.getCookie().getValue()).toString();
     }
 
     private class NodeUpdateState {
@@ -286,6 +300,7 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
     }
 
     private class TableFlowUpdateState {
+
         private boolean tableEnsured = false;
         final KeyedInstanceIdentifier<Table, TableKey> tableRef;
         final TableKey tableKey;
@@ -304,7 +319,7 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
                     for (final FlowHashIdMap flowHashId : flowHashMap) {
                         try {
                             flowIdByHash.put(flowHashId.getKey(), flowHashId.getFlowId());
-                        } catch (Exception e) {
+                        } catch (final Exception e) {
                             LOG.warn("flow hashing hit a duplicate for {} -> {}", flowHashId.getKey(), flowHashId.getFlowId());
                         }
                     }
@@ -338,12 +353,7 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
         }
 
         private void initConfigFlows(final ReadWriteTransaction trans) {
-            Optional<Table> table = readLatestConfiguration(tableRef);
-            try {
-                table = trans.read(LogicalDatastoreType.CONFIGURATION, tableRef).checkedGet();
-            } catch (final ReadFailedException e) {
-                table = Optional.absent();
-            }
+            final Optional<Table> table = readLatestConfiguration(tableRef);
             List<Flow> localList = null;
             if(table.isPresent()) {
                 localList = table.get().getFlow();
@@ -378,7 +388,7 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
 
         void reportFlow(final FlowAndStatisticsMapList flowStat, final ReadWriteTransaction trans) {
             ensureTable(trans);
-            final FlowHashIdMapKey hashingKey = new FlowHashIdMapKey(buildHashCode(flowStat));
+            final FlowHashIdMapKey hashingKey = new FlowHashIdMapKey(buildFlowIdOperKey(flowStat));
             FlowKey flowKey = getFlowKeyAndRemoveHash(hashingKey);
             if (flowKey == null) {
                 flowKey = searchInConfiguration(flowStat, trans);
@@ -394,13 +404,12 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
             trans.put(LogicalDatastoreType.OPERATIONAL, flowIdent, flowBuilder.build());
             /* check life for Alien flows */
             if (flowKey.getId().getValue().startsWith(ALIEN_SYSTEM_FLOW_ID)) {
-                removeData(flowIdent, Integer.valueOf(5));
+                removeData(flowIdent, REMOVE_AFTER_MISSING_COLLECTION);
             }
         }
 
         /* Build and deploy new FlowHashId map */
         private void updateHashCache(final ReadWriteTransaction trans, final FlowKey flowKey, final FlowHashIdMapKey hashingKey) {
-            // TODO Auto-generated method stub
             final FlowHashIdMapBuilder flHashIdMap = new FlowHashIdMapBuilder();
             flHashIdMap.setFlowId(flowKey.getId());
             flHashIdMap.setKey(hashingKey);
@@ -411,43 +420,24 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
         }
 
         void removeUnreportedFlows(final ReadWriteTransaction tx) {
+            final InstanceIdentifier<Node> nodeIdent = tableRef.firstIdentifierOf(Node.class);
+            final Map<InstanceIdentifier<Flow>, Integer> nodeDeleteMap = mapNodesForDelete.get(nodeIdent);
             final Map<FlowHashIdMapKey, FlowId> listForRemove = getRemovalList();
-            final Optional<Table> configTable = readLatestConfiguration(tableRef);
-            List<Flow> configFlows = Collections.emptyList();
-            if (configTable.isPresent() && configTable.get().getFlow() != null) {
-                configFlows = new ArrayList<>(configTable.get().getFlow());
-            }
             for (final Entry<FlowHashIdMapKey, FlowId> entryForRemove : listForRemove.entrySet()) {
                 final FlowKey flowKey = new FlowKey(entryForRemove.getValue());
                 final InstanceIdentifier<Flow> flowRef = tableRef.child(Flow.class, flowKey);
-                final InstanceIdentifier<FlowStatisticsData> flowStatIdent = flowRef.augmentation(FlowStatisticsData.class);
-                if (flowKey.getId().getValue().startsWith(ALIEN_SYSTEM_FLOW_ID)) {
-                    final InstanceIdentifier<Node> nodeIdent = tableRef.firstIdentifierOf(Node.class);
-                    final Integer lifeIndex = mapNodesForDelete.get(nodeIdent).remove(flowRef);
+                if (nodeDeleteMap != null && flowKey.getId().getValue().startsWith(ALIEN_SYSTEM_FLOW_ID)) {
+                    final Integer lifeIndex = nodeDeleteMap.get(flowRef);
                     if (lifeIndex > 0) {
-                        mapNodesForDelete.get(nodeIdent).put(flowRef, Integer.valueOf(lifeIndex.intValue() - 1));
-                        break;
-                    }
-                } else {
-                    if (configFlows.remove(flowRef)) {
-                        /* Node is still presented in Config/DataStore - probably lost some multipart msg */
                         break;
+                    } else {
+                        nodeDeleteMap.remove(flowRef);
                     }
                 }
-                final Optional<FlowStatisticsData> flowStatNodeCheck;
-                try {
-                    flowStatNodeCheck = tx.read(LogicalDatastoreType.OPERATIONAL, flowStatIdent).checkedGet();
-                }
-                catch (final ReadFailedException e) {
-                    LOG.debug("Read FlowStatistics {} in Operational/DS fail! Statisticscan not beupdated.", flowStatIdent, e);
-                    break;
-                }
-                if (flowStatNodeCheck.isPresent()) {
-                    /* Node isn't new and it has not been removed yet */
-                    final InstanceIdentifier<FlowHashIdMap> flHashIdent = tableRef.augmentation(FlowHashIdMapping.class).child(FlowHashIdMap.class, entryForRemove.getKey());
-                    tx.delete(LogicalDatastoreType.OPERATIONAL, flowRef);
-                    tx.delete(LogicalDatastoreType.OPERATIONAL, flHashIdent);
-                }
+                final InstanceIdentifier<FlowHashIdMap> flHashIdent =
+                        tableRef.augmentation(FlowHashIdMapping.class).child(FlowHashIdMap.class, entryForRemove.getKey());
+                tx.delete(LogicalDatastoreType.OPERATIONAL, flowRef);
+                tx.delete(LogicalDatastoreType.OPERATIONAL, flHashIdent);
             }
         }
     }
index 8430549be177fce7b84522d723f2ff6d7ddf0a14..247703f8acb3c60bb24df0f8ffc367867e13e1bc 100644 (file)
@@ -22,7 +22,6 @@ import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.statistics.manager.StatListeningCommiter;
 import org.opendaylight.controller.md.statistics.manager.StatNodeRegistration;
 import org.opendaylight.controller.md.statistics.manager.StatNotifyCommiter;
@@ -68,7 +67,7 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
 
    private final static Logger LOG = LoggerFactory.getLogger(StatisticsManagerImpl.class);
 
-   private static final int QUEUE_DEPTH = 500;
+   private static final int QUEUE_DEPTH = 1000;
    private static final int MAX_BATCH = 1;
 
    private final BlockingQueue<StatDataStoreOperation> dataStoreOperQueue = new LinkedBlockingDeque<>(QUEUE_DEPTH);
@@ -205,31 +204,22 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
 
                LOG.trace("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier());
 
-               try {
                    tx.submit().checkedGet();
-               } catch (final TransactionCommitFailedException e) {
-                   LOG.warn("Stat DataStoreOperation unexpected State!", e);
-                   txChain.close();
-                   txChain = dataBroker.createTransactionChain(StatisticsManagerImpl.this);
-                   cleanDataStoreOperQueue();
-               }
-           }
-           catch (final IllegalStateException e) {
-               LOG.warn("Stat DataStoreOperation unexpected State!", e);
-           }
-           catch (final InterruptedException e) {
+           } catch (final InterruptedException e) {
                LOG.warn("Stat Manager DS Operation thread interupted!", e);
                finishing = true;
-           }
-           catch (final Exception e) {
-               LOG.warn("Stat DataStore Operation executor fail!", e);
+           } catch (final Exception e) {
+               LOG.warn("Unhandled exception during processing statistics. Restarting transaction chain.", e);
+               txChain.close();
+               txChain = dataBroker.createTransactionChain(StatisticsManagerImpl.this);
+               cleanDataStoreOperQueue();
            }
        }
        // Drain all events, making sure any blocked threads are unblocked
        cleanDataStoreOperQueue();
    }
 
-   private void cleanDataStoreOperQueue() {
+   private synchronized void cleanDataStoreOperQueue() {
        // Drain all events, making sure any blocked threads are unblocked
        while (! dataStoreOperQueue.isEmpty()) {
            dataStoreOperQueue.poll();
@@ -240,9 +230,6 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
    public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
            final Throwable cause) {
        LOG.warn("Failed to export Flow Capable Statistics, Transaction {} failed.",transaction.getIdentifier(),cause);
-       txChain.close();
-       txChain = dataBroker.createTransactionChain(StatisticsManagerImpl.this);
-       cleanDataStoreOperQueue();
    }
 
    @Override
@@ -294,6 +281,7 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
 
    @Override
    public void disconnectedNodeUnregistration(final InstanceIdentifier<Node> nodeIdent) {
+       flowListeningCommiter.cleanForDisconnect(nodeIdent);
        for (final StatPermCollector collector : statCollectors) {
            if (collector.disconnectedNodeUnregistration(nodeIdent)) {
                if ( ! collector.hasActiveNodes()) {