Merge "Make neutron a simple osgi app"
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / impl / StatPermCollectorImpl.java
1 package org.opendaylight.controller.md.statistics.manager.impl;
2
3 import java.util.ArrayList;
4 import java.util.Collections;
5 import java.util.HashMap;
6 import java.util.List;
7 import java.util.Map;
8 import java.util.Map.Entry;
9 import java.util.concurrent.ExecutionException;
10 import java.util.concurrent.ExecutorService;
11 import java.util.concurrent.Executors;
12 import java.util.concurrent.ThreadFactory;
13
14 import org.opendaylight.controller.md.statistics.manager.StatPermCollector;
15 import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
16 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
17 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
20 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23
24 import com.google.common.base.Preconditions;
25 import com.google.common.util.concurrent.ThreadFactoryBuilder;
26
27 /**
28  * statistics-manager
29  * org.opendaylight.controller.md.statistics.manager.impl
30  *
31  * StatPermCollectorImpl
32  * Thread base statistic collector. Class holds internal map for all registered
33  * (means connected) nodes with List of Switch capabilities;
34  * Statistics collecting process get cross whole Network Device by device
35  * and statistic by statistic (follow Switch capabilities to prevent unnecessary
36  * ask) Next statistic start collecting by notification or by timeout.
37  *
38  * @author @author avishnoi@in.ibm.com <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
39  *
40  */
41 public class StatPermCollectorImpl implements StatPermCollector {
42
43     private final static Logger LOG = LoggerFactory.getLogger(StatPermCollectorImpl.class);
44
45     private final static long STAT_COLLECT_TIME_OUT = 3000L;
46
47     private final ExecutorService statNetCollectorServ;
48     private final StatisticsManager manager;
49
50     private final int maxNodeForCollector;
51     private final long minReqNetInterval;
52     private final String name;
53
54     private final Object statCollectorLock = new Object();
55     private final Object statNodeHolderLock = new Object();
56     private final Object transNotifyLock = new Object();
57
58     private Map<InstanceIdentifier<Node>, StatNodeInfoHolder> statNodeHolder =
59             Collections.<InstanceIdentifier<Node>, StatNodeInfoHolder> emptyMap();
60
61     private volatile boolean wakeMe = false;
62     private volatile boolean finishing = false;
63     private TransactionId actualTransactionId;
64
65     public StatPermCollectorImpl(final StatisticsManager manager, final long minReqNetInterv, final int nr,
66             final int maxNodeForCollectors) {
67         this.manager = Preconditions.checkNotNull(manager, "StatisticsManager can not be null!");
68         name = "odl-stat-collector-" + nr;
69         minReqNetInterval = minReqNetInterv;
70         final ThreadFactory threadFact = new ThreadFactoryBuilder()
71             .setNameFormat(name + "-thread-%d").build();
72         statNetCollectorServ = Executors.newSingleThreadExecutor(threadFact);
73         maxNodeForCollector = maxNodeForCollectors;
74         LOG.trace("StatCollector {} start successfull!", name);
75     }
76
77     /**
78      * finish collecting statistics
79      */
80     @Override
81     public void close() {
82         statNodeHolder = Collections.<InstanceIdentifier<Node>, StatNodeInfoHolder> emptyMap();
83         finishing = true;
84         collectNextStatistics(actualTransactionId);
85         statNetCollectorServ.shutdown();
86     }
87
88     @Override
89     public boolean hasActiveNodes() {
90         return ( ! statNodeHolder.isEmpty());
91     }
92
93     @Override
94     public boolean isProvidedFlowNodeActive(
95             final InstanceIdentifier<Node> flowNode) {
96         return statNodeHolder.containsKey(flowNode);
97     }
98
99     @Override
100     public boolean connectedNodeRegistration(final InstanceIdentifier<Node> ident,
101             final List<StatCapabTypes> statTypes, final Short nrOfSwitchTables) {
102         if (isNodeIdentValidForUse(ident)) {
103             if ( ! statNodeHolder.containsKey(ident)) {
104                 synchronized (statNodeHolderLock) {
105                     final boolean startStatCollecting = statNodeHolder.size() == 0;
106                     if ( ! statNodeHolder.containsKey(ident)) {
107                         if (statNodeHolder.size() >= maxNodeForCollector) {
108                             return false;
109                         }
110                         final Map<InstanceIdentifier<Node>, StatNodeInfoHolder> statNode =
111                                 new HashMap<>(statNodeHolder);
112                         final NodeRef nodeRef = new NodeRef(ident);
113                         final StatNodeInfoHolder nodeInfoHolder = new StatNodeInfoHolder(nodeRef,
114                                 statTypes, nrOfSwitchTables);
115                         statNode.put(ident, nodeInfoHolder);
116                         statNodeHolder = Collections.unmodifiableMap(statNode);
117                     }
118                     if (startStatCollecting) {
119                         finishing = false;
120                         statNetCollectorServ.execute(this);
121                     }
122                 }
123             }
124         }
125         return true;
126     }
127
128     @Override
129     public boolean disconnectedNodeUnregistration(final InstanceIdentifier<Node> ident) {
130         if (isNodeIdentValidForUse(ident)) {
131             if (statNodeHolder.containsKey(ident)) {
132                 synchronized (statNodeHolderLock) {
133                     if (statNodeHolder.containsKey(ident)) {
134                         final Map<InstanceIdentifier<Node>, StatNodeInfoHolder> statNode =
135                                 new HashMap<>(statNodeHolder);
136                         statNode.remove(ident);
137                         statNodeHolder = Collections.unmodifiableMap(statNode);
138                     }
139                     if (statNodeHolder.isEmpty()) {
140                         finishing = true;
141                         collectNextStatistics(actualTransactionId);
142                         statNetCollectorServ.shutdown();
143                     }
144                     return true;
145                 }
146             }
147         }
148         return false;
149     }
150
151     @Override
152     public boolean registerAdditionalNodeFeature(final InstanceIdentifier<Node> ident,
153             final StatCapabTypes statCapab) {
154         if (isNodeIdentValidForUse(ident)) {
155             if ( ! statNodeHolder.containsKey(ident)) {
156                 return false;
157             }
158             final StatNodeInfoHolder statNode = statNodeHolder.get(ident);
159             if ( ! statNode.getStatMarkers().contains(statCapab)) {
160                 synchronized (statNodeHolderLock) {
161                     if ( ! statNode.getStatMarkers().contains(statCapab)) {
162                         final List<StatCapabTypes> statCapabForEdit = new ArrayList<>(statNode.getStatMarkers());
163                         statCapabForEdit.add(statCapab);
164                         final StatNodeInfoHolder nodeInfoHolder = new StatNodeInfoHolder(statNode.getNodeRef(),
165                                 Collections.unmodifiableList(statCapabForEdit), statNode.getMaxTables());
166
167                         final Map<InstanceIdentifier<Node>, StatNodeInfoHolder> statNodes =
168                                 new HashMap<>(statNodeHolder);
169                         statNodes.put(ident, nodeInfoHolder);
170                         statNodeHolder = Collections.unmodifiableMap(statNodes);
171                     }
172                 }
173             }
174         }
175         return true;
176     }
177
178     @Override
179     public void collectNextStatistics(final TransactionId xid) {
180         if (checkTransactionId(xid)) {
181             if (wakeMe) {
182                 synchronized (statCollectorLock) {
183                     if (wakeMe) {
184                         LOG.trace("STAT-COLLECTOR is notified to conntinue");
185                         statCollectorLock.notify();
186                     }
187                 }
188             }
189         }
190     }
191
192     @Override
193     public void run() {
194         try {
195             // sleep 5 second before collecting all statistics cycles is important
196             // for loading all Nodes to Operational/DS
197             Thread.sleep(5000);
198         }
199         catch (final InterruptedException e1) {
200             // NOOP
201         }
202         LOG.debug("StatCollector {} Start collecting!", name);
203          /* Neverending cyle - wait for finishing */
204          while ( ! finishing) {
205             boolean collecting = false;
206             final long startTime = System.currentTimeMillis();
207
208             if ( ! statNodeHolder.isEmpty()) {
209                 collecting = true;
210                 collectStatCrossNetwork();
211                 collecting = false;
212             }
213
214             if ( ! collecting) {
215                 final long statFinalTime = System.currentTimeMillis() - startTime;
216                 LOG.debug("STAT-MANAGER {}: last all NET statistics collection cost {} ms", name, statFinalTime);
217                 if (statFinalTime < minReqNetInterval) {
218                     LOG.trace("statCollector is about to make a collecting sleep");
219                     synchronized (statCollectorLock) {
220                         wakeMe = true;
221                         try {
222                             final long waitTime = minReqNetInterval - statFinalTime;
223                             statCollectorLock.wait(waitTime);
224                             LOG.trace("STAT-MANAGER : statCollector {} is waking up from a collecting sleep for {} ms", name, waitTime);
225                         } catch (final InterruptedException e) {
226                             LOG.warn("statCollector has been interrupted during collecting sleep", e);
227                         } finally {
228                             wakeMe = false;
229                         }
230                     }
231                 }
232             }
233         }
234     }
235
236     private void waitingForNotification() {
237         synchronized (statCollectorLock) {
238             wakeMe = true;
239             try {
240                 statCollectorLock.wait(STAT_COLLECT_TIME_OUT);
241                 LOG.trace("statCollector is waking up from a wait stat Response sleep");
242             } catch (final InterruptedException e) {
243                 LOG.warn("statCollector has been interrupted waiting stat Response sleep", e);
244             } finally {
245                 setActualTransactionId(null);
246                 wakeMe = false;
247             }
248         }
249     }
250
251
252     private void collectStatCrossNetwork() {
253         for (final Entry<InstanceIdentifier<Node>, StatNodeInfoHolder> nodeEntity : statNodeHolder.entrySet()) {
254             final List<StatCapabTypes> listNeededStat = nodeEntity.getValue().getStatMarkers();
255             final NodeRef actualNodeRef = nodeEntity.getValue().getNodeRef();
256             final Short maxTables = nodeEntity.getValue().getMaxTables();
257             for (final StatCapabTypes statMarker : listNeededStat) {
258                 if ( ! isProvidedFlowNodeActive(nodeEntity.getKey())) {
259                     break;
260                 }
261                 try {
262                     switch (statMarker) {
263                     case PORT_STATS:
264                         LOG.trace("STAT-MANAGER-collecting PORT-STATS for NodeRef {}", actualNodeRef);
265                         setActualTransactionId(manager.getRpcMsgManager().getAllPortsStat(actualNodeRef).get());
266                         waitingForNotification();
267                         break;
268                     case QUEUE_STATS:
269                         LOG.trace("STAT-MANAGER-collecting QUEUE-STATS for NodeRef {}", actualNodeRef);
270                         setActualTransactionId(manager.getRpcMsgManager().getAllQueueStat(actualNodeRef).get());
271                         waitingForNotification();
272                         break;
273                     case TABLE_STATS:
274                         LOG.trace("STAT-MANAGER-collecting TABLE-STATS for NodeRef {}", actualNodeRef);
275                         setActualTransactionId(manager.getRpcMsgManager().getAllTablesStat(actualNodeRef).get());
276                         waitingForNotification();
277                         break;
278                     case GROUP_STATS:
279                         LOG.trace("STAT-MANAGER-collecting GROUP-STATS for NodeRef {}", actualNodeRef);
280                         setActualTransactionId(manager.getRpcMsgManager().getAllGroupsConfStats(actualNodeRef).get());
281                         waitingForNotification();
282                         setActualTransactionId(manager.getRpcMsgManager().getAllGroupsStat(actualNodeRef).get());
283                         waitingForNotification();
284                         break;
285                     case METER_STATS:
286                         LOG.trace("STAT-MANAGER-collecting METER-STATS for NodeRef {}", actualNodeRef);
287                         setActualTransactionId(manager.getRpcMsgManager().getAllMeterConfigStat(actualNodeRef).get());
288                         waitingForNotification();
289                         setActualTransactionId(manager.getRpcMsgManager().getAllMetersStat(actualNodeRef).get());
290                         waitingForNotification();
291                         break;
292                     case FLOW_STATS:
293                         LOG.trace("STAT-MANAGER-collecting FLOW-STATS-ALL_FLOWS for NodeRef {}", actualNodeRef);
294                         setActualTransactionId(manager.getRpcMsgManager().getAllFlowsStat(actualNodeRef).get());
295                         waitingForNotification();
296                         LOG.trace("STAT-MANAGER-collecting FLOW-AGGREGATE-STATS for NodeRef {}", actualNodeRef);
297                         for (short i = 0; i < maxTables; i++) {
298                             final TableId tableId = new TableId(i);
299                             manager.getRpcMsgManager().getAggregateFlowStat(actualNodeRef, tableId);
300                         }
301                         break;
302                     default:
303                         /* Exception for programmers in implementation cycle */
304                         throw new IllegalStateException("Not implemented ASK for " + statMarker);
305                     }
306                 } catch (InterruptedException | ExecutionException ex) {
307                     LOG.warn("Unexpected RPC exception by call RPC Future!", ex);
308                     continue;
309                 }
310             }
311         }
312     }
313
314     private class StatNodeInfoHolder {
315         private final NodeRef nodeRef;
316         private final List<StatCapabTypes> statMarkers;
317         private final Short maxTables;
318
319         public StatNodeInfoHolder(final NodeRef nodeRef,
320                 final List<StatCapabTypes> statMarkers, final Short maxTables) {
321             this.nodeRef = nodeRef;
322             this.maxTables = maxTables;
323             this.statMarkers = statMarkers;
324         }
325
326         public final NodeRef getNodeRef() {
327             return nodeRef;
328         }
329
330         public final List<StatCapabTypes> getStatMarkers() {
331             return statMarkers;
332         }
333
334         public final Short getMaxTables() {
335             return maxTables;
336         }
337     }
338
339     private boolean isNodeIdentValidForUse(final InstanceIdentifier<Node> ident) {
340         if (ident == null) {
341             LOG.warn("FlowCapableNode InstanceIdentifier {} can not be null!");
342             return false;
343         }
344         if (ident.isWildcarded()) {
345             LOG.warn("FlowCapableNode InstanceIdentifier {} can not be wildcarded!", ident);
346             return false;
347         }
348         return true;
349     }
350
351     private boolean checkTransactionId(final TransactionId xid) {
352         synchronized (transNotifyLock) {
353             return actualTransactionId != null && actualTransactionId.equals(xid);
354         }
355     }
356
357     private void setActualTransactionId(final TransactionId transactionId) {
358         synchronized (transNotifyLock) {
359             actualTransactionId = transactionId;
360         }
361     }
362 }
363