package org.opendaylight.controller.md.statistics.manager.impl;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.opendaylight.controller.md.statistics.manager.StatPermCollector;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
private final static Logger LOG = LoggerFactory.getLogger(StatPermCollectorImpl.class);
- private final static long STAT_COLLECT_TIME_OUT = 30000L;
+ private final static long STAT_COLLECT_TIME_OUT = 3000L;
private final ExecutorService statNetCollectorServ;
private final StatisticsManager manager;
private final Object statCollectorLock = new Object();
private final Object statNodeHolderLock = new Object();
+ private final Object transNotifyLock = new Object();
private Map<InstanceIdentifier<Node>, StatNodeInfoHolder> statNodeHolder =
Collections.<InstanceIdentifier<Node>, StatNodeInfoHolder> emptyMap();
private volatile boolean wakeMe = false;
private volatile boolean finishing = false;
+ private TransactionId actualTransactionId;
public StatPermCollectorImpl(final StatisticsManager manager, final long minReqNetInterv, final int nr,
final int maxNodeForCollectors) {
public void close() {
statNodeHolder = Collections.<InstanceIdentifier<Node>, StatNodeInfoHolder> emptyMap();
finishing = true;
- collectNextStatistics();
+ collectNextStatistics(actualTransactionId);
statNetCollectorServ.shutdown();
}
+ @Override
+ public boolean hasActiveNodes() {
+ return ( ! statNodeHolder.isEmpty());
+ }
+
@Override
public boolean isProvidedFlowNodeActive(
final InstanceIdentifier<Node> flowNode) {
@Override
public boolean connectedNodeRegistration(final InstanceIdentifier<Node> ident,
final List<StatCapabTypes> statTypes, final Short nrOfSwitchTables) {
- if (ident.isWildcarded()) {
- LOG.warn("FlowCapableNode IstanceIdentifier {} registration can not be wildcarded!", ident);
- } else {
+ if (isNodeIdentValidForUse(ident)) {
if ( ! statNodeHolder.containsKey(ident)) {
synchronized (statNodeHolderLock) {
final boolean startStatCollecting = statNodeHolder.size() == 0;
@Override
public boolean disconnectedNodeUnregistration(final InstanceIdentifier<Node> ident) {
- if (ident.isWildcarded()) {
- LOG.warn("FlowCapableNode IstanceIdentifier {} unregistration can not be wildcarded!", ident);
- } else {
+ if (isNodeIdentValidForUse(ident)) {
if (statNodeHolder.containsKey(ident)) {
synchronized (statNodeHolderLock) {
if (statNodeHolder.containsKey(ident)) {
}
if (statNodeHolder.isEmpty()) {
finishing = true;
- collectNextStatistics();
+ collectNextStatistics(actualTransactionId);
statNetCollectorServ.shutdown();
}
return true;
}
@Override
- public void collectNextStatistics() {
- if (wakeMe) {
- synchronized (statCollectorLock) {
- if (wakeMe) {
- LOG.trace("STAT-COLLECTOR is notified to conntinue");
- statCollectorLock.notify();
+ public boolean registerAdditionalNodeFeature(final InstanceIdentifier<Node> ident,
+ final StatCapabTypes statCapab) {
+ if (isNodeIdentValidForUse(ident)) {
+ if ( ! statNodeHolder.containsKey(ident)) {
+ return false;
+ }
+ final StatNodeInfoHolder statNode = statNodeHolder.get(ident);
+ if ( ! statNode.getStatMarkers().contains(statCapab)) {
+ synchronized (statNodeHolderLock) {
+ if ( ! statNode.getStatMarkers().contains(statCapab)) {
+ final List<StatCapabTypes> statCapabForEdit = new ArrayList<>(statNode.getStatMarkers());
+ statCapabForEdit.add(statCapab);
+ final StatNodeInfoHolder nodeInfoHolder = new StatNodeInfoHolder(statNode.getNodeRef(),
+ Collections.unmodifiableList(statCapabForEdit), statNode.getMaxTables());
+
+ final Map<InstanceIdentifier<Node>, StatNodeInfoHolder> statNodes =
+ new HashMap<>(statNodeHolder);
+ statNodes.put(ident, nodeInfoHolder);
+ statNodeHolder = Collections.unmodifiableMap(statNodes);
+ }
+ }
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void collectNextStatistics(final TransactionId xid) {
+ if (checkTransactionId(xid)) {
+ if (wakeMe) {
+ synchronized (statCollectorLock) {
+ if (wakeMe) {
+ LOG.trace("STAT-COLLECTOR is notified to conntinue");
+ statCollectorLock.notify();
+ }
}
}
}
@Override
public void run() {
try {
+ // sleep 5 second before collecting all statistics cycles is important
+ // for loading all Nodes to Operational/DS
Thread.sleep(5000);
}
catch (final InterruptedException e1) {
} catch (final InterruptedException e) {
LOG.warn("statCollector has been interrupted waiting stat Response sleep", e);
} finally {
+ setActualTransactionId(null);
wakeMe = false;
}
}
if ( ! isProvidedFlowNodeActive(nodeEntity.getKey())) {
break;
}
- switch (statMarker) {
- case PORT_STATS:
- LOG.trace("STAT-MANAGER-collecting PORT-STATS for NodeRef {}", actualNodeRef);
- manager.getRpcMsgManager().getAllPortsStat(actualNodeRef);
- waitingForNotification();
- break;
- case QUEUE_STATS:
- LOG.trace("STAT-MANAGER-collecting QUEUE-STATS for NodeRef {}", actualNodeRef);
- manager.getRpcMsgManager().getAllQueueStat(actualNodeRef);
- waitingForNotification();
- break;
- case TABLE_STATS:
- LOG.trace("STAT-MANAGER-collecting TABLE-STATS for NodeRef {}", actualNodeRef);
- manager.getRpcMsgManager().getAllTablesStat(actualNodeRef);
- waitingForNotification();
- break;
- case GROUP_STATS:
- LOG.trace("STAT-MANAGER-collecting GROUP-STATS for NodeRef {}", actualNodeRef);
- manager.getRpcMsgManager().getGroupFeaturesStat(actualNodeRef);
- waitingForNotification();
- manager.getRpcMsgManager().getAllGroupsConfStats(actualNodeRef);
- waitingForNotification();
- manager.getRpcMsgManager().getAllGroupsStat(actualNodeRef);
- waitingForNotification();
- break;
- case METER_STATS:
- LOG.trace("STAT-MANAGER-collecting METER-STATS for NodeRef {}", actualNodeRef);
- manager.getRpcMsgManager().getMeterFeaturesStat(actualNodeRef);
- waitingForNotification();
- manager.getRpcMsgManager().getAllMeterConfigStat(actualNodeRef);
- waitingForNotification();
- manager.getRpcMsgManager().getAllMetersStat(actualNodeRef);
- waitingForNotification();
- break;
- case FLOW_STATS:
- LOG.trace("STAT-MANAGER-collecting FLOW-STATS-ALL_FLOWS for NodeRef {}", actualNodeRef);
- manager.getRpcMsgManager().getAllFlowsStat(actualNodeRef);
- waitingForNotification();
- LOG.trace("STAT-MANAGER-collecting FLOW-AGGREGATE-STATS for NodeRef {}", actualNodeRef);
- for (short i = 0; i < maxTables; i++) {
- final TableId tableId = new TableId(i);
- manager.getRpcMsgManager().getAggregateFlowStat(actualNodeRef, tableId);
+ try {
+ switch (statMarker) {
+ case PORT_STATS:
+ LOG.trace("STAT-MANAGER-collecting PORT-STATS for NodeRef {}", actualNodeRef);
+ setActualTransactionId(manager.getRpcMsgManager().getAllPortsStat(actualNodeRef).get());
+ waitingForNotification();
+ break;
+ case QUEUE_STATS:
+ LOG.trace("STAT-MANAGER-collecting QUEUE-STATS for NodeRef {}", actualNodeRef);
+ setActualTransactionId(manager.getRpcMsgManager().getAllQueueStat(actualNodeRef).get());
+ waitingForNotification();
+ break;
+ case TABLE_STATS:
+ LOG.trace("STAT-MANAGER-collecting TABLE-STATS for NodeRef {}", actualNodeRef);
+ setActualTransactionId(manager.getRpcMsgManager().getAllTablesStat(actualNodeRef).get());
+ waitingForNotification();
+ break;
+ case GROUP_STATS:
+ LOG.trace("STAT-MANAGER-collecting GROUP-STATS for NodeRef {}", actualNodeRef);
+ setActualTransactionId(manager.getRpcMsgManager().getAllGroupsConfStats(actualNodeRef).get());
+ waitingForNotification();
+ setActualTransactionId(manager.getRpcMsgManager().getAllGroupsStat(actualNodeRef).get());
+ waitingForNotification();
+ break;
+ case METER_STATS:
+ LOG.trace("STAT-MANAGER-collecting METER-STATS for NodeRef {}", actualNodeRef);
+ setActualTransactionId(manager.getRpcMsgManager().getAllMeterConfigStat(actualNodeRef).get());
+ waitingForNotification();
+ setActualTransactionId(manager.getRpcMsgManager().getAllMetersStat(actualNodeRef).get());
+ waitingForNotification();
+ break;
+ case FLOW_STATS:
+ LOG.trace("STAT-MANAGER-collecting FLOW-STATS-ALL_FLOWS for NodeRef {}", actualNodeRef);
+ setActualTransactionId(manager.getRpcMsgManager().getAllFlowsStat(actualNodeRef).get());
+ waitingForNotification();
+ LOG.trace("STAT-MANAGER-collecting FLOW-AGGREGATE-STATS for NodeRef {}", actualNodeRef);
+ for (short i = 0; i < maxTables; i++) {
+ final TableId tableId = new TableId(i);
+ manager.getRpcMsgManager().getAggregateFlowStat(actualNodeRef, tableId);
+ }
+ break;
+ default:
+ /* Exception for programmers in implementation cycle */
+ throw new IllegalStateException("Not implemented ASK for " + statMarker);
}
- break;
- default:
- /* Exception for programmers in implementation cycle */
- throw new IllegalStateException("Not implemented ASK for " + statMarker);
+ } catch (InterruptedException | ExecutionException ex) {
+ LOG.warn("Unexpected RPC exception by call RPC Future!", ex);
+ continue;
}
}
}
}
}
- @Override
- public boolean hasActiveNodes() {
- return ( ! statNodeHolder.isEmpty());
+ private boolean isNodeIdentValidForUse(final InstanceIdentifier<Node> ident) {
+ if (ident == null) {
+ LOG.warn("FlowCapableNode InstanceIdentifier {} can not be null!");
+ return false;
+ }
+ if (ident.isWildcarded()) {
+ LOG.warn("FlowCapableNode InstanceIdentifier {} can not be wildcarded!", ident);
+ return false;
+ }
+ return true;
+ }
+
+ private boolean checkTransactionId(final TransactionId xid) {
+ synchronized (transNotifyLock) {
+ return actualTransactionId != null && actualTransactionId.equals(xid);
+ }
+ }
+
+ private void setActualTransactionId(final TransactionId transactionId) {
+ synchronized (transNotifyLock) {
+ actualTransactionId = transactionId;
+ }
}
}