Fix bug 2413 NPE for group and meters
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / impl / StatPermCollectorImpl.java
1 package org.opendaylight.controller.md.statistics.manager.impl;
2
3 import java.util.ArrayList;
4 import java.util.Collections;
5 import java.util.HashMap;
6 import java.util.List;
7 import java.util.Map;
8 import java.util.Map.Entry;
9 import java.util.concurrent.ExecutorService;
10 import java.util.concurrent.Executors;
11 import java.util.concurrent.ThreadFactory;
12
13 import org.opendaylight.controller.md.statistics.manager.StatPermCollector;
14 import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
15 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
16 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
17 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
18 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
21
22 import com.google.common.base.Preconditions;
23 import com.google.common.util.concurrent.ThreadFactoryBuilder;
24
25 /**
26  * statistics-manager
27  * org.opendaylight.controller.md.statistics.manager.impl
28  *
29  * StatPermCollectorImpl
30  * Thread base statistic collector. Class holds internal map for all registered
31  * (means connected) nodes with List of Switch capabilities;
32  * Statistics collecting process get cross whole Network Device by device
33  * and statistic by statistic (follow Switch capabilities to prevent unnecessary
34  * ask) Next statistic start collecting by notification or by timeout.
35  *
36  * @author @author avishnoi@in.ibm.com <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
37  *
38  */
39 public class StatPermCollectorImpl implements StatPermCollector {
40
41     private final static Logger LOG = LoggerFactory.getLogger(StatPermCollectorImpl.class);
42
43     private final static long STAT_COLLECT_TIME_OUT = 30000L;
44
45     private final ExecutorService statNetCollectorServ;
46     private final StatisticsManager manager;
47
48     private final int maxNodeForCollector;
49     private final long minReqNetInterval;
50     private final String name;
51
52     private final Object statCollectorLock = new Object();
53     private final Object statNodeHolderLock = new Object();
54
55     private Map<InstanceIdentifier<Node>, StatNodeInfoHolder> statNodeHolder =
56             Collections.<InstanceIdentifier<Node>, StatNodeInfoHolder> emptyMap();
57
58     private volatile boolean wakeMe = false;
59     private volatile boolean finishing = false;
60
61     public StatPermCollectorImpl(final StatisticsManager manager, final long minReqNetInterv, final int nr,
62             final int maxNodeForCollectors) {
63         this.manager = Preconditions.checkNotNull(manager, "StatisticsManager can not be null!");
64         name = "odl-stat-collector-" + nr;
65         minReqNetInterval = minReqNetInterv;
66         final ThreadFactory threadFact = new ThreadFactoryBuilder()
67             .setNameFormat(name + "-thread-%d").build();
68         statNetCollectorServ = Executors.newSingleThreadExecutor(threadFact);
69         maxNodeForCollector = maxNodeForCollectors;
70         LOG.trace("StatCollector {} start successfull!", name);
71     }
72
73     /**
74      * finish collecting statistics
75      */
76     @Override
77     public void close() {
78         statNodeHolder = Collections.<InstanceIdentifier<Node>, StatNodeInfoHolder> emptyMap();
79         finishing = true;
80         collectNextStatistics();
81         statNetCollectorServ.shutdown();
82     }
83
84     @Override
85     public boolean hasActiveNodes() {
86         return ( ! statNodeHolder.isEmpty());
87     }
88
89     @Override
90     public boolean isProvidedFlowNodeActive(
91             final InstanceIdentifier<Node> flowNode) {
92         return statNodeHolder.containsKey(flowNode);
93     }
94
95     @Override
96     public boolean connectedNodeRegistration(final InstanceIdentifier<Node> ident,
97             final List<StatCapabTypes> statTypes, final Short nrOfSwitchTables) {
98         if (isNodeIdentValidForUse(ident)) {
99             if ( ! statNodeHolder.containsKey(ident)) {
100                 synchronized (statNodeHolderLock) {
101                     final boolean startStatCollecting = statNodeHolder.size() == 0;
102                     if ( ! statNodeHolder.containsKey(ident)) {
103                         if (statNodeHolder.size() >= maxNodeForCollector) {
104                             return false;
105                         }
106                         final Map<InstanceIdentifier<Node>, StatNodeInfoHolder> statNode =
107                                 new HashMap<>(statNodeHolder);
108                         final NodeRef nodeRef = new NodeRef(ident);
109                         final StatNodeInfoHolder nodeInfoHolder = new StatNodeInfoHolder(nodeRef,
110                                 statTypes, nrOfSwitchTables);
111                         statNode.put(ident, nodeInfoHolder);
112                         statNodeHolder = Collections.unmodifiableMap(statNode);
113                     }
114                     if (startStatCollecting) {
115                         finishing = false;
116                         statNetCollectorServ.execute(this);
117                     }
118                 }
119             }
120         }
121         return true;
122     }
123
124     @Override
125     public boolean disconnectedNodeUnregistration(final InstanceIdentifier<Node> ident) {
126         if (isNodeIdentValidForUse(ident)) {
127             if (statNodeHolder.containsKey(ident)) {
128                 synchronized (statNodeHolderLock) {
129                     if (statNodeHolder.containsKey(ident)) {
130                         final Map<InstanceIdentifier<Node>, StatNodeInfoHolder> statNode =
131                                 new HashMap<>(statNodeHolder);
132                         statNode.remove(ident);
133                         statNodeHolder = Collections.unmodifiableMap(statNode);
134                     }
135                     if (statNodeHolder.isEmpty()) {
136                         finishing = true;
137                         collectNextStatistics();
138                         statNetCollectorServ.shutdown();
139                     }
140                     return true;
141                 }
142             }
143         }
144         return false;
145     }
146
147     @Override
148     public boolean registerAdditionalNodeFeature(final InstanceIdentifier<Node> ident,
149             final StatCapabTypes statCapab) {
150         if (isNodeIdentValidForUse(ident)) {
151             if ( ! statNodeHolder.containsKey(ident)) {
152                 return false;
153             }
154             final StatNodeInfoHolder statNode = statNodeHolder.get(ident);
155             if ( ! statNode.getStatMarkers().contains(statCapab)) {
156                 synchronized (statNodeHolderLock) {
157                     if ( ! statNode.getStatMarkers().contains(statCapab)) {
158                         final List<StatCapabTypes> statCapabForEdit = new ArrayList<>(statNode.getStatMarkers());
159                         statCapabForEdit.add(statCapab);
160                         final StatNodeInfoHolder nodeInfoHolder = new StatNodeInfoHolder(statNode.getNodeRef(),
161                                 Collections.unmodifiableList(statCapabForEdit), statNode.getMaxTables());
162
163                         final Map<InstanceIdentifier<Node>, StatNodeInfoHolder> statNodes =
164                                 new HashMap<>(statNodeHolder);
165                         statNodes.put(ident, nodeInfoHolder);
166                         statNodeHolder = Collections.unmodifiableMap(statNodes);
167                     }
168                 }
169             }
170         }
171         return true;
172     }
173
174     @Override
175     public void collectNextStatistics() {
176         if (wakeMe) {
177             synchronized (statCollectorLock) {
178                 if (wakeMe) {
179                     LOG.trace("STAT-COLLECTOR is notified to conntinue");
180                     statCollectorLock.notify();
181                 }
182             }
183         }
184     }
185
186     @Override
187     public void run() {
188         try {
189             Thread.sleep(5000);
190         }
191         catch (final InterruptedException e1) {
192             // NOOP
193         }
194         LOG.debug("StatCollector {} Start collecting!", name);
195          /* Neverending cyle - wait for finishing */
196          while ( ! finishing) {
197             boolean collecting = false;
198             final long startTime = System.currentTimeMillis();
199
200             if ( ! statNodeHolder.isEmpty()) {
201                 collecting = true;
202                 collectStatCrossNetwork();
203                 collecting = false;
204             }
205
206             if ( ! collecting) {
207                 final long statFinalTime = System.currentTimeMillis() - startTime;
208                 LOG.debug("STAT-MANAGER {}: last all NET statistics collection cost {} ms", name, statFinalTime);
209                 if (statFinalTime < minReqNetInterval) {
210                     LOG.trace("statCollector is about to make a collecting sleep");
211                     synchronized (statCollectorLock) {
212                         wakeMe = true;
213                         try {
214                             final long waitTime = minReqNetInterval - statFinalTime;
215                             statCollectorLock.wait(waitTime);
216                             LOG.trace("STAT-MANAGER : statCollector {} is waking up from a collecting sleep for {} ms", name, waitTime);
217                         } catch (final InterruptedException e) {
218                             LOG.warn("statCollector has been interrupted during collecting sleep", e);
219                         } finally {
220                             wakeMe = false;
221                         }
222                     }
223                 }
224             }
225         }
226     }
227
228     private void waitingForNotification() {
229         synchronized (statCollectorLock) {
230             wakeMe = true;
231             try {
232                 statCollectorLock.wait(STAT_COLLECT_TIME_OUT);
233                 LOG.trace("statCollector is waking up from a wait stat Response sleep");
234             } catch (final InterruptedException e) {
235                 LOG.warn("statCollector has been interrupted waiting stat Response sleep", e);
236             } finally {
237                 wakeMe = false;
238             }
239         }
240     }
241
242
243     private void collectStatCrossNetwork() {
244         for (final Entry<InstanceIdentifier<Node>, StatNodeInfoHolder> nodeEntity : statNodeHolder.entrySet()) {
245             final List<StatCapabTypes> listNeededStat = nodeEntity.getValue().getStatMarkers();
246             final NodeRef actualNodeRef = nodeEntity.getValue().getNodeRef();
247             final Short maxTables = nodeEntity.getValue().getMaxTables();
248             for (final StatCapabTypes statMarker : listNeededStat) {
249                 if ( ! isProvidedFlowNodeActive(nodeEntity.getKey())) {
250                     break;
251                 }
252                 switch (statMarker) {
253                 case PORT_STATS:
254                     LOG.trace("STAT-MANAGER-collecting PORT-STATS for NodeRef {}", actualNodeRef);
255                     manager.getRpcMsgManager().getAllPortsStat(actualNodeRef);
256                     waitingForNotification();
257                     break;
258                 case QUEUE_STATS:
259                     LOG.trace("STAT-MANAGER-collecting QUEUE-STATS for NodeRef {}", actualNodeRef);
260                     manager.getRpcMsgManager().getAllQueueStat(actualNodeRef);
261                     waitingForNotification();
262                     break;
263                 case TABLE_STATS:
264                     LOG.trace("STAT-MANAGER-collecting TABLE-STATS for NodeRef {}", actualNodeRef);
265                     manager.getRpcMsgManager().getAllTablesStat(actualNodeRef);
266                     waitingForNotification();
267                     break;
268                 case GROUP_STATS:
269                     LOG.trace("STAT-MANAGER-collecting GROUP-STATS for NodeRef {}", actualNodeRef);
270                     manager.getRpcMsgManager().getAllGroupsConfStats(actualNodeRef);
271                     waitingForNotification();
272                     manager.getRpcMsgManager().getAllGroupsStat(actualNodeRef);
273                     waitingForNotification();
274                     break;
275                 case METER_STATS:
276                     LOG.trace("STAT-MANAGER-collecting METER-STATS for NodeRef {}", actualNodeRef);
277                     manager.getRpcMsgManager().getAllMeterConfigStat(actualNodeRef);
278                     waitingForNotification();
279                     manager.getRpcMsgManager().getAllMetersStat(actualNodeRef);
280                     waitingForNotification();
281                     break;
282                 case FLOW_STATS:
283                     LOG.trace("STAT-MANAGER-collecting FLOW-STATS-ALL_FLOWS for NodeRef {}", actualNodeRef);
284                     manager.getRpcMsgManager().getAllFlowsStat(actualNodeRef);
285                     waitingForNotification();
286                     LOG.trace("STAT-MANAGER-collecting FLOW-AGGREGATE-STATS for NodeRef {}", actualNodeRef);
287                     for (short i = 0; i < maxTables; i++) {
288                         final TableId tableId = new TableId(i);
289                         manager.getRpcMsgManager().getAggregateFlowStat(actualNodeRef, tableId);
290                     }
291                     break;
292                 default:
293                     /* Exception for programmers in implementation cycle */
294                     throw new IllegalStateException("Not implemented ASK for " + statMarker);
295                 }
296             }
297         }
298     }
299
300     private class StatNodeInfoHolder {
301         private final NodeRef nodeRef;
302         private final List<StatCapabTypes> statMarkers;
303         private final Short maxTables;
304
305         public StatNodeInfoHolder(final NodeRef nodeRef,
306                 final List<StatCapabTypes> statMarkers, final Short maxTables) {
307             this.nodeRef = nodeRef;
308             this.maxTables = maxTables;
309             this.statMarkers = statMarkers;
310         }
311
312         public final NodeRef getNodeRef() {
313             return nodeRef;
314         }
315
316         public final List<StatCapabTypes> getStatMarkers() {
317             return statMarkers;
318         }
319
320         public final Short getMaxTables() {
321             return maxTables;
322         }
323     }
324
325     private boolean isNodeIdentValidForUse(final InstanceIdentifier<Node> ident) {
326         if (ident == null) {
327             LOG.warn("FlowCapableNode InstanceIdentifier {} can not be null!");
328             return false;
329         }
330         if (ident.isWildcarded()) {
331             LOG.warn("FlowCapableNode InstanceIdentifier {} can not be wildcarded!", ident);
332             return false;
333         }
334         return true;
335     }
336 }
337