2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.applications.statistics.manager.impl;
11 import java.util.ArrayList;
12 import java.util.Collections;
13 import java.util.HashMap;
14 import java.util.List;
16 import java.util.Map.Entry;
17 import java.util.concurrent.ExecutionException;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.ThreadFactory;
22 import com.google.common.base.Optional;
23 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
24 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipState;
25 import org.opendaylight.openflowplugin.applications.statistics.manager.StatPermCollector;
26 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionId;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
33 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
37 import com.google.common.base.Preconditions;
38 import com.google.common.util.concurrent.ThreadFactoryBuilder;
42 * org.opendaylight.openflowplugin.applications.statistics.manager.impl
44 * StatPermCollectorImpl
45 * Thread base statistic collector. Class holds internal map for all registered
46 * (means connected) nodes with List of Switch capabilities;
47 * Statistics collecting process get cross whole Network Device by device
48 * and statistic by statistic (follow Switch capabilities to prevent unnecessary
49 * ask) Next statistic start collecting by notification or by timeout.
51 * @author @author avishnoi@in.ibm.com <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
54 public class StatPermCollectorImpl implements StatPermCollector {
56 private static final Logger LOG = LoggerFactory.getLogger(StatPermCollectorImpl.class);
58 private static final long STAT_COLLECT_TIME_OUT = 3000L;
61 sleep 5 second before collecting all statistics cycles is important
62 for loading all Nodes to Operational/DS
64 private static final long WAIT_BEFORE_COLLECTING_STATS = 5000;
66 private final ExecutorService statNetCollectorServ;
67 private final StatisticsManager manager;
69 private final int maxNodeForCollector;
70 private final long minReqNetInterval;
71 private final String name;
73 private final Object statCollectorLock = new Object();
74 private final Object statNodeHolderLock = new Object();
75 private final Object transNotifyLock = new Object();
77 private Map<InstanceIdentifier<Node>, StatNodeInfoHolder> statNodeHolder =
78 Collections.<InstanceIdentifier<Node>, StatNodeInfoHolder> emptyMap();
80 private volatile boolean wakeMe = false;
81 private volatile boolean finishing = false;
82 private TransactionId actualTransactionId;
84 public StatPermCollectorImpl(final StatisticsManager manager, final long minReqNetInterv, final int nr,
85 final int maxNodeForCollectors) {
86 this.manager = Preconditions.checkNotNull(manager, "StatisticsManager can not be null!");
87 name = "odl-stat-collector-" + nr;
88 minReqNetInterval = minReqNetInterv;
89 final ThreadFactory threadFact = new ThreadFactoryBuilder()
90 .setNameFormat(name + "-thread-%d").build();
91 statNetCollectorServ = Executors.newSingleThreadExecutor(threadFact);
92 maxNodeForCollector = maxNodeForCollectors;
93 LOG.trace("StatCollector {} start successful!", name);
97 * finish collecting statistics
100 public void close() {
101 statNodeHolder = Collections.<InstanceIdentifier<Node>, StatNodeInfoHolder> emptyMap();
103 collectNextStatistics(actualTransactionId);
104 statNetCollectorServ.shutdown();
108 public boolean hasActiveNodes() {
109 return ( ! statNodeHolder.isEmpty());
113 public boolean isProvidedFlowNodeActive(
114 final InstanceIdentifier<Node> flowNode) {
115 return statNodeHolder.containsKey(flowNode);
119 public boolean connectedNodeRegistration(final InstanceIdentifier<Node> ident,
120 final List<StatCapabTypes> statTypes, final Short nrOfSwitchTables) {
121 if (isNodeIdentValidForUse(ident) && ! statNodeHolder.containsKey(ident)) {
122 synchronized (statNodeHolderLock) {
123 final boolean startStatCollecting = statNodeHolder.size() == 0;
124 if ( ! statNodeHolder.containsKey(ident)) {
125 if (statNodeHolder.size() >= maxNodeForCollector) {
128 final Map<InstanceIdentifier<Node>, StatNodeInfoHolder> statNode =
129 new HashMap<>(statNodeHolder);
130 final NodeRef nodeRef = new NodeRef(ident);
131 final StatNodeInfoHolder nodeInfoHolder = new StatNodeInfoHolder(nodeRef,
132 statTypes, nrOfSwitchTables);
133 statNode.put(ident, nodeInfoHolder);
134 statNodeHolder = Collections.unmodifiableMap(statNode);
136 if (startStatCollecting) {
138 statNetCollectorServ.execute(this);
146 public boolean disconnectedNodeUnregistration(final InstanceIdentifier<Node> ident) {
147 if (isNodeIdentValidForUse(ident) && statNodeHolder.containsKey(ident)) {
148 synchronized (statNodeHolderLock) {
149 if (statNodeHolder.containsKey(ident)) {
150 final Map<InstanceIdentifier<Node>, StatNodeInfoHolder> statNode =
151 new HashMap<>(statNodeHolder);
152 statNode.remove(ident);
153 statNodeHolder = Collections.unmodifiableMap(statNode);
155 if (statNodeHolder.isEmpty()) {
157 collectNextStatistics(actualTransactionId);
158 statNetCollectorServ.shutdown();
167 public boolean registerAdditionalNodeFeature(final InstanceIdentifier<Node> ident,
168 final StatCapabTypes statCapab) {
169 if (isNodeIdentValidForUse(ident)) {
170 if ( ! statNodeHolder.containsKey(ident)) {
173 final StatNodeInfoHolder statNode = statNodeHolder.get(ident);
174 if ( ! statNode.getStatMarkers().contains(statCapab)) {
175 synchronized (statNodeHolderLock) {
176 if ( ! statNode.getStatMarkers().contains(statCapab)) {
177 final List<StatCapabTypes> statCapabForEdit = new ArrayList<>(statNode.getStatMarkers());
178 statCapabForEdit.add(statCapab);
179 final StatNodeInfoHolder nodeInfoHolder = new StatNodeInfoHolder(statNode.getNodeRef(),
180 Collections.unmodifiableList(statCapabForEdit), statNode.getMaxTables());
182 final Map<InstanceIdentifier<Node>, StatNodeInfoHolder> statNodes =
183 new HashMap<>(statNodeHolder);
184 statNodes.put(ident, nodeInfoHolder);
185 statNodeHolder = Collections.unmodifiableMap(statNodes);
194 public void collectNextStatistics(final TransactionId xid) {
195 if (checkTransactionId(xid) && wakeMe) {
196 synchronized (statCollectorLock) {
198 LOG.trace("STAT-COLLECTOR is notified to conntinue");
199 statCollectorLock.notify();
208 Thread.sleep(WAIT_BEFORE_COLLECTING_STATS);
210 catch (final InterruptedException e1) {
213 LOG.debug("StatCollector {} Start collecting!", name);
214 /* Neverending cyle - wait for finishing */
215 while ( ! finishing) {
216 boolean collecting = false;
217 final long startTime = System.currentTimeMillis();
219 if ( ! statNodeHolder.isEmpty()) {
221 collectStatCrossNetwork();
226 final long statFinalTime = System.currentTimeMillis() - startTime;
227 LOG.debug("STAT-MANAGER {}: last all NET statistics collection cost {} ms", name, statFinalTime);
228 if (statFinalTime < minReqNetInterval) {
229 LOG.trace("statCollector is about to make a collecting sleep");
230 synchronized (statCollectorLock) {
233 final long waitTime = minReqNetInterval - statFinalTime;
234 statCollectorLock.wait(waitTime);
235 LOG.trace("STAT-MANAGER : statCollector {} is waking up from a collecting sleep for {} ms", name, waitTime);
236 } catch (final InterruptedException e) {
237 LOG.warn("statCollector has been interrupted during collecting sleep", e);
247 private void waitingForNotification() {
248 synchronized (statCollectorLock) {
251 statCollectorLock.wait(STAT_COLLECT_TIME_OUT);
252 LOG.trace("statCollector is waking up from a wait stat Response sleep");
253 } catch (final InterruptedException e) {
254 LOG.warn("statCollector has been interrupted waiting stat Response sleep", e);
256 setActualTransactionId(null);
263 private void collectStatCrossNetwork() {
264 for (final Entry<InstanceIdentifier<Node>, StatNodeInfoHolder> nodeEntity : statNodeHolder.entrySet()) {
265 final NodeKey nodeKey = nodeEntity.getKey().firstKeyOf(Node.class);
266 if (!this.isThisInstanceNodeOwner(nodeKey.getId())) {
269 LOG.trace("collectStatCrossNetwork: Controller is owner of the " +
270 "node {}, so collecting the statistics.",nodeKey);
272 final List<StatCapabTypes> listNeededStat = nodeEntity.getValue().getStatMarkers();
273 final NodeRef actualNodeRef = nodeEntity.getValue().getNodeRef();
274 final Short maxTables = nodeEntity.getValue().getMaxTables();
275 for (final StatCapabTypes statMarker : listNeededStat) {
276 if ( ! isProvidedFlowNodeActive(nodeEntity.getKey())) {
280 switch (statMarker) {
282 LOG.trace("STAT-MANAGER-collecting PORT-STATS for NodeRef {}", actualNodeRef);
283 setActualTransactionId(manager.getRpcMsgManager().getAllPortsStat(actualNodeRef).get());
284 waitingForNotification();
287 LOG.trace("STAT-MANAGER-collecting QUEUE-STATS for NodeRef {}", actualNodeRef);
288 setActualTransactionId(manager.getRpcMsgManager().getAllQueueStat(actualNodeRef).get());
289 waitingForNotification();
292 LOG.trace("STAT-MANAGER-collecting TABLE-STATS for NodeRef {}", actualNodeRef);
293 setActualTransactionId(manager.getRpcMsgManager().getAllTablesStat(actualNodeRef).get());
294 waitingForNotification();
297 LOG.trace("STAT-MANAGER-collecting GROUP-STATS for NodeRef {}", actualNodeRef);
298 setActualTransactionId(manager.getRpcMsgManager().getAllGroupsConfStats(actualNodeRef).get());
299 waitingForNotification();
300 setActualTransactionId(manager.getRpcMsgManager().getAllGroupsStat(actualNodeRef).get());
301 waitingForNotification();
304 LOG.trace("STAT-MANAGER-collecting METER-STATS for NodeRef {}", actualNodeRef);
305 setActualTransactionId(manager.getRpcMsgManager().getAllMeterConfigStat(actualNodeRef).get());
306 waitingForNotification();
307 setActualTransactionId(manager.getRpcMsgManager().getAllMetersStat(actualNodeRef).get());
308 waitingForNotification();
311 LOG.trace("STAT-MANAGER-collecting FLOW-STATS-ALL_FLOWS for NodeRef {}", actualNodeRef);
312 setActualTransactionId(manager.getRpcMsgManager().getAllFlowsStat(actualNodeRef).get());
313 waitingForNotification();
314 /*LOG.trace("STAT-MANAGER-collecting FLOW-AGGREGATE-STATS for NodeRef {}", actualNodeRef);
315 for (short i = 0; i < maxTables; i++) {
316 final TableId tableId = new TableId(i);
317 manager.getRpcMsgManager().getAggregateFlowStat(actualNodeRef, tableId);
321 /* Exception for programmers in implementation cycle */
322 throw new IllegalStateException("Not implemented ASK for " + statMarker);
324 } catch (InterruptedException | ExecutionException ex) {
325 LOG.warn("Unexpected RPC exception by call RPC Future!", ex);
332 private boolean isThisInstanceNodeOwner(NodeId nodeId) {
333 return manager.getNodeRegistrator().isFlowCapableNodeOwner(nodeId);
336 private class StatNodeInfoHolder {
337 private final NodeRef nodeRef;
338 private final List<StatCapabTypes> statMarkers;
339 private final Short maxTables;
341 public StatNodeInfoHolder(final NodeRef nodeRef,
342 final List<StatCapabTypes> statMarkers, final Short maxTables) {
343 this.nodeRef = nodeRef;
344 this.maxTables = maxTables;
345 this.statMarkers = statMarkers;
348 public final NodeRef getNodeRef() {
352 public final List<StatCapabTypes> getStatMarkers() {
356 public final Short getMaxTables() {
361 private boolean isNodeIdentValidForUse(final InstanceIdentifier<Node> ident) {
363 LOG.warn("FlowCapableNode InstanceIdentifier {} can not be null!");
366 if (ident.isWildcarded()) {
367 LOG.warn("FlowCapableNode InstanceIdentifier {} can not be wildcarded!", ident);
373 private boolean checkTransactionId(final TransactionId xid) {
374 synchronized (transNotifyLock) {
375 return actualTransactionId != null && actualTransactionId.equals(xid);
379 private void setActualTransactionId(final TransactionId transactionId) {
380 synchronized (transNotifyLock) {
381 actualTransactionId = transactionId;