Merge "Bug 1309 - Cannot publish LinkDiscovered event"
[controller.git] / opendaylight / md-sal / topology-manager / src / main / java / org / opendaylight / md / controller / topology / manager / OperationProcessor.java
index f09da0045930cf7cc843de1a924e64841f2db508..41162d30463d54338d687dfd904ef1f7646dbec6 100644 (file)
@@ -8,13 +8,6 @@
 package org.opendaylight.md.controller.topology.manager;
 
 import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
@@ -25,6 +18,9 @@ import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFaile
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
 final class OperationProcessor implements AutoCloseable, Runnable, TransactionChainListener {
     private static final Logger LOG = LoggerFactory.getLogger(OperationProcessor.class);
     private static final int MAX_TRANSACTION_OPERATIONS = 100;
@@ -32,7 +28,7 @@ final class OperationProcessor implements AutoCloseable, Runnable, TransactionCh
 
     private final BlockingQueue<TopologyOperation> queue = new LinkedBlockingQueue<>(OPERATION_QUEUE_DEPTH);
     private final DataBroker dataBroker;
-    private final BindingTransactionChain transactionChain;
+    private BindingTransactionChain transactionChain;
 
     OperationProcessor(final DataBroker dataBroker) {
         this.dataBroker = Preconditions.checkNotNull(dataBroker);
@@ -73,23 +69,32 @@ final class OperationProcessor implements AutoCloseable, Runnable, TransactionCh
 
                 LOG.debug("Processed {} operations, submitting transaction", ops);
 
-                CheckedFuture<Void, TransactionCommitFailedException> txResultFuture = tx.submit();
-                Futures.addCallback(txResultFuture, new FutureCallback<Void>() {
-                    @Override
-                    public void onSuccess(Void notUsed) {
-                        LOG.debug("Topology export successful for tx :{}", tx.getIdentifier());
-                    }
-
-                    @Override
-                    public void onFailure(Throwable throwable) {
-                        LOG.error("Topology export transaction {} failed", tx.getIdentifier(), throwable.getCause());
-                    }
-                });
+                try {
+                    tx.submit().checkedGet();
+                } catch (final TransactionCommitFailedException e) {
+                    LOG.warn("Stat DataStoreOperation unexpected State!", e);
+                    transactionChain.close();
+                    transactionChain = dataBroker.createTransactionChain(this);
+                    cleanDataStoreOperQueue();
+                }
             }
-        } catch (InterruptedException e) {
-            LOG.info("Interrupted processing, terminating", e);
+        } catch (final IllegalStateException e) {
+            LOG.warn("Stat DataStoreOperation unexpected State!", e);
+            transactionChain.close();
+            transactionChain = dataBroker.createTransactionChain(this);
+            cleanDataStoreOperQueue();
+        } catch (final InterruptedException e) {
+            LOG.warn("Stat Manager DS Operation thread interupted!", e);
+        } catch (final Exception e) {
+            LOG.warn("Stat DataStore Operation executor fail!", e);
         }
 
+        // Drain all events, making sure any blocked threads are unblocked
+        cleanDataStoreOperQueue();
+
+    }
+
+    private void cleanDataStoreOperQueue() {
         // Drain all events, making sure any blocked threads are unblocked
         while (!queue.isEmpty()) {
             queue.poll();