2 * Copyright (c) 2016, 2017 Ericsson Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.MoreExecutors;
12 import java.util.concurrent.ExecutorService;
13 import java.util.concurrent.atomic.AtomicInteger;
14 import java.util.concurrent.atomic.AtomicLong;
15 import org.opendaylight.mdsal.binding.api.DataBroker;
16 import org.opendaylight.mdsal.binding.api.Transaction;
17 import org.opendaylight.mdsal.binding.api.TransactionChain;
18 import org.opendaylight.mdsal.binding.api.TransactionChainListener;
19 import org.opendaylight.mdsal.binding.api.WriteTransaction;
20 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
23 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
27 public class FlowWriterTxChain implements FlowCounterMBean {
28 private static final Logger LOG = LoggerFactory.getLogger(FlowWriterTxChain.class);
29 private final DataBroker dataBroker;
30 private final ExecutorService flowPusher;
31 private long startTime;
32 private final AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
33 private final AtomicInteger countDpnWriteCompletion = new AtomicInteger();
34 private final AtomicLong taskCompletionTime = new AtomicLong();
36 public FlowWriterTxChain(final DataBroker dataBroker, ExecutorService flowPusher) {
37 this.dataBroker = dataBroker;
38 this.flowPusher = flowPusher;
39 LOG.info("Using Ping Pong Flow Tester Impl");
42 public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, int sleepMillis, int sleepAfter,
43 short startTableId, short endTableId, boolean isCreateParents) {
44 LOG.info("Using Transaction Chain Flow Writer Impl");
45 countDpnWriteCompletion.set(dpnCount);
46 startTime = System.nanoTime();
47 for (int i = 1; i <= dpnCount; i++) {
48 FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, true, batchSize, sleepMillis,
49 sleepAfter, startTableId, endTableId, isCreateParents);
50 flowPusher.execute(task);
54 public void deleteFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, short startTableId,
56 LOG.info("Using Transaction Chain Flow Writer Impl");
57 countDpnWriteCompletion.set(dpnCount);
58 for (int i = 1; i <= dpnCount; i++) {
59 FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize, 0, 1,
60 startTableId, endTableId, false);
61 flowPusher.execute(task);
66 public int getWriteOpStatus() {
67 return writeOpStatus.get();
71 public long getTaskCompletionTime() {
72 return taskCompletionTime.get();
75 private class FlowHandlerTask implements Runnable, TransactionChainListener {
76 private final String dpId;
77 private final boolean add;
78 private final int flowsPerDpn;
79 private final int batchSize;
80 private final int sleepAfter;
81 private final int sleepMillis;
82 private final short startTableId;
83 private final short endTableId;
84 private final boolean isCreateParents;
85 private final AtomicInteger remainingTxReturn = new AtomicInteger(0);
87 private TransactionChain txChain;
89 FlowHandlerTask(final String dpId,
90 final int flowsPerDpn,
93 final int sleepMillis,
95 final short startTableId,
96 final short endTableId,
97 final boolean isCreateParents) {
98 this.dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + dpId;
100 this.flowsPerDpn = flowsPerDpn;
101 this.batchSize = batchSize;
102 this.sleepMillis = sleepMillis;
103 this.sleepAfter = sleepAfter;
104 this.startTableId = startTableId;
105 this.endTableId = endTableId;
106 this.isCreateParents = isCreateParents;
107 remainingTxReturn.set(flowsPerDpn / batchSize);
112 writeOpStatus.set(FlowCounter.OperationStatus.IN_PROGRESS.status());
113 short tableId = startTableId;
114 int numSubmits = flowsPerDpn / batchSize;
116 int newBatchSize = batchSize;
117 LOG.info("Number of Txn for dpId: {} is: {}", dpId, numSubmits);
119 txChain = dataBroker.createMergingTransactionChain(this);
120 LOG.info("Creating new txChain: {} for dpid: {}", txChain, dpId);
122 for (int i = 1; i <= numSubmits; i++) {
123 WriteTransaction writeTransaction = txChain.newWriteOnlyTransaction();
124 short calculatedTableId = tableId;
125 for (; sourceIp <= newBatchSize; sourceIp++) {
126 String flowId = "Flow-" + dpId + "." + calculatedTableId + "." + sourceIp;
129 Match match = BulkOMaticUtils.getMatch(sourceIp);
130 flow = BulkOMaticUtils.buildFlow(calculatedTableId, flowId, match);
133 writeTxToDs(writeTransaction, flowId,
134 BulkOMaticUtils.getFlowInstanceIdentifier(calculatedTableId, flowId, dpId),
135 flow, sourceIp, calculatedTableId);
137 if (sourceIp < newBatchSize) {
139 short numberB = (short) (endTableId - startTableId + 1);
140 calculatedTableId = (short) ((calculatedTableId + numberA) % numberB + startTableId);
143 LOG.debug("Submitting Txn for dpId: {}, begin tableId: {}, end tableId: {}, sourceIp: {}", dpId,
144 tableId, calculatedTableId, sourceIp - 1);
145 writeTransaction.commit().addCallback(
146 new DsCallBack(dpId, tableId, calculatedTableId, sourceIp, txChain),
147 MoreExecutors.directExecutor());
149 tableId = (short) ((calculatedTableId + 1) % (short) (endTableId - startTableId + 1) + startTableId);
150 newBatchSize += batchSize;
151 if (i % sleepAfter == 0 && sleepMillis > 0) {
153 Thread.sleep(sleepMillis);
154 } catch (InterruptedException e) {
155 LOG.error("Writer Thread Interrupted: {}", e.getMessage());
159 LOG.info("Completed FlowHandlerTask thread for dpid: {}", dpId);
163 public void onTransactionChainFailed(TransactionChain transactionChain,
164 Transaction asyncTransaction, Throwable throwable) {
165 LOG.error("Transaction chain: {} FAILED at asyncTransaction: {} due to: ", transactionChain,
166 asyncTransaction.getIdentifier(), throwable);
167 transactionChain.close();
171 public void onTransactionChainSuccessful(TransactionChain transactionChain) {
172 LOG.info("Transaction chain: {} closed successfully.", transactionChain);
175 private void writeTxToDs(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid,
176 Flow flow, Integer sourceIp, Short tableId) {
178 LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
179 writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, isCreateParents);
181 LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid);
182 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid);
186 private class DsCallBack implements FutureCallback<Object> {
187 private final String dpId;
188 private final int sourceIp;
189 private final short endTableId;
190 private final short beginTableId;
191 private final TransactionChain txChain;
193 DsCallBack(String dpId, Short beginTableId, Short endTableId, Integer sourceIp,
194 TransactionChain txChain) {
196 this.sourceIp = sourceIp;
197 this.endTableId = endTableId;
198 this.beginTableId = beginTableId;
199 this.txChain = txChain;
203 public void onSuccess(Object notUsed) {
204 if (remainingTxReturn.decrementAndGet() <= 0) {
205 long dur = System.nanoTime() - startTime;
206 LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId, dur);
207 if (0 == countDpnWriteCompletion.decrementAndGet()
208 && writeOpStatus.get() != FlowCounter.OperationStatus.FAILURE.status()) {
209 writeOpStatus.set(FlowCounter.OperationStatus.SUCCESS.status());
210 taskCompletionTime.set(dur);
217 public void onFailure(Throwable error) {
218 if (remainingTxReturn.decrementAndGet() <= 0) {
219 long dur = System.nanoTime() - startTime;
220 LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId, dur);
222 LOG.error("Error: {} in Datastore write operation: dpid: {}, begin tableId: {}, "
223 + "end tableId: {}, sourceIp: {} ", error, dpId, beginTableId, endTableId, sourceIp);
224 writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());