Merge "Move empty match constant into OFConstants"
[openflowplugin.git] / applications / topology-manager / src / main / java / org / opendaylight / openflowplugin / applications / topology / manager / OperationProcessor.java
index 87fb7354debb21ac8a458f3a7c3c7cb3bd6de118..bbfae52b84910b179fe182d9b3fa345a0ef90934 100644 (file)
@@ -20,19 +20,24 @@ import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFaile
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-final class OperationProcessor implements AutoCloseable, Runnable, TransactionChainListener {
+public final class OperationProcessor implements AutoCloseable, Runnable, TransactionChainListener {
     private static final Logger LOG = LoggerFactory.getLogger(OperationProcessor.class);
     private static final int MAX_TRANSACTION_OPERATIONS = 100;
     private static final int OPERATION_QUEUE_DEPTH = 500;
 
     private final BlockingQueue<TopologyOperation> queue = new LinkedBlockingQueue<>(OPERATION_QUEUE_DEPTH);
     private final DataBroker dataBroker;
+    private final Thread thread;
     private BindingTransactionChain transactionChain;
     private volatile boolean finishing = false;
 
-    OperationProcessor(final DataBroker dataBroker) {
+    public OperationProcessor(final DataBroker dataBroker) {
         this.dataBroker = Preconditions.checkNotNull(dataBroker);
         transactionChain = this.dataBroker.createTransactionChain(this);
+
+        thread = new Thread(this);
+        thread.setDaemon(true);
+        thread.setName("FlowCapableTopologyExporter-" + FlowCapableTopologyProvider.TOPOLOGY_ID);
     }
 
     void enqueueOperation(final TopologyOperation task) {
@@ -43,6 +48,10 @@ final class OperationProcessor implements AutoCloseable, Runnable, TransactionCh
         }
     }
 
+    public void start() {
+        thread.start();
+    }
+
     @Override
     public void run() {
             while (!finishing) {
@@ -68,23 +77,15 @@ final class OperationProcessor implements AutoCloseable, Runnable, TransactionCh
                     } while (op != null);
 
                     LOG.debug("Processed {} operations, submitting transaction", ops);
-
-                    try {
-                        tx.submit().checkedGet();
-                    } catch (final TransactionCommitFailedException e) {
-                        LOG.warn("Stat DataStoreOperation unexpected State!", e);
-                        transactionChain.close();
-                        transactionChain = dataBroker.createTransactionChain(this);
-                        cleanDataStoreOperQueue();
-                    }
-
+                    submitTransaction(tx);
                 } catch (final IllegalStateException e) {
                     LOG.warn("Stat DataStoreOperation unexpected State!", e);
                     transactionChain.close();
                     transactionChain = dataBroker.createTransactionChain(this);
                     cleanDataStoreOperQueue();
                 } catch (final InterruptedException e) {
-                    LOG.warn("Stat Manager DS Operation thread interupted!", e);
+                    // This should mean we're shutting down.
+                    LOG.debug("Stat Manager DS Operation thread interrupted!", e);
                     finishing = true;
                 } catch (final Exception e) {
                     LOG.warn("Stat DataStore Operation executor fail!", e);
@@ -94,6 +95,17 @@ final class OperationProcessor implements AutoCloseable, Runnable, TransactionCh
         cleanDataStoreOperQueue();
     }
 
+    private void submitTransaction(ReadWriteTransaction tx) {
+        try {
+            tx.submit().checkedGet();
+        } catch (final TransactionCommitFailedException e) {
+            LOG.warn("Stat DataStoreOperation unexpected State!", e);
+            transactionChain.close();
+            transactionChain = dataBroker.createTransactionChain(this);
+            cleanDataStoreOperQueue();
+        }
+    }
+
     private void cleanDataStoreOperQueue() {
         while (!queue.isEmpty()) {
             queue.poll();
@@ -115,10 +127,18 @@ final class OperationProcessor implements AutoCloseable, Runnable, TransactionCh
     }
 
     @Override
-    public void close() throws Exception {
+    public void close() {
+        thread.interrupt();
+        try {
+            thread.join();
+        } catch(InterruptedException e) {
+            LOG.debug("Join of thread {} was interrupted", thread.getName(), e);
+        }
+
         if (transactionChain != null) {
             transactionChain.close();
         }
 
+        LOG.debug("OperationProcessor closed");
     }
 }