private AtomicInteger numNodesBeingCollected = new AtomicInteger(0);
- private final DataBroker dataBroker;
+ private final DataBroker dataBroker;
private final ExecutorService statRpcMsgManagerExecutor;
private final ExecutorService statDataStoreOperationServ;
private EntityOwnershipService ownershipService;
rpcMsgManager = new StatRpcMsgManagerImpl(this, rpcRegistry, statManagerConfig.getMaxNodesForCollector());
statCollectors = Collections.emptyList();
nodeRegistrator = new StatNodeRegistrationImpl(this, dataBroker, notifService);
- flowListeningCommiter = new StatListenCommitFlow(this, dataBroker, notifService);
- meterListeningCommiter = new StatListenCommitMeter(this, dataBroker, notifService);
- groupListeningCommiter = new StatListenCommitGroup(this, dataBroker, notifService);
- tableNotifCommiter = new StatNotifyCommitTable(this, notifService);
- portNotifyCommiter = new StatNotifyCommitPort(this, notifService);
- queueNotifyCommiter = new StatListenCommitQueue(this, dataBroker, notifService);
+ flowListeningCommiter = new StatListenCommitFlow(this, dataBroker, notifService, nodeRegistrator);
+ meterListeningCommiter = new StatListenCommitMeter(this, dataBroker, notifService, nodeRegistrator);
+ groupListeningCommiter = new StatListenCommitGroup(this, dataBroker, notifService, nodeRegistrator);
+ tableNotifCommiter = new StatNotifyCommitTable(this, notifService, nodeRegistrator);
+ portNotifyCommiter = new StatNotifyCommitPort(this, notifService, nodeRegistrator);
+ queueNotifyCommiter = new StatListenCommitQueue(this, dataBroker, notifService, nodeRegistrator);
statRpcMsgManagerExecutor.execute(rpcMsgManager);
statDataStoreOperationServ.execute(this);
public EntityOwnershipService getOwnershipService() {
return this.ownershipService;
}
+
}