import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
private AtomicInteger numNodesBeingCollected = new AtomicInteger(0);
- private final DataBroker dataBroker;
- private final ExecutorService statRpcMsgManagerExecutor;
+ private final DataBroker dataBroker;
private final ExecutorService statDataStoreOperationServ;
+ private EntityOwnershipService ownershipService;
private StatRpcMsgManager rpcMsgManager;
private List<StatPermCollector> statCollectors;
private final Object statCollectorLock = new Object();
this.dataBroker = Preconditions.checkNotNull(dataBroker, "DataBroker can not be null!");
ThreadFactory threadFact;
threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-rpc-oper-thread-%d").build();
- statRpcMsgManagerExecutor = Executors.newSingleThreadExecutor(threadFact);
threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-ds-oper-thread-%d").build();
statDataStoreOperationServ = Executors.newSingleThreadExecutor(threadFact);
txChain = dataBroker.createTransactionChain(this);
rpcMsgManager = new StatRpcMsgManagerImpl(this, rpcRegistry, statManagerConfig.getMaxNodesForCollector());
statCollectors = Collections.emptyList();
nodeRegistrator = new StatNodeRegistrationImpl(this, dataBroker, notifService);
- flowListeningCommiter = new StatListenCommitFlow(this, dataBroker, notifService);
- meterListeningCommiter = new StatListenCommitMeter(this, dataBroker, notifService);
- groupListeningCommiter = new StatListenCommitGroup(this, dataBroker, notifService);
- tableNotifCommiter = new StatNotifyCommitTable(this, notifService);
- portNotifyCommiter = new StatNotifyCommitPort(this, notifService);
- queueNotifyCommiter = new StatListenCommitQueue(this, dataBroker, notifService);
-
- statRpcMsgManagerExecutor.execute(rpcMsgManager);
+ flowListeningCommiter = new StatListenCommitFlow(this, dataBroker, notifService, nodeRegistrator);
+ meterListeningCommiter = new StatListenCommitMeter(this, dataBroker, notifService, nodeRegistrator);
+ groupListeningCommiter = new StatListenCommitGroup(this, dataBroker, notifService, nodeRegistrator);
+ tableNotifCommiter = new StatNotifyCommitTable(this, notifService, nodeRegistrator);
+ portNotifyCommiter = new StatNotifyCommitPort(this, notifService, nodeRegistrator);
+ queueNotifyCommiter = new StatListenCommitQueue(this, dataBroker, notifService, nodeRegistrator);
+
statDataStoreOperationServ.execute(this);
LOG.info("Statistics Manager started successfully!");
}
}
statCollectors = null;
}
- rpcMsgManager = close(rpcMsgManager);
- statRpcMsgManagerExecutor.shutdown();
+ rpcMsgManager = null;
statDataStoreOperationServ.shutdown();
txChain = close(txChain);
}
// we don't need to block anything - next statistics come soon
final boolean success = dataStoreOperQueue.offer(op);
if ( ! success) {
- LOG.debug("Stat DS/Operational submiter Queue is full!");
+ LOG.debug("Stat DS/Operational submitter Queue is full!");
}
}
public void run() {
/* Neverending cyle - wait for finishing */
while ( ! finishing) {
+ StatDataStoreOperation op = null;
try {
- StatDataStoreOperation op = dataStoreOperQueue.take();
+ op = dataStoreOperQueue.take();
final ReadWriteTransaction tx = txChain.newReadWriteTransaction();
LOG.trace("New operations available, starting transaction {}", tx.getIdentifier());
tx.submit().checkedGet();
} catch (final InterruptedException e) {
- LOG.warn("Stat Manager DS Operation thread interupted!", e);
+ LOG.warn("Stat Manager DS Operation thread interrupted, while " +
+ "waiting for StatDataStore Operation task!", e);
finishing = true;
} catch (final Exception e) {
- LOG.warn("Unhandled exception during processing statistics. Restarting transaction chain.", e);
+ LOG.warn("Unhandled exception during processing statistics for {}. " +
+ "Restarting transaction chain.",op != null?op.getNodeId().getValue():"",e);
txChain.close();
txChain = dataBroker.createTransactionChain(StatisticsManagerImpl.this);
cleanDataStoreOperQueue();
LOG.debug("Node {} has not been extended for feature {}!", nodeIdent, statCapab);
}
+ @Override
+ public void unregisterNodeStats(final InstanceIdentifier<Node> nodeIdent,
+ final StatCapabTypes statCapab) {
+ for (final StatPermCollector collector : statCollectors) {
+ if (collector.unregisterNodeStats(nodeIdent, statCapab)) {
+ return;
+ }
+ }
+ LOG.debug("Stats type {} is not removed from the node {}!", statCapab,nodeIdent );
+ }
+
/* Getter internal Statistic Manager Job Classes */
@Override
public StatRpcMsgManager getRpcMsgManager() {
// we dont want to mark operations with null uuid and get NPEs later. So mark them with invalid ones
return UUID.fromString("invalid-uuid");
}
+
+ @Override
+ public void setOwnershipService(EntityOwnershipService ownershipService) {
+ this.ownershipService = ownershipService;
+ }
+
+ @Override
+ public EntityOwnershipService getOwnershipService() {
+ return this.ownershipService;
+ }
+
}