import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.opendaylight.controller.md.statistics.manager.StatPermCollector;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
private final static Logger LOG = LoggerFactory.getLogger(StatPermCollectorImpl.class);
- private final static long STAT_COLLECT_TIME_OUT = 30000L;
+ private final static long STAT_COLLECT_TIME_OUT = 3000L;
private final ExecutorService statNetCollectorServ;
private final StatisticsManager manager;
private final Object statCollectorLock = new Object();
private final Object statNodeHolderLock = new Object();
+ private final Object transNotifyLock = new Object();
private Map<InstanceIdentifier<Node>, StatNodeInfoHolder> statNodeHolder =
Collections.<InstanceIdentifier<Node>, StatNodeInfoHolder> emptyMap();
private volatile boolean wakeMe = false;
private volatile boolean finishing = false;
+ private TransactionId actualTransactionId;
public StatPermCollectorImpl(final StatisticsManager manager, final long minReqNetInterv, final int nr,
final int maxNodeForCollectors) {
public void close() {
statNodeHolder = Collections.<InstanceIdentifier<Node>, StatNodeInfoHolder> emptyMap();
finishing = true;
- collectNextStatistics();
+ collectNextStatistics(actualTransactionId);
statNetCollectorServ.shutdown();
}
}
if (statNodeHolder.isEmpty()) {
finishing = true;
- collectNextStatistics();
+ collectNextStatistics(actualTransactionId);
statNetCollectorServ.shutdown();
}
return true;
}
@Override
- public void collectNextStatistics() {
- if (wakeMe) {
- synchronized (statCollectorLock) {
- if (wakeMe) {
- LOG.trace("STAT-COLLECTOR is notified to conntinue");
- statCollectorLock.notify();
+ public void collectNextStatistics(final TransactionId xid) {
+ if (checkTransactionId(xid)) {
+ if (wakeMe) {
+ synchronized (statCollectorLock) {
+ if (wakeMe) {
+ LOG.trace("STAT-COLLECTOR is notified to conntinue");
+ statCollectorLock.notify();
+ }
}
}
}
@Override
public void run() {
try {
+ // sleep 5 second before collecting all statistics cycles is important
+ // for loading all Nodes to Operational/DS
Thread.sleep(5000);
}
catch (final InterruptedException e1) {
} catch (final InterruptedException e) {
LOG.warn("statCollector has been interrupted waiting stat Response sleep", e);
} finally {
+ setActualTransactionId(null);
wakeMe = false;
}
}
if ( ! isProvidedFlowNodeActive(nodeEntity.getKey())) {
break;
}
- switch (statMarker) {
- case PORT_STATS:
- LOG.trace("STAT-MANAGER-collecting PORT-STATS for NodeRef {}", actualNodeRef);
- manager.getRpcMsgManager().getAllPortsStat(actualNodeRef);
- waitingForNotification();
- break;
- case QUEUE_STATS:
- LOG.trace("STAT-MANAGER-collecting QUEUE-STATS for NodeRef {}", actualNodeRef);
- manager.getRpcMsgManager().getAllQueueStat(actualNodeRef);
- waitingForNotification();
- break;
- case TABLE_STATS:
- LOG.trace("STAT-MANAGER-collecting TABLE-STATS for NodeRef {}", actualNodeRef);
- manager.getRpcMsgManager().getAllTablesStat(actualNodeRef);
- waitingForNotification();
- break;
- case GROUP_STATS:
- LOG.trace("STAT-MANAGER-collecting GROUP-STATS for NodeRef {}", actualNodeRef);
- manager.getRpcMsgManager().getAllGroupsConfStats(actualNodeRef);
- waitingForNotification();
- manager.getRpcMsgManager().getAllGroupsStat(actualNodeRef);
- waitingForNotification();
- break;
- case METER_STATS:
- LOG.trace("STAT-MANAGER-collecting METER-STATS for NodeRef {}", actualNodeRef);
- manager.getRpcMsgManager().getAllMeterConfigStat(actualNodeRef);
- waitingForNotification();
- manager.getRpcMsgManager().getAllMetersStat(actualNodeRef);
- waitingForNotification();
- break;
- case FLOW_STATS:
- LOG.trace("STAT-MANAGER-collecting FLOW-STATS-ALL_FLOWS for NodeRef {}", actualNodeRef);
- manager.getRpcMsgManager().getAllFlowsStat(actualNodeRef);
- waitingForNotification();
- LOG.trace("STAT-MANAGER-collecting FLOW-AGGREGATE-STATS for NodeRef {}", actualNodeRef);
- for (short i = 0; i < maxTables; i++) {
- final TableId tableId = new TableId(i);
- manager.getRpcMsgManager().getAggregateFlowStat(actualNodeRef, tableId);
+ try {
+ switch (statMarker) {
+ case PORT_STATS:
+ LOG.trace("STAT-MANAGER-collecting PORT-STATS for NodeRef {}", actualNodeRef);
+ setActualTransactionId(manager.getRpcMsgManager().getAllPortsStat(actualNodeRef).get());
+ waitingForNotification();
+ break;
+ case QUEUE_STATS:
+ LOG.trace("STAT-MANAGER-collecting QUEUE-STATS for NodeRef {}", actualNodeRef);
+ setActualTransactionId(manager.getRpcMsgManager().getAllQueueStat(actualNodeRef).get());
+ waitingForNotification();
+ break;
+ case TABLE_STATS:
+ LOG.trace("STAT-MANAGER-collecting TABLE-STATS for NodeRef {}", actualNodeRef);
+ setActualTransactionId(manager.getRpcMsgManager().getAllTablesStat(actualNodeRef).get());
+ waitingForNotification();
+ break;
+ case GROUP_STATS:
+ LOG.trace("STAT-MANAGER-collecting GROUP-STATS for NodeRef {}", actualNodeRef);
+ setActualTransactionId(manager.getRpcMsgManager().getAllGroupsConfStats(actualNodeRef).get());
+ waitingForNotification();
+ setActualTransactionId(manager.getRpcMsgManager().getAllGroupsStat(actualNodeRef).get());
+ waitingForNotification();
+ break;
+ case METER_STATS:
+ LOG.trace("STAT-MANAGER-collecting METER-STATS for NodeRef {}", actualNodeRef);
+ setActualTransactionId(manager.getRpcMsgManager().getAllMeterConfigStat(actualNodeRef).get());
+ waitingForNotification();
+ setActualTransactionId(manager.getRpcMsgManager().getAllMetersStat(actualNodeRef).get());
+ waitingForNotification();
+ break;
+ case FLOW_STATS:
+ LOG.trace("STAT-MANAGER-collecting FLOW-STATS-ALL_FLOWS for NodeRef {}", actualNodeRef);
+ setActualTransactionId(manager.getRpcMsgManager().getAllFlowsStat(actualNodeRef).get());
+ waitingForNotification();
+ LOG.trace("STAT-MANAGER-collecting FLOW-AGGREGATE-STATS for NodeRef {}", actualNodeRef);
+ for (short i = 0; i < maxTables; i++) {
+ final TableId tableId = new TableId(i);
+ manager.getRpcMsgManager().getAggregateFlowStat(actualNodeRef, tableId);
+ }
+ break;
+ default:
+ /* Exception for programmers in implementation cycle */
+ throw new IllegalStateException("Not implemented ASK for " + statMarker);
}
- break;
- default:
- /* Exception for programmers in implementation cycle */
- throw new IllegalStateException("Not implemented ASK for " + statMarker);
+ } catch (InterruptedException | ExecutionException ex) {
+ LOG.warn("Unexpected RPC exception by call RPC Future!", ex);
+ continue;
}
}
}
}
return true;
}
+
+ private boolean checkTransactionId(final TransactionId xid) {
+ synchronized (transNotifyLock) {
+ return actualTransactionId != null && actualTransactionId.equals(xid);
+ }
+ }
+
+ private void setActualTransactionId(final TransactionId transactionId) {
+ synchronized (transNotifyLock) {
+ actualTransactionId = transactionId;
+ }
+ }
}