1 package org.opendaylight.openflowplugin.applications.statistics.manager.impl;
3 import java.util.ArrayList;
4 import java.util.Collections;
5 import java.util.HashMap;
8 import java.util.Map.Entry;
9 import java.util.concurrent.ExecutionException;
10 import java.util.concurrent.ExecutorService;
11 import java.util.concurrent.Executors;
12 import java.util.concurrent.ThreadFactory;
14 import org.opendaylight.openflowplugin.applications.statistics.manager.StatPermCollector;
15 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager;
16 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
17 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
20 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
24 import com.google.common.base.Preconditions;
25 import com.google.common.util.concurrent.ThreadFactoryBuilder;
29 * org.opendaylight.openflowplugin.applications.statistics.manager.impl
31 * StatPermCollectorImpl
32 * Thread base statistic collector. Class holds internal map for all registered
33 * (means connected) nodes with List of Switch capabilities;
34 * Statistics collecting process get cross whole Network Device by device
35 * and statistic by statistic (follow Switch capabilities to prevent unnecessary
36 * ask) Next statistic start collecting by notification or by timeout.
38 * @author @author avishnoi@in.ibm.com <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
41 public class StatPermCollectorImpl implements StatPermCollector {
43 private static final Logger LOG = LoggerFactory.getLogger(StatPermCollectorImpl.class);
45 private static final long STAT_COLLECT_TIME_OUT = 3000L;
48 sleep 5 second before collecting all statistics cycles is important
49 for loading all Nodes to Operational/DS
51 private static final long WAIT_BEFORE_COLLECTING_STATS = 5000;
53 private final ExecutorService statNetCollectorServ;
54 private final StatisticsManager manager;
56 private final int maxNodeForCollector;
57 private final long minReqNetInterval;
58 private final String name;
60 private final Object statCollectorLock = new Object();
61 private final Object statNodeHolderLock = new Object();
62 private final Object transNotifyLock = new Object();
64 private Map<InstanceIdentifier<Node>, StatNodeInfoHolder> statNodeHolder =
65 Collections.<InstanceIdentifier<Node>, StatNodeInfoHolder> emptyMap();
67 private volatile boolean wakeMe = false;
68 private volatile boolean finishing = false;
69 private TransactionId actualTransactionId;
71 public StatPermCollectorImpl(final StatisticsManager manager, final long minReqNetInterv, final int nr,
72 final int maxNodeForCollectors) {
73 this.manager = Preconditions.checkNotNull(manager, "StatisticsManager can not be null!");
74 name = "odl-stat-collector-" + nr;
75 minReqNetInterval = minReqNetInterv;
76 final ThreadFactory threadFact = new ThreadFactoryBuilder()
77 .setNameFormat(name + "-thread-%d").build();
78 statNetCollectorServ = Executors.newSingleThreadExecutor(threadFact);
79 maxNodeForCollector = maxNodeForCollectors;
80 LOG.trace("StatCollector {} start successfull!", name);
84 * finish collecting statistics
88 statNodeHolder = Collections.<InstanceIdentifier<Node>, StatNodeInfoHolder> emptyMap();
90 collectNextStatistics(actualTransactionId);
91 statNetCollectorServ.shutdown();
95 public boolean hasActiveNodes() {
96 return ( ! statNodeHolder.isEmpty());
100 public boolean isProvidedFlowNodeActive(
101 final InstanceIdentifier<Node> flowNode) {
102 return statNodeHolder.containsKey(flowNode);
106 public boolean connectedNodeRegistration(final InstanceIdentifier<Node> ident,
107 final List<StatCapabTypes> statTypes, final Short nrOfSwitchTables) {
108 if (isNodeIdentValidForUse(ident) && ! statNodeHolder.containsKey(ident)) {
109 synchronized (statNodeHolderLock) {
110 final boolean startStatCollecting = statNodeHolder.size() == 0;
111 if ( ! statNodeHolder.containsKey(ident)) {
112 if (statNodeHolder.size() >= maxNodeForCollector) {
115 final Map<InstanceIdentifier<Node>, StatNodeInfoHolder> statNode =
116 new HashMap<>(statNodeHolder);
117 final NodeRef nodeRef = new NodeRef(ident);
118 final StatNodeInfoHolder nodeInfoHolder = new StatNodeInfoHolder(nodeRef,
119 statTypes, nrOfSwitchTables);
120 statNode.put(ident, nodeInfoHolder);
121 statNodeHolder = Collections.unmodifiableMap(statNode);
123 if (startStatCollecting) {
125 statNetCollectorServ.execute(this);
133 public boolean disconnectedNodeUnregistration(final InstanceIdentifier<Node> ident) {
134 if (isNodeIdentValidForUse(ident) && statNodeHolder.containsKey(ident)) {
135 synchronized (statNodeHolderLock) {
136 if (statNodeHolder.containsKey(ident)) {
137 final Map<InstanceIdentifier<Node>, StatNodeInfoHolder> statNode =
138 new HashMap<>(statNodeHolder);
139 statNode.remove(ident);
140 statNodeHolder = Collections.unmodifiableMap(statNode);
142 if (statNodeHolder.isEmpty()) {
144 collectNextStatistics(actualTransactionId);
145 statNetCollectorServ.shutdown();
154 public boolean registerAdditionalNodeFeature(final InstanceIdentifier<Node> ident,
155 final StatCapabTypes statCapab) {
156 if (isNodeIdentValidForUse(ident)) {
157 if ( ! statNodeHolder.containsKey(ident)) {
160 final StatNodeInfoHolder statNode = statNodeHolder.get(ident);
161 if ( ! statNode.getStatMarkers().contains(statCapab)) {
162 synchronized (statNodeHolderLock) {
163 if ( ! statNode.getStatMarkers().contains(statCapab)) {
164 final List<StatCapabTypes> statCapabForEdit = new ArrayList<>(statNode.getStatMarkers());
165 statCapabForEdit.add(statCapab);
166 final StatNodeInfoHolder nodeInfoHolder = new StatNodeInfoHolder(statNode.getNodeRef(),
167 Collections.unmodifiableList(statCapabForEdit), statNode.getMaxTables());
169 final Map<InstanceIdentifier<Node>, StatNodeInfoHolder> statNodes =
170 new HashMap<>(statNodeHolder);
171 statNodes.put(ident, nodeInfoHolder);
172 statNodeHolder = Collections.unmodifiableMap(statNodes);
181 public void collectNextStatistics(final TransactionId xid) {
182 if (checkTransactionId(xid) && wakeMe) {
183 synchronized (statCollectorLock) {
185 LOG.trace("STAT-COLLECTOR is notified to conntinue");
186 statCollectorLock.notify();
195 Thread.sleep(WAIT_BEFORE_COLLECTING_STATS);
197 catch (final InterruptedException e1) {
200 LOG.debug("StatCollector {} Start collecting!", name);
201 /* Neverending cyle - wait for finishing */
202 while ( ! finishing) {
203 boolean collecting = false;
204 final long startTime = System.currentTimeMillis();
206 if ( ! statNodeHolder.isEmpty()) {
208 collectStatCrossNetwork();
213 final long statFinalTime = System.currentTimeMillis() - startTime;
214 LOG.debug("STAT-MANAGER {}: last all NET statistics collection cost {} ms", name, statFinalTime);
215 if (statFinalTime < minReqNetInterval) {
216 LOG.trace("statCollector is about to make a collecting sleep");
217 synchronized (statCollectorLock) {
220 final long waitTime = minReqNetInterval - statFinalTime;
221 statCollectorLock.wait(waitTime);
222 LOG.trace("STAT-MANAGER : statCollector {} is waking up from a collecting sleep for {} ms", name, waitTime);
223 } catch (final InterruptedException e) {
224 LOG.warn("statCollector has been interrupted during collecting sleep", e);
234 private void waitingForNotification() {
235 synchronized (statCollectorLock) {
238 statCollectorLock.wait(STAT_COLLECT_TIME_OUT);
239 LOG.trace("statCollector is waking up from a wait stat Response sleep");
240 } catch (final InterruptedException e) {
241 LOG.warn("statCollector has been interrupted waiting stat Response sleep", e);
243 setActualTransactionId(null);
250 private void collectStatCrossNetwork() {
251 for (final Entry<InstanceIdentifier<Node>, StatNodeInfoHolder> nodeEntity : statNodeHolder.entrySet()) {
252 final List<StatCapabTypes> listNeededStat = nodeEntity.getValue().getStatMarkers();
253 final NodeRef actualNodeRef = nodeEntity.getValue().getNodeRef();
254 final Short maxTables = nodeEntity.getValue().getMaxTables();
255 for (final StatCapabTypes statMarker : listNeededStat) {
256 if ( ! isProvidedFlowNodeActive(nodeEntity.getKey())) {
260 switch (statMarker) {
262 LOG.trace("STAT-MANAGER-collecting PORT-STATS for NodeRef {}", actualNodeRef);
263 setActualTransactionId(manager.getRpcMsgManager().getAllPortsStat(actualNodeRef).get());
264 waitingForNotification();
267 LOG.trace("STAT-MANAGER-collecting QUEUE-STATS for NodeRef {}", actualNodeRef);
268 setActualTransactionId(manager.getRpcMsgManager().getAllQueueStat(actualNodeRef).get());
269 waitingForNotification();
272 LOG.trace("STAT-MANAGER-collecting TABLE-STATS for NodeRef {}", actualNodeRef);
273 setActualTransactionId(manager.getRpcMsgManager().getAllTablesStat(actualNodeRef).get());
274 waitingForNotification();
277 LOG.trace("STAT-MANAGER-collecting GROUP-STATS for NodeRef {}", actualNodeRef);
278 setActualTransactionId(manager.getRpcMsgManager().getAllGroupsConfStats(actualNodeRef).get());
279 waitingForNotification();
280 setActualTransactionId(manager.getRpcMsgManager().getAllGroupsStat(actualNodeRef).get());
281 waitingForNotification();
284 LOG.trace("STAT-MANAGER-collecting METER-STATS for NodeRef {}", actualNodeRef);
285 setActualTransactionId(manager.getRpcMsgManager().getAllMeterConfigStat(actualNodeRef).get());
286 waitingForNotification();
287 setActualTransactionId(manager.getRpcMsgManager().getAllMetersStat(actualNodeRef).get());
288 waitingForNotification();
291 LOG.trace("STAT-MANAGER-collecting FLOW-STATS-ALL_FLOWS for NodeRef {}", actualNodeRef);
292 setActualTransactionId(manager.getRpcMsgManager().getAllFlowsStat(actualNodeRef).get());
293 waitingForNotification();
294 LOG.trace("STAT-MANAGER-collecting FLOW-AGGREGATE-STATS for NodeRef {}", actualNodeRef);
295 for (short i = 0; i < maxTables; i++) {
296 final TableId tableId = new TableId(i);
297 manager.getRpcMsgManager().getAggregateFlowStat(actualNodeRef, tableId);
301 /* Exception for programmers in implementation cycle */
302 throw new IllegalStateException("Not implemented ASK for " + statMarker);
304 } catch (InterruptedException | ExecutionException ex) {
305 LOG.warn("Unexpected RPC exception by call RPC Future!", ex);
312 private class StatNodeInfoHolder {
313 private final NodeRef nodeRef;
314 private final List<StatCapabTypes> statMarkers;
315 private final Short maxTables;
317 public StatNodeInfoHolder(final NodeRef nodeRef,
318 final List<StatCapabTypes> statMarkers, final Short maxTables) {
319 this.nodeRef = nodeRef;
320 this.maxTables = maxTables;
321 this.statMarkers = statMarkers;
324 public final NodeRef getNodeRef() {
328 public final List<StatCapabTypes> getStatMarkers() {
332 public final Short getMaxTables() {
337 private boolean isNodeIdentValidForUse(final InstanceIdentifier<Node> ident) {
339 LOG.warn("FlowCapableNode InstanceIdentifier {} can not be null!");
342 if (ident.isWildcarded()) {
343 LOG.warn("FlowCapableNode InstanceIdentifier {} can not be wildcarded!", ident);
349 private boolean checkTransactionId(final TransactionId xid) {
350 synchronized (transNotifyLock) {
351 return actualTransactionId != null && actualTransactionId.equals(xid);
355 private void setActualTransactionId(final TransactionId transactionId) {
356 synchronized (transNotifyLock) {
357 actualTransactionId = transactionId;