2 * Copyright (c) 2016 Ericsson Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.MoreExecutors;
12 import java.util.concurrent.ExecutorService;
13 import java.util.concurrent.atomic.AtomicInteger;
14 import java.util.concurrent.atomic.AtomicLong;
15 import org.opendaylight.mdsal.binding.api.DataBroker;
16 import org.opendaylight.mdsal.binding.api.WriteTransaction;
17 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
20 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
24 public class FlowWriterConcurrent implements FlowCounterMBean {
25 private static final Logger LOG = LoggerFactory.getLogger(FlowWriterConcurrent.class);
26 public static final String USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER =
27 "Using Concurrent implementation of Flow Writer.";
28 private final DataBroker dataBroker;
29 private final ExecutorService flowPusher;
30 private long startTime;
31 private final AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
32 private final AtomicInteger countDpnWriteCompletion = new AtomicInteger();
33 private final AtomicLong taskCompletionTime = new AtomicLong();
35 public FlowWriterConcurrent(final DataBroker dataBroker, ExecutorService flowPusher) {
36 this.dataBroker = dataBroker;
37 this.flowPusher = flowPusher;
38 LOG.info(USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER);
41 public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, int sleepMillis, int sleepAfter,
42 short startTableId, short endTableId, boolean isCreateParents) {
43 LOG.info(USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER);
44 countDpnWriteCompletion.set(dpnCount);
45 startTime = System.nanoTime();
46 for (int i = 1; i <= dpnCount; i++) {
47 FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, true, batchSize, sleepMillis,
48 sleepAfter, startTableId, endTableId, isCreateParents);
49 flowPusher.execute(task);
53 public void deleteFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, short startTableId,
55 LOG.info(USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER);
56 countDpnWriteCompletion.set(dpnCount);
57 for (int i = 1; i <= dpnCount; i++) {
58 FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize, 0, 1,
59 startTableId, endTableId, false);
60 flowPusher.execute(task);
65 public int getWriteOpStatus() {
66 return writeOpStatus.get();
70 public long getTaskCompletionTime() {
71 return taskCompletionTime.get();
74 private class FlowHandlerTask implements Runnable {
75 private final String dpId;
76 private final boolean add;
77 private final int flowsPerDpn;
78 private final int batchSize;
79 private final int sleepAfter;
80 private final int sleepMillis;
81 private final short startTableId;
82 private final short endTableId;
83 private final AtomicInteger remainingTxReturn = new AtomicInteger(0);
84 private final boolean isCreateParents;
86 FlowHandlerTask(final String dpId, final int flowsPerDpn,
87 final boolean add, final int batchSize,
88 final int sleepMillis, final int sleepAfter,
89 final short startTableId, final short endTableId,
90 final boolean isCreateParents) {
91 this.dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + dpId;
93 this.flowsPerDpn = flowsPerDpn;
94 this.batchSize = batchSize;
95 this.sleepMillis = sleepMillis;
96 this.sleepAfter = sleepAfter;
97 this.startTableId = startTableId;
98 this.endTableId = endTableId;
99 this.isCreateParents = isCreateParents;
100 remainingTxReturn.set(flowsPerDpn / batchSize);
105 LOG.info("Starting flow writer task for dpid: {}. Number of transactions: {}", dpId,
106 flowsPerDpn / batchSize);
107 writeOpStatus.set(FlowCounter.OperationStatus.IN_PROGRESS.status());
108 short tableId = startTableId;
109 int numSubmits = flowsPerDpn / batchSize;
111 int newBatchSize = batchSize;
113 for (int i = 1; i <= numSubmits; i++) {
114 WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
115 short calculatedTableId = tableId;
116 for (; sourceIp <= newBatchSize; sourceIp++) {
117 String flowId = "Flow-" + dpId + "." + calculatedTableId + "." + sourceIp;
120 Match match = BulkOMaticUtils.getMatch(sourceIp);
121 flow = BulkOMaticUtils.buildFlow(calculatedTableId, flowId, match);
124 addFlowToTx(writeTransaction, flowId,
125 BulkOMaticUtils.getFlowInstanceIdentifier(calculatedTableId, flowId, dpId), flow, sourceIp,
128 if (sourceIp < newBatchSize) {
130 short numberB = (short) (endTableId - startTableId + 1);
131 calculatedTableId = (short) ((calculatedTableId + numberA) % numberB + startTableId);
134 writeTransaction.commit().addCallback(
135 new DsCallBack(dpId, tableId, calculatedTableId, sourceIp), MoreExecutors.directExecutor());
137 tableId = (short) ((calculatedTableId + 1) % (short) (endTableId - startTableId + 1) + startTableId);
138 newBatchSize += batchSize;
139 if (i % sleepAfter == 0 && sleepMillis > 0) {
141 Thread.sleep(sleepMillis);
142 } catch (InterruptedException e) {
143 LOG.error("Writer Thread Interrupted: {}", e.getMessage());
149 private void addFlowToTx(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid,
150 Flow flow, Integer sourceIp, Short tableId) {
152 LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
153 if (isCreateParents) {
154 writeTransaction.mergeParentStructurePut(LogicalDatastoreType.CONFIGURATION, flowIid, flow);
156 writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow);
159 LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid);
160 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid);
164 private class DsCallBack implements FutureCallback<Object> {
165 private final String dpId;
166 private final int sourceIp;
167 private final short endTableId;
168 private final short beginTableId;
170 DsCallBack(String dpId, Short beginTableId, Short endTableId, Integer sourceIp) {
172 this.sourceIp = sourceIp;
173 this.endTableId = endTableId;
174 this.beginTableId = beginTableId;
178 public void onSuccess(Object object) {
179 if (remainingTxReturn.decrementAndGet() <= 0) {
180 long dur = System.nanoTime() - startTime;
181 LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId, dur);
182 if (0 == countDpnWriteCompletion.decrementAndGet()
183 && writeOpStatus.get() != FlowCounter.OperationStatus.FAILURE.status()) {
184 writeOpStatus.set(FlowCounter.OperationStatus.SUCCESS.status());
185 taskCompletionTime.set(dur);
191 public void onFailure(Throwable error) {
192 if (remainingTxReturn.decrementAndGet() <= 0) {
193 long dur = System.nanoTime() - startTime;
194 LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId, dur);
196 LOG.error("Error: {} in Datastore write operation: dpid: {}, begin tableId: {}, "
197 + "end tableId: {}, sourceIp: {} ", error, dpId, beginTableId, endTableId, sourceIp);
198 writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());