2 * Copyright (c) 2016 Ericsson Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.MoreExecutors;
14 import java.util.concurrent.ExecutorService;
15 import java.util.concurrent.atomic.AtomicInteger;
16 import java.util.concurrent.atomic.AtomicLong;
17 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
18 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
19 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
22 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
26 public class FlowWriterConcurrent implements FlowCounterMBean {
27 private static final Logger LOG = LoggerFactory.getLogger(FlowWriterConcurrent.class);
28 public static final String USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER =
29 "Using Concurrent implementation of Flow Writer.";
30 private final DataBroker dataBroker;
31 private final ExecutorService flowPusher;
32 private long startTime;
33 private final AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
34 private final AtomicInteger countDpnWriteCompletion = new AtomicInteger();
35 private final AtomicLong taskCompletionTime = new AtomicLong();
37 public FlowWriterConcurrent(final DataBroker dataBroker, ExecutorService flowPusher) {
38 this.dataBroker = dataBroker;
39 this.flowPusher = flowPusher;
40 LOG.info(USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER);
43 public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, int sleepMillis, int sleepAfter,
44 short startTableId, short endTableId, boolean isCreateParents) {
45 LOG.info(USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER);
46 countDpnWriteCompletion.set(dpnCount);
47 startTime = System.nanoTime();
48 for (int i = 1; i <= dpnCount; i++) {
49 FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, true, batchSize, sleepMillis,
50 sleepAfter, startTableId, endTableId, isCreateParents);
51 flowPusher.execute(task);
55 public void deleteFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, short startTableId,
57 LOG.info(USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER);
58 countDpnWriteCompletion.set(dpnCount);
59 for (int i = 1; i <= dpnCount; i++) {
60 FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize, 0, 1,
61 startTableId, endTableId, false);
62 flowPusher.execute(task);
67 public int getWriteOpStatus() {
68 return writeOpStatus.get();
72 public long getTaskCompletionTime() {
73 return taskCompletionTime.get();
76 private class FlowHandlerTask implements Runnable {
77 private final String dpId;
78 private final boolean add;
79 private final int flowsPerDpn;
80 private final int batchSize;
81 private final int sleepAfter;
82 private final int sleepMillis;
83 private final short startTableId;
84 private final short endTableId;
85 private final AtomicInteger remainingTxReturn = new AtomicInteger(0);
86 private final boolean isCreateParents;
88 FlowHandlerTask(final String dpId, final int flowsPerDpn,
89 final boolean add, final int batchSize,
90 final int sleepMillis, final int sleepAfter,
91 final short startTableId, final short endTableId,
92 final boolean isCreateParents) {
93 this.dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + dpId;
95 this.flowsPerDpn = flowsPerDpn;
96 this.batchSize = batchSize;
97 this.sleepMillis = sleepMillis;
98 this.sleepAfter = sleepAfter;
99 this.startTableId = startTableId;
100 this.endTableId = endTableId;
101 this.isCreateParents = isCreateParents;
102 remainingTxReturn.set(flowsPerDpn / batchSize);
107 LOG.info("Starting flow writer task for dpid: {}. Number of transactions: {}", dpId,
108 flowsPerDpn / batchSize);
109 writeOpStatus.set(FlowCounter.OperationStatus.IN_PROGRESS.status());
110 short tableId = startTableId;
111 int numSubmits = flowsPerDpn / batchSize;
113 int newBatchSize = batchSize;
115 for (int i = 1; i <= numSubmits; i++) {
116 WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
117 short calculatedTableId = tableId;
118 for (; sourceIp <= newBatchSize; sourceIp++) {
119 String flowId = "Flow-" + dpId + "." + calculatedTableId + "." + sourceIp;
122 Match match = BulkOMaticUtils.getMatch(sourceIp);
123 flow = BulkOMaticUtils.buildFlow(calculatedTableId, flowId, match);
126 addFlowToTx(writeTransaction, flowId,
127 BulkOMaticUtils.getFlowInstanceIdentifier(calculatedTableId, flowId, dpId), flow, sourceIp,
130 if (sourceIp < newBatchSize) {
132 short numberB = (short) (endTableId - startTableId + 1);
133 calculatedTableId = (short) ((calculatedTableId + numberA) % numberB + startTableId);
136 Futures.addCallback(writeTransaction.submit(),
137 new DsCallBack(dpId, tableId, calculatedTableId, sourceIp), MoreExecutors.directExecutor());
139 tableId = (short) ((calculatedTableId + 1) % (short) (endTableId - startTableId + 1) + startTableId);
140 newBatchSize += batchSize;
141 if (i % sleepAfter == 0 && sleepMillis > 0) {
143 Thread.sleep(sleepMillis);
144 } catch (InterruptedException e) {
145 LOG.error("Writer Thread Interrupted: {}", e.getMessage());
151 private void addFlowToTx(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid,
152 Flow flow, Integer sourceIp, Short tableId) {
154 LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
155 writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, isCreateParents);
157 LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid);
158 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid);
162 private class DsCallBack implements FutureCallback {
163 private final String dpId;
164 private final int sourceIp;
165 private final short endTableId;
166 private final short beginTableId;
168 DsCallBack(String dpId, Short beginTableId, Short endTableId, Integer sourceIp) {
170 this.sourceIp = sourceIp;
171 this.endTableId = endTableId;
172 this.beginTableId = beginTableId;
176 public void onSuccess(Object object) {
177 if (remainingTxReturn.decrementAndGet() <= 0) {
178 long dur = System.nanoTime() - startTime;
179 LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId, dur);
180 if (0 == countDpnWriteCompletion.decrementAndGet()
181 && writeOpStatus.get() != FlowCounter.OperationStatus.FAILURE.status()) {
182 writeOpStatus.set(FlowCounter.OperationStatus.SUCCESS.status());
183 taskCompletionTime.set(dur);
189 public void onFailure(Throwable error) {
190 if (remainingTxReturn.decrementAndGet() <= 0) {
191 long dur = System.nanoTime() - startTime;
192 LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId, dur);
194 LOG.error("Error: {} in Datastore write operation: dpid: {}, begin tableId: {}, "
195 + "end tableId: {}, sourceIp: {} ", error, dpId, beginTableId, endTableId, sourceIp);
196 writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());