BUG-2637: migration final - cleanup
[controller.git] / opendaylight / md-sal / statistics-manager / src / main / java / org / opendaylight / controller / md / statistics / manager / impl / StatisticsManagerImpl.java
diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatisticsManagerImpl.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/impl/StatisticsManagerImpl.java
deleted file mode 100644 (file)
index 437c92f..0000000
+++ /dev/null
@@ -1,377 +0,0 @@
-/**
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.md.statistics.manager.impl;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.ThreadFactory;
-
-import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
-import org.opendaylight.controller.md.statistics.manager.StatListeningCommiter;
-import org.opendaylight.controller.md.statistics.manager.StatNodeRegistration;
-import org.opendaylight.controller.md.statistics.manager.StatNotifyCommiter;
-import org.opendaylight.controller.md.statistics.manager.StatPermCollector;
-import org.opendaylight.controller.md.statistics.manager.StatPermCollector.StatCapabTypes;
-import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager;
-import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
-import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
-import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
-import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.OpendaylightFlowTableStatisticsListener;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsListener;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsListener;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.OpendaylightQueueStatisticsListener;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-/**
-* statistics-manager
-* org.opendaylight.controller.md.statistics.manager.impl
-*
-* StatisticsManagerImpl
-* It represent a central point for whole module. Implementation
-* {@link StatisticsManager} registers all Operation/DS {@link StatNotifyCommiter} and
-* Config/DS {@StatListeningCommiter}, as well as {@link StatPermCollector}
-* for statistic collecting and {@link StatRpcMsgManager} as Device RPCs provider.
-* In next, StatisticsManager provides all DS contact Transaction services.
-*
-* @author avishnoi@in.ibm.com <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
-*
-*/
-public class StatisticsManagerImpl implements StatisticsManager, Runnable {
-
-   private final static Logger LOG = LoggerFactory.getLogger(StatisticsManagerImpl.class);
-
-   private static final int QUEUE_DEPTH = 5000;
-   private static final int MAX_BATCH = 100;
-
-   private final BlockingQueue<StatDataStoreOperation> dataStoreOperQueue = new LinkedBlockingDeque<>(QUEUE_DEPTH);
-
-   private final DataBroker dataBroker;
-   private final ExecutorService statRpcMsgManagerExecutor;
-   private final ExecutorService statDataStoreOperationServ;
-   private StatRpcMsgManager rpcMsgManager;
-   private List<StatPermCollector> statCollectors;
-   private final Object statCollectorLock = new Object();
-   private BindingTransactionChain txChain;
-   private volatile boolean finishing = false;
-
-   private StatNodeRegistration nodeRegistrator;
-   private StatListeningCommiter<Flow, OpendaylightFlowStatisticsListener> flowListeningCommiter;
-   private StatListeningCommiter<Meter, OpendaylightMeterStatisticsListener> meterListeningCommiter;
-   private StatListeningCommiter<Group, OpendaylightGroupStatisticsListener> groupListeningCommiter;
-   private StatListeningCommiter<Queue, OpendaylightQueueStatisticsListener> queueNotifyCommiter;
-   private StatNotifyCommiter<OpendaylightFlowTableStatisticsListener> tableNotifCommiter;
-   private StatNotifyCommiter<OpendaylightPortStatisticsListener> portNotifyCommiter;
-
-   private final StatisticsManagerConfig statManagerConfig;
-
-   public StatisticsManagerImpl (final DataBroker dataBroker, final StatisticsManagerConfig statManagerconfig) {
-       statManagerConfig = Preconditions.checkNotNull(statManagerconfig);
-       this.dataBroker = Preconditions.checkNotNull(dataBroker, "DataBroker can not be null!");
-       ThreadFactory threadFact;
-       threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-rpc-oper-thread-%d").build();
-       statRpcMsgManagerExecutor = Executors.newSingleThreadExecutor(threadFact);
-       threadFact = new ThreadFactoryBuilder().setNameFormat("odl-stat-ds-oper-thread-%d").build();
-       statDataStoreOperationServ = Executors.newSingleThreadExecutor(threadFact);
-       txChain =  dataBroker.createTransactionChain(this);
-   }
-
-   @Override
-   public void start(final NotificationProviderService notifService,
-           final RpcConsumerRegistry rpcRegistry) {
-       Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
-       rpcMsgManager = new StatRpcMsgManagerImpl(this, rpcRegistry, statManagerConfig.getMaxNodesForCollector());
-       statCollectors = Collections.emptyList();
-       nodeRegistrator = new StatNodeRegistrationImpl(this, dataBroker, notifService);
-       flowListeningCommiter = new StatListenCommitFlow(this, dataBroker, notifService);
-       meterListeningCommiter = new StatListenCommitMeter(this, dataBroker, notifService);
-       groupListeningCommiter = new StatListenCommitGroup(this, dataBroker, notifService);
-       tableNotifCommiter = new StatNotifyCommitTable(this, notifService);
-       portNotifyCommiter = new StatNotifyCommitPort(this, notifService);
-       queueNotifyCommiter = new StatListenCommitQueue(this, dataBroker, notifService);
-
-       statRpcMsgManagerExecutor.execute(rpcMsgManager);
-       statDataStoreOperationServ.execute(this);
-       LOG.info("Statistics Manager started successfully!");
-   }
-
-   @Override
-   public void close() throws Exception {
-       LOG.info("StatisticsManager close called");
-       finishing = true;
-       if (nodeRegistrator != null) {
-           nodeRegistrator.close();
-           nodeRegistrator = null;
-       }
-       if (flowListeningCommiter != null) {
-           flowListeningCommiter.close();
-           flowListeningCommiter = null;
-       }
-       if (meterListeningCommiter != null) {
-           meterListeningCommiter.close();
-           meterListeningCommiter = null;
-       }
-       if (groupListeningCommiter != null) {
-           groupListeningCommiter.close();
-           groupListeningCommiter = null;
-       }
-       if (tableNotifCommiter != null) {
-           tableNotifCommiter.close();
-           tableNotifCommiter = null;
-       }
-       if (portNotifyCommiter != null) {
-           portNotifyCommiter.close();
-           portNotifyCommiter = null;
-       }
-       if (queueNotifyCommiter != null) {
-           queueNotifyCommiter.close();
-           queueNotifyCommiter = null;
-       }
-       if (statCollectors != null) {
-           for (StatPermCollector collector : statCollectors) {
-               collector.close();
-               collector = null;
-           }
-           statCollectors = null;
-       }
-       if (rpcMsgManager != null) {
-           rpcMsgManager.close();
-           rpcMsgManager = null;
-       }
-       statRpcMsgManagerExecutor.shutdown();
-       statDataStoreOperationServ.shutdown();
-       if (txChain != null) {
-           txChain.close();
-           txChain = null;
-       }
-   }
-
-   @Override
-   public void enqueue(final StatDataStoreOperation op) {
-       // we don't need to block anything - next statistics come soon
-       final boolean success = dataStoreOperQueue.offer(op);
-       if ( ! success) {
-           LOG.debug("Stat DS/Operational submiter Queue is full!");
-       }
-   }
-
-   @Override
-   public void run() {
-       /* Neverending cyle - wait for finishing */
-       while ( ! finishing) {
-           try {
-               StatDataStoreOperation op = dataStoreOperQueue.take();
-               final ReadWriteTransaction tx = txChain.newReadWriteTransaction();
-               LOG.trace("New operations available, starting transaction {}", tx.getIdentifier());
-
-               int ops = 0;
-               do {
-                   op.applyOperation(tx);
-
-                   ops++;
-                   if (ops < MAX_BATCH) {
-                       op = dataStoreOperQueue.poll();
-                   } else {
-                       op = null;
-                   }
-               } while (op != null);
-
-               LOG.trace("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier());
-
-                   tx.submit().checkedGet();
-           } catch (final InterruptedException e) {
-               LOG.warn("Stat Manager DS Operation thread interupted!", e);
-               finishing = true;
-           } catch (final Exception e) {
-               LOG.warn("Unhandled exception during processing statistics. Restarting transaction chain.", e);
-               txChain.close();
-               txChain = dataBroker.createTransactionChain(StatisticsManagerImpl.this);
-               cleanDataStoreOperQueue();
-           }
-       }
-       // Drain all events, making sure any blocked threads are unblocked
-       cleanDataStoreOperQueue();
-   }
-
-   private synchronized void cleanDataStoreOperQueue() {
-       // Drain all events, making sure any blocked threads are unblocked
-       while (! dataStoreOperQueue.isEmpty()) {
-           StatDataStoreOperation op = dataStoreOperQueue.poll();
-
-           // Execute the node removal clean up operation if queued in the
-           // operational queue.
-           if (op.getType() == StatsManagerOperationType.NODE_REMOVAL) {
-               try {
-                   LOG.debug("Node {} disconnected. Cleaning internal data.",op.getNodeId());
-                   op.applyOperation(null);
-               } catch (final Exception ex) {
-                   LOG.warn("Unhandled exception while cleaning up internal data of node [{}]",op.getNodeId());
-               }
-           }
-       }
-   }
-
-   @Override
-   public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
-           final Throwable cause) {
-       LOG.warn("Failed to export Flow Capable Statistics, Transaction {} failed.",transaction.getIdentifier(),cause);
-   }
-
-   @Override
-   public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
-       // NOOP
-   }
-
-   @Override
-   public boolean isProvidedFlowNodeActive(final InstanceIdentifier<Node> nodeIdent) {
-       for (final StatPermCollector collector : statCollectors) {
-           if (collector.isProvidedFlowNodeActive(nodeIdent)) {
-               return true;
-           }
-       }
-       return false;
-   }
-
-   @Override
-   public void collectNextStatistics(final InstanceIdentifier<Node> nodeIdent, final TransactionId xid) {
-       for (final StatPermCollector collector : statCollectors) {
-           if (collector.isProvidedFlowNodeActive(nodeIdent)) {
-               collector.collectNextStatistics(xid);
-           }
-       }
-   }
-
-   @Override
-   public void connectedNodeRegistration(final InstanceIdentifier<Node> nodeIdent,
-           final List<StatCapabTypes> statTypes, final Short nrOfSwitchTables) {
-       for (final StatPermCollector collector : statCollectors) {
-           if (collector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables)) {
-               return;
-           }
-       }
-       synchronized (statCollectorLock) {
-           for (final StatPermCollector collector : statCollectors) {
-               if (collector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables)) {
-                   return;
-               }
-           }
-           final StatPermCollectorImpl newCollector = new StatPermCollectorImpl(this,
-                   statManagerConfig.getMinRequestNetMonitorInterval(), statCollectors.size() + 1,
-                   statManagerConfig.getMaxNodesForCollector());
-           final List<StatPermCollector> statCollectorsNew = new ArrayList<>(statCollectors);
-           newCollector.connectedNodeRegistration(nodeIdent, statTypes, nrOfSwitchTables);
-           statCollectorsNew.add(newCollector);
-           statCollectors = Collections.unmodifiableList(statCollectorsNew);
-       }
-   }
-
-   @Override
-   public void disconnectedNodeUnregistration(final InstanceIdentifier<Node> nodeIdent) {
-       flowListeningCommiter.cleanForDisconnect(nodeIdent);
-
-       for (final StatPermCollector collector : statCollectors) {
-           if (collector.disconnectedNodeUnregistration(nodeIdent)) {
-               if ( ! collector.hasActiveNodes()) {
-                   synchronized (statCollectorLock) {
-                       if (collector.hasActiveNodes()) {
-                           return;
-                       }
-                       final List<StatPermCollector> newStatColl =
-                               new ArrayList<>(statCollectors);
-                       newStatColl.remove(collector);
-                       statCollectors = Collections.unmodifiableList(newStatColl);
-                   }
-               }
-               return;
-           }
-       }
-       LOG.debug("Node {} has not been removed.", nodeIdent);
-   }
-
-   @Override
-   public void registerAdditionalNodeFeature(final InstanceIdentifier<Node> nodeIdent,
-           final StatCapabTypes statCapab) {
-       for (final StatPermCollector collector : statCollectors) {
-           if (collector.registerAdditionalNodeFeature(nodeIdent, statCapab)) {
-               return;
-           }
-       }
-       LOG.debug("Node {} has not been extended for feature {}!", nodeIdent, statCapab);
-   }
-
-   /* Getter internal Statistic Manager Job Classes */
-   @Override
-   public StatRpcMsgManager getRpcMsgManager() {
-       return rpcMsgManager;
-   }
-
-   @Override
-   public StatNodeRegistration getNodeRegistrator() {
-       return nodeRegistrator;
-   }
-
-   @Override
-   public StatListeningCommiter<Flow, OpendaylightFlowStatisticsListener> getFlowListenComit() {
-       return flowListeningCommiter;
-   }
-
-   @Override
-   public StatListeningCommiter<Meter, OpendaylightMeterStatisticsListener> getMeterListenCommit() {
-       return meterListeningCommiter;
-   }
-
-   @Override
-   public StatListeningCommiter<Group, OpendaylightGroupStatisticsListener> getGroupListenCommit() {
-       return groupListeningCommiter;
-   }
-
-   @Override
-   public StatListeningCommiter<Queue, OpendaylightQueueStatisticsListener> getQueueNotifyCommit() {
-       return queueNotifyCommiter;
-   }
-
-
-   @Override
-   public StatNotifyCommiter<OpendaylightFlowTableStatisticsListener> getTableNotifCommit() {
-       return tableNotifCommiter;
-   }
-
-   @Override
-   public StatNotifyCommiter<OpendaylightPortStatisticsListener> getPortNotifyCommit() {
-       return portNotifyCommiter;
-   }
-
-    @Override
-    public StatisticsManagerConfig getConfiguration() {
-        return statManagerConfig;
-    }
-}
-