X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fstatistics-manager%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fstatistics%2Fmanager%2Fimpl%2FStatisticsManagerImpl.java;h=437c92f6a09e27d1646a893c644bf688c0ad4af7;hp=8430549be177fce7b84522d723f2ff6d7ddf0a14;hb=a5899521495f3a3d089f42ed026bd5a6fea62411;hpb=b495d88f63ba0b5b3959d7742ae683c869cf3ccc diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatisticsManagerImpl.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatisticsManagerImpl.java index 8430549be1..437c92f6a0 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatisticsManagerImpl.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatisticsManagerImpl.java @@ -22,7 +22,6 @@ import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction; import org.opendaylight.controller.md.sal.common.api.data.TransactionChain; -import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.statistics.manager.StatListeningCommiter; import org.opendaylight.controller.md.statistics.manager.StatNodeRegistration; import org.opendaylight.controller.md.statistics.manager.StatNotifyCommiter; @@ -30,12 +29,14 @@ import org.opendaylight.controller.md.statistics.manager.StatPermCollector; import org.opendaylight.controller.md.statistics.manager.StatPermCollector.StatCapabTypes; import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager; import org.opendaylight.controller.md.statistics.manager.StatisticsManager; +import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType; import org.opendaylight.controller.sal.binding.api.NotificationProviderService; import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsListener; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsListener; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group; @@ -68,14 +69,12 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable { private final static Logger LOG = LoggerFactory.getLogger(StatisticsManagerImpl.class); - private static final int QUEUE_DEPTH = 500; - private static final int MAX_BATCH = 1; + private static final int QUEUE_DEPTH = 5000; + private static final int MAX_BATCH = 100; private final BlockingQueue dataStoreOperQueue = new LinkedBlockingDeque<>(QUEUE_DEPTH); private final DataBroker dataBroker; - private final int maxNodesForCollectors; - private long minReqNetMonitInt; private final ExecutorService statRpcMsgManagerExecutor; private final ExecutorService statDataStoreOperationServ; private StatRpcMsgManager rpcMsgManager; @@ -92,23 +91,24 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable { private StatNotifyCommiter tableNotifCommiter; private StatNotifyCommiter portNotifyCommiter; - public StatisticsManagerImpl (final DataBroker dataBroker, final int maxNodesForCollector) { + private final StatisticsManagerConfig statManagerConfig; + + public StatisticsManagerImpl (final DataBroker dataBroker, final StatisticsManagerConfig statManagerconfig) { + statManagerConfig = Preconditions.checkNotNull(statManagerconfig); this.dataBroker = Preconditions.checkNotNull(dataBroker, "DataBroker can not be null!"); ThreadFactory threadFact; threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-rpc-oper-thread-%d").build(); statRpcMsgManagerExecutor = Executors.newSingleThreadExecutor(threadFact); threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-ds-oper-thread-%d").build(); statDataStoreOperationServ = Executors.newSingleThreadExecutor(threadFact); - maxNodesForCollectors = maxNodesForCollector; txChain = dataBroker.createTransactionChain(this); } @Override public void start(final NotificationProviderService notifService, - final RpcConsumerRegistry rpcRegistry, final long minReqNetMonitInt) { + final RpcConsumerRegistry rpcRegistry) { Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !"); - this.minReqNetMonitInt = minReqNetMonitInt; - rpcMsgManager = new StatRpcMsgManagerImpl(this, rpcRegistry, minReqNetMonitInt); + rpcMsgManager = new StatRpcMsgManagerImpl(this, rpcRegistry, statManagerConfig.getMaxNodesForCollector()); statCollectors = Collections.emptyList(); nodeRegistrator = new StatNodeRegistrationImpl(this, dataBroker, notifService); flowListeningCommiter = new StatListenCommitFlow(this, dataBroker, notifService); @@ -125,6 +125,7 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable { @Override public void close() throws Exception { + LOG.info("StatisticsManager close called"); finishing = true; if (nodeRegistrator != null) { nodeRegistrator.close(); @@ -205,34 +206,36 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable { LOG.trace("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier()); - try { tx.submit().checkedGet(); - } catch (final TransactionCommitFailedException e) { - LOG.warn("Stat DataStoreOperation unexpected State!", e); - txChain.close(); - txChain = dataBroker.createTransactionChain(StatisticsManagerImpl.this); - cleanDataStoreOperQueue(); - } - } - catch (final IllegalStateException e) { - LOG.warn("Stat DataStoreOperation unexpected State!", e); - } - catch (final InterruptedException e) { + } catch (final InterruptedException e) { LOG.warn("Stat Manager DS Operation thread interupted!", e); finishing = true; - } - catch (final Exception e) { - LOG.warn("Stat DataStore Operation executor fail!", e); + } catch (final Exception e) { + LOG.warn("Unhandled exception during processing statistics. Restarting transaction chain.", e); + txChain.close(); + txChain = dataBroker.createTransactionChain(StatisticsManagerImpl.this); + cleanDataStoreOperQueue(); } } // Drain all events, making sure any blocked threads are unblocked cleanDataStoreOperQueue(); } - private void cleanDataStoreOperQueue() { + private synchronized void cleanDataStoreOperQueue() { // Drain all events, making sure any blocked threads are unblocked while (! dataStoreOperQueue.isEmpty()) { - dataStoreOperQueue.poll(); + StatDataStoreOperation op = dataStoreOperQueue.poll(); + + // Execute the node removal clean up operation if queued in the + // operational queue. + if (op.getType() == StatsManagerOperationType.NODE_REMOVAL) { + try { + LOG.debug("Node {} disconnected. Cleaning internal data.",op.getNodeId()); + op.applyOperation(null); + } catch (final Exception ex) { + LOG.warn("Unhandled exception while cleaning up internal data of node [{}]",op.getNodeId()); + } + } } } @@ -240,9 +243,6 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable { public void onTransactionChainFailed(final TransactionChain chain, final AsyncTransaction transaction, final Throwable cause) { LOG.warn("Failed to export Flow Capable Statistics, Transaction {} failed.",transaction.getIdentifier(),cause); - txChain.close(); - txChain = dataBroker.createTransactionChain(StatisticsManagerImpl.this); - cleanDataStoreOperQueue(); } @Override @@ -261,10 +261,10 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable { } @Override - public void collectNextStatistics(final InstanceIdentifier nodeIdent) { + public void collectNextStatistics(final InstanceIdentifier nodeIdent, final TransactionId xid) { for (final StatPermCollector collector : statCollectors) { if (collector.isProvidedFlowNodeActive(nodeIdent)) { - collector.collectNextStatistics(); + collector.collectNextStatistics(xid); } } } @@ -284,7 +284,8 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable { } } final StatPermCollectorImpl newCollector = new StatPermCollectorImpl(this, - minReqNetMonitInt, statCollectors.size() + 1, maxNodesForCollectors); + statManagerConfig.getMinRequestNetMonitorInterval(), statCollectors.size() + 1, + statManagerConfig.getMaxNodesForCollector()); final List statCollectorsNew = new ArrayList<>(statCollectors); newCollector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables); statCollectorsNew.add(newCollector); @@ -294,6 +295,8 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable { @Override public void disconnectedNodeUnregistration(final InstanceIdentifier nodeIdent) { + flowListeningCommiter.cleanForDisconnect(nodeIdent); + for (final StatPermCollector collector : statCollectors) { if (collector.disconnectedNodeUnregistration(nodeIdent)) { if ( ! collector.hasActiveNodes()) { @@ -310,7 +313,18 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable { return; } } - LOG.debug("Node {} has not removed.", nodeIdent); + LOG.debug("Node {} has not been removed.", nodeIdent); + } + + @Override + public void registerAdditionalNodeFeature(final InstanceIdentifier nodeIdent, + final StatCapabTypes statCapab) { + for (final StatPermCollector collector : statCollectors) { + if (collector.registerAdditionalNodeFeature(nodeIdent, statCapab)) { + return; + } + } + LOG.debug("Node {} has not been extended for feature {}!", nodeIdent, statCapab); } /* Getter internal Statistic Manager Job Classes */ @@ -354,5 +368,10 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable { public StatNotifyCommiter getPortNotifyCommit() { return portNotifyCommiter; } + + @Override + public StatisticsManagerConfig getConfiguration() { + return statManagerConfig; + } }