/**
* Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
package org.opendaylight.controller.md.statistics.manager.impl;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadFactory;
import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
import org.opendaylight.controller.md.statistics.manager.StatListeningCommiter;
import org.opendaylight.controller.md.statistics.manager.StatNodeRegistration;
import org.opendaylight.controller.md.statistics.manager.StatNotifyCommiter;
import org.opendaylight.controller.md.statistics.manager.StatPermCollector;
import org.opendaylight.controller.md.statistics.manager.StatPermCollector.StatCapabTypes;
import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsListener;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* statistics-manager
* org.opendaylight.controller.md.statistics.manager.impl
*
* StatisticsManagerImpl
* It represent a central point for whole module. Implementation
* {@link StatisticsManager} registers all Operation/DS {@link StatNotifyCommiter} and
* Config/DS {@StatListeningCommiter}, as well as {@link StatPermCollector}
* for statistic collecting and {@link StatRpcMsgManager} as Device RPCs provider.
* In next, StatisticsManager provides all DS contact Transaction services.
*
* @author avishnoi@in.ibm.com Vaclav Demcak
*
*/
public class StatisticsManagerImpl implements StatisticsManager, Runnable {
private final static Logger LOG = LoggerFactory.getLogger(StatisticsManagerImpl.class);
private static final int QUEUE_DEPTH = 5000;
private static final int MAX_BATCH = 100;
private final BlockingQueue dataStoreOperQueue = new LinkedBlockingDeque<>(QUEUE_DEPTH);
private final DataBroker dataBroker;
private final int maxNodesForCollectors;
private long minReqNetMonitInt;
private final ExecutorService statRpcMsgManagerExecutor;
private final ExecutorService statDataStoreOperationServ;
private StatRpcMsgManager rpcMsgManager;
private List statCollectors;
private final Object statCollectorLock = new Object();
private BindingTransactionChain txChain;
private volatile boolean finishing = false;
private StatNodeRegistration nodeRegistrator;
private StatListeningCommiter flowListeningCommiter;
private StatListeningCommiter meterListeningCommiter;
private StatListeningCommiter groupListeningCommiter;
private StatListeningCommiter queueNotifyCommiter;
private StatNotifyCommiter tableNotifCommiter;
private StatNotifyCommiter portNotifyCommiter;
public StatisticsManagerImpl (final DataBroker dataBroker, final int maxNodesForCollector) {
this.dataBroker = Preconditions.checkNotNull(dataBroker, "DataBroker can not be null!");
ThreadFactory threadFact;
threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-rpc-oper-thread-%d").build();
statRpcMsgManagerExecutor = Executors.newSingleThreadExecutor(threadFact);
threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-ds-oper-thread-%d").build();
statDataStoreOperationServ = Executors.newSingleThreadExecutor(threadFact);
maxNodesForCollectors = maxNodesForCollector;
txChain = dataBroker.createTransactionChain(this);
}
@Override
public void start(final NotificationProviderService notifService,
final RpcConsumerRegistry rpcRegistry, final long minReqNetMonitInt) {
Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
this.minReqNetMonitInt = minReqNetMonitInt;
rpcMsgManager = new StatRpcMsgManagerImpl(this, rpcRegistry, minReqNetMonitInt);
statCollectors = Collections.emptyList();
nodeRegistrator = new StatNodeRegistrationImpl(this, dataBroker, notifService);
flowListeningCommiter = new StatListenCommitFlow(this, dataBroker, notifService);
meterListeningCommiter = new StatListenCommitMeter(this, dataBroker, notifService);
groupListeningCommiter = new StatListenCommitGroup(this, dataBroker, notifService);
tableNotifCommiter = new StatNotifyCommitTable(this, notifService);
portNotifyCommiter = new StatNotifyCommitPort(this, notifService);
queueNotifyCommiter = new StatListenCommitQueue(this, dataBroker, notifService);
statRpcMsgManagerExecutor.execute(rpcMsgManager);
statDataStoreOperationServ.execute(this);
LOG.info("Statistics Manager started successfully!");
}
@Override
public void close() throws Exception {
LOG.info("StatisticsManager close called");
finishing = true;
if (nodeRegistrator != null) {
nodeRegistrator.close();
nodeRegistrator = null;
}
if (flowListeningCommiter != null) {
flowListeningCommiter.close();
flowListeningCommiter = null;
}
if (meterListeningCommiter != null) {
meterListeningCommiter.close();
meterListeningCommiter = null;
}
if (groupListeningCommiter != null) {
groupListeningCommiter.close();
groupListeningCommiter = null;
}
if (tableNotifCommiter != null) {
tableNotifCommiter.close();
tableNotifCommiter = null;
}
if (portNotifyCommiter != null) {
portNotifyCommiter.close();
portNotifyCommiter = null;
}
if (queueNotifyCommiter != null) {
queueNotifyCommiter.close();
queueNotifyCommiter = null;
}
if (statCollectors != null) {
for (StatPermCollector collector : statCollectors) {
collector.close();
collector = null;
}
statCollectors = null;
}
if (rpcMsgManager != null) {
rpcMsgManager.close();
rpcMsgManager = null;
}
statRpcMsgManagerExecutor.shutdown();
statDataStoreOperationServ.shutdown();
if (txChain != null) {
txChain.close();
txChain = null;
}
}
@Override
public void enqueue(final StatDataStoreOperation op) {
// we don't need to block anything - next statistics come soon
final boolean success = dataStoreOperQueue.offer(op);
if ( ! success) {
LOG.debug("Stat DS/Operational submiter Queue is full!");
}
}
@Override
public void run() {
/* Neverending cyle - wait for finishing */
while ( ! finishing) {
try {
StatDataStoreOperation op = dataStoreOperQueue.take();
final ReadWriteTransaction tx = txChain.newReadWriteTransaction();
LOG.trace("New operations available, starting transaction {}", tx.getIdentifier());
int ops = 0;
do {
op.applyOperation(tx);
ops++;
if (ops < MAX_BATCH) {
op = dataStoreOperQueue.poll();
} else {
op = null;
}
} while (op != null);
LOG.trace("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier());
tx.submit().checkedGet();
} catch (final InterruptedException e) {
LOG.warn("Stat Manager DS Operation thread interupted!", e);
finishing = true;
} catch (final Exception e) {
LOG.warn("Unhandled exception during processing statistics. Restarting transaction chain.", e);
txChain.close();
txChain = dataBroker.createTransactionChain(StatisticsManagerImpl.this);
cleanDataStoreOperQueue();
}
}
// Drain all events, making sure any blocked threads are unblocked
cleanDataStoreOperQueue();
}
private synchronized void cleanDataStoreOperQueue() {
// Drain all events, making sure any blocked threads are unblocked
while (! dataStoreOperQueue.isEmpty()) {
dataStoreOperQueue.poll();
}
}
@Override
public void onTransactionChainFailed(final TransactionChain, ?> chain, final AsyncTransaction, ?> transaction,
final Throwable cause) {
LOG.warn("Failed to export Flow Capable Statistics, Transaction {} failed.",transaction.getIdentifier(),cause);
}
@Override
public void onTransactionChainSuccessful(final TransactionChain, ?> chain) {
// NOOP
}
@Override
public boolean isProvidedFlowNodeActive(final InstanceIdentifier nodeIdent) {
for (final StatPermCollector collector : statCollectors) {
if (collector.isProvidedFlowNodeActive(nodeIdent)) {
return true;
}
}
return false;
}
@Override
public void collectNextStatistics(final InstanceIdentifier nodeIdent) {
for (final StatPermCollector collector : statCollectors) {
if (collector.isProvidedFlowNodeActive(nodeIdent)) {
collector.collectNextStatistics();
}
}
}
@Override
public void connectedNodeRegistration(final InstanceIdentifier nodeIdent,
final List statTypes, final Short nrOfSwitchTables) {
for (final StatPermCollector collector : statCollectors) {
if (collector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables)) {
return;
}
}
synchronized (statCollectorLock) {
for (final StatPermCollector collector : statCollectors) {
if (collector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables)) {
return;
}
}
final StatPermCollectorImpl newCollector = new StatPermCollectorImpl(this,
minReqNetMonitInt, statCollectors.size() + 1, maxNodesForCollectors);
final List statCollectorsNew = new ArrayList<>(statCollectors);
newCollector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables);
statCollectorsNew.add(newCollector);
statCollectors = Collections.unmodifiableList(statCollectorsNew);
}
}
@Override
public void disconnectedNodeUnregistration(final InstanceIdentifier nodeIdent) {
flowListeningCommiter.cleanForDisconnect(nodeIdent);
for (final StatPermCollector collector : statCollectors) {
if (collector.disconnectedNodeUnregistration(nodeIdent)) {
if ( ! collector.hasActiveNodes()) {
synchronized (statCollectorLock) {
if (collector.hasActiveNodes()) {
return;
}
final List newStatColl =
new ArrayList<>(statCollectors);
newStatColl.remove(collector);
statCollectors = Collections.unmodifiableList(newStatColl);
}
}
return;
}
}
LOG.debug("Node {} has not been removed.", nodeIdent);
}
@Override
public void registerAdditionalNodeFeature(final InstanceIdentifier nodeIdent,
final StatCapabTypes statCapab) {
for (final StatPermCollector collector : statCollectors) {
if (collector.registerAdditionalNodeFeature(nodeIdent, statCapab)) {
return;
}
}
LOG.debug("Node {} has not been extended for feature {}!", nodeIdent, statCapab);
}
/* Getter internal Statistic Manager Job Classes */
@Override
public StatRpcMsgManager getRpcMsgManager() {
return rpcMsgManager;
}
@Override
public StatNodeRegistration getNodeRegistrator() {
return nodeRegistrator;
}
@Override
public StatListeningCommiter getFlowListenComit() {
return flowListeningCommiter;
}
@Override
public StatListeningCommiter getMeterListenCommit() {
return meterListeningCommiter;
}
@Override
public StatListeningCommiter getGroupListenCommit() {
return groupListeningCommiter;
}
@Override
public StatListeningCommiter getQueueNotifyCommit() {
return queueNotifyCommiter;
}
@Override
public StatNotifyCommiter getTableNotifCommit() {
return tableNotifCommiter;
}
@Override
public StatNotifyCommiter getPortNotifyCommit() {
return portNotifyCommiter;
}
}