2 * Copyright (c) 2016, 2017 Ericsson Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.MoreExecutors;
14 import java.util.concurrent.ExecutorService;
15 import java.util.concurrent.atomic.AtomicInteger;
16 import java.util.concurrent.atomic.AtomicLong;
17 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
18 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
19 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
22 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
26 public class FlowWriterSequential implements FlowCounterMBean {
27 private static final Logger LOG = LoggerFactory.getLogger(FlowWriterSequential.class);
28 private final DataBroker dataBroker;
29 private final ExecutorService flowPusher;
30 protected int dpnCount;
31 private long startTime;
32 private final AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
33 private final AtomicInteger countDpnWriteCompletion = new AtomicInteger();
34 private final AtomicLong taskCompletionTime = new AtomicLong();
36 public FlowWriterSequential(final DataBroker dataBroker, ExecutorService flowPusher) {
37 this.dataBroker = dataBroker;
38 this.flowPusher = flowPusher;
39 LOG.info("Using Sequential implementation of Flow Writer.");
42 public void addFlows(Integer count, Integer flowsPerDPN, int batchSize, int sleepMillis, short startTableId,
43 short endTableId, boolean isCreateParents) {
44 LOG.info("Using Sequential implementation of Flow Writer.");
45 this.dpnCount = count;
46 countDpnWriteCompletion.set(count);
47 startTime = System.nanoTime();
48 for (int i = 1; i <= count; i++) {
49 FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, true, batchSize, sleepMillis,
50 startTableId, endTableId, isCreateParents);
51 flowPusher.execute(task);
55 public void deleteFlows(Integer count, Integer flowsPerDPN, int batchSize, short startTableId,
57 LOG.info("Using Sequential implementation of Flow Writer.");
58 countDpnWriteCompletion.set(count);
59 for (int i = 1; i <= count; i++) {
60 FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize, 0,
61 startTableId, endTableId, false);
62 flowPusher.execute(task);
67 public int getWriteOpStatus() {
68 return writeOpStatus.get();
72 public long getTaskCompletionTime() {
73 return taskCompletionTime.get();
76 private class FlowHandlerTask implements Runnable {
77 private final String dpId;
78 private final int flowsPerDpn;
79 private final boolean add;
80 private final int batchSize;
81 private final int sleepMillis;
82 private final short startTableId;
83 private final short endTableId;
84 private final boolean isCreateParents;
86 FlowHandlerTask(final String dpId,
87 final int flowsPerDpn,
91 final short startTableId,
92 final short endTableId,
93 final boolean isCreateParents) {
94 this.dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + dpId;
96 this.flowsPerDpn = flowsPerDpn;
97 this.batchSize = batchSize;
98 this.sleepMillis = sleepMillis;
99 this.startTableId = startTableId;
100 this.endTableId = endTableId;
101 this.isCreateParents = isCreateParents;
106 LOG.info("Starting flow writer task for dpid: {}. Number of transactions: {}", dpId,
107 flowsPerDpn / batchSize);
108 writeOpStatus.set(FlowCounter.OperationStatus.IN_PROGRESS.status());
110 Short tableId = startTableId;
111 Integer sourceIp = 1;
113 WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
114 short calculatedTableId = tableId;
116 for (; sourceIp <= batchSize; sourceIp++) {
117 String flowId = "Flow-" + dpId + "." + calculatedTableId + "." + sourceIp;
118 LOG.debug("Adding flow with id: {}", flowId);
121 Match match = BulkOMaticUtils.getMatch(sourceIp);
122 flow = BulkOMaticUtils.buildFlow(calculatedTableId, flowId, match);
124 addFlowToTx(writeTransaction, flowId,
125 BulkOMaticUtils.getFlowInstanceIdentifier(calculatedTableId, flowId, dpId), flow);
127 if (sourceIp < batchSize) {
129 short numberB = (short) (endTableId - startTableId + 1);
130 calculatedTableId = (short) ((calculatedTableId + numberA) % numberB + startTableId);
134 LOG.debug("Submitting Txn for dpId: {}, begin tableId: {}, end tableId: {}, sourceIp: {}", dpId, tableId,
135 calculatedTableId, sourceIp);
137 Futures.addCallback(writeTransaction.submit(), new DsCallBack(dpId, sourceIp, calculatedTableId),
138 MoreExecutors.directExecutor());
141 private void addFlowToTx(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid,
144 LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
145 writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, isCreateParents);
147 LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid);
148 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid);
152 private class DsCallBack implements FutureCallback {
153 private final String dpId;
154 private Integer sourceIp;
155 private final Short tableId;
157 DsCallBack(String dpId, Integer sourceIp, Short tableId) {
159 this.sourceIp = sourceIp;
161 short numberB = (short) (endTableId - startTableId + 1);
162 this.tableId = (short) ((tableId + numberA) % numberB + startTableId);
166 public void onSuccess(Object object) {
167 if (sourceIp > flowsPerDpn) {
168 long dur = System.nanoTime() - startTime;
169 LOG.info("Completed all flows installation for: dpid: {}, tableId: {}, sourceIp: {} in {}ns", dpId,
170 tableId, sourceIp, dur);
171 if (0 == countDpnWriteCompletion.decrementAndGet()
172 && writeOpStatus.get() != FlowCounter.OperationStatus.FAILURE.status()) {
173 writeOpStatus.set(FlowCounter.OperationStatus.SUCCESS.status());
174 taskCompletionTime.set(dur);
179 if (sleepMillis > 0) {
180 Thread.sleep(sleepMillis);
182 } catch (InterruptedException e) {
183 LOG.error("Writer Thread Interrupted while sleeping: {}", e.getMessage());
186 WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
187 int newBatchSize = sourceIp + batchSize - 1;
188 short calculatedTableId = tableId;
189 for (; sourceIp <= newBatchSize; sourceIp++) {
190 String flowId = "Flow-" + dpId + "." + calculatedTableId + "." + sourceIp;
193 Match match = BulkOMaticUtils.getMatch(sourceIp);
194 flow = BulkOMaticUtils.buildFlow(calculatedTableId, flowId, match);
196 LOG.debug("Adding flow with id: {}", flowId);
197 addFlowToTx(writeTransaction, flowId,
198 BulkOMaticUtils.getFlowInstanceIdentifier(calculatedTableId, flowId, dpId),
201 if (sourceIp < newBatchSize) {
203 short numberB = (short) (endTableId - startTableId + 1);
204 calculatedTableId = (short) ((calculatedTableId + numberA) % numberB + startTableId);
207 LOG.debug("OnSuccess: Submitting Txn for dpId: {}, begin tableId: {}, end tableId: {}, sourceIp: {}",
208 dpId, tableId, calculatedTableId, sourceIp);
209 Futures.addCallback(writeTransaction.submit(), new DsCallBack(dpId, sourceIp, calculatedTableId),
210 MoreExecutors.directExecutor());
214 public void onFailure(Throwable error) {
215 LOG.error("Error: {} in Datastore write operation: dpid: {}, tableId: {}, sourceIp: {}", error, dpId,
217 writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());