2 * Copyright (c) 2016 Ericsson Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.MoreExecutors;
12 import java.util.concurrent.ExecutorService;
13 import java.util.concurrent.atomic.AtomicInteger;
14 import java.util.concurrent.atomic.AtomicLong;
15 import org.opendaylight.mdsal.binding.api.DataBroker;
16 import org.opendaylight.mdsal.binding.api.WriteTransaction;
17 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
20 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
24 public class FlowWriterConcurrent implements FlowCounterMBean {
25 private static final Logger LOG = LoggerFactory.getLogger(FlowWriterConcurrent.class);
26 public static final String USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER =
27 "Using Concurrent implementation of Flow Writer.";
28 private final DataBroker dataBroker;
29 private final ExecutorService flowPusher;
30 private long startTime;
31 private final AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
32 private final AtomicInteger countDpnWriteCompletion = new AtomicInteger();
33 private final AtomicLong taskCompletionTime = new AtomicLong();
35 public FlowWriterConcurrent(final DataBroker dataBroker, ExecutorService flowPusher) {
36 this.dataBroker = dataBroker;
37 this.flowPusher = flowPusher;
38 LOG.info(USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER);
41 public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, int sleepMillis, int sleepAfter,
42 short startTableId, short endTableId, boolean isCreateParents) {
43 LOG.info(USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER);
44 countDpnWriteCompletion.set(dpnCount);
45 startTime = System.nanoTime();
46 for (int i = 1; i <= dpnCount; i++) {
47 FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, true, batchSize, sleepMillis,
48 sleepAfter, startTableId, endTableId, isCreateParents);
49 flowPusher.execute(task);
53 public void deleteFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, short startTableId,
55 LOG.info(USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER);
56 countDpnWriteCompletion.set(dpnCount);
57 for (int i = 1; i <= dpnCount; i++) {
58 FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize, 0, 1,
59 startTableId, endTableId, false);
60 flowPusher.execute(task);
65 public int getWriteOpStatus() {
66 return writeOpStatus.get();
70 public long getTaskCompletionTime() {
71 return taskCompletionTime.get();
74 private class FlowHandlerTask implements Runnable {
75 private final String dpId;
76 private final boolean add;
77 private final int flowsPerDpn;
78 private final int batchSize;
79 private final int sleepAfter;
80 private final int sleepMillis;
81 private final short startTableId;
82 private final short endTableId;
83 private final AtomicInteger remainingTxReturn = new AtomicInteger(0);
84 private final boolean isCreateParents;
86 FlowHandlerTask(final String dpId, final int flowsPerDpn,
87 final boolean add, final int batchSize,
88 final int sleepMillis, final int sleepAfter,
89 final short startTableId, final short endTableId,
90 final boolean isCreateParents) {
91 this.dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + dpId;
93 this.flowsPerDpn = flowsPerDpn;
94 this.batchSize = batchSize;
95 this.sleepMillis = sleepMillis;
96 this.sleepAfter = sleepAfter;
97 this.startTableId = startTableId;
98 this.endTableId = endTableId;
99 this.isCreateParents = isCreateParents;
100 remainingTxReturn.set(flowsPerDpn / batchSize);
105 LOG.info("Starting flow writer task for dpid: {}. Number of transactions: {}", dpId,
106 flowsPerDpn / batchSize);
107 writeOpStatus.set(FlowCounter.OperationStatus.IN_PROGRESS.status());
108 short tableId = startTableId;
109 int numSubmits = flowsPerDpn / batchSize;
111 int newBatchSize = batchSize;
113 for (int i = 1; i <= numSubmits; i++) {
114 WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
115 short calculatedTableId = tableId;
116 for (; sourceIp <= newBatchSize; sourceIp++) {
117 String flowId = "Flow-" + dpId + "." + calculatedTableId + "." + sourceIp;
120 Match match = BulkOMaticUtils.getMatch(sourceIp);
121 flow = BulkOMaticUtils.buildFlow(calculatedTableId, flowId, match);
124 addFlowToTx(writeTransaction, flowId,
125 BulkOMaticUtils.getFlowInstanceIdentifier(calculatedTableId, flowId, dpId), flow, sourceIp,
128 if (sourceIp < newBatchSize) {
130 short numberB = (short) (endTableId - startTableId + 1);
131 calculatedTableId = (short) ((calculatedTableId + numberA) % numberB + startTableId);
134 writeTransaction.commit().addCallback(
135 new DsCallBack(dpId, tableId, calculatedTableId, sourceIp), MoreExecutors.directExecutor());
137 tableId = (short) ((calculatedTableId + 1) % (short) (endTableId - startTableId + 1) + startTableId);
138 newBatchSize += batchSize;
139 if (i % sleepAfter == 0 && sleepMillis > 0) {
141 Thread.sleep(sleepMillis);
142 } catch (InterruptedException e) {
143 LOG.error("Writer Thread Interrupted: {}", e.getMessage());
149 private void addFlowToTx(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid,
150 Flow flow, Integer sourceIp, Short tableId) {
152 LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
153 writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, isCreateParents);
155 LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid);
156 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid);
160 private class DsCallBack implements FutureCallback<Object> {
161 private final String dpId;
162 private final int sourceIp;
163 private final short endTableId;
164 private final short beginTableId;
166 DsCallBack(String dpId, Short beginTableId, Short endTableId, Integer sourceIp) {
168 this.sourceIp = sourceIp;
169 this.endTableId = endTableId;
170 this.beginTableId = beginTableId;
174 public void onSuccess(Object object) {
175 if (remainingTxReturn.decrementAndGet() <= 0) {
176 long dur = System.nanoTime() - startTime;
177 LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId, dur);
178 if (0 == countDpnWriteCompletion.decrementAndGet()
179 && writeOpStatus.get() != FlowCounter.OperationStatus.FAILURE.status()) {
180 writeOpStatus.set(FlowCounter.OperationStatus.SUCCESS.status());
181 taskCompletionTime.set(dur);
187 public void onFailure(Throwable error) {
188 if (remainingTxReturn.decrementAndGet() <= 0) {
189 long dur = System.nanoTime() - startTime;
190 LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId, dur);
192 LOG.error("Error: {} in Datastore write operation: dpid: {}, begin tableId: {}, "
193 + "end tableId: {}, sourceIp: {} ", error, dpId, beginTableId, endTableId, sourceIp);
194 writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());