import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
+import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
*
* @author avishnoi@in.ibm.com
*/
-public class NodeStatisticsHandler implements AutoCloseable {
+public final class NodeStatisticsHandler implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(NodeStatisticsHandler.class);
private static final int NUMBER_OF_WAIT_CYCLES = 2;
private final Map<FlowEntry,Long> flowStatsUpdate = new HashMap<>();
private final Map<QueueEntry,Long> queuesStatsUpdate = new HashMap<>();
private final InstanceIdentifier<Node> targetNodeIdentifier;
- private final StatisticsProvider statisticsProvider;
+ private final DataProviderService dps;
+ private final NodeRef targetNodeRef;
private final NodeKey targetNodeKey;
private Collection<TableKey> knownTables = Collections.emptySet();
private int unaccountedFlowsCounter = 1;
- public NodeStatisticsHandler(StatisticsProvider statisticsProvider, NodeKey nodeKey){
- this.statisticsProvider = Preconditions.checkNotNull(statisticsProvider);
+ public NodeStatisticsHandler(final DataProviderService dps, final NodeKey nodeKey) {
+ this.dps = Preconditions.checkNotNull(dps);
this.targetNodeKey = Preconditions.checkNotNull(nodeKey);
this.targetNodeIdentifier = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey).build();
+ this.targetNodeRef = new NodeRef(targetNodeIdentifier);
}
private static class FlowEntry {
return knownTables;
}
+ public InstanceIdentifier<Node> getTargetNodeIdentifier() {
+ return targetNodeIdentifier;
+ }
+
+ public NodeRef getTargetNodeRef() {
+ return targetNodeRef;
+ }
+
public synchronized void updateGroupDescStats(List<GroupDescStats> list){
final Long expiryTime = getExpiryTime();
- final DataModificationTransaction trans = statisticsProvider.startChange();
+ final DataModificationTransaction trans = dps.beginTransaction();
for (GroupDescStats groupDescStats : list) {
GroupBuilder groupBuilder = new GroupBuilder();
}
public synchronized void updateGroupStats(List<GroupStats> list) {
- final DataModificationTransaction trans = statisticsProvider.startChange();
+ final DataModificationTransaction trans = dps.beginTransaction();
for(GroupStats groupStats : list) {
GroupBuilder groupBuilder = new GroupBuilder();
public synchronized void updateMeterConfigStats(List<MeterConfigStats> list) {
final Long expiryTime = getExpiryTime();
- final DataModificationTransaction trans = statisticsProvider.startChange();
+ final DataModificationTransaction trans = dps.beginTransaction();
for(MeterConfigStats meterConfigStats : list) {
MeterBuilder meterBuilder = new MeterBuilder();
public synchronized void updateMeterStats(List<MeterStats> list) {
- final DataModificationTransaction trans = statisticsProvider.startChange();
+ final DataModificationTransaction trans = dps.beginTransaction();
for(MeterStats meterStats : list) {
MeterBuilder meterBuilder = new MeterBuilder();
public synchronized void updateQueueStats(List<QueueIdAndStatisticsMap> list) {
final Long expiryTime = getExpiryTime();
- final DataModificationTransaction trans = statisticsProvider.startChange();
+ final DataModificationTransaction trans = dps.beginTransaction();
for (QueueIdAndStatisticsMap swQueueStats : list) {
}
public synchronized void updateFlowTableStats(List<FlowTableAndStatisticsMap> list) {
- final DataModificationTransaction trans = statisticsProvider.startChange();
+ final DataModificationTransaction trans = dps.beginTransaction();
final Set<TableKey> knownTables = new HashSet<>(list.size());
for (FlowTableAndStatisticsMap ftStats : list) {
}
public synchronized void updateNodeConnectorStats(List<NodeConnectorStatisticsAndPortNumberMap> list) {
- final DataModificationTransaction trans = statisticsProvider.startChange();
+ final DataModificationTransaction trans = dps.beginTransaction();
for(NodeConnectorStatisticsAndPortNumberMap portStats : list) {
public synchronized void updateAggregateFlowStats(Short tableId, AggregateFlowStatistics flowStats) {
if (tableId != null) {
- final DataModificationTransaction trans = statisticsProvider.startChange();
+ final DataModificationTransaction trans = dps.beginTransaction();
InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNodeKey)
}
public synchronized void updateGroupFeatures(GroupFeatures notification) {
- final DataModificationTransaction trans = statisticsProvider.startChange();
+ final DataModificationTransaction trans = dps.beginTransaction();
final NodeBuilder nodeData = new NodeBuilder();
nodeData.setKey(targetNodeKey);
}
public synchronized void updateMeterFeatures(MeterFeatures features) {
- final DataModificationTransaction trans = statisticsProvider.startChange();
+ final DataModificationTransaction trans = dps.beginTransaction();
final NodeBuilder nodeData = new NodeBuilder();
nodeData.setKey(targetNodeKey);
public synchronized void updateFlowStats(List<FlowAndStatisticsMapList> list) {
final Long expiryTime = getExpiryTime();
- final DataModificationTransaction trans = statisticsProvider.startChange();
+ final DataModificationTransaction trans = dps.beginTransaction();
for(FlowAndStatisticsMapList map : list) {
short tableId = map.getTableId();
}
public synchronized void cleanStaleStatistics(){
- final DataModificationTransaction trans = this.statisticsProvider.startChange();
+ final DataModificationTransaction trans = dps.beginTransaction();
final long now = System.nanoTime();
//Clean stale statistics related to group
private static final Logger spLogger = LoggerFactory.getLogger(StatisticsProvider.class);
+ private final ConcurrentMap<NodeId, NodeStatisticsHandler> handlers = new ConcurrentHashMap<>();
private final MultipartMessageManager multipartMessageManager = new MultipartMessageManager();
private final DataProviderService dps;
- //Local caching of stats
- private final ConcurrentMap<NodeId, NodeStatisticsHandler> handlers = new ConcurrentHashMap<>();
-
private OpendaylightGroupStatisticsService groupStatsService;
private OpendaylightMeterStatisticsService meterStatsService;
// Register for switch connect/disconnect notifications
final InstanceIdentifier<FlowCapableNode> fcnId = InstanceIdentifier.builder(Nodes.class)
.child(Node.class).augmentation(FlowCapableNode.class).build();
+ spLogger.debug("Registering FlowCapable tracker to {}", fcnId);
this.flowCapableTrackerRegistration = dbs.registerDataChangeListener(fcnId,
new FlowCapableTracker(this, fcnId));
}
}
- private void sendStatisticsRequestsToNode(NodeStatisticsHandler h) {
+ private void sendStatisticsRequestsToNode(final NodeStatisticsHandler h) {
NodeKey targetNode = h.getTargetNodeKey();
spLogger.debug("Send requests for statistics collection to node : {}", targetNode.getId());
- InstanceIdentifier<Node> targetInstanceId = InstanceIdentifier.builder(Nodes.class).child(Node.class, targetNode).build();
-
- NodeRef targetNodeRef = new NodeRef(targetInstanceId);
-
try{
if(flowTableStatsService != null){
- sendAllFlowTablesStatisticsRequest(targetNodeRef);
+ sendAllFlowTablesStatisticsRequest(h);
}
if(flowStatsService != null){
// FIXME: it does not make sense to trigger this before sendAllFlowTablesStatisticsRequest()
// comes back -- we do not have any tables anyway.
sendAggregateFlowsStatsFromAllTablesRequest(h);
- sendAllFlowsStatsFromAllTablesRequest(targetNodeRef);
+ sendAllFlowsStatsFromAllTablesRequest(h);
}
if(portStatsService != null){
- sendAllNodeConnectorsStatisticsRequest(targetNodeRef);
+ sendAllNodeConnectorsStatisticsRequest(h);
}
if(groupStatsService != null){
- sendAllGroupStatisticsRequest(targetNodeRef);
- sendGroupDescriptionRequest(targetNodeRef);
+ sendAllGroupStatisticsRequest(h);
+ sendGroupDescriptionRequest(h.getTargetNodeRef());
}
if(meterStatsService != null){
- sendAllMeterStatisticsRequest(targetNodeRef);
- sendMeterConfigStatisticsRequest(targetNodeRef);
+ sendAllMeterStatisticsRequest(h);
+ sendMeterConfigStatisticsRequest(h.getTargetNodeRef());
}
if(queueStatsService != null){
- sendAllQueueStatsFromAllNodeConnector(targetNodeRef);
+ sendAllQueueStatsFromAllNodeConnector(h);
}
}catch(Exception e){
spLogger.error("Exception occured while sending statistics requests : {}", e);
}
- private void sendAllFlowTablesStatisticsRequest(NodeRef targetNodeRef) throws InterruptedException, ExecutionException {
+ private void sendAllFlowTablesStatisticsRequest(NodeStatisticsHandler h) throws InterruptedException, ExecutionException {
final GetFlowTablesStatisticsInputBuilder input =
new GetFlowTablesStatisticsInputBuilder();
- input.setNode(targetNodeRef);
+ input.setNode(h.getTargetNodeRef());
Future<RpcResult<GetFlowTablesStatisticsOutput>> response =
flowTableStatsService.getFlowTablesStatistics(input.build());
- this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNodeRef),response.get().getResult().getTransactionId()
+ this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(),response.get().getResult().getTransactionId()
, StatsRequestType.ALL_FLOW_TABLE);
}
- private void sendAllFlowsStatsFromAllTablesRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
+ private void sendAllFlowsStatsFromAllTablesRequest(NodeStatisticsHandler h) throws InterruptedException, ExecutionException{
final GetAllFlowsStatisticsFromAllFlowTablesInputBuilder input =
new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder();
- input.setNode(targetNode);
+ input.setNode(h.getTargetNodeRef());
Future<RpcResult<GetAllFlowsStatisticsFromAllFlowTablesOutput>> response =
flowStatsService.getAllFlowsStatisticsFromAllFlowTables(input.build());
- this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
+ this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_FLOW);
}
, StatsRequestType.AGGR_FLOW);
}
- private void sendAllNodeConnectorsStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
+ private void sendAllNodeConnectorsStatisticsRequest(NodeStatisticsHandler h) throws InterruptedException, ExecutionException{
final GetAllNodeConnectorsStatisticsInputBuilder input = new GetAllNodeConnectorsStatisticsInputBuilder();
- input.setNode(targetNode);
+ input.setNode(h.getTargetNodeRef());
Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> response =
portStatsService.getAllNodeConnectorsStatistics(input.build());
- this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
+ this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_PORT);
}
- private void sendAllGroupStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
+ private void sendAllGroupStatisticsRequest(NodeStatisticsHandler h) throws InterruptedException, ExecutionException{
final GetAllGroupStatisticsInputBuilder input = new GetAllGroupStatisticsInputBuilder();
- input.setNode(targetNode);
+ input.setNode(h.getTargetNodeRef());
Future<RpcResult<GetAllGroupStatisticsOutput>> response =
groupStatsService.getAllGroupStatistics(input.build());
- this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
+ this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_GROUP);
}
}
- private void sendAllMeterStatisticsRequest(NodeRef targetNode) throws InterruptedException, ExecutionException{
+ private void sendAllMeterStatisticsRequest(NodeStatisticsHandler h) throws InterruptedException, ExecutionException{
GetAllMeterStatisticsInputBuilder input = new GetAllMeterStatisticsInputBuilder();
- input.setNode(targetNode);
+ input.setNode(h.getTargetNodeRef());
Future<RpcResult<GetAllMeterStatisticsOutput>> response =
meterStatsService.getAllMeterStatistics(input.build());
- this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
+ this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_METER);;
}
}
- private void sendAllQueueStatsFromAllNodeConnector(NodeRef targetNode) throws InterruptedException, ExecutionException {
+ private void sendAllQueueStatsFromAllNodeConnector(NodeStatisticsHandler h) throws InterruptedException, ExecutionException {
GetAllQueuesStatisticsFromAllPortsInputBuilder input = new GetAllQueuesStatisticsFromAllPortsInputBuilder();
- input.setNode(targetNode);
+ input.setNode(h.getTargetNodeRef());
Future<RpcResult<GetAllQueuesStatisticsFromAllPortsOutput>> response =
queueStatsService.getAllQueuesStatisticsFromAllPorts(input.build());
- this.multipartMessageManager.addTxIdToRequestTypeEntry(getNodeId(targetNode), response.get().getResult().getTransactionId()
+ this.multipartMessageManager.addTxIdToRequestTypeEntry(h.getTargetNodeKey().getId(), response.get().getResult().getTransactionId()
, StatsRequestType.ALL_QUEUE_STATS);;
}
}
}
- synchronized void startNodeHandlers(final Collection<NodeKey> addedNodes) {
+ void startNodeHandlers(final Collection<NodeKey> addedNodes) {
for (NodeKey key : addedNodes) {
if (handlers.containsKey(key.getId())) {
spLogger.warn("Attempted to start already-existing handler for {}, very strange", key.getId());
continue;
}
- final NodeStatisticsHandler h = new NodeStatisticsHandler(this, key);
- handlers.put(key.getId(), h);
- spLogger.debug("Started node handler for {}", key.getId());
+ final NodeStatisticsHandler h = new NodeStatisticsHandler(dps, key);
+ final NodeStatisticsHandler old = handlers.putIfAbsent(key.getId(), h);
+ if (old == null) {
+ spLogger.debug("Started node handler for {}", key.getId());
- // FIXME: this should be in the NodeStatisticsHandler itself
- sendStatisticsRequestsToNode(h);
+ // FIXME: this should be in the NodeStatisticsHandler itself
+ sendStatisticsRequestsToNode(h);
+ } else {
+ spLogger.debug("Prevented race on handler for {}", key.getId());
+ }
}
}
- synchronized void stopNodeHandlers(final Collection<NodeKey> removedNodes) {
+ void stopNodeHandlers(final Collection<NodeKey> removedNodes) {
for (NodeKey key : removedNodes) {
final NodeStatisticsHandler s = handlers.remove(key.getId());
if (s != null) {