2 * Copyright (c) 2016 Ericsson Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.Futures;
12 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
13 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
14 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
15 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
16 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
17 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import java.util.concurrent.atomic.AtomicLong;
25 public class FlowWriterConcurrent implements FlowCounterMBean {
26 private static final Logger LOG = LoggerFactory.getLogger(FlowWriterConcurrent.class);
27 public static final String USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER = "Using Concurrent implementation of Flow Writer.";
28 private final DataBroker dataBroker;
29 private final ExecutorService flowPusher;
30 private long startTime;
31 private AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
32 private AtomicInteger countDpnWriteCompletion = new AtomicInteger(0);
33 private AtomicLong taskCompletionTime = new AtomicLong(0);
34 private static final String UNITS = "ns";
36 public FlowWriterConcurrent(final DataBroker dataBroker, ExecutorService flowPusher) {
37 this.dataBroker = dataBroker;
38 this.flowPusher = flowPusher;
39 LOG.info(USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER);
42 public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize,
43 int sleepMillis, int sleepAfter, short startTableId, short endTableId) {
44 LOG.info(USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER);
45 countDpnWriteCompletion.set(dpnCount);
46 startTime = System.nanoTime();
47 for (int i = 1; i <= dpnCount; i++) {
48 FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i),
49 flowsPerDPN, true, batchSize, sleepMillis, sleepAfter, startTableId, endTableId);
50 flowPusher.execute(task);
54 public void deleteFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize,
55 short startTableId, short endTableId) {
56 LOG.info(USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER);
57 countDpnWriteCompletion.set(dpnCount);
58 for (int i = 1; i <= dpnCount; i++) {
59 FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize,
60 0, 1, startTableId, endTableId);
61 flowPusher.execute(task);
66 public long getFlowCount() {
67 return BulkOMaticUtils.DEFAULT_FLOW_COUNT;
71 public int getReadOpStatus() {
72 return BulkOMaticUtils.DEFUALT_STATUS;
76 public int getWriteOpStatus() {
77 return writeOpStatus.get();
81 public long getTaskCompletionTime() {
82 return taskCompletionTime.get();
86 public String getUnits() {
90 private class FlowHandlerTask implements Runnable {
91 private final String dpId;
92 private final boolean add;
93 private final int flowsPerDpn;
94 private final int batchSize;
95 private final int sleepAfter;
96 private final int sleepMillis;
97 private final short startTableId;
98 private final short endTableId;
99 private AtomicInteger remainingTxReturn = new AtomicInteger(0);
101 public FlowHandlerTask(final String dpId,
102 final int flowsPerDpn,
105 final int sleepMillis,
106 final int sleepAfter,
107 final short startTableId,
108 final short endTableId){
109 this.dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + dpId;
111 this.flowsPerDpn = flowsPerDpn;
112 this.batchSize = batchSize;
113 this.sleepMillis = sleepMillis;
114 this.sleepAfter = sleepAfter;
115 this.startTableId = startTableId;
116 this.endTableId = endTableId;
117 remainingTxReturn.set(flowsPerDpn/batchSize);
122 LOG.info("Starting flow writer task for dpid: {}. Number of transactions: {}", dpId, flowsPerDpn/batchSize);
123 writeOpStatus.set(FlowCounter.OperationStatus.IN_PROGRESS.status());
124 short tableId = startTableId;
125 int numSubmits = flowsPerDpn/batchSize;
127 int newBatchSize = batchSize;
129 for (int i = 1; i <= numSubmits; i++) {
130 WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
132 for (; sourceIp <= newBatchSize; sourceIp++) {
133 String flowId = "Flow-" + dpId + "." + k + "." + sourceIp;
136 Match match = BulkOMaticUtils.getMatch(sourceIp);
137 flow = BulkOMaticUtils.buildFlow(k, flowId, match);
140 addFlowToTx(writeTransaction, flowId,
141 BulkOMaticUtils.getFlowInstanceIdentifier(k, flowId, dpId), flow, sourceIp, k);
143 if (sourceIp < newBatchSize) {
145 short b = (short)(endTableId - startTableId + 1);
146 k = (short) (((k + a) % b) + startTableId);
149 Futures.addCallback(writeTransaction.submit(), new DsCallBack(dpId, tableId, k, sourceIp));
151 tableId = (short)(((k + 1)%((short)(endTableId - startTableId + 1))) + startTableId);
152 newBatchSize += batchSize;
153 if (((i%sleepAfter) == 0) && (sleepMillis > 0)) {
155 Thread.sleep(sleepMillis);
156 } catch (InterruptedException e) {
157 LOG.error("Writer Thread Interrupted: {}", e.getMessage());
163 private void addFlowToTx(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid, Flow flow, Integer sourceIp, Short tableId){
165 LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
166 writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, true);
168 LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid);
169 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid);
173 private class DsCallBack implements FutureCallback {
175 private int sourceIp;
176 private short endTableId;
177 private short beginTableId;
179 public DsCallBack(String dpId, Short beginTableId, Short endTableId, Integer sourceIp) {
181 this.sourceIp = sourceIp;
182 this.endTableId = endTableId;
183 this.beginTableId = beginTableId;
187 public void onSuccess(Object o) {
188 if (remainingTxReturn.decrementAndGet() <= 0) {
189 long dur = System.nanoTime() - startTime;
190 LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId,
192 if(0 == countDpnWriteCompletion.decrementAndGet() &&
193 writeOpStatus.get() != FlowCounter.OperationStatus.FAILURE.status()) {
194 writeOpStatus.set(FlowCounter.OperationStatus.SUCCESS.status());
195 taskCompletionTime.set(dur);
200 public void onFailure(Throwable error) {
201 if (remainingTxReturn.decrementAndGet() <= 0) {
202 long dur = System.nanoTime() - startTime;
203 LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId,
206 LOG.error("Error: {} in Datastore write operation: dpid: {}, begin tableId: {}, " +
207 "end tableId: {}, sourceIp: {} ", error, dpId, beginTableId, endTableId, sourceIp);
208 writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());