X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fstatistics-manager%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fstatistics%2Fmanager%2Fimpl%2FStatisticsManagerImpl.java;h=0f8030f6204f18242d4b269549f805d11c5ab2d2;hb=1b8f7c7beaed83797320686bebddd536637aed9a;hp=8430549be177fce7b84522d723f2ff6d7ddf0a14;hpb=d45e906a590d8ee44ea7957179aa378515d6ba47;p=controller.git diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatisticsManagerImpl.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatisticsManagerImpl.java index 8430549be1..0f8030f620 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatisticsManagerImpl.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatisticsManagerImpl.java @@ -22,7 +22,6 @@ import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction; import org.opendaylight.controller.md.sal.common.api.data.TransactionChain; -import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.statistics.manager.StatListeningCommiter; import org.opendaylight.controller.md.statistics.manager.StatNodeRegistration; import org.opendaylight.controller.md.statistics.manager.StatNotifyCommiter; @@ -68,8 +67,8 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable { private final static Logger LOG = LoggerFactory.getLogger(StatisticsManagerImpl.class); - private static final int QUEUE_DEPTH = 500; - private static final int MAX_BATCH = 1; + private static final int QUEUE_DEPTH = 5000; + private static final int MAX_BATCH = 100; private final BlockingQueue dataStoreOperQueue = new LinkedBlockingDeque<>(QUEUE_DEPTH); @@ -205,31 +204,22 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable { LOG.trace("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier()); - try { tx.submit().checkedGet(); - } catch (final TransactionCommitFailedException e) { - LOG.warn("Stat DataStoreOperation unexpected State!", e); - txChain.close(); - txChain = dataBroker.createTransactionChain(StatisticsManagerImpl.this); - cleanDataStoreOperQueue(); - } - } - catch (final IllegalStateException e) { - LOG.warn("Stat DataStoreOperation unexpected State!", e); - } - catch (final InterruptedException e) { + } catch (final InterruptedException e) { LOG.warn("Stat Manager DS Operation thread interupted!", e); finishing = true; - } - catch (final Exception e) { - LOG.warn("Stat DataStore Operation executor fail!", e); + } catch (final Exception e) { + LOG.warn("Unhandled exception during processing statistics. Restarting transaction chain.", e); + txChain.close(); + txChain = dataBroker.createTransactionChain(StatisticsManagerImpl.this); + cleanDataStoreOperQueue(); } } // Drain all events, making sure any blocked threads are unblocked cleanDataStoreOperQueue(); } - private void cleanDataStoreOperQueue() { + private synchronized void cleanDataStoreOperQueue() { // Drain all events, making sure any blocked threads are unblocked while (! dataStoreOperQueue.isEmpty()) { dataStoreOperQueue.poll(); @@ -240,9 +230,6 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable { public void onTransactionChainFailed(final TransactionChain chain, final AsyncTransaction transaction, final Throwable cause) { LOG.warn("Failed to export Flow Capable Statistics, Transaction {} failed.",transaction.getIdentifier(),cause); - txChain.close(); - txChain = dataBroker.createTransactionChain(StatisticsManagerImpl.this); - cleanDataStoreOperQueue(); } @Override @@ -294,6 +281,8 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable { @Override public void disconnectedNodeUnregistration(final InstanceIdentifier nodeIdent) { + flowListeningCommiter.cleanForDisconnect(nodeIdent); + for (final StatPermCollector collector : statCollectors) { if (collector.disconnectedNodeUnregistration(nodeIdent)) { if ( ! collector.hasActiveNodes()) {