82e46be007ab37271728a7c2eb6bed12515bbbd4
[openflowplugin.git] / applications / statistics-manager / src / main / java / org / opendaylight / openflowplugin / applications / statistics / manager / impl / StatisticsManagerImpl.java
1 /**
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.statistics.manager.impl;
10
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.ThreadFactoryBuilder;
13 import java.util.ArrayList;
14 import java.util.Collections;
15 import java.util.List;
16 import java.util.Map;
17 import java.util.UUID;
18 import java.util.concurrent.BlockingQueue;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.LinkedBlockingDeque;
23 import java.util.concurrent.ThreadFactory;
24 import java.util.concurrent.atomic.AtomicInteger;
25 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
26 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
27 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
28 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
29 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
30 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
31 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
32 import org.opendaylight.openflowplugin.applications.statistics.manager.StatListeningCommiter;
33 import org.opendaylight.openflowplugin.applications.statistics.manager.StatNodeRegistration;
34 import org.opendaylight.openflowplugin.applications.statistics.manager.StatNotifyCommiter;
35 import org.opendaylight.openflowplugin.applications.statistics.manager.StatPermCollector;
36 import org.opendaylight.openflowplugin.applications.statistics.manager.StatPermCollector.StatCapabTypes;
37 import org.opendaylight.openflowplugin.applications.statistics.manager.StatRpcMsgManager;
38 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsListener;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionId;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsListener;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsListener;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsListener;
51 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 /**
56 * statistics-manager
57 * org.opendaylight.openflowplugin.applications.statistics.manager.impl
58 *
59 * StatisticsManagerImpl
60 * It represent a central point for whole module. Implementation
61 * {@link StatisticsManager} registers all Operation/DS {@link StatNotifyCommiter} and
62 * Config/DS {@link StatListeningCommiter}, as well as {@link StatPermCollector}
63 * for statistic collecting and {@link StatRpcMsgManager} as Device RPCs provider.
64 * In next, StatisticsManager provides all DS contact Transaction services.
65 *
66 * @author avishnoi@in.ibm.com <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
67 *
68 */
69 public class StatisticsManagerImpl implements StatisticsManager, Runnable {
70
71    private static final Logger LOG = LoggerFactory.getLogger(StatisticsManagerImpl.class);
72
73    private static final int QUEUE_DEPTH = 5000;
74    private static final int MAX_BATCH = 100;
75
76    private final BlockingQueue<StatDataStoreOperation> dataStoreOperQueue = new LinkedBlockingDeque<>(QUEUE_DEPTH);
77    private final Map<InstanceIdentifier<Node>, Pair<StatPermCollector, UUID>> nodeCollectorMap = new ConcurrentHashMap<>();
78    private AtomicInteger numNodesBeingCollected = new AtomicInteger(0);
79
80
81    private final DataBroker dataBroker;
82    private final ExecutorService statRpcMsgManagerExecutor;
83    private final ExecutorService statDataStoreOperationServ;
84    private StatRpcMsgManager rpcMsgManager;
85    private List<StatPermCollector> statCollectors;
86    private final Object statCollectorLock = new Object();
87    private BindingTransactionChain txChain;
88    private volatile boolean finishing = false;
89
90    private StatNodeRegistration nodeRegistrator;
91    private StatListeningCommiter<Flow, OpendaylightFlowStatisticsListener> flowListeningCommiter;
92    private StatListeningCommiter<Meter, OpendaylightMeterStatisticsListener> meterListeningCommiter;
93    private StatListeningCommiter<Group, OpendaylightGroupStatisticsListener> groupListeningCommiter;
94    private StatListeningCommiter<Queue, OpendaylightQueueStatisticsListener> queueNotifyCommiter;
95    private StatNotifyCommiter<OpendaylightFlowTableStatisticsListener> tableNotifCommiter;
96    private StatNotifyCommiter<OpendaylightPortStatisticsListener> portNotifyCommiter;
97
98    private final StatisticsManagerConfig statManagerConfig;
99
100    public StatisticsManagerImpl (final DataBroker dataBroker, final StatisticsManagerConfig statManagerconfig) {
101        statManagerConfig = Preconditions.checkNotNull(statManagerconfig);
102        this.dataBroker = Preconditions.checkNotNull(dataBroker, "DataBroker can not be null!");
103        ThreadFactory threadFact;
104        threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-rpc-oper-thread-%d").build();
105        statRpcMsgManagerExecutor = Executors.newSingleThreadExecutor(threadFact);
106        threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-ds-oper-thread-%d").build();
107        statDataStoreOperationServ = Executors.newSingleThreadExecutor(threadFact);
108        txChain =  dataBroker.createTransactionChain(this);
109    }
110
111    @Override
112    public void start(final NotificationProviderService notifService,
113            final RpcConsumerRegistry rpcRegistry) {
114        Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
115        rpcMsgManager = new StatRpcMsgManagerImpl(this, rpcRegistry, statManagerConfig.getMaxNodesForCollector());
116        statCollectors = Collections.emptyList();
117        nodeRegistrator = new StatNodeRegistrationImpl(this, dataBroker, notifService);
118        flowListeningCommiter = new StatListenCommitFlow(this, dataBroker, notifService);
119        meterListeningCommiter = new StatListenCommitMeter(this, dataBroker, notifService);
120        groupListeningCommiter = new StatListenCommitGroup(this, dataBroker, notifService);
121        tableNotifCommiter = new StatNotifyCommitTable(this, notifService);
122        portNotifyCommiter = new StatNotifyCommitPort(this, notifService);
123        queueNotifyCommiter = new StatListenCommitQueue(this, dataBroker, notifService);
124
125        statRpcMsgManagerExecutor.execute(rpcMsgManager);
126        statDataStoreOperationServ.execute(this);
127        LOG.info("Statistics Manager started successfully!");
128    }
129
130    private <T extends AutoCloseable> T close(final T closeable) throws Exception {
131        if (closeable != null) {
132            closeable.close();
133        }
134        return null;
135    }
136
137    @Override
138    public void close() throws Exception {
139        LOG.info("StatisticsManager close called");
140        finishing = true;
141        nodeRegistrator = close(nodeRegistrator);
142        flowListeningCommiter = close(flowListeningCommiter);
143        meterListeningCommiter = close(meterListeningCommiter);
144        groupListeningCommiter = close(groupListeningCommiter);
145        tableNotifCommiter = close(tableNotifCommiter);
146        portNotifyCommiter = close(portNotifyCommiter);
147        queueNotifyCommiter = close(queueNotifyCommiter);
148        if (statCollectors != null) {
149            for (StatPermCollector collector : statCollectors) {
150                collector = close(collector);
151            }
152            statCollectors = null;
153        }
154        rpcMsgManager = close(rpcMsgManager);
155        statRpcMsgManagerExecutor.shutdown();
156        statDataStoreOperationServ.shutdown();
157        txChain = close(txChain);
158    }
159
160    @Override
161    public void enqueue(final StatDataStoreOperation op) {
162        // we don't need to block anything - next statistics come soon
163        final boolean success = dataStoreOperQueue.offer(op);
164        if ( ! success) {
165            LOG.debug("Stat DS/Operational submiter Queue is full!");
166        }
167    }
168
169    @Override
170    public void run() {
171        /* Neverending cyle - wait for finishing */
172        while ( ! finishing) {
173            try {
174                StatDataStoreOperation op = dataStoreOperQueue.take();
175                final ReadWriteTransaction tx = txChain.newReadWriteTransaction();
176                LOG.trace("New operations available, starting transaction {}", tx.getIdentifier());
177
178                int ops = 0;
179                do {
180                    Pair<StatPermCollector, UUID> statPermCollectorUUIDPair = nodeCollectorMap.get(op.getNodeIdentifier());
181                    if (statPermCollectorUUIDPair != null && statPermCollectorUUIDPair.getRight().equals(op.getNodeUUID())) {
182                        // dont apply operations for nodes which have been disconnected or if there uuids do not match
183                        // this can happen if operations are queued and node is removed.
184                        // if the uuids dont match, it means that the stat operation are stale and belong to the same node
185                        // which got disconnected and connected again.
186                        op.applyOperation(tx);
187                        ops++;
188                    } else {
189                        LOG.debug("{} not found or UUID mismatch for statistics datastore operation", op.getNodeIdentifier());
190                    }
191
192                    if (ops < MAX_BATCH) {
193                        op = dataStoreOperQueue.poll();
194                    } else {
195                        op = null;
196                    }
197                } while (op != null);
198
199                LOG.trace("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier());
200
201                tx.submit().checkedGet();
202            } catch (final InterruptedException e) {
203                LOG.warn("Stat Manager DS Operation thread interupted!", e);
204                finishing = true;
205            } catch (final Exception e) {
206                LOG.warn("Unhandled exception during processing statistics. Restarting transaction chain.", e);
207                txChain.close();
208                txChain = dataBroker.createTransactionChain(StatisticsManagerImpl.this);
209                cleanDataStoreOperQueue();
210            }
211        }
212        // Drain all events, making sure any blocked threads are unblocked
213        cleanDataStoreOperQueue();
214    }
215
216    private synchronized void cleanDataStoreOperQueue() {
217        // Drain all events, making sure any blocked threads are unblocked
218        while (! dataStoreOperQueue.isEmpty()) {
219            dataStoreOperQueue.poll();
220        }
221    }
222
223    @Override
224    public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
225            final Throwable cause) {
226        LOG.warn("Failed to export Flow Capable Statistics, Transaction {} failed.",transaction.getIdentifier(),cause);
227    }
228
229    @Override
230    public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
231        // NOOP
232    }
233
234    @Override
235    public boolean isProvidedFlowNodeActive(final InstanceIdentifier<Node> nodeIdent) {
236        for (final StatPermCollector collector : statCollectors) {
237            if (collector.isProvidedFlowNodeActive(nodeIdent)) {
238                return true;
239            }
240        }
241        return false;
242    }
243
244    @Override
245    public void collectNextStatistics(final InstanceIdentifier<Node> nodeIdent, final TransactionId xid) {
246        for (final StatPermCollector collector : statCollectors) {
247            if (collector.isProvidedFlowNodeActive(nodeIdent)) {
248                collector.collectNextStatistics(xid);
249            }
250        }
251    }
252
253     @Override
254     public void connectedNodeRegistration(final InstanceIdentifier<Node> nodeIdent,
255             final List<StatCapabTypes> statTypes, final Short nrOfSwitchTables) {
256
257
258         Pair<StatPermCollector, UUID> collectorUUIDPair = nodeCollectorMap.get(nodeIdent);
259         if (collectorUUIDPair == null) {
260             // no collector contains this node,
261             // check if one of the collectors can accommodate it
262             // if no then add a new collector
263
264             synchronized(statCollectorLock) {
265                 for (int i = statCollectors.size() - 1; i >= 0; i--) {
266                     // start from back of the list as most likely previous ones might be full
267                     final StatPermCollector aCollector = statCollectors.get(i);
268                     if (aCollector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables)) {
269                         // if the collector returns true after adding node, then return
270                         nodeCollectorMap.put(nodeIdent, new Pair(aCollector, UUID.randomUUID()));
271                         LOG.debug("NodeAdded: Num Nodes Registered with StatisticsManager:{}",
272                                 numNodesBeingCollected.incrementAndGet());
273                         return;
274                     }
275                 }
276                 // no collector was able to add this node
277                 LOG.info("No existing collector found for new node. Creating a new collector for {}", nodeIdent);
278                 final StatPermCollectorImpl newCollector = new StatPermCollectorImpl(this,
279                         statManagerConfig.getMinRequestNetMonitorInterval(), statCollectors.size() + 1,
280                         statManagerConfig.getMaxNodesForCollector());
281
282                 final List<StatPermCollector> statCollectorsNew = new ArrayList<>(statCollectors);
283                 statCollectorsNew.add(newCollector);
284                 statCollectors = Collections.unmodifiableList(statCollectorsNew);
285                 nodeCollectorMap.put(nodeIdent, new Pair(newCollector, UUID.randomUUID()));
286                 LOG.debug("NodeAdded: Num Nodes Registered with StatisticsManager:{}", numNodesBeingCollected.incrementAndGet());
287
288                 newCollector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables);
289             }
290
291
292         } else {
293             // add to the collector, even if it rejects it.
294             collectorUUIDPair.getLeft().connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables);
295         }
296     }
297
298
299     @Override
300     public void disconnectedNodeUnregistration(final InstanceIdentifier<Node> nodeIdent) {
301         flowListeningCommiter.cleanForDisconnect(nodeIdent);
302
303         Pair<StatPermCollector, UUID> collectorUUIDPair = nodeCollectorMap.get(nodeIdent);
304         if (collectorUUIDPair != null) {
305             StatPermCollector collector = collectorUUIDPair.getLeft();
306             if (collector != null) {
307                 nodeCollectorMap.remove(nodeIdent);
308                 LOG.debug("NodeRemoved: Num Nodes Registered with StatisticsManager:{}", numNodesBeingCollected.decrementAndGet());
309
310                 if (collector.disconnectedNodeUnregistration(nodeIdent)) {
311                     if (!collector.hasActiveNodes()) {
312                         synchronized (statCollectorLock) {
313                             if (collector.hasActiveNodes()) {
314                                 return;
315                             }
316                             final List<StatPermCollector> newStatColl = new ArrayList<>(statCollectors);
317                             newStatColl.remove(collector);
318                             statCollectors = Collections.unmodifiableList(newStatColl);
319                         }
320                     }
321                     LOG.info("Node:{} successfully removed by StatisticsManager ", nodeIdent);
322                 } else {
323                     LOG.error("Collector not disconnecting for node, no operations will be committed for this node:{}", nodeIdent);
324                 }
325             } else {
326                 LOG.error("Unexpected error, collector not found in collectorUUIDPair for node:{}, UUID:{}", nodeIdent, collectorUUIDPair.getRight());
327             }
328
329         } else {
330             LOG.error("Received node removed for {}, but unable to find it in nodeCollectorMap", nodeIdent);
331         }
332     }
333
334    @Override
335    public void registerAdditionalNodeFeature(final InstanceIdentifier<Node> nodeIdent,
336            final StatCapabTypes statCapab) {
337        for (final StatPermCollector collector : statCollectors) {
338            if (collector.registerAdditionalNodeFeature(nodeIdent, statCapab)) {
339                return;
340            }
341        }
342        LOG.debug("Node {} has not been extended for feature {}!", nodeIdent, statCapab);
343    }
344
345    /* Getter internal Statistic Manager Job Classes */
346    @Override
347    public StatRpcMsgManager getRpcMsgManager() {
348        return rpcMsgManager;
349    }
350
351    @Override
352    public StatNodeRegistration getNodeRegistrator() {
353        return nodeRegistrator;
354    }
355
356    @Override
357    public StatListeningCommiter<Flow, OpendaylightFlowStatisticsListener> getFlowListenComit() {
358        return flowListeningCommiter;
359    }
360
361    @Override
362    public StatListeningCommiter<Meter, OpendaylightMeterStatisticsListener> getMeterListenCommit() {
363        return meterListeningCommiter;
364    }
365
366    @Override
367    public StatListeningCommiter<Group, OpendaylightGroupStatisticsListener> getGroupListenCommit() {
368        return groupListeningCommiter;
369    }
370
371    @Override
372    public StatListeningCommiter<Queue, OpendaylightQueueStatisticsListener> getQueueNotifyCommit() {
373        return queueNotifyCommiter;
374    }
375
376
377    @Override
378    public StatNotifyCommiter<OpendaylightFlowTableStatisticsListener> getTableNotifCommit() {
379        return tableNotifCommiter;
380    }
381
382    @Override
383    public StatNotifyCommiter<OpendaylightPortStatisticsListener> getPortNotifyCommit() {
384        return portNotifyCommiter;
385    }
386
387     @Override
388     public StatisticsManagerConfig getConfiguration() {
389         return statManagerConfig;
390     }
391
392     @Override
393     public UUID getGeneratedUUIDForNode(InstanceIdentifier<Node> nodeInstanceIdentifier) {
394         Pair<StatPermCollector, UUID> permCollectorUUIDPair = nodeCollectorMap.get(nodeInstanceIdentifier);
395         if (permCollectorUUIDPair != null) {
396             return permCollectorUUIDPair.getRight();
397         }
398         // we dont want to mark operations with null uuid and get NPEs later. So mark them with invalid ones
399         return UUID.fromString("invalid-uuid");
400     }
401 }
402