External api proposal
[openflowplugin.git] / applications / statistics-manager / src / main / java / org / opendaylight / openflowplugin / applications / statistics / manager / impl / StatPermCollectorImpl.java
1 package org.opendaylight.openflowplugin.applications.statistics.manager.impl;
2
3 import java.util.ArrayList;
4 import java.util.Collections;
5 import java.util.HashMap;
6 import java.util.List;
7 import java.util.Map;
8 import java.util.Map.Entry;
9 import java.util.concurrent.ExecutionException;
10 import java.util.concurrent.ExecutorService;
11 import java.util.concurrent.Executors;
12 import java.util.concurrent.ThreadFactory;
13
14 import org.opendaylight.openflowplugin.applications.statistics.manager.StatPermCollector;
15 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager;
16 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionId;
17 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
20 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23
24 import com.google.common.base.Preconditions;
25 import com.google.common.util.concurrent.ThreadFactoryBuilder;
26
27 /**
28  * statistics-manager
29  * org.opendaylight.openflowplugin.applications.statistics.manager.impl
30  *
31  * StatPermCollectorImpl
32  * Thread base statistic collector. Class holds internal map for all registered
33  * (means connected) nodes with List of Switch capabilities;
34  * Statistics collecting process get cross whole Network Device by device
35  * and statistic by statistic (follow Switch capabilities to prevent unnecessary
36  * ask) Next statistic start collecting by notification or by timeout.
37  *
38  * @author @author avishnoi@in.ibm.com <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
39  *
40  */
41 public class StatPermCollectorImpl implements StatPermCollector {
42
43     private static final Logger LOG = LoggerFactory.getLogger(StatPermCollectorImpl.class);
44
45     private static final long STAT_COLLECT_TIME_OUT = 3000L;
46
47     /**
48         sleep 5 second before collecting all statistics cycles is important
49         for loading all Nodes to Operational/DS
50      */
51     private static final long WAIT_BEFORE_COLLECTING_STATS = 5000;
52
53     private final ExecutorService statNetCollectorServ;
54     private final StatisticsManager manager;
55
56     private final int maxNodeForCollector;
57     private final long minReqNetInterval;
58     private final String name;
59
60     private final Object statCollectorLock = new Object();
61     private final Object statNodeHolderLock = new Object();
62     private final Object transNotifyLock = new Object();
63
64     private Map<InstanceIdentifier<Node>, StatNodeInfoHolder> statNodeHolder =
65             Collections.<InstanceIdentifier<Node>, StatNodeInfoHolder> emptyMap();
66
67     private volatile boolean wakeMe = false;
68     private volatile boolean finishing = false;
69     private TransactionId actualTransactionId;
70
71     public StatPermCollectorImpl(final StatisticsManager manager, final long minReqNetInterv, final int nr,
72             final int maxNodeForCollectors) {
73         this.manager = Preconditions.checkNotNull(manager, "StatisticsManager can not be null!");
74         name = "odl-stat-collector-" + nr;
75         minReqNetInterval = minReqNetInterv;
76         final ThreadFactory threadFact = new ThreadFactoryBuilder()
77             .setNameFormat(name + "-thread-%d").build();
78         statNetCollectorServ = Executors.newSingleThreadExecutor(threadFact);
79         maxNodeForCollector = maxNodeForCollectors;
80         LOG.trace("StatCollector {} start successfull!", name);
81     }
82
83     /**
84      * finish collecting statistics
85      */
86     @Override
87     public void close() {
88         statNodeHolder = Collections.<InstanceIdentifier<Node>, StatNodeInfoHolder> emptyMap();
89         finishing = true;
90         collectNextStatistics(actualTransactionId);
91         statNetCollectorServ.shutdown();
92     }
93
94     @Override
95     public boolean hasActiveNodes() {
96         return ( ! statNodeHolder.isEmpty());
97     }
98
99     @Override
100     public boolean isProvidedFlowNodeActive(
101             final InstanceIdentifier<Node> flowNode) {
102         return statNodeHolder.containsKey(flowNode);
103     }
104
105     @Override
106     public boolean connectedNodeRegistration(final InstanceIdentifier<Node> ident,
107             final List<StatCapabTypes> statTypes, final Short nrOfSwitchTables) {
108         if (isNodeIdentValidForUse(ident) && ! statNodeHolder.containsKey(ident)) {
109             synchronized (statNodeHolderLock) {
110                 final boolean startStatCollecting = statNodeHolder.size() == 0;
111                 if ( ! statNodeHolder.containsKey(ident)) {
112                     if (statNodeHolder.size() >= maxNodeForCollector) {
113                         return false;
114                     }
115                     final Map<InstanceIdentifier<Node>, StatNodeInfoHolder> statNode =
116                             new HashMap<>(statNodeHolder);
117                     final NodeRef nodeRef = new NodeRef(ident);
118                     final StatNodeInfoHolder nodeInfoHolder = new StatNodeInfoHolder(nodeRef,
119                             statTypes, nrOfSwitchTables);
120                     statNode.put(ident, nodeInfoHolder);
121                     statNodeHolder = Collections.unmodifiableMap(statNode);
122                 }
123                 if (startStatCollecting) {
124                     finishing = false;
125                     statNetCollectorServ.execute(this);
126                 }
127             }
128         }
129         return true;
130     }
131
132     @Override
133     public boolean disconnectedNodeUnregistration(final InstanceIdentifier<Node> ident) {
134         if (isNodeIdentValidForUse(ident) && statNodeHolder.containsKey(ident)) {
135             synchronized (statNodeHolderLock) {
136                 if (statNodeHolder.containsKey(ident)) {
137                     final Map<InstanceIdentifier<Node>, StatNodeInfoHolder> statNode =
138                             new HashMap<>(statNodeHolder);
139                     statNode.remove(ident);
140                     statNodeHolder = Collections.unmodifiableMap(statNode);
141                 }
142                 if (statNodeHolder.isEmpty()) {
143                     finishing = true;
144                     collectNextStatistics(actualTransactionId);
145                     statNetCollectorServ.shutdown();
146                 }
147                 return true;
148             }
149         }
150         return false;
151     }
152
153     @Override
154     public boolean registerAdditionalNodeFeature(final InstanceIdentifier<Node> ident,
155             final StatCapabTypes statCapab) {
156         if (isNodeIdentValidForUse(ident)) {
157             if ( ! statNodeHolder.containsKey(ident)) {
158                 return false;
159             }
160             final StatNodeInfoHolder statNode = statNodeHolder.get(ident);
161             if ( ! statNode.getStatMarkers().contains(statCapab)) {
162                 synchronized (statNodeHolderLock) {
163                     if ( ! statNode.getStatMarkers().contains(statCapab)) {
164                         final List<StatCapabTypes> statCapabForEdit = new ArrayList<>(statNode.getStatMarkers());
165                         statCapabForEdit.add(statCapab);
166                         final StatNodeInfoHolder nodeInfoHolder = new StatNodeInfoHolder(statNode.getNodeRef(),
167                                 Collections.unmodifiableList(statCapabForEdit), statNode.getMaxTables());
168
169                         final Map<InstanceIdentifier<Node>, StatNodeInfoHolder> statNodes =
170                                 new HashMap<>(statNodeHolder);
171                         statNodes.put(ident, nodeInfoHolder);
172                         statNodeHolder = Collections.unmodifiableMap(statNodes);
173                     }
174                 }
175             }
176         }
177         return true;
178     }
179
180     @Override
181     public void collectNextStatistics(final TransactionId xid) {
182         if (checkTransactionId(xid) && wakeMe) {
183             synchronized (statCollectorLock) {
184                 if (wakeMe) {
185                     LOG.trace("STAT-COLLECTOR is notified to conntinue");
186                     statCollectorLock.notify();
187                 }
188             }
189         }
190     }
191
192     @Override
193     public void run() {
194         try {
195             Thread.sleep(WAIT_BEFORE_COLLECTING_STATS);
196         }
197         catch (final InterruptedException e1) {
198             // NOOP
199         }
200         LOG.debug("StatCollector {} Start collecting!", name);
201          /* Neverending cyle - wait for finishing */
202          while ( ! finishing) {
203             boolean collecting = false;
204             final long startTime = System.currentTimeMillis();
205
206             if ( ! statNodeHolder.isEmpty()) {
207                 collecting = true;
208                 collectStatCrossNetwork();
209                 collecting = false;
210             }
211
212             if ( ! collecting) {
213                 final long statFinalTime = System.currentTimeMillis() - startTime;
214                 LOG.debug("STAT-MANAGER {}: last all NET statistics collection cost {} ms", name, statFinalTime);
215                 if (statFinalTime < minReqNetInterval) {
216                     LOG.trace("statCollector is about to make a collecting sleep");
217                     synchronized (statCollectorLock) {
218                         wakeMe = true;
219                         try {
220                             final long waitTime = minReqNetInterval - statFinalTime;
221                             statCollectorLock.wait(waitTime);
222                             LOG.trace("STAT-MANAGER : statCollector {} is waking up from a collecting sleep for {} ms", name, waitTime);
223                         } catch (final InterruptedException e) {
224                             LOG.warn("statCollector has been interrupted during collecting sleep", e);
225                         } finally {
226                             wakeMe = false;
227                         }
228                     }
229                 }
230             }
231         }
232     }
233
234     private void waitingForNotification() {
235         synchronized (statCollectorLock) {
236             wakeMe = true;
237             try {
238                 statCollectorLock.wait(STAT_COLLECT_TIME_OUT);
239                 LOG.trace("statCollector is waking up from a wait stat Response sleep");
240             } catch (final InterruptedException e) {
241                 LOG.warn("statCollector has been interrupted waiting stat Response sleep", e);
242             } finally {
243                 setActualTransactionId(null);
244                 wakeMe = false;
245             }
246         }
247     }
248
249
250     private void collectStatCrossNetwork() {
251         for (final Entry<InstanceIdentifier<Node>, StatNodeInfoHolder> nodeEntity : statNodeHolder.entrySet()) {
252             final List<StatCapabTypes> listNeededStat = nodeEntity.getValue().getStatMarkers();
253             final NodeRef actualNodeRef = nodeEntity.getValue().getNodeRef();
254             final Short maxTables = nodeEntity.getValue().getMaxTables();
255             for (final StatCapabTypes statMarker : listNeededStat) {
256                 if ( ! isProvidedFlowNodeActive(nodeEntity.getKey())) {
257                     break;
258                 }
259                 try {
260                     switch (statMarker) {
261                     case PORT_STATS:
262                         LOG.trace("STAT-MANAGER-collecting PORT-STATS for NodeRef {}", actualNodeRef);
263                         setActualTransactionId(manager.getRpcMsgManager().getAllPortsStat(actualNodeRef).get());
264                         waitingForNotification();
265                         break;
266                     case QUEUE_STATS:
267                         LOG.trace("STAT-MANAGER-collecting QUEUE-STATS for NodeRef {}", actualNodeRef);
268                         setActualTransactionId(manager.getRpcMsgManager().getAllQueueStat(actualNodeRef).get());
269                         waitingForNotification();
270                         break;
271                     case TABLE_STATS:
272                         LOG.trace("STAT-MANAGER-collecting TABLE-STATS for NodeRef {}", actualNodeRef);
273                         setActualTransactionId(manager.getRpcMsgManager().getAllTablesStat(actualNodeRef).get());
274                         waitingForNotification();
275                         break;
276                     case GROUP_STATS:
277                         LOG.trace("STAT-MANAGER-collecting GROUP-STATS for NodeRef {}", actualNodeRef);
278                         setActualTransactionId(manager.getRpcMsgManager().getAllGroupsConfStats(actualNodeRef).get());
279                         waitingForNotification();
280                         setActualTransactionId(manager.getRpcMsgManager().getAllGroupsStat(actualNodeRef).get());
281                         waitingForNotification();
282                         break;
283                     case METER_STATS:
284                         LOG.trace("STAT-MANAGER-collecting METER-STATS for NodeRef {}", actualNodeRef);
285                         setActualTransactionId(manager.getRpcMsgManager().getAllMeterConfigStat(actualNodeRef).get());
286                         waitingForNotification();
287                         setActualTransactionId(manager.getRpcMsgManager().getAllMetersStat(actualNodeRef).get());
288                         waitingForNotification();
289                         break;
290                     case FLOW_STATS:
291                         LOG.trace("STAT-MANAGER-collecting FLOW-STATS-ALL_FLOWS for NodeRef {}", actualNodeRef);
292                         setActualTransactionId(manager.getRpcMsgManager().getAllFlowsStat(actualNodeRef).get());
293                         waitingForNotification();
294                         LOG.trace("STAT-MANAGER-collecting FLOW-AGGREGATE-STATS for NodeRef {}", actualNodeRef);
295                         for (short i = 0; i < maxTables; i++) {
296                             final TableId tableId = new TableId(i);
297                             manager.getRpcMsgManager().getAggregateFlowStat(actualNodeRef, tableId);
298                         }
299                         break;
300                     default:
301                         /* Exception for programmers in implementation cycle */
302                         throw new IllegalStateException("Not implemented ASK for " + statMarker);
303                     }
304                 } catch (InterruptedException | ExecutionException ex) {
305                     LOG.warn("Unexpected RPC exception by call RPC Future!", ex);
306                     continue;
307                 }
308             }
309         }
310     }
311
312     private class StatNodeInfoHolder {
313         private final NodeRef nodeRef;
314         private final List<StatCapabTypes> statMarkers;
315         private final Short maxTables;
316
317         public StatNodeInfoHolder(final NodeRef nodeRef,
318                 final List<StatCapabTypes> statMarkers, final Short maxTables) {
319             this.nodeRef = nodeRef;
320             this.maxTables = maxTables;
321             this.statMarkers = statMarkers;
322         }
323
324         public final NodeRef getNodeRef() {
325             return nodeRef;
326         }
327
328         public final List<StatCapabTypes> getStatMarkers() {
329             return statMarkers;
330         }
331
332         public final Short getMaxTables() {
333             return maxTables;
334         }
335     }
336
337     private boolean isNodeIdentValidForUse(final InstanceIdentifier<Node> ident) {
338         if (ident == null) {
339             LOG.warn("FlowCapableNode InstanceIdentifier {} can not be null!");
340             return false;
341         }
342         if (ident.isWildcarded()) {
343             LOG.warn("FlowCapableNode InstanceIdentifier {} can not be wildcarded!", ident);
344             return false;
345         }
346         return true;
347     }
348
349     private boolean checkTransactionId(final TransactionId xid) {
350         synchronized (transNotifyLock) {
351             return actualTransactionId != null && actualTransactionId.equals(xid);
352         }
353     }
354
355     private void setActualTransactionId(final TransactionId transactionId) {
356         synchronized (transNotifyLock) {
357             actualTransactionId = transactionId;
358         }
359     }
360 }
361