import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
private final static Logger LOG = LoggerFactory.getLogger(StatisticsManagerImpl.class);
- private static final int QUEUE_DEPTH = 1000;
- private static final int MAX_BATCH = 1;
+ private static final int QUEUE_DEPTH = 5000;
+ private static final int MAX_BATCH = 100;
private final BlockingQueue<StatDataStoreOperation> dataStoreOperQueue = new LinkedBlockingDeque<>(QUEUE_DEPTH);
private final DataBroker dataBroker;
- private final int maxNodesForCollectors;
- private long minReqNetMonitInt;
private final ExecutorService statRpcMsgManagerExecutor;
private final ExecutorService statDataStoreOperationServ;
private StatRpcMsgManager rpcMsgManager;
private StatNotifyCommiter<OpendaylightFlowTableStatisticsListener> tableNotifCommiter;
private StatNotifyCommiter<OpendaylightPortStatisticsListener> portNotifyCommiter;
- public StatisticsManagerImpl (final DataBroker dataBroker, final int maxNodesForCollector) {
+ private final StatisticsManagerConfig statManagerConfig;
+
+ public StatisticsManagerImpl (final DataBroker dataBroker, final StatisticsManagerConfig statManagerconfig) {
+ statManagerConfig = Preconditions.checkNotNull(statManagerconfig);
this.dataBroker = Preconditions.checkNotNull(dataBroker, "DataBroker can not be null!");
ThreadFactory threadFact;
threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-rpc-oper-thread-%d").build();
statRpcMsgManagerExecutor = Executors.newSingleThreadExecutor(threadFact);
threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-ds-oper-thread-%d").build();
statDataStoreOperationServ = Executors.newSingleThreadExecutor(threadFact);
- maxNodesForCollectors = maxNodesForCollector;
txChain = dataBroker.createTransactionChain(this);
}
@Override
public void start(final NotificationProviderService notifService,
- final RpcConsumerRegistry rpcRegistry, final long minReqNetMonitInt) {
+ final RpcConsumerRegistry rpcRegistry) {
Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
- this.minReqNetMonitInt = minReqNetMonitInt;
- rpcMsgManager = new StatRpcMsgManagerImpl(this, rpcRegistry, minReqNetMonitInt);
+ rpcMsgManager = new StatRpcMsgManagerImpl(this, rpcRegistry, statManagerConfig.getMaxNodesForCollector());
statCollectors = Collections.emptyList();
nodeRegistrator = new StatNodeRegistrationImpl(this, dataBroker, notifService);
flowListeningCommiter = new StatListenCommitFlow(this, dataBroker, notifService);
@Override
public void close() throws Exception {
+ LOG.info("StatisticsManager close called");
finishing = true;
if (nodeRegistrator != null) {
nodeRegistrator.close();
}
} while (op != null);
- LOG.trace("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier());
+ LOG.trace("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier());
- tx.submit(); //.checkedGet();
+ tx.submit().checkedGet();
} catch (final InterruptedException e) {
LOG.warn("Stat Manager DS Operation thread interupted!", e);
finishing = true;
}
@Override
- public void collectNextStatistics(final InstanceIdentifier<Node> nodeIdent) {
+ public void collectNextStatistics(final InstanceIdentifier<Node> nodeIdent, final TransactionId xid) {
for (final StatPermCollector collector : statCollectors) {
if (collector.isProvidedFlowNodeActive(nodeIdent)) {
- collector.collectNextStatistics();
+ collector.collectNextStatistics(xid);
}
}
}
}
}
final StatPermCollectorImpl newCollector = new StatPermCollectorImpl(this,
- minReqNetMonitInt, statCollectors.size() + 1, maxNodesForCollectors);
+ statManagerConfig.getMinRequestNetMonitorInterval(), statCollectors.size() + 1,
+ statManagerConfig.getMaxNodesForCollector());
final List<StatPermCollector> statCollectorsNew = new ArrayList<>(statCollectors);
newCollector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables);
statCollectorsNew.add(newCollector);
@Override
public void disconnectedNodeUnregistration(final InstanceIdentifier<Node> nodeIdent) {
flowListeningCommiter.cleanForDisconnect(nodeIdent);
+
for (final StatPermCollector collector : statCollectors) {
if (collector.disconnectedNodeUnregistration(nodeIdent)) {
if ( ! collector.hasActiveNodes()) {
return;
}
}
- LOG.debug("Node {} has not removed.", nodeIdent);
+ LOG.debug("Node {} has not been removed.", nodeIdent);
+ }
+
+ @Override
+ public void registerAdditionalNodeFeature(final InstanceIdentifier<Node> nodeIdent,
+ final StatCapabTypes statCapab) {
+ for (final StatPermCollector collector : statCollectors) {
+ if (collector.registerAdditionalNodeFeature(nodeIdent, statCapab)) {
+ return;
+ }
+ }
+ LOG.debug("Node {} has not been extended for feature {}!", nodeIdent, statCapab);
}
/* Getter internal Statistic Manager Job Classes */
public StatNotifyCommiter<OpendaylightPortStatisticsListener> getPortNotifyCommit() {
return portNotifyCommiter;
}
+
+ @Override
+ public StatisticsManagerConfig getConfiguration() {
+ return statManagerConfig;
+ }
}