afca1d2ed5eeabe95a92a5bb8289146d8560a8a7
[openflowplugin.git] / applications / statistics-manager / src / main / java / org / opendaylight / openflowplugin / applications / statistics / manager / impl / StatisticsManagerImpl.java
1 /**
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.statistics.manager.impl;
10
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.ThreadFactoryBuilder;
13 import java.util.ArrayList;
14 import java.util.Collections;
15 import java.util.List;
16 import java.util.Map;
17 import java.util.UUID;
18 import java.util.concurrent.BlockingQueue;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.LinkedBlockingDeque;
23 import java.util.concurrent.ThreadFactory;
24 import java.util.concurrent.atomic.AtomicInteger;
25 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
26 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
27 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
28 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
29 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
30 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
31 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
32 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
33 import org.opendaylight.openflowplugin.applications.statistics.manager.StatListeningCommiter;
34 import org.opendaylight.openflowplugin.applications.statistics.manager.StatNodeRegistration;
35 import org.opendaylight.openflowplugin.applications.statistics.manager.StatNotifyCommiter;
36 import org.opendaylight.openflowplugin.applications.statistics.manager.StatPermCollector;
37 import org.opendaylight.openflowplugin.applications.statistics.manager.StatPermCollector.StatCapabTypes;
38 import org.opendaylight.openflowplugin.applications.statistics.manager.StatRpcMsgManager;
39 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsListener;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionId;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsListener;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsListener;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsListener;
52 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 /**
57 * statistics-manager
58 * org.opendaylight.openflowplugin.applications.statistics.manager.impl
59 *
60 * StatisticsManagerImpl
61 * It represent a central point for whole module. Implementation
62 * {@link StatisticsManager} registers all Operation/DS {@link StatNotifyCommiter} and
63 * Config/DS {@link StatListeningCommiter}, as well as {@link StatPermCollector}
64 * for statistic collecting and {@link StatRpcMsgManager} as Device RPCs provider.
65 * In next, StatisticsManager provides all DS contact Transaction services.
66 *
67 * @author avishnoi@in.ibm.com <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
68 *
69 */
70 public class StatisticsManagerImpl implements StatisticsManager, Runnable {
71
72    private static final Logger LOG = LoggerFactory.getLogger(StatisticsManagerImpl.class);
73
74    private static final int QUEUE_DEPTH = 5000;
75    private static final int MAX_BATCH = 100;
76
77    private final BlockingQueue<StatDataStoreOperation> dataStoreOperQueue = new LinkedBlockingDeque<>(QUEUE_DEPTH);
78    private final Map<InstanceIdentifier<Node>, Pair<StatPermCollector, UUID>> nodeCollectorMap = new ConcurrentHashMap<>();
79    private AtomicInteger numNodesBeingCollected = new AtomicInteger(0);
80
81
82     private final DataBroker dataBroker;
83    private final ExecutorService statRpcMsgManagerExecutor;
84    private final ExecutorService statDataStoreOperationServ;
85    private EntityOwnershipService ownershipService;
86    private StatRpcMsgManager rpcMsgManager;
87    private List<StatPermCollector> statCollectors;
88    private final Object statCollectorLock = new Object();
89    private BindingTransactionChain txChain;
90    private volatile boolean finishing = false;
91
92    private StatNodeRegistration nodeRegistrator;
93    private StatListeningCommiter<Flow, OpendaylightFlowStatisticsListener> flowListeningCommiter;
94    private StatListeningCommiter<Meter, OpendaylightMeterStatisticsListener> meterListeningCommiter;
95    private StatListeningCommiter<Group, OpendaylightGroupStatisticsListener> groupListeningCommiter;
96    private StatListeningCommiter<Queue, OpendaylightQueueStatisticsListener> queueNotifyCommiter;
97    private StatNotifyCommiter<OpendaylightFlowTableStatisticsListener> tableNotifCommiter;
98    private StatNotifyCommiter<OpendaylightPortStatisticsListener> portNotifyCommiter;
99
100    private final StatisticsManagerConfig statManagerConfig;
101
102    public StatisticsManagerImpl (final DataBroker dataBroker, final StatisticsManagerConfig statManagerconfig) {
103        statManagerConfig = Preconditions.checkNotNull(statManagerconfig);
104        this.dataBroker = Preconditions.checkNotNull(dataBroker, "DataBroker can not be null!");
105        ThreadFactory threadFact;
106        threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-rpc-oper-thread-%d").build();
107        statRpcMsgManagerExecutor = Executors.newSingleThreadExecutor(threadFact);
108        threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-ds-oper-thread-%d").build();
109        statDataStoreOperationServ = Executors.newSingleThreadExecutor(threadFact);
110        txChain =  dataBroker.createTransactionChain(this);
111    }
112
113    @Override
114    public void start(final NotificationProviderService notifService,
115            final RpcConsumerRegistry rpcRegistry) {
116        Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
117        rpcMsgManager = new StatRpcMsgManagerImpl(this, rpcRegistry, statManagerConfig.getMaxNodesForCollector());
118        statCollectors = Collections.emptyList();
119        nodeRegistrator = new StatNodeRegistrationImpl(this, dataBroker, notifService);
120        flowListeningCommiter = new StatListenCommitFlow(this, dataBroker, notifService, nodeRegistrator);
121        meterListeningCommiter = new StatListenCommitMeter(this, dataBroker, notifService, nodeRegistrator);
122        groupListeningCommiter = new StatListenCommitGroup(this, dataBroker, notifService, nodeRegistrator);
123        tableNotifCommiter = new StatNotifyCommitTable(this, notifService, nodeRegistrator);
124        portNotifyCommiter = new StatNotifyCommitPort(this, notifService, nodeRegistrator);
125        queueNotifyCommiter = new StatListenCommitQueue(this, dataBroker, notifService, nodeRegistrator);
126
127        statRpcMsgManagerExecutor.execute(rpcMsgManager);
128        statDataStoreOperationServ.execute(this);
129        LOG.info("Statistics Manager started successfully!");
130    }
131
132    private <T extends AutoCloseable> T close(final T closeable) throws Exception {
133        if (closeable != null) {
134            closeable.close();
135        }
136        return null;
137    }
138
139    @Override
140    public void close() throws Exception {
141        LOG.info("StatisticsManager close called");
142        finishing = true;
143        nodeRegistrator = close(nodeRegistrator);
144        flowListeningCommiter = close(flowListeningCommiter);
145        meterListeningCommiter = close(meterListeningCommiter);
146        groupListeningCommiter = close(groupListeningCommiter);
147        tableNotifCommiter = close(tableNotifCommiter);
148        portNotifyCommiter = close(portNotifyCommiter);
149        queueNotifyCommiter = close(queueNotifyCommiter);
150        if (statCollectors != null) {
151            for (StatPermCollector collector : statCollectors) {
152                collector = close(collector);
153            }
154            statCollectors = null;
155        }
156        rpcMsgManager = close(rpcMsgManager);
157        statRpcMsgManagerExecutor.shutdown();
158        statDataStoreOperationServ.shutdown();
159        txChain = close(txChain);
160    }
161
162    @Override
163    public void enqueue(final StatDataStoreOperation op) {
164        // we don't need to block anything - next statistics come soon
165        final boolean success = dataStoreOperQueue.offer(op);
166        if ( ! success) {
167            LOG.debug("Stat DS/Operational submitter Queue is full!");
168        }
169    }
170
171    @Override
172    public void run() {
173        /* Neverending cyle - wait for finishing */
174        while ( ! finishing) {
175            StatDataStoreOperation op = null;
176            try {
177                op = dataStoreOperQueue.take();
178                final ReadWriteTransaction tx = txChain.newReadWriteTransaction();
179                LOG.trace("New operations available, starting transaction {}", tx.getIdentifier());
180
181                int ops = 0;
182                do {
183                    Pair<StatPermCollector, UUID> statPermCollectorUUIDPair = nodeCollectorMap.get(op.getNodeIdentifier());
184                    if (statPermCollectorUUIDPair != null && statPermCollectorUUIDPair.getRight().equals(op.getNodeUUID())) {
185                        // dont apply operations for nodes which have been disconnected or if there uuids do not match
186                        // this can happen if operations are queued and node is removed.
187                        // if the uuids dont match, it means that the stat operation are stale and belong to the same node
188                        // which got disconnected and connected again.
189                        op.applyOperation(tx);
190                        ops++;
191                    } else {
192                        LOG.debug("{} not found or UUID mismatch for statistics datastore operation", op.getNodeIdentifier());
193                    }
194
195                    if (ops < MAX_BATCH) {
196                        op = dataStoreOperQueue.poll();
197                    } else {
198                        op = null;
199                    }
200                } while (op != null);
201
202                LOG.trace("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier());
203
204                tx.submit().checkedGet();
205            } catch (final InterruptedException e) {
206                LOG.warn("Stat Manager DS Operation thread interrupted, while " +
207                        "waiting for StatDataStore Operation task!", e);
208                finishing = true;
209            } catch (final Exception e) {
210                LOG.warn("Unhandled exception during processing statistics for {}. " +
211                        "Restarting transaction chain.",op != null?op.getNodeId().getValue():"",e);
212                txChain.close();
213                txChain = dataBroker.createTransactionChain(StatisticsManagerImpl.this);
214                cleanDataStoreOperQueue();
215            }
216        }
217        // Drain all events, making sure any blocked threads are unblocked
218        cleanDataStoreOperQueue();
219    }
220
221    private synchronized void cleanDataStoreOperQueue() {
222        // Drain all events, making sure any blocked threads are unblocked
223        while (! dataStoreOperQueue.isEmpty()) {
224            dataStoreOperQueue.poll();
225        }
226    }
227
228    @Override
229    public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
230            final Throwable cause) {
231        LOG.warn("Failed to export Flow Capable Statistics, Transaction {} failed.",transaction.getIdentifier(),cause);
232    }
233
234    @Override
235    public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
236        // NOOP
237    }
238
239    @Override
240    public boolean isProvidedFlowNodeActive(final InstanceIdentifier<Node> nodeIdent) {
241        for (final StatPermCollector collector : statCollectors) {
242            if (collector.isProvidedFlowNodeActive(nodeIdent)) {
243                return true;
244            }
245        }
246        return false;
247    }
248
249    @Override
250    public void collectNextStatistics(final InstanceIdentifier<Node> nodeIdent, final TransactionId xid) {
251        for (final StatPermCollector collector : statCollectors) {
252            if (collector.isProvidedFlowNodeActive(nodeIdent)) {
253                collector.collectNextStatistics(xid);
254            }
255        }
256    }
257
258     @Override
259     public void connectedNodeRegistration(final InstanceIdentifier<Node> nodeIdent,
260             final List<StatCapabTypes> statTypes, final Short nrOfSwitchTables) {
261
262
263         Pair<StatPermCollector, UUID> collectorUUIDPair = nodeCollectorMap.get(nodeIdent);
264         if (collectorUUIDPair == null) {
265             // no collector contains this node,
266             // check if one of the collectors can accommodate it
267             // if no then add a new collector
268
269             synchronized(statCollectorLock) {
270                 for (int i = statCollectors.size() - 1; i >= 0; i--) {
271                     // start from back of the list as most likely previous ones might be full
272                     final StatPermCollector aCollector = statCollectors.get(i);
273                     if (aCollector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables)) {
274                         // if the collector returns true after adding node, then return
275                         nodeCollectorMap.put(nodeIdent, new Pair(aCollector, UUID.randomUUID()));
276                         LOG.debug("NodeAdded: Num Nodes Registered with StatisticsManager:{}",
277                                 numNodesBeingCollected.incrementAndGet());
278                         return;
279                     }
280                 }
281                 // no collector was able to add this node
282                 LOG.info("No existing collector found for new node. Creating a new collector for {}", nodeIdent);
283                 final StatPermCollectorImpl newCollector = new StatPermCollectorImpl(this,
284                         statManagerConfig.getMinRequestNetMonitorInterval(), statCollectors.size() + 1,
285                         statManagerConfig.getMaxNodesForCollector());
286
287                 final List<StatPermCollector> statCollectorsNew = new ArrayList<>(statCollectors);
288                 statCollectorsNew.add(newCollector);
289                 statCollectors = Collections.unmodifiableList(statCollectorsNew);
290                 nodeCollectorMap.put(nodeIdent, new Pair(newCollector, UUID.randomUUID()));
291                 LOG.debug("NodeAdded: Num Nodes Registered with StatisticsManager:{}", numNodesBeingCollected.incrementAndGet());
292
293                 newCollector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables);
294             }
295
296
297         } else {
298             // add to the collector, even if it rejects it.
299             collectorUUIDPair.getLeft().connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables);
300         }
301     }
302
303
304     @Override
305     public void disconnectedNodeUnregistration(final InstanceIdentifier<Node> nodeIdent) {
306         flowListeningCommiter.cleanForDisconnect(nodeIdent);
307
308         Pair<StatPermCollector, UUID> collectorUUIDPair = nodeCollectorMap.get(nodeIdent);
309         if (collectorUUIDPair != null) {
310             StatPermCollector collector = collectorUUIDPair.getLeft();
311             if (collector != null) {
312                 nodeCollectorMap.remove(nodeIdent);
313                 LOG.debug("NodeRemoved: Num Nodes Registered with StatisticsManager:{}", numNodesBeingCollected.decrementAndGet());
314
315                 if (collector.disconnectedNodeUnregistration(nodeIdent)) {
316                     if (!collector.hasActiveNodes()) {
317                         synchronized (statCollectorLock) {
318                             if (collector.hasActiveNodes()) {
319                                 return;
320                             }
321                             final List<StatPermCollector> newStatColl = new ArrayList<>(statCollectors);
322                             newStatColl.remove(collector);
323                             statCollectors = Collections.unmodifiableList(newStatColl);
324                         }
325                     }
326                     LOG.info("Node:{} successfully removed by StatisticsManager ", nodeIdent);
327                 } else {
328                     LOG.error("Collector not disconnecting for node, no operations will be committed for this node:{}", nodeIdent);
329                 }
330             } else {
331                 LOG.error("Unexpected error, collector not found in collectorUUIDPair for node:{}, UUID:{}", nodeIdent, collectorUUIDPair.getRight());
332             }
333
334         } else {
335             LOG.error("Received node removed for {}, but unable to find it in nodeCollectorMap", nodeIdent);
336         }
337     }
338
339    @Override
340    public void registerAdditionalNodeFeature(final InstanceIdentifier<Node> nodeIdent,
341            final StatCapabTypes statCapab) {
342        for (final StatPermCollector collector : statCollectors) {
343            if (collector.registerAdditionalNodeFeature(nodeIdent, statCapab)) {
344                return;
345            }
346        }
347        LOG.debug("Node {} has not been extended for feature {}!", nodeIdent, statCapab);
348    }
349
350     @Override
351     public void unregisterNodeStats(final InstanceIdentifier<Node> nodeIdent,
352                                               final StatCapabTypes statCapab) {
353         for (final StatPermCollector collector : statCollectors) {
354             if (collector.unregisterNodeStats(nodeIdent, statCapab)) {
355                 return;
356             }
357         }
358         LOG.debug("Stats type {} is not removed from the node {}!", statCapab,nodeIdent );
359     }
360
361    /* Getter internal Statistic Manager Job Classes */
362    @Override
363    public StatRpcMsgManager getRpcMsgManager() {
364        return rpcMsgManager;
365    }
366
367    @Override
368    public StatNodeRegistration getNodeRegistrator() {
369        return nodeRegistrator;
370    }
371
372    @Override
373    public StatListeningCommiter<Flow, OpendaylightFlowStatisticsListener> getFlowListenComit() {
374        return flowListeningCommiter;
375    }
376
377    @Override
378    public StatListeningCommiter<Meter, OpendaylightMeterStatisticsListener> getMeterListenCommit() {
379        return meterListeningCommiter;
380    }
381
382    @Override
383    public StatListeningCommiter<Group, OpendaylightGroupStatisticsListener> getGroupListenCommit() {
384        return groupListeningCommiter;
385    }
386
387    @Override
388    public StatListeningCommiter<Queue, OpendaylightQueueStatisticsListener> getQueueNotifyCommit() {
389        return queueNotifyCommiter;
390    }
391
392
393    @Override
394    public StatNotifyCommiter<OpendaylightFlowTableStatisticsListener> getTableNotifCommit() {
395        return tableNotifCommiter;
396    }
397
398    @Override
399    public StatNotifyCommiter<OpendaylightPortStatisticsListener> getPortNotifyCommit() {
400        return portNotifyCommiter;
401    }
402
403     @Override
404     public StatisticsManagerConfig getConfiguration() {
405         return statManagerConfig;
406     }
407
408     @Override
409     public UUID getGeneratedUUIDForNode(InstanceIdentifier<Node> nodeInstanceIdentifier) {
410         Pair<StatPermCollector, UUID> permCollectorUUIDPair = nodeCollectorMap.get(nodeInstanceIdentifier);
411         if (permCollectorUUIDPair != null) {
412             return permCollectorUUIDPair.getRight();
413         }
414         // we dont want to mark operations with null uuid and get NPEs later. So mark them with invalid ones
415         return UUID.fromString("invalid-uuid");
416     }
417
418     @Override
419     public void setOwnershipService(EntityOwnershipService ownershipService) {
420         this.ownershipService = ownershipService;
421     }
422
423     @Override
424     public EntityOwnershipService getOwnershipService() {
425         return this.ownershipService;
426     }
427
428 }
429