X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=applications%2Fbulk-o-matic%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fapplications%2Fbulk%2Fo%2Fmatic%2FFlowWriterTxChain.java;h=9f7a7024ab8cfe4b3f303c51b07719c1a6cee64d;hb=05f8db12159673d0e0a95642fe86e62c14b7dc7b;hp=660b8d0797c9a7a1f2b27bb22d02c10557d9f225;hpb=aca9e862e3313c24461ed2eb255684221d95e6fd;p=openflowplugin.git diff --git a/applications/bulk-o-matic/src/main/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/FlowWriterTxChain.java b/applications/bulk-o-matic/src/main/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/FlowWriterTxChain.java index 660b8d0797..9f7a7024ab 100644 --- a/applications/bulk-o-matic/src/main/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/FlowWriterTxChain.java +++ b/applications/bulk-o-matic/src/main/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/FlowWriterTxChain.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016 Ericsson Systems, Inc. and others. All rights reserved. + * Copyright (c) 2016, 2017 Ericsson Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -8,17 +8,16 @@ package org.opendaylight.openflowplugin.applications.bulk.o.matic; import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain; -import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; -import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.common.api.data.TransactionChain; -import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; +import org.opendaylight.mdsal.binding.api.DataBroker; +import org.opendaylight.mdsal.binding.api.Transaction; +import org.opendaylight.mdsal.binding.api.TransactionChain; +import org.opendaylight.mdsal.binding.api.TransactionChainListener; +import org.opendaylight.mdsal.binding.api.WriteTransaction; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; @@ -30,36 +29,35 @@ public class FlowWriterTxChain implements FlowCounterMBean { private final DataBroker dataBroker; private final ExecutorService flowPusher; private long startTime; - private AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status()); - private AtomicInteger countDpnWriteCompletion = new AtomicInteger(); - private AtomicLong taskCompletionTime = new AtomicLong(); + private final AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status()); + private final AtomicInteger countDpnWriteCompletion = new AtomicInteger(); + private final AtomicLong taskCompletionTime = new AtomicLong(); - public FlowWriterTxChain(final DataBroker dataBroker, ExecutorService flowPusher){ + public FlowWriterTxChain(final DataBroker dataBroker, ExecutorService flowPusher) { this.dataBroker = dataBroker; this.flowPusher = flowPusher; LOG.info("Using Ping Pong Flow Tester Impl"); } - public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, - int sleepMillis, int sleepAfter, short startTableId, short endTableId, - boolean isCreateParents) { + public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, int sleepMillis, int sleepAfter, + short startTableId, short endTableId, boolean isCreateParents) { LOG.info("Using Transaction Chain Flow Writer Impl"); countDpnWriteCompletion.set(dpnCount); startTime = System.nanoTime(); for (int i = 1; i <= dpnCount; i++) { - FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), - flowsPerDPN, true, batchSize, sleepMillis, sleepAfter, startTableId, endTableId, isCreateParents); + FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, true, batchSize, sleepMillis, + sleepAfter, startTableId, endTableId, isCreateParents); flowPusher.execute(task); } } - public void deleteFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, - short startTableId, short endTableId) { + public void deleteFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, short startTableId, + short endTableId) { LOG.info("Using Transaction Chain Flow Writer Impl"); countDpnWriteCompletion.set(dpnCount); for (int i = 1; i <= dpnCount; i++) { - FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize, - 0, 1, startTableId, endTableId, false); + FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize, 0, 1, + startTableId, endTableId, false); flowPusher.execute(task); } } @@ -84,19 +82,19 @@ public class FlowWriterTxChain implements FlowCounterMBean { private final short startTableId; private final short endTableId; private final boolean isCreateParents; - private AtomicInteger remainingTxReturn = new AtomicInteger(0); - - BindingTransactionChain txChain; - - public FlowHandlerTask(final String dpId, - final int flowsPerDpn, - final boolean add, - final int batchSize, - final int sleepMillis, - final int sleepAfter, - final short startTableId, - final short endTableId, - final boolean isCreateParents){ + private final AtomicInteger remainingTxReturn = new AtomicInteger(0); + + private TransactionChain txChain; + + FlowHandlerTask(final String dpId, + final int flowsPerDpn, + final boolean add, + final int batchSize, + final int sleepMillis, + final int sleepAfter, + final short startTableId, + final short endTableId, + final boolean isCreateParents) { this.dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + dpId; this.add = add; this.flowsPerDpn = flowsPerDpn; @@ -106,53 +104,51 @@ public class FlowWriterTxChain implements FlowCounterMBean { this.startTableId = startTableId; this.endTableId = endTableId; this.isCreateParents = isCreateParents; - remainingTxReturn.set(flowsPerDpn/batchSize); + remainingTxReturn.set(flowsPerDpn / batchSize); } @Override public void run() { writeOpStatus.set(FlowCounter.OperationStatus.IN_PROGRESS.status()); short tableId = startTableId; - int numSubmits = flowsPerDpn/batchSize; + int numSubmits = flowsPerDpn / batchSize; int sourceIp = 1; int newBatchSize = batchSize; LOG.info("Number of Txn for dpId: {} is: {}", dpId, numSubmits); - txChain = dataBroker.createTransactionChain(this); + txChain = dataBroker.createMergingTransactionChain(this); LOG.info("Creating new txChain: {} for dpid: {}", txChain, dpId); for (int i = 1; i <= numSubmits; i++) { - WriteTransaction writeTransaction; - try { - writeTransaction = txChain.newWriteOnlyTransaction(); - } catch (Exception e) { - LOG.error("Transaction creation failed in txChain: {}, due to: {}", txChain, e); - break; - } - short k = tableId; + WriteTransaction writeTransaction = txChain.newWriteOnlyTransaction(); + short calculatedTableId = tableId; for (; sourceIp <= newBatchSize; sourceIp++) { - String flowId = "Flow-" + dpId + "." + k + "." + sourceIp; + String flowId = "Flow-" + dpId + "." + calculatedTableId + "." + sourceIp; Flow flow = null; if (add) { Match match = BulkOMaticUtils.getMatch(sourceIp); - flow = BulkOMaticUtils.buildFlow(k, flowId, match); + flow = BulkOMaticUtils.buildFlow(calculatedTableId, flowId, match); } writeTxToDs(writeTransaction, flowId, - BulkOMaticUtils.getFlowInstanceIdentifier(k, flowId, dpId), flow, sourceIp, k); + BulkOMaticUtils.getFlowInstanceIdentifier(calculatedTableId, flowId, dpId), + flow, sourceIp, calculatedTableId); if (sourceIp < newBatchSize) { - short a = 1; - short b = (short) (endTableId - startTableId + 1); - k = (short) (((k + a) % b) + startTableId); + short numberA = 1; + short numberB = (short) (endTableId - startTableId + 1); + calculatedTableId = (short) ((calculatedTableId + numberA) % numberB + startTableId); } } - LOG.debug("Submitting Txn for dpId: {}, begin tableId: {}, end tableId: {}, sourceIp: {}", dpId, tableId, k, sourceIp - 1); - Futures.addCallback(writeTransaction.submit(), new DsCallBack(dpId, tableId, k, sourceIp)); + LOG.debug("Submitting Txn for dpId: {}, begin tableId: {}, end tableId: {}, sourceIp: {}", dpId, + tableId, calculatedTableId, sourceIp - 1); + writeTransaction.commit().addCallback( + new DsCallBack(dpId, tableId, calculatedTableId, sourceIp, txChain), + MoreExecutors.directExecutor()); // Wrap around - tableId = (short) (((k + 1) % ((short) (endTableId - startTableId + 1))) + startTableId); + tableId = (short) ((calculatedTableId + 1) % (short) (endTableId - startTableId + 1) + startTableId); newBatchSize += batchSize; - if (((i % sleepAfter) == 0) && (sleepMillis > 0)) { + if (i % sleepAfter == 0 && sleepMillis > 0) { try { Thread.sleep(sleepMillis); } catch (InterruptedException e) { @@ -164,48 +160,56 @@ public class FlowWriterTxChain implements FlowCounterMBean { } @Override - public void onTransactionChainFailed(TransactionChain transactionChain, AsyncTransaction asyncTransaction, Throwable throwable) { - LOG.error("Transaction chain: {} FAILED at asyncTransaction: {} due to: {}", transactionChain, + public void onTransactionChainFailed(TransactionChain transactionChain, + Transaction asyncTransaction, Throwable throwable) { + LOG.error("Transaction chain: {} FAILED at asyncTransaction: {} due to: ", transactionChain, asyncTransaction.getIdentifier(), throwable); transactionChain.close(); } @Override - public void onTransactionChainSuccessful(TransactionChain transactionChain) { + public void onTransactionChainSuccessful(TransactionChain transactionChain) { LOG.info("Transaction chain: {} closed successfully.", transactionChain); } - private void writeTxToDs(WriteTransaction writeTransaction, String flowId, InstanceIdentifier flowIid, Flow flow, Integer sourceIp, Short tableId){ + private void writeTxToDs(WriteTransaction writeTransaction, String flowId, InstanceIdentifier flowIid, + Flow flow, Integer sourceIp, Short tableId) { if (add) { LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid); - writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, isCreateParents); + if (isCreateParents) { + writeTransaction.mergeParentStructurePut(LogicalDatastoreType.CONFIGURATION, flowIid, flow); + } else { + writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow); + } } else { LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid); writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid); } } - private class DsCallBack implements FutureCallback { - private String dpId; - private int sourceIp; - private short endTableId; - private short beginTableId; + private class DsCallBack implements FutureCallback { + private final String dpId; + private final int sourceIp; + private final short endTableId; + private final short beginTableId; + private final TransactionChain txChain; - public DsCallBack(String dpId, Short beginTableId, Short endTableId, Integer sourceIp) { + DsCallBack(String dpId, Short beginTableId, Short endTableId, Integer sourceIp, + TransactionChain txChain) { this.dpId = dpId; this.sourceIp = sourceIp; this.endTableId = endTableId; this.beginTableId = beginTableId; + this.txChain = txChain; } @Override - public void onSuccess(Object o) { + public void onSuccess(Object notUsed) { if (remainingTxReturn.decrementAndGet() <= 0) { long dur = System.nanoTime() - startTime; - LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId, - dur); - if(0 == countDpnWriteCompletion.decrementAndGet() && - writeOpStatus.get() != FlowCounter.OperationStatus.FAILURE.status()) { + LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId, dur); + if (0 == countDpnWriteCompletion.decrementAndGet() + && writeOpStatus.get() != FlowCounter.OperationStatus.FAILURE.status()) { writeOpStatus.set(FlowCounter.OperationStatus.SUCCESS.status()); taskCompletionTime.set(dur); } @@ -213,16 +217,16 @@ public class FlowWriterTxChain implements FlowCounterMBean { } } + @Override public void onFailure(Throwable error) { if (remainingTxReturn.decrementAndGet() <= 0) { long dur = System.nanoTime() - startTime; - LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId, - dur); + LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId, dur); } - LOG.error("Error: {} in Datastore write operation: dpid: {}, begin tableId: {}, " + - "end tableId: {}, sourceIp: {} ", error, dpId, beginTableId, endTableId, sourceIp); + LOG.error("Error: {} in Datastore write operation: dpid: {}, begin tableId: {}, " + + "end tableId: {}, sourceIp: {} ", error, dpId, beginTableId, endTableId, sourceIp); writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status()); } } } -} \ No newline at end of file +}