Update MRI projects for Aluminium
[openflowplugin.git] / applications / bulk-o-matic / src / main / java / org / opendaylight / openflowplugin / applications / bulk / o / matic / FlowWriterTxChain.java
index f45202f4b3a0a93f709a07b672629831a32f5527..9f7a7024ab8cfe4b3f303c51b07719c1a6cee64d 100644 (file)
@@ -8,17 +8,16 @@
 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
 
 import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.Transaction;
+import org.opendaylight.mdsal.binding.api.TransactionChain;
+import org.opendaylight.mdsal.binding.api.TransactionChainListener;
+import org.opendaylight.mdsal.binding.api.WriteTransaction;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
@@ -85,7 +84,7 @@ public class FlowWriterTxChain implements FlowCounterMBean {
         private final boolean isCreateParents;
         private final AtomicInteger remainingTxReturn = new AtomicInteger(0);
 
-        BindingTransactionChain txChain;
+        private TransactionChain txChain;
 
         FlowHandlerTask(final String dpId,
                         final int flowsPerDpn,
@@ -117,7 +116,7 @@ public class FlowWriterTxChain implements FlowCounterMBean {
             int newBatchSize = batchSize;
             LOG.info("Number of Txn for dpId: {} is: {}", dpId, numSubmits);
 
-            txChain = dataBroker.createTransactionChain(this);
+            txChain = dataBroker.createMergingTransactionChain(this);
             LOG.info("Creating new txChain: {} for dpid: {}", txChain, dpId);
 
             for (int i = 1; i <= numSubmits; i++) {
@@ -143,8 +142,9 @@ public class FlowWriterTxChain implements FlowCounterMBean {
                 }
                 LOG.debug("Submitting Txn for dpId: {}, begin tableId: {}, end tableId: {}, sourceIp: {}", dpId,
                         tableId, calculatedTableId, sourceIp - 1);
-                Futures.addCallback(writeTransaction.submit(),
-                        new DsCallBack(dpId, tableId, calculatedTableId, sourceIp));
+                writeTransaction.commit().addCallback(
+                        new DsCallBack(dpId, tableId, calculatedTableId, sourceIp, txChain),
+                        MoreExecutors.directExecutor());
                 // Wrap around
                 tableId = (short) ((calculatedTableId + 1) % (short) (endTableId - startTableId + 1) + startTableId);
                 newBatchSize += batchSize;
@@ -160,15 +160,15 @@ public class FlowWriterTxChain implements FlowCounterMBean {
         }
 
         @Override
-        public void onTransactionChainFailed(TransactionChain<?, ?> transactionChain,
-                AsyncTransaction<?, ?> asyncTransaction, Throwable throwable) {
-            LOG.error("Transaction chain: {} FAILED at asyncTransaction: {} due to: {}", transactionChain,
+        public void onTransactionChainFailed(TransactionChain transactionChain,
+                Transaction asyncTransaction, Throwable throwable) {
+            LOG.error("Transaction chain: {} FAILED at asyncTransaction: {} due to: ", transactionChain,
                     asyncTransaction.getIdentifier(), throwable);
             transactionChain.close();
         }
 
         @Override
-        public void onTransactionChainSuccessful(TransactionChain<?, ?> transactionChain) {
+        public void onTransactionChainSuccessful(TransactionChain transactionChain) {
             LOG.info("Transaction chain: {} closed successfully.", transactionChain);
         }
 
@@ -176,28 +176,35 @@ public class FlowWriterTxChain implements FlowCounterMBean {
                 Flow flow, Integer sourceIp, Short tableId) {
             if (add) {
                 LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
-                writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, isCreateParents);
+                if (isCreateParents) {
+                    writeTransaction.mergeParentStructurePut(LogicalDatastoreType.CONFIGURATION, flowIid, flow);
+                } else {
+                    writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow);
+                }
             } else {
                 LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid);
                 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid);
             }
         }
 
-        private class DsCallBack implements FutureCallback {
+        private class DsCallBack implements FutureCallback<Object> {
             private final String dpId;
             private final int sourceIp;
             private final short endTableId;
             private final short beginTableId;
+            private final TransactionChain txChain;
 
-            DsCallBack(String dpId, Short beginTableId, Short endTableId, Integer sourceIp) {
+            DsCallBack(String dpId, Short beginTableId, Short endTableId, Integer sourceIp,
+                    TransactionChain txChain) {
                 this.dpId = dpId;
                 this.sourceIp = sourceIp;
                 this.endTableId = endTableId;
                 this.beginTableId = beginTableId;
+                this.txChain = txChain;
             }
 
             @Override
-            public void onSuccess(Object object) {
+            public void onSuccess(Object notUsed) {
                 if (remainingTxReturn.decrementAndGet() <= 0) {
                     long dur = System.nanoTime() - startTime;
                     LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId, dur);