Update MRI projects for Aluminium
[openflowplugin.git] / applications / bulk-o-matic / src / main / java / org / opendaylight / openflowplugin / applications / bulk / o / matic / FlowWriterTxChain.java
index 660b8d0797c9a7a1f2b27bb22d02c10557d9f225..9f7a7024ab8cfe4b3f303c51b07719c1a6cee64d 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016 Ericsson Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2016, 2017 Ericsson Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -8,17 +8,16 @@
 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
 
 import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.Transaction;
+import org.opendaylight.mdsal.binding.api.TransactionChain;
+import org.opendaylight.mdsal.binding.api.TransactionChainListener;
+import org.opendaylight.mdsal.binding.api.WriteTransaction;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
@@ -30,36 +29,35 @@ public class FlowWriterTxChain implements FlowCounterMBean {
     private final DataBroker dataBroker;
     private final ExecutorService flowPusher;
     private long startTime;
-    private AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
-    private AtomicInteger countDpnWriteCompletion = new AtomicInteger();
-    private AtomicLong taskCompletionTime = new AtomicLong();
+    private final AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
+    private final AtomicInteger countDpnWriteCompletion = new AtomicInteger();
+    private final AtomicLong taskCompletionTime = new AtomicLong();
 
-    public FlowWriterTxChain(final DataBroker dataBroker, ExecutorService flowPusher){
+    public FlowWriterTxChain(final DataBroker dataBroker, ExecutorService flowPusher) {
         this.dataBroker = dataBroker;
         this.flowPusher = flowPusher;
         LOG.info("Using Ping Pong Flow Tester Impl");
     }
 
-    public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize,
-                         int sleepMillis, int sleepAfter, short startTableId, short endTableId,
-                         boolean isCreateParents) {
+    public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, int sleepMillis, int sleepAfter,
+            short startTableId, short endTableId, boolean isCreateParents) {
         LOG.info("Using Transaction Chain Flow Writer Impl");
         countDpnWriteCompletion.set(dpnCount);
         startTime = System.nanoTime();
         for (int i = 1; i <= dpnCount; i++) {
-            FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i),
-                    flowsPerDPN, true, batchSize, sleepMillis, sleepAfter, startTableId, endTableId, isCreateParents);
+            FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, true, batchSize, sleepMillis,
+                    sleepAfter, startTableId, endTableId, isCreateParents);
             flowPusher.execute(task);
         }
     }
 
-    public void deleteFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize,
-                            short startTableId, short endTableId) {
+    public void deleteFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, short startTableId,
+            short endTableId) {
         LOG.info("Using Transaction Chain Flow Writer Impl");
         countDpnWriteCompletion.set(dpnCount);
         for (int i = 1; i <= dpnCount; i++) {
-            FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize,
-                    0, 1, startTableId, endTableId, false);
+            FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize, 0, 1,
+                    startTableId, endTableId, false);
             flowPusher.execute(task);
         }
     }
@@ -84,19 +82,19 @@ public class FlowWriterTxChain implements FlowCounterMBean {
         private final short startTableId;
         private final short endTableId;
         private final boolean isCreateParents;
-        private AtomicInteger remainingTxReturn = new AtomicInteger(0);
-
-        BindingTransactionChain txChain;
-
-        public FlowHandlerTask(final String dpId,
-                               final int flowsPerDpn,
-                               final boolean add,
-                               final int batchSize,
-                               final int sleepMillis,
-                               final int sleepAfter,
-                               final short startTableId,
-                               final short endTableId,
-                               final boolean isCreateParents){
+        private final AtomicInteger remainingTxReturn = new AtomicInteger(0);
+
+        private TransactionChain txChain;
+
+        FlowHandlerTask(final String dpId,
+                        final int flowsPerDpn,
+                        final boolean add,
+                        final int batchSize,
+                        final int sleepMillis,
+                        final int sleepAfter,
+                        final short startTableId,
+                        final short endTableId,
+                        final boolean isCreateParents) {
             this.dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + dpId;
             this.add = add;
             this.flowsPerDpn = flowsPerDpn;
@@ -106,53 +104,51 @@ public class FlowWriterTxChain implements FlowCounterMBean {
             this.startTableId = startTableId;
             this.endTableId = endTableId;
             this.isCreateParents = isCreateParents;
-            remainingTxReturn.set(flowsPerDpn/batchSize);
+            remainingTxReturn.set(flowsPerDpn / batchSize);
         }
 
         @Override
         public void run() {
             writeOpStatus.set(FlowCounter.OperationStatus.IN_PROGRESS.status());
             short tableId = startTableId;
-            int numSubmits = flowsPerDpn/batchSize;
+            int numSubmits = flowsPerDpn / batchSize;
             int sourceIp = 1;
             int newBatchSize = batchSize;
             LOG.info("Number of Txn for dpId: {} is: {}", dpId, numSubmits);
 
-            txChain = dataBroker.createTransactionChain(this);
+            txChain = dataBroker.createMergingTransactionChain(this);
             LOG.info("Creating new txChain: {} for dpid: {}", txChain, dpId);
 
             for (int i = 1; i <= numSubmits; i++) {
-                WriteTransaction writeTransaction;
-                try {
-                    writeTransaction = txChain.newWriteOnlyTransaction();
-                } catch (Exception e) {
-                    LOG.error("Transaction creation failed in txChain: {}, due to: {}", txChain, e);
-                    break;
-                }
-                short k = tableId;
+                WriteTransaction writeTransaction = txChain.newWriteOnlyTransaction();
+                short calculatedTableId = tableId;
                 for (; sourceIp <= newBatchSize; sourceIp++) {
-                    String flowId = "Flow-" + dpId + "." + k + "." + sourceIp;
+                    String flowId = "Flow-" + dpId + "." + calculatedTableId + "." + sourceIp;
                     Flow flow = null;
                     if (add) {
                         Match match = BulkOMaticUtils.getMatch(sourceIp);
-                        flow = BulkOMaticUtils.buildFlow(k, flowId, match);
+                        flow = BulkOMaticUtils.buildFlow(calculatedTableId, flowId, match);
                     }
 
                     writeTxToDs(writeTransaction, flowId,
-                            BulkOMaticUtils.getFlowInstanceIdentifier(k, flowId, dpId), flow, sourceIp, k);
+                            BulkOMaticUtils.getFlowInstanceIdentifier(calculatedTableId, flowId, dpId),
+                            flow, sourceIp, calculatedTableId);
 
                     if (sourceIp < newBatchSize) {
-                        short a = 1;
-                        short b = (short) (endTableId - startTableId + 1);
-                        k = (short) (((k + a) % b) + startTableId);
+                        short numberA = 1;
+                        short numberB = (short) (endTableId - startTableId + 1);
+                        calculatedTableId = (short) ((calculatedTableId + numberA) % numberB + startTableId);
                     }
                 }
-                LOG.debug("Submitting Txn for dpId: {}, begin tableId: {}, end tableId: {}, sourceIp: {}", dpId, tableId, k, sourceIp - 1);
-                Futures.addCallback(writeTransaction.submit(), new DsCallBack(dpId, tableId, k, sourceIp));
+                LOG.debug("Submitting Txn for dpId: {}, begin tableId: {}, end tableId: {}, sourceIp: {}", dpId,
+                        tableId, calculatedTableId, sourceIp - 1);
+                writeTransaction.commit().addCallback(
+                        new DsCallBack(dpId, tableId, calculatedTableId, sourceIp, txChain),
+                        MoreExecutors.directExecutor());
                 // Wrap around
-                tableId = (short) (((k + 1) % ((short) (endTableId - startTableId + 1))) + startTableId);
+                tableId = (short) ((calculatedTableId + 1) % (short) (endTableId - startTableId + 1) + startTableId);
                 newBatchSize += batchSize;
-                if (((i % sleepAfter) == 0) && (sleepMillis > 0)) {
+                if (i % sleepAfter == 0 && sleepMillis > 0) {
                     try {
                         Thread.sleep(sleepMillis);
                     } catch (InterruptedException e) {
@@ -164,48 +160,56 @@ public class FlowWriterTxChain implements FlowCounterMBean {
         }
 
         @Override
-        public void onTransactionChainFailed(TransactionChain<?, ?> transactionChain, AsyncTransaction<?, ?> asyncTransaction, Throwable throwable) {
-            LOG.error("Transaction chain: {} FAILED at asyncTransaction: {} due to: {}", transactionChain,
+        public void onTransactionChainFailed(TransactionChain transactionChain,
+                Transaction asyncTransaction, Throwable throwable) {
+            LOG.error("Transaction chain: {} FAILED at asyncTransaction: {} due to: ", transactionChain,
                     asyncTransaction.getIdentifier(), throwable);
             transactionChain.close();
         }
 
         @Override
-        public void onTransactionChainSuccessful(TransactionChain<?, ?> transactionChain) {
+        public void onTransactionChainSuccessful(TransactionChain transactionChain) {
             LOG.info("Transaction chain: {} closed successfully.", transactionChain);
         }
 
-        private void writeTxToDs(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid, Flow flow, Integer sourceIp, Short tableId){
+        private void writeTxToDs(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid,
+                Flow flow, Integer sourceIp, Short tableId) {
             if (add) {
                 LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
-                writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, isCreateParents);
+                if (isCreateParents) {
+                    writeTransaction.mergeParentStructurePut(LogicalDatastoreType.CONFIGURATION, flowIid, flow);
+                } else {
+                    writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow);
+                }
             } else {
                 LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid);
                 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid);
             }
         }
 
-        private class DsCallBack implements FutureCallback {
-            private String dpId;
-            private int sourceIp;
-            private short endTableId;
-            private short beginTableId;
+        private class DsCallBack implements FutureCallback<Object> {
+            private final String dpId;
+            private final int sourceIp;
+            private final short endTableId;
+            private final short beginTableId;
+            private final TransactionChain txChain;
 
-            public DsCallBack(String dpId, Short beginTableId, Short endTableId, Integer sourceIp) {
+            DsCallBack(String dpId, Short beginTableId, Short endTableId, Integer sourceIp,
+                    TransactionChain txChain) {
                 this.dpId = dpId;
                 this.sourceIp = sourceIp;
                 this.endTableId = endTableId;
                 this.beginTableId = beginTableId;
+                this.txChain = txChain;
             }
 
             @Override
-            public void onSuccess(Object o) {
+            public void onSuccess(Object notUsed) {
                 if (remainingTxReturn.decrementAndGet() <= 0) {
                     long dur = System.nanoTime() - startTime;
-                    LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId,
-                            dur);
-                    if(0 == countDpnWriteCompletion.decrementAndGet() &&
-                            writeOpStatus.get() != FlowCounter.OperationStatus.FAILURE.status()) {
+                    LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId, dur);
+                    if (0 == countDpnWriteCompletion.decrementAndGet()
+                            && writeOpStatus.get() != FlowCounter.OperationStatus.FAILURE.status()) {
                         writeOpStatus.set(FlowCounter.OperationStatus.SUCCESS.status());
                         taskCompletionTime.set(dur);
                     }
@@ -213,16 +217,16 @@ public class FlowWriterTxChain implements FlowCounterMBean {
                 }
             }
 
+            @Override
             public void onFailure(Throwable error) {
                 if (remainingTxReturn.decrementAndGet() <= 0) {
                     long dur = System.nanoTime() - startTime;
-                    LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId,
-                            dur);
+                    LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId, dur);
                 }
-                LOG.error("Error: {} in Datastore write operation: dpid: {}, begin tableId: {}, " +
-                        "end tableId: {}, sourceIp: {} ", error, dpId, beginTableId, endTableId, sourceIp);
+                LOG.error("Error: {} in Datastore write operation: dpid: {}, begin tableId: {}, "
+                        "end tableId: {}, sourceIp: {} ", error, dpId, beginTableId, endTableId, sourceIp);
                 writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());
             }
         }
     }
-}
\ No newline at end of file
+}