2 * Copyright (c) 2016 Ericsson Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.Futures;
12 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
13 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
14 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
15 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
16 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
17 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
18 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
21 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.atomic.AtomicInteger;
27 import java.util.concurrent.atomic.AtomicLong;
29 public class FlowWriterTxChain implements FlowCounterMBean {
30 private static final Logger LOG = LoggerFactory.getLogger(FlowWriterTxChain.class);
31 private final DataBroker dataBroker;
32 private final ExecutorService flowPusher;
33 private long startTime;
34 private AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
35 private AtomicInteger countDpnWriteCompletion = new AtomicInteger(0);
36 private AtomicLong taskCompletionTime = new AtomicLong(0);
37 private final String UNITS = "ns";
39 public FlowWriterTxChain(final DataBroker dataBroker, ExecutorService flowPusher){
40 this.dataBroker = dataBroker;
41 this.flowPusher = flowPusher;
42 LOG.info("Using Ping Pong Flow Tester Impl");
45 public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize,
46 int sleepMillis, int sleepAfter, short startTableId, short endTableId) {
47 LOG.info("Using Transaction Chain Flow Writer Impl");
48 countDpnWriteCompletion.set(dpnCount);
49 startTime = System.nanoTime();
50 for (int i = 1; i <= dpnCount; i++) {
51 FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i),
52 flowsPerDPN, true, batchSize, sleepMillis, sleepAfter, startTableId, endTableId);
53 flowPusher.execute(task);
57 public void deleteFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize,
58 short startTableId, short endTableId) {
59 LOG.info("Using Transaction Chain Flow Writer Impl");
60 countDpnWriteCompletion.set(dpnCount);
61 for (int i = 1; i <= dpnCount; i++) {
62 FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize,
63 0, 1, startTableId, endTableId);
64 flowPusher.execute(task);
69 public long getFlowCount() {
70 return BulkOMaticUtils.DEFAULT_FLOW_COUNT;
74 public int getReadOpStatus() {
75 return BulkOMaticUtils.DEFUALT_STATUS;
79 public int getWriteOpStatus() {
80 return writeOpStatus.get();
84 public long getTaskCompletionTime() {
85 return taskCompletionTime.get();
89 public String getUnits() {
93 private class FlowHandlerTask implements Runnable, TransactionChainListener {
94 private final String dpId;
95 private final boolean add;
96 private final int flowsPerDpn;
97 private final int batchSize;
98 private final int sleepAfter;
99 private final int sleepMillis;
100 private final short startTableId;
101 private final short endTableId;
102 private AtomicInteger remainingTxReturn = new AtomicInteger(0);
104 BindingTransactionChain txChain;
106 public FlowHandlerTask(final String dpId,
107 final int flowsPerDpn,
110 final int sleepMillis,
111 final int sleepAfter,
112 final short startTableId,
113 final short endTableId){
114 this.dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + dpId;
116 this.flowsPerDpn = flowsPerDpn;
117 this.batchSize = batchSize;
118 this.sleepMillis = sleepMillis;
119 this.sleepAfter = sleepAfter;
120 this.startTableId = startTableId;
121 this.endTableId = endTableId;
122 remainingTxReturn.set(flowsPerDpn/batchSize);
127 writeOpStatus.set(FlowCounter.OperationStatus.IN_PROGRESS.status());
128 short tableId = startTableId;
129 int numSubmits = flowsPerDpn/batchSize;
131 int newBatchSize = batchSize;
132 LOG.info("Number of Txn for dpId: {} is: {}", dpId, numSubmits);
134 txChain = dataBroker.createTransactionChain(this);
135 LOG.info("Creating new txChain: {} for dpid: {}", txChain, dpId);
137 for (int i = 1; i <= numSubmits; i++) {
138 WriteTransaction writeTransaction;
140 writeTransaction = txChain.newWriteOnlyTransaction();
141 } catch (Exception e) {
142 LOG.error("Transaction creation failed in txChain: {}, due to: {}", txChain, e);
146 for (; sourceIp <= newBatchSize; sourceIp++) {
147 String flowId = "Flow-" + dpId + "." + k + "." + sourceIp;
150 Match match = BulkOMaticUtils.getMatch(sourceIp);
151 flow = BulkOMaticUtils.buildFlow(k, flowId, match);
154 writeTxToDs(writeTransaction, flowId,
155 BulkOMaticUtils.getFlowInstanceIdentifier(k, flowId, dpId), flow, sourceIp, k);
157 if (sourceIp < newBatchSize) {
159 short b = (short) (endTableId - startTableId + 1);
160 k = (short) (((k + a) % b) + startTableId);
163 LOG.debug("Submitting Txn for dpId: {}, begin tableId: {}, end tableId: {}, sourceIp: {}", dpId, tableId, k, sourceIp - 1);
164 Futures.addCallback(writeTransaction.submit(), new DsCallBack(dpId, tableId, k, sourceIp));
166 tableId = (short) (((k + 1) % ((short) (endTableId - startTableId + 1))) + startTableId);
167 newBatchSize += batchSize;
168 if (((i % sleepAfter) == 0) && (sleepMillis > 0)) {
170 Thread.sleep(sleepMillis);
171 } catch (InterruptedException e) {
172 LOG.error("Writer Thread Interrupted: {}", e.getMessage());
176 LOG.info("Completed FlowHandlerTask thread for dpid: {}", dpId);
180 public void onTransactionChainFailed(TransactionChain<?, ?> transactionChain, AsyncTransaction<?, ?> asyncTransaction, Throwable throwable) {
181 LOG.error("Transaction chain: {} FAILED at asyncTransaction: {} due to: {}", transactionChain,
182 asyncTransaction.getIdentifier(), throwable);
183 transactionChain.close();
187 public void onTransactionChainSuccessful(TransactionChain<?, ?> transactionChain) {
188 LOG.info("Transaction chain: {} closed successfully.", transactionChain);
191 private void writeTxToDs(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid, Flow flow, Integer sourceIp, Short tableId){
193 LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
194 writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, true);
196 LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid);
197 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid);
201 private class DsCallBack implements FutureCallback {
203 private int sourceIp;
204 private short endTableId;
205 private short beginTableId;
207 public DsCallBack(String dpId, Short beginTableId, Short endTableId, Integer sourceIp) {
209 this.sourceIp = sourceIp;
210 this.endTableId = endTableId;
211 this.beginTableId = beginTableId;
215 public void onSuccess(Object o) {
216 if (remainingTxReturn.decrementAndGet() <= 0) {
217 long dur = System.nanoTime() - startTime;
218 LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId,
220 if(0 == countDpnWriteCompletion.decrementAndGet() &&
221 writeOpStatus.get() != FlowCounter.OperationStatus.FAILURE.status()) {
222 writeOpStatus.set(FlowCounter.OperationStatus.SUCCESS.status());
223 taskCompletionTime.set(dur);
229 public void onFailure(Throwable error) {
230 if (remainingTxReturn.decrementAndGet() <= 0) {
231 long dur = System.nanoTime() - startTime;
232 LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId,
235 LOG.error("Error: {} in Datastore write operation: dpid: {}, begin tableId: {}, " +
236 "end tableId: {}, sourceIp: {} ", error, dpId, beginTableId, endTableId, sourceIp);
237 writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());