Merge "Bug 4957 Role lifecycle support for TxChainManager in DeviceContext"
[openflowplugin.git] / applications / statistics-manager / src / main / java / org / opendaylight / openflowplugin / applications / statistics / manager / impl / StatPermCollectorImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.statistics.manager.impl;
10
11 import java.util.ArrayList;
12 import java.util.Collections;
13 import java.util.HashMap;
14 import java.util.List;
15 import java.util.Map;
16 import java.util.Map.Entry;
17 import java.util.concurrent.ExecutionException;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.ThreadFactory;
21
22 import com.google.common.base.Optional;
23 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
24 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipState;
25 import org.opendaylight.openflowplugin.applications.statistics.manager.StatPermCollector;
26 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionId;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
33 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 import com.google.common.base.Preconditions;
38 import com.google.common.util.concurrent.ThreadFactoryBuilder;
39
40 /**
41  * statistics-manager
42  * org.opendaylight.openflowplugin.applications.statistics.manager.impl
43  *
44  * StatPermCollectorImpl
45  * Thread base statistic collector. Class holds internal map for all registered
46  * (means connected) nodes with List of Switch capabilities;
47  * Statistics collecting process get cross whole Network Device by device
48  * and statistic by statistic (follow Switch capabilities to prevent unnecessary
49  * ask) Next statistic start collecting by notification or by timeout.
50  *
51  * @author @author avishnoi@in.ibm.com <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
52  *
53  */
54 public class StatPermCollectorImpl implements StatPermCollector {
55
56     private static final Logger LOG = LoggerFactory.getLogger(StatPermCollectorImpl.class);
57
58     private static final long STAT_COLLECT_TIME_OUT = 3000L;
59
60     /**
61         sleep 5 second before collecting all statistics cycles is important
62         for loading all Nodes to Operational/DS
63      */
64     private static final long WAIT_BEFORE_COLLECTING_STATS = 5000;
65
66     private final ExecutorService statNetCollectorServ;
67     private final StatisticsManager manager;
68
69     private final int maxNodeForCollector;
70     private final long minReqNetInterval;
71     private final String name;
72
73     private final Object statCollectorLock = new Object();
74     private final Object statNodeHolderLock = new Object();
75     private final Object transNotifyLock = new Object();
76
77     private Map<InstanceIdentifier<Node>, StatNodeInfoHolder> statNodeHolder =
78             Collections.<InstanceIdentifier<Node>, StatNodeInfoHolder> emptyMap();
79
80     private volatile boolean wakeMe = false;
81     private volatile boolean finishing = false;
82     private TransactionId actualTransactionId;
83
84     public StatPermCollectorImpl(final StatisticsManager manager, final long minReqNetInterv, final int nr,
85             final int maxNodeForCollectors) {
86         this.manager = Preconditions.checkNotNull(manager, "StatisticsManager can not be null!");
87         name = "odl-stat-collector-" + nr;
88         minReqNetInterval = minReqNetInterv;
89         final ThreadFactory threadFact = new ThreadFactoryBuilder()
90             .setNameFormat(name + "-thread-%d").build();
91         statNetCollectorServ = Executors.newSingleThreadExecutor(threadFact);
92         maxNodeForCollector = maxNodeForCollectors;
93         LOG.trace("StatCollector {} start successful!", name);
94     }
95
96     /**
97      * finish collecting statistics
98      */
99     @Override
100     public void close() {
101         statNodeHolder = Collections.<InstanceIdentifier<Node>, StatNodeInfoHolder> emptyMap();
102         finishing = true;
103         collectNextStatistics(actualTransactionId);
104         statNetCollectorServ.shutdown();
105     }
106
107     @Override
108     public boolean hasActiveNodes() {
109         return ( ! statNodeHolder.isEmpty());
110     }
111
112     @Override
113     public boolean isProvidedFlowNodeActive(
114             final InstanceIdentifier<Node> flowNode) {
115         return statNodeHolder.containsKey(flowNode);
116     }
117
118     @Override
119     public boolean connectedNodeRegistration(final InstanceIdentifier<Node> ident,
120             final List<StatCapabTypes> statTypes, final Short nrOfSwitchTables) {
121         if (isNodeIdentValidForUse(ident) && ! statNodeHolder.containsKey(ident)) {
122             synchronized (statNodeHolderLock) {
123                 final boolean startStatCollecting = statNodeHolder.size() == 0;
124                 if ( ! statNodeHolder.containsKey(ident)) {
125                     if (statNodeHolder.size() >= maxNodeForCollector) {
126                         return false;
127                     }
128                     final Map<InstanceIdentifier<Node>, StatNodeInfoHolder> statNode =
129                             new HashMap<>(statNodeHolder);
130                     final NodeRef nodeRef = new NodeRef(ident);
131                     final StatNodeInfoHolder nodeInfoHolder = new StatNodeInfoHolder(nodeRef,
132                             statTypes, nrOfSwitchTables);
133                     statNode.put(ident, nodeInfoHolder);
134                     statNodeHolder = Collections.unmodifiableMap(statNode);
135                 }
136                 if (startStatCollecting) {
137                     finishing = false;
138                     statNetCollectorServ.execute(this);
139                 }
140             }
141         }
142         return true;
143     }
144
145     @Override
146     public boolean disconnectedNodeUnregistration(final InstanceIdentifier<Node> ident) {
147         if (isNodeIdentValidForUse(ident) && statNodeHolder.containsKey(ident)) {
148             synchronized (statNodeHolderLock) {
149                 if (statNodeHolder.containsKey(ident)) {
150                     final Map<InstanceIdentifier<Node>, StatNodeInfoHolder> statNode =
151                             new HashMap<>(statNodeHolder);
152                     statNode.remove(ident);
153                     statNodeHolder = Collections.unmodifiableMap(statNode);
154                 }
155                 if (statNodeHolder.isEmpty()) {
156                     finishing = true;
157                     collectNextStatistics(actualTransactionId);
158                     statNetCollectorServ.shutdown();
159                 }
160                 return true;
161             }
162         }
163         return false;
164     }
165
166     @Override
167     public boolean registerAdditionalNodeFeature(final InstanceIdentifier<Node> ident,
168             final StatCapabTypes statCapab) {
169         if (isNodeIdentValidForUse(ident)) {
170             if ( ! statNodeHolder.containsKey(ident)) {
171                 return false;
172             }
173             final StatNodeInfoHolder statNode = statNodeHolder.get(ident);
174             if ( ! statNode.getStatMarkers().contains(statCapab)) {
175                 synchronized (statNodeHolderLock) {
176                     if ( ! statNode.getStatMarkers().contains(statCapab)) {
177                         final List<StatCapabTypes> statCapabForEdit = new ArrayList<>(statNode.getStatMarkers());
178                         statCapabForEdit.add(statCapab);
179                         final StatNodeInfoHolder nodeInfoHolder = new StatNodeInfoHolder(statNode.getNodeRef(),
180                                 Collections.unmodifiableList(statCapabForEdit), statNode.getMaxTables());
181
182                         final Map<InstanceIdentifier<Node>, StatNodeInfoHolder> statNodes =
183                                 new HashMap<>(statNodeHolder);
184                         statNodes.put(ident, nodeInfoHolder);
185                         statNodeHolder = Collections.unmodifiableMap(statNodes);
186                     }
187                 }
188             }
189         }
190         return true;
191     }
192
193     @Override
194     public void collectNextStatistics(final TransactionId xid) {
195         if (checkTransactionId(xid) && wakeMe) {
196             synchronized (statCollectorLock) {
197                 if (wakeMe) {
198                     LOG.trace("STAT-COLLECTOR is notified to conntinue");
199                     statCollectorLock.notify();
200                 }
201             }
202         }
203     }
204
205     @Override
206     public void run() {
207         try {
208             Thread.sleep(WAIT_BEFORE_COLLECTING_STATS);
209         }
210         catch (final InterruptedException e1) {
211             // NOOP
212         }
213         LOG.debug("StatCollector {} Start collecting!", name);
214          /* Neverending cyle - wait for finishing */
215          while ( ! finishing) {
216             boolean collecting = false;
217             final long startTime = System.currentTimeMillis();
218
219             if ( ! statNodeHolder.isEmpty()) {
220                 collecting = true;
221                 collectStatCrossNetwork();
222                 collecting = false;
223             }
224
225             if ( ! collecting) {
226                 final long statFinalTime = System.currentTimeMillis() - startTime;
227                 LOG.debug("STAT-MANAGER {}: last all NET statistics collection cost {} ms", name, statFinalTime);
228                 if (statFinalTime < minReqNetInterval) {
229                     LOG.trace("statCollector is about to make a collecting sleep");
230                     synchronized (statCollectorLock) {
231                         wakeMe = true;
232                         try {
233                             final long waitTime = minReqNetInterval - statFinalTime;
234                             statCollectorLock.wait(waitTime);
235                             LOG.trace("STAT-MANAGER : statCollector {} is waking up from a collecting sleep for {} ms", name, waitTime);
236                         } catch (final InterruptedException e) {
237                             LOG.warn("statCollector has been interrupted during collecting sleep", e);
238                         } finally {
239                             wakeMe = false;
240                         }
241                     }
242                 }
243             }
244         }
245     }
246
247     private void waitingForNotification() {
248         synchronized (statCollectorLock) {
249             wakeMe = true;
250             try {
251                 statCollectorLock.wait(STAT_COLLECT_TIME_OUT);
252                 LOG.trace("statCollector is waking up from a wait stat Response sleep");
253             } catch (final InterruptedException e) {
254                 LOG.warn("statCollector has been interrupted waiting stat Response sleep", e);
255             } finally {
256                 setActualTransactionId(null);
257                 wakeMe = false;
258             }
259         }
260     }
261
262
263     private void collectStatCrossNetwork() {
264         for (final Entry<InstanceIdentifier<Node>, StatNodeInfoHolder> nodeEntity : statNodeHolder.entrySet()) {
265             final NodeKey nodeKey = nodeEntity.getKey().firstKeyOf(Node.class);
266             if (!this.isThisInstanceNodeOwner(nodeKey.getId())) {
267                 continue;
268             }
269             LOG.trace("collectStatCrossNetwork: Controller is owner of the " +
270                     "node {}, so collecting the statistics.",nodeKey);
271
272             final List<StatCapabTypes> listNeededStat = nodeEntity.getValue().getStatMarkers();
273             final NodeRef actualNodeRef = nodeEntity.getValue().getNodeRef();
274             final Short maxTables = nodeEntity.getValue().getMaxTables();
275             for (final StatCapabTypes statMarker : listNeededStat) {
276                 if ( ! isProvidedFlowNodeActive(nodeEntity.getKey())) {
277                     break;
278                 }
279                 try {
280                     switch (statMarker) {
281                     case PORT_STATS:
282                         LOG.trace("STAT-MANAGER-collecting PORT-STATS for NodeRef {}", actualNodeRef);
283                         setActualTransactionId(manager.getRpcMsgManager().getAllPortsStat(actualNodeRef).get());
284                         waitingForNotification();
285                         break;
286                     case QUEUE_STATS:
287                         LOG.trace("STAT-MANAGER-collecting QUEUE-STATS for NodeRef {}", actualNodeRef);
288                         setActualTransactionId(manager.getRpcMsgManager().getAllQueueStat(actualNodeRef).get());
289                         waitingForNotification();
290                         break;
291                     case TABLE_STATS:
292                         LOG.trace("STAT-MANAGER-collecting TABLE-STATS for NodeRef {}", actualNodeRef);
293                         setActualTransactionId(manager.getRpcMsgManager().getAllTablesStat(actualNodeRef).get());
294                         waitingForNotification();
295                         break;
296                     case GROUP_STATS:
297                         LOG.trace("STAT-MANAGER-collecting GROUP-STATS for NodeRef {}", actualNodeRef);
298                         setActualTransactionId(manager.getRpcMsgManager().getAllGroupsConfStats(actualNodeRef).get());
299                         waitingForNotification();
300                         setActualTransactionId(manager.getRpcMsgManager().getAllGroupsStat(actualNodeRef).get());
301                         waitingForNotification();
302                         break;
303                     case METER_STATS:
304                         LOG.trace("STAT-MANAGER-collecting METER-STATS for NodeRef {}", actualNodeRef);
305                         setActualTransactionId(manager.getRpcMsgManager().getAllMeterConfigStat(actualNodeRef).get());
306                         waitingForNotification();
307                         setActualTransactionId(manager.getRpcMsgManager().getAllMetersStat(actualNodeRef).get());
308                         waitingForNotification();
309                         break;
310                     case FLOW_STATS:
311                         LOG.trace("STAT-MANAGER-collecting FLOW-STATS-ALL_FLOWS for NodeRef {}", actualNodeRef);
312                         setActualTransactionId(manager.getRpcMsgManager().getAllFlowsStat(actualNodeRef).get());
313                         waitingForNotification();
314                         /*LOG.trace("STAT-MANAGER-collecting FLOW-AGGREGATE-STATS for NodeRef {}", actualNodeRef);
315                         for (short i = 0; i < maxTables; i++) {
316                             final TableId tableId = new TableId(i);
317                             manager.getRpcMsgManager().getAggregateFlowStat(actualNodeRef, tableId);
318                         }*/
319                         break;
320                     default:
321                         /* Exception for programmers in implementation cycle */
322                         throw new IllegalStateException("Not implemented ASK for " + statMarker);
323                     }
324                 } catch (InterruptedException | ExecutionException ex) {
325                     LOG.warn("Unexpected RPC exception by call RPC Future!", ex);
326                     continue;
327                 }
328             }
329         }
330     }
331
332     private boolean isThisInstanceNodeOwner(NodeId nodeId) {
333         return manager.getNodeRegistrator().isFlowCapableNodeOwner(nodeId);
334     }
335
336     private class StatNodeInfoHolder {
337         private final NodeRef nodeRef;
338         private final List<StatCapabTypes> statMarkers;
339         private final Short maxTables;
340
341         public StatNodeInfoHolder(final NodeRef nodeRef,
342                 final List<StatCapabTypes> statMarkers, final Short maxTables) {
343             this.nodeRef = nodeRef;
344             this.maxTables = maxTables;
345             this.statMarkers = statMarkers;
346         }
347
348         public final NodeRef getNodeRef() {
349             return nodeRef;
350         }
351
352         public final List<StatCapabTypes> getStatMarkers() {
353             return statMarkers;
354         }
355
356         public final Short getMaxTables() {
357             return maxTables;
358         }
359     }
360
361     private boolean isNodeIdentValidForUse(final InstanceIdentifier<Node> ident) {
362         if (ident == null) {
363             LOG.warn("FlowCapableNode InstanceIdentifier {} can not be null!");
364             return false;
365         }
366         if (ident.isWildcarded()) {
367             LOG.warn("FlowCapableNode InstanceIdentifier {} can not be wildcarded!", ident);
368             return false;
369         }
370         return true;
371     }
372
373     private boolean checkTransactionId(final TransactionId xid) {
374         synchronized (transNotifyLock) {
375             return actualTransactionId != null && actualTransactionId.equals(xid);
376         }
377     }
378
379     private void setActualTransactionId(final TransactionId transactionId) {
380         synchronized (transNotifyLock) {
381             actualTransactionId = transactionId;
382         }
383     }
384 }
385