Bump upstreams
[openflowplugin.git] / applications / bulk-o-matic / src / main / java / org / opendaylight / openflowplugin / applications / bulk / o / matic / FlowWriterTxChain.java
index 91dbcaa471313190fef9e905a68602750579ce6e..067b6d34395556afcf7295f0a7c16af3c568d0a9 100644 (file)
@@ -13,14 +13,13 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import org.opendaylight.mdsal.binding.api.DataBroker;
-import org.opendaylight.mdsal.binding.api.Transaction;
 import org.opendaylight.mdsal.binding.api.TransactionChain;
-import org.opendaylight.mdsal.binding.api.TransactionChainListener;
 import org.opendaylight.mdsal.binding.api.WriteTransaction;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.Empty;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,14 +32,14 @@ public class FlowWriterTxChain implements FlowCounterMBean {
     private final AtomicInteger countDpnWriteCompletion = new AtomicInteger();
     private final AtomicLong taskCompletionTime = new AtomicLong();
 
-    public FlowWriterTxChain(final DataBroker dataBroker, ExecutorService flowPusher) {
+    public FlowWriterTxChain(final DataBroker dataBroker, final ExecutorService flowPusher) {
         this.dataBroker = dataBroker;
         this.flowPusher = flowPusher;
         LOG.info("Using Ping Pong Flow Tester Impl");
     }
 
-    public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, int sleepMillis, int sleepAfter,
-            short startTableId, short endTableId, boolean isCreateParents) {
+    public void addFlows(final Integer dpnCount, final Integer flowsPerDPN, final int batchSize, final int sleepMillis,
+            final int sleepAfter, final short startTableId, final short endTableId, final boolean isCreateParents) {
         LOG.info("Using Transaction Chain Flow Writer Impl");
         countDpnWriteCompletion.set(dpnCount);
         startTime = System.nanoTime();
@@ -51,8 +50,8 @@ public class FlowWriterTxChain implements FlowCounterMBean {
         }
     }
 
-    public void deleteFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, short startTableId,
-            short endTableId) {
+    public void deleteFlows(final Integer dpnCount, final Integer flowsPerDPN, final int batchSize,
+            final short startTableId, final short endTableId) {
         LOG.info("Using Transaction Chain Flow Writer Impl");
         countDpnWriteCompletion.set(dpnCount);
         for (int i = 1; i <= dpnCount; i++) {
@@ -72,7 +71,7 @@ public class FlowWriterTxChain implements FlowCounterMBean {
         return taskCompletionTime.get();
     }
 
-    private class FlowHandlerTask implements Runnable, TransactionChainListener {
+    private class FlowHandlerTask implements Runnable, FutureCallback<Empty> {
         private final String dpId;
         private final boolean add;
         private final int flowsPerDpn;
@@ -84,7 +83,7 @@ public class FlowWriterTxChain implements FlowCounterMBean {
         private final boolean isCreateParents;
         private final AtomicInteger remainingTxReturn = new AtomicInteger(0);
 
-        private TransactionChain txChain;
+        private TransactionChain txChain = null;
 
         FlowHandlerTask(final String dpId,
                         final int flowsPerDpn,
@@ -116,7 +115,8 @@ public class FlowWriterTxChain implements FlowCounterMBean {
             int newBatchSize = batchSize;
             LOG.info("Number of Txn for dpId: {} is: {}", dpId, numSubmits);
 
-            txChain = dataBroker.createMergingTransactionChain(this);
+            txChain = dataBroker.createMergingTransactionChain();
+            txChain.addCallback(this);
             LOG.info("Creating new txChain: {} for dpid: {}", txChain, dpId);
 
             for (int i = 1; i <= numSubmits; i++) {
@@ -160,20 +160,18 @@ public class FlowWriterTxChain implements FlowCounterMBean {
         }
 
         @Override
-        public void onTransactionChainFailed(TransactionChain transactionChain,
-                Transaction asyncTransaction, Throwable throwable) {
-            LOG.error("Transaction chain: {} FAILED at asyncTransaction: {} due to: ", transactionChain,
-                    asyncTransaction.getIdentifier(), throwable);
-            transactionChain.close();
+        public void onFailure(final Throwable throwable) {
+            LOG.error("Transaction chain: {} FAILED due to: ", txChain, throwable);
+            txChain.close();
         }
 
         @Override
-        public void onTransactionChainSuccessful(TransactionChain transactionChain) {
-            LOG.info("Transaction chain: {} closed successfully.", transactionChain);
+        public void onSuccess(final Empty result) {
+            LOG.info("Transaction chain: {} closed successfully.", txChain);
         }
 
-        private void writeTxToDs(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid,
-                Flow flow, Integer sourceIp, Short tableId) {
+        private void writeTxToDs(final WriteTransaction writeTransaction, final String flowId,
+                final InstanceIdentifier<Flow> flowIid, final Flow flow, final Integer sourceIp, final Short tableId) {
             if (add) {
                 LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
                 if (isCreateParents) {
@@ -194,8 +192,8 @@ public class FlowWriterTxChain implements FlowCounterMBean {
             private final short beginTableId;
             private final TransactionChain txChain;
 
-            DsCallBack(String dpId, short beginTableId, short endTableId, int sourceIp,
-                    TransactionChain txChain) {
+            DsCallBack(final String dpId, final short beginTableId, final short endTableId, final int sourceIp,
+                    final TransactionChain txChain) {
                 this.dpId = dpId;
                 this.sourceIp = sourceIp;
                 this.endTableId = endTableId;
@@ -204,7 +202,7 @@ public class FlowWriterTxChain implements FlowCounterMBean {
             }
 
             @Override
-            public void onSuccess(Object notUsed) {
+            public void onSuccess(final Object notUsed) {
                 if (remainingTxReturn.decrementAndGet() <= 0) {
                     long dur = System.nanoTime() - startTime;
                     LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId, dur);
@@ -218,7 +216,7 @@ public class FlowWriterTxChain implements FlowCounterMBean {
             }
 
             @Override
-            public void onFailure(Throwable error) {
+            public void onFailure(final Throwable error) {
                 if (remainingTxReturn.decrementAndGet() <= 0) {
                     long dur = System.nanoTime() - startTime;
                     LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId, dur);