2 * Copyright (c) 2016 Ericsson Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.Futures;
12 import java.util.concurrent.ExecutorService;
13 import java.util.concurrent.atomic.AtomicInteger;
14 import java.util.concurrent.atomic.AtomicLong;
15 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
16 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
17 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
20 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
24 public class FlowWriterSequential implements FlowCounterMBean {
25 private static final Logger LOG = LoggerFactory.getLogger(FlowWriterSequential.class);
26 private final DataBroker dataBroker;
27 private final ExecutorService flowPusher;
28 protected int dpnCount;
29 private long startTime;
30 private AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
31 private AtomicInteger countDpnWriteCompletion = new AtomicInteger();
32 private AtomicLong taskCompletionTime = new AtomicLong();
34 public FlowWriterSequential(final DataBroker dataBroker, ExecutorService flowPusher) {
35 this.dataBroker = dataBroker;
36 this.flowPusher = flowPusher;
37 LOG.info("Using Sequential implementation of Flow Writer.");
40 public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, int sleepMillis,
41 short startTableId, short endTableId, boolean isCreateParents) {
42 LOG.info("Using Sequential implementation of Flow Writer.");
43 this.dpnCount = dpnCount;
44 countDpnWriteCompletion.set(dpnCount);
45 startTime = System.nanoTime();
46 for (int i = 1; i <= dpnCount; i++) {
47 FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, true, batchSize,
48 sleepMillis, startTableId, endTableId, isCreateParents);
49 flowPusher.execute(task);
53 public void deleteFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, short startTableId,
55 LOG.info("Using Sequential implementation of Flow Writer.");
56 countDpnWriteCompletion.set(dpnCount);
57 for (int i = 1; i <= dpnCount; i++) {
58 FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize, 0,
59 startTableId, endTableId, false);
60 flowPusher.execute(task);
65 public int getWriteOpStatus() {
66 return writeOpStatus.get();
70 public long getTaskCompletionTime() {
71 return taskCompletionTime.get();
74 private class FlowHandlerTask implements Runnable {
75 private final String dpId;
76 private final int flowsPerDpn;
77 private final boolean add;
78 private final int batchSize;
79 private final int sleepMillis;
80 private final short startTableId;
81 private final short endTableId;
82 private final boolean isCreateParents;
84 public FlowHandlerTask(final String dpId,
85 final int flowsPerDpn,
89 final short startTableId,
90 final short endTableId,
91 final boolean isCreateParents){
92 this.dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + dpId;
94 this.flowsPerDpn = flowsPerDpn;
95 this.batchSize = batchSize;
96 this.sleepMillis = sleepMillis;
97 this.startTableId = startTableId;
98 this.endTableId = endTableId;
99 this.isCreateParents = isCreateParents;
104 LOG.info("Starting flow writer task for dpid: {}. Number of transactions: {}", dpId, flowsPerDpn/batchSize);
105 writeOpStatus.set(FlowCounter.OperationStatus.IN_PROGRESS.status());
107 Short tableId = startTableId;
108 Integer sourceIp = 1;
110 WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
113 for (; sourceIp <= batchSize; sourceIp++) {
114 String flowId = "Flow-" + dpId + "." + k + "." + sourceIp;
115 LOG.debug("Adding flow with id: {}", flowId);
118 Match match = BulkOMaticUtils.getMatch(sourceIp);
119 flow = BulkOMaticUtils.buildFlow(k, flowId, match);
121 addFlowToTx(writeTransaction, flowId,
122 BulkOMaticUtils.getFlowInstanceIdentifier(k, flowId, dpId), flow);
124 if (sourceIp < batchSize) {
126 short b = (short)(endTableId - startTableId + 1);
127 k = (short) (((k + a) % b) + startTableId);
131 LOG.debug("Submitting Txn for dpId: {}, begin tableId: {}, end tableId: {}, sourceIp: {}", dpId, tableId, k, sourceIp);
133 Futures.addCallback(writeTransaction.submit(), new DsCallBack(dpId, sourceIp, k));
136 private void addFlowToTx(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid,
139 LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
140 writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, isCreateParents);
142 LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid);
143 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid);
147 private class DsCallBack implements FutureCallback {
149 private Integer sourceIp;
150 private Short tableId;
152 public DsCallBack(String dpId, Integer sourceIp, Short tableId) {
154 this.sourceIp = sourceIp;
156 short b = (short)(endTableId - startTableId + 1);
157 this.tableId = (short) (((tableId + a) % b) + startTableId);
161 public void onSuccess(Object o) {
162 if (sourceIp > flowsPerDpn) {
163 long dur = System.nanoTime() - startTime;
164 LOG.info("Completed all flows installation for: dpid: {}, tableId: {}, sourceIp: {} in {}ns", dpId,
165 tableId, sourceIp, dur);
166 if(0 == countDpnWriteCompletion.decrementAndGet() &&
167 writeOpStatus.get() != FlowCounter.OperationStatus.FAILURE.status()) {
168 writeOpStatus.set(FlowCounter.OperationStatus.SUCCESS.status());
169 taskCompletionTime.set(dur);
174 if (sleepMillis > 0) {
175 Thread.sleep(sleepMillis);
177 } catch (InterruptedException e) {
178 LOG.error("Writer Thread Interrupted while sleeping: {}", e.getMessage());
181 WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
182 int newBatchSize = sourceIp + batchSize - 1;
184 for (; sourceIp <= newBatchSize; sourceIp++) {
185 String flowId = "Flow-" + dpId + "." + k + "." + sourceIp;
188 Match match = BulkOMaticUtils.getMatch(sourceIp);
189 flow = BulkOMaticUtils.buildFlow(k, flowId, match);
191 LOG.debug("Adding flow with id: {}", flowId);
192 addFlowToTx(writeTransaction, flowId,
193 BulkOMaticUtils.getFlowInstanceIdentifier(k, flowId, dpId), flow);
195 if (sourceIp < newBatchSize) {
197 short b = (short)(endTableId - startTableId + 1);
198 k = (short) (((k + a) % b) + startTableId);
201 LOG.debug("OnSuccess: Submitting Txn for dpId: {}, begin tableId: {}, end tableId: {}, sourceIp: {}",
202 dpId, tableId, k, sourceIp);
203 Futures.addCallback(writeTransaction.submit(), new DsCallBack(dpId, sourceIp, k));
206 public void onFailure(Throwable error) {
207 LOG.error("Error: {} in Datastore write operation: dpid: {}, tableId: {}, sourceIp: {}",
208 error, dpId, tableId, sourceIp);
209 writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());