import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.statistics.manager.StatListeningCommiter;
import org.opendaylight.controller.md.statistics.manager.StatNodeRegistration;
import org.opendaylight.controller.md.statistics.manager.StatNotifyCommiter;
private final static Logger LOG = LoggerFactory.getLogger(StatisticsManagerImpl.class);
- private static final int QUEUE_DEPTH = 500;
- private static final int MAX_BATCH = 1;
+ private static final int QUEUE_DEPTH = 5000;
+ private static final int MAX_BATCH = 100;
private final BlockingQueue<StatDataStoreOperation> dataStoreOperQueue = new LinkedBlockingDeque<>(QUEUE_DEPTH);
LOG.trace("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier());
- try {
tx.submit().checkedGet();
- } catch (final TransactionCommitFailedException e) {
- LOG.warn("Stat DataStoreOperation unexpected State!", e);
- txChain.close();
- txChain = dataBroker.createTransactionChain(StatisticsManagerImpl.this);
- cleanDataStoreOperQueue();
- }
- }
- catch (final IllegalStateException e) {
- LOG.warn("Stat DataStoreOperation unexpected State!", e);
- }
- catch (final InterruptedException e) {
+ } catch (final InterruptedException e) {
LOG.warn("Stat Manager DS Operation thread interupted!", e);
finishing = true;
- }
- catch (final Exception e) {
- LOG.warn("Stat DataStore Operation executor fail!", e);
+ } catch (final Exception e) {
+ LOG.warn("Unhandled exception during processing statistics. Restarting transaction chain.", e);
+ txChain.close();
+ txChain = dataBroker.createTransactionChain(StatisticsManagerImpl.this);
+ cleanDataStoreOperQueue();
}
}
// Drain all events, making sure any blocked threads are unblocked
cleanDataStoreOperQueue();
}
- private void cleanDataStoreOperQueue() {
+ private synchronized void cleanDataStoreOperQueue() {
// Drain all events, making sure any blocked threads are unblocked
while (! dataStoreOperQueue.isEmpty()) {
dataStoreOperQueue.poll();
public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
final Throwable cause) {
LOG.warn("Failed to export Flow Capable Statistics, Transaction {} failed.",transaction.getIdentifier(),cause);
- txChain.close();
- txChain = dataBroker.createTransactionChain(StatisticsManagerImpl.this);
- cleanDataStoreOperQueue();
}
@Override
@Override
public void disconnectedNodeUnregistration(final InstanceIdentifier<Node> nodeIdent) {
+ flowListeningCommiter.cleanForDisconnect(nodeIdent);
+
for (final StatPermCollector collector : statCollectors) {
if (collector.disconnectedNodeUnregistration(nodeIdent)) {
if ( ! collector.hasActiveNodes()) {