2 * Copyright (c) 2016 Ericsson Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.Futures;
12 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
13 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
14 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
15 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
16 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
17 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import java.util.concurrent.atomic.AtomicLong;
25 public class FlowWriterSequential implements FlowCounterMBean {
26 private static final Logger LOG = LoggerFactory.getLogger(FlowWriterSequential.class);
27 private final DataBroker dataBroker;
28 private final ExecutorService flowPusher;
29 protected int dpnCount;
30 private long startTime;
31 private AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
32 private AtomicInteger countDpnWriteCompletion = new AtomicInteger(0);
33 private AtomicLong taskCompletionTime = new AtomicLong(0);
34 private static final String UNITS = "ns";
36 public FlowWriterSequential(final DataBroker dataBroker, ExecutorService flowPusher) {
37 this.dataBroker = dataBroker;
38 this.flowPusher = flowPusher;
39 LOG.info("Using Sequential implementation of Flow Writer.");
42 public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, int sleepMillis,
43 short startTableId, short endTableId) {
44 LOG.info("Using Sequential implementation of Flow Writer.");
45 this.dpnCount = dpnCount;
46 countDpnWriteCompletion.set(dpnCount);
47 startTime = System.nanoTime();
48 for (int i = 1; i <= dpnCount; i++) {
49 FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, true, batchSize,
50 sleepMillis, startTableId, endTableId);
51 flowPusher.execute(task);
55 public void deleteFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, short startTableId,
57 LOG.info("Using Sequential implementation of Flow Writer.");
58 countDpnWriteCompletion.set(dpnCount);
59 for (int i = 1; i <= dpnCount; i++) {
60 FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize, 0,
61 startTableId, endTableId);
62 flowPusher.execute(task);
67 public long getFlowCount() {
68 return BulkOMaticUtils.DEFAULT_FLOW_COUNT;
72 public int getReadOpStatus() {
73 return BulkOMaticUtils.DEFUALT_STATUS;
77 public int getWriteOpStatus() {
78 return writeOpStatus.get();
82 public long getTaskCompletionTime() {
83 return taskCompletionTime.get();
87 public String getUnits() {
91 private class FlowHandlerTask implements Runnable {
92 private final String dpId;
93 private final int flowsPerDpn;
94 private final boolean add;
95 private final int batchSize;
96 private final int sleepMillis;
97 private final short startTableId;
98 private final short endTableId;
100 public FlowHandlerTask(final String dpId,
101 final int flowsPerDpn,
105 final short startTableId,
106 final short endTableId){
107 this.dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + dpId;
109 this.flowsPerDpn = flowsPerDpn;
110 this.batchSize = batchSize;
111 this.sleepMillis = sleepMillis;
112 this.startTableId = startTableId;
113 this.endTableId = endTableId;
118 LOG.info("Starting flow writer task for dpid: {}. Number of transactions: {}", dpId, flowsPerDpn/batchSize);
119 writeOpStatus.set(FlowCounter.OperationStatus.IN_PROGRESS.status());
121 Short tableId = startTableId;
122 Integer sourceIp = 1;
124 WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
127 for (; sourceIp <= batchSize; sourceIp++) {
128 String flowId = "Flow-" + dpId + "." + k + "." + sourceIp;
129 LOG.debug("Adding flow with id: {}", flowId);
132 Match match = BulkOMaticUtils.getMatch(sourceIp);
133 flow = BulkOMaticUtils.buildFlow(k, flowId, match);
135 addFlowToTx(writeTransaction, flowId,
136 BulkOMaticUtils.getFlowInstanceIdentifier(k, flowId, dpId), flow);
138 if (sourceIp < batchSize) {
140 short b = (short)(endTableId - startTableId + 1);
141 k = (short) (((k + a) % b) + startTableId);
145 LOG.debug("Submitting Txn for dpId: {}, begin tableId: {}, end tableId: {}, sourceIp: {}", dpId, tableId, k, sourceIp);
147 Futures.addCallback(writeTransaction.submit(), new DsCallBack(dpId, sourceIp, k));
150 private void addFlowToTx(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid,
153 LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
154 writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, true);
156 LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid);
157 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid);
161 private class DsCallBack implements FutureCallback {
163 private Integer sourceIp;
164 private Short tableId;
166 public DsCallBack(String dpId, Integer sourceIp, Short tableId) {
168 this.sourceIp = sourceIp;
170 short b = (short)(endTableId - startTableId + 1);
171 this.tableId = (short) (((tableId + a) % b) + startTableId);
175 public void onSuccess(Object o) {
176 if (sourceIp > flowsPerDpn) {
177 long dur = System.nanoTime() - startTime;
178 LOG.info("Completed all flows installation for: dpid: {}, tableId: {}, sourceIp: {} in {}ns", dpId,
179 tableId, sourceIp, dur);
180 if(0 == countDpnWriteCompletion.decrementAndGet() &&
181 writeOpStatus.get() != FlowCounter.OperationStatus.FAILURE.status()) {
182 writeOpStatus.set(FlowCounter.OperationStatus.SUCCESS.status());
183 taskCompletionTime.set(dur);
188 if (sleepMillis > 0) {
189 Thread.sleep(sleepMillis);
191 } catch (InterruptedException e) {
192 LOG.error("Writer Thread Interrupted while sleeping: {}", e.getMessage());
195 WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
196 int newBatchSize = sourceIp + batchSize - 1;
198 for (; sourceIp <= newBatchSize; sourceIp++) {
199 String flowId = "Flow-" + dpId + "." + k + "." + sourceIp;
202 Match match = BulkOMaticUtils.getMatch(sourceIp);
203 flow = BulkOMaticUtils.buildFlow(k, flowId, match);
205 LOG.debug("Adding flow with id: {}", flowId);
206 addFlowToTx(writeTransaction, flowId,
207 BulkOMaticUtils.getFlowInstanceIdentifier(k, flowId, dpId), flow);
209 if (sourceIp < newBatchSize) {
211 short b = (short)(endTableId - startTableId + 1);
212 k = (short) (((k + a) % b) + startTableId);
215 LOG.debug("OnSuccess: Submitting Txn for dpId: {}, begin tableId: {}, end tableId: {}, sourceIp: {}",
216 dpId, tableId, k, sourceIp);
217 Futures.addCallback(writeTransaction.submit(), new DsCallBack(dpId, sourceIp, k));
220 public void onFailure(Throwable error) {
221 LOG.error("Error: {} in Datastore write operation: dpid: {}, tableId: {}, sourceIp: {}",
222 error, dpId, tableId, sourceIp);
223 writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());