2 * Copyright (c) 2016 Ericsson Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.Futures;
12 import java.util.concurrent.ExecutorService;
13 import java.util.concurrent.atomic.AtomicInteger;
14 import java.util.concurrent.atomic.AtomicLong;
15 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
16 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
17 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
18 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
19 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
20 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
21 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
24 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
28 public class FlowWriterTxChain implements FlowCounterMBean {
29 private static final Logger LOG = LoggerFactory.getLogger(FlowWriterTxChain.class);
30 private final DataBroker dataBroker;
31 private final ExecutorService flowPusher;
32 private long startTime;
33 private AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
34 private AtomicInteger countDpnWriteCompletion = new AtomicInteger();
35 private AtomicLong taskCompletionTime = new AtomicLong();
37 public FlowWriterTxChain(final DataBroker dataBroker, ExecutorService flowPusher){
38 this.dataBroker = dataBroker;
39 this.flowPusher = flowPusher;
40 LOG.info("Using Ping Pong Flow Tester Impl");
43 public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize,
44 int sleepMillis, int sleepAfter, short startTableId, short endTableId,
45 boolean isCreateParents) {
46 LOG.info("Using Transaction Chain Flow Writer Impl");
47 countDpnWriteCompletion.set(dpnCount);
48 startTime = System.nanoTime();
49 for (int i = 1; i <= dpnCount; i++) {
50 FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i),
51 flowsPerDPN, true, batchSize, sleepMillis, sleepAfter, startTableId, endTableId, isCreateParents);
52 flowPusher.execute(task);
56 public void deleteFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize,
57 short startTableId, short endTableId) {
58 LOG.info("Using Transaction Chain Flow Writer Impl");
59 countDpnWriteCompletion.set(dpnCount);
60 for (int i = 1; i <= dpnCount; i++) {
61 FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize,
62 0, 1, startTableId, endTableId, false);
63 flowPusher.execute(task);
68 public int getWriteOpStatus() {
69 return writeOpStatus.get();
73 public long getTaskCompletionTime() {
74 return taskCompletionTime.get();
77 private class FlowHandlerTask implements Runnable, TransactionChainListener {
78 private final String dpId;
79 private final boolean add;
80 private final int flowsPerDpn;
81 private final int batchSize;
82 private final int sleepAfter;
83 private final int sleepMillis;
84 private final short startTableId;
85 private final short endTableId;
86 private final boolean isCreateParents;
87 private AtomicInteger remainingTxReturn = new AtomicInteger(0);
89 BindingTransactionChain txChain;
91 public FlowHandlerTask(final String dpId,
92 final int flowsPerDpn,
95 final int sleepMillis,
97 final short startTableId,
98 final short endTableId,
99 final boolean isCreateParents){
100 this.dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + dpId;
102 this.flowsPerDpn = flowsPerDpn;
103 this.batchSize = batchSize;
104 this.sleepMillis = sleepMillis;
105 this.sleepAfter = sleepAfter;
106 this.startTableId = startTableId;
107 this.endTableId = endTableId;
108 this.isCreateParents = isCreateParents;
109 remainingTxReturn.set(flowsPerDpn/batchSize);
114 writeOpStatus.set(FlowCounter.OperationStatus.IN_PROGRESS.status());
115 short tableId = startTableId;
116 int numSubmits = flowsPerDpn/batchSize;
118 int newBatchSize = batchSize;
119 LOG.info("Number of Txn for dpId: {} is: {}", dpId, numSubmits);
121 txChain = dataBroker.createTransactionChain(this);
122 LOG.info("Creating new txChain: {} for dpid: {}", txChain, dpId);
124 for (int i = 1; i <= numSubmits; i++) {
125 WriteTransaction writeTransaction;
127 writeTransaction = txChain.newWriteOnlyTransaction();
128 } catch (Exception e) {
129 LOG.error("Transaction creation failed in txChain: {}, due to: {}", txChain, e);
133 for (; sourceIp <= newBatchSize; sourceIp++) {
134 String flowId = "Flow-" + dpId + "." + k + "." + sourceIp;
137 Match match = BulkOMaticUtils.getMatch(sourceIp);
138 flow = BulkOMaticUtils.buildFlow(k, flowId, match);
141 writeTxToDs(writeTransaction, flowId,
142 BulkOMaticUtils.getFlowInstanceIdentifier(k, flowId, dpId), flow, sourceIp, k);
144 if (sourceIp < newBatchSize) {
146 short b = (short) (endTableId - startTableId + 1);
147 k = (short) (((k + a) % b) + startTableId);
150 LOG.debug("Submitting Txn for dpId: {}, begin tableId: {}, end tableId: {}, sourceIp: {}", dpId, tableId, k, sourceIp - 1);
151 Futures.addCallback(writeTransaction.submit(), new DsCallBack(dpId, tableId, k, sourceIp));
153 tableId = (short) (((k + 1) % ((short) (endTableId - startTableId + 1))) + startTableId);
154 newBatchSize += batchSize;
155 if (((i % sleepAfter) == 0) && (sleepMillis > 0)) {
157 Thread.sleep(sleepMillis);
158 } catch (InterruptedException e) {
159 LOG.error("Writer Thread Interrupted: {}", e.getMessage());
163 LOG.info("Completed FlowHandlerTask thread for dpid: {}", dpId);
167 public void onTransactionChainFailed(TransactionChain<?, ?> transactionChain, AsyncTransaction<?, ?> asyncTransaction, Throwable throwable) {
168 LOG.error("Transaction chain: {} FAILED at asyncTransaction: {} due to: {}", transactionChain,
169 asyncTransaction.getIdentifier(), throwable);
170 transactionChain.close();
174 public void onTransactionChainSuccessful(TransactionChain<?, ?> transactionChain) {
175 LOG.info("Transaction chain: {} closed successfully.", transactionChain);
178 private void writeTxToDs(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid, Flow flow, Integer sourceIp, Short tableId){
180 LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
181 writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, isCreateParents);
183 LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid);
184 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid);
188 private class DsCallBack implements FutureCallback {
190 private int sourceIp;
191 private short endTableId;
192 private short beginTableId;
194 public DsCallBack(String dpId, Short beginTableId, Short endTableId, Integer sourceIp) {
196 this.sourceIp = sourceIp;
197 this.endTableId = endTableId;
198 this.beginTableId = beginTableId;
202 public void onSuccess(Object o) {
203 if (remainingTxReturn.decrementAndGet() <= 0) {
204 long dur = System.nanoTime() - startTime;
205 LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId,
207 if(0 == countDpnWriteCompletion.decrementAndGet() &&
208 writeOpStatus.get() != FlowCounter.OperationStatus.FAILURE.status()) {
209 writeOpStatus.set(FlowCounter.OperationStatus.SUCCESS.status());
210 taskCompletionTime.set(dur);
216 public void onFailure(Throwable error) {
217 if (remainingTxReturn.decrementAndGet() <= 0) {
218 long dur = System.nanoTime() - startTime;
219 LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId,
222 LOG.error("Error: {} in Datastore write operation: dpid: {}, begin tableId: {}, " +
223 "end tableId: {}, sourceIp: {} ", error, dpId, beginTableId, endTableId, sourceIp);
224 writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());