Switch to MD-SAL APIs
[openflowplugin.git] / applications / bulk-o-matic / src / main / java / org / opendaylight / openflowplugin / applications / bulk / o / matic / FlowWriterConcurrent.java
1 /*
2  * Copyright (c) 2016 Ericsson Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
9
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.MoreExecutors;
12 import java.util.concurrent.ExecutorService;
13 import java.util.concurrent.atomic.AtomicInteger;
14 import java.util.concurrent.atomic.AtomicLong;
15 import org.opendaylight.mdsal.binding.api.DataBroker;
16 import org.opendaylight.mdsal.binding.api.WriteTransaction;
17 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
20 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23
24 public class FlowWriterConcurrent implements FlowCounterMBean {
25     private static final Logger LOG = LoggerFactory.getLogger(FlowWriterConcurrent.class);
26     public static final String USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER =
27             "Using Concurrent implementation of Flow Writer.";
28     private final DataBroker dataBroker;
29     private final ExecutorService flowPusher;
30     private long startTime;
31     private final AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
32     private final AtomicInteger countDpnWriteCompletion = new AtomicInteger();
33     private final AtomicLong taskCompletionTime = new AtomicLong();
34
35     public FlowWriterConcurrent(final DataBroker dataBroker, ExecutorService flowPusher) {
36         this.dataBroker = dataBroker;
37         this.flowPusher = flowPusher;
38         LOG.info(USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER);
39     }
40
41     public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, int sleepMillis, int sleepAfter,
42             short startTableId, short endTableId, boolean isCreateParents) {
43         LOG.info(USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER);
44         countDpnWriteCompletion.set(dpnCount);
45         startTime = System.nanoTime();
46         for (int i = 1; i <= dpnCount; i++) {
47             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, true, batchSize, sleepMillis,
48                     sleepAfter, startTableId, endTableId, isCreateParents);
49             flowPusher.execute(task);
50         }
51     }
52
53     public void deleteFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, short startTableId,
54             short endTableId) {
55         LOG.info(USING_CONCURRENT_IMPLEMENTATION_OF_FLOW_WRITER);
56         countDpnWriteCompletion.set(dpnCount);
57         for (int i = 1; i <= dpnCount; i++) {
58             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize, 0, 1,
59                     startTableId, endTableId, false);
60             flowPusher.execute(task);
61         }
62     }
63
64     @Override
65     public int getWriteOpStatus() {
66         return writeOpStatus.get();
67     }
68
69     @Override
70     public long getTaskCompletionTime() {
71         return taskCompletionTime.get();
72     }
73
74     private class FlowHandlerTask implements Runnable {
75         private final String dpId;
76         private final boolean add;
77         private final int flowsPerDpn;
78         private final int batchSize;
79         private final int sleepAfter;
80         private final int sleepMillis;
81         private final short startTableId;
82         private final short endTableId;
83         private final AtomicInteger remainingTxReturn = new AtomicInteger(0);
84         private final boolean isCreateParents;
85
86         FlowHandlerTask(final String dpId, final int flowsPerDpn,
87                 final boolean add, final int batchSize,
88                 final int sleepMillis, final int sleepAfter,
89                 final short startTableId, final short endTableId,
90                 final boolean isCreateParents) {
91             this.dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + dpId;
92             this.add = add;
93             this.flowsPerDpn = flowsPerDpn;
94             this.batchSize = batchSize;
95             this.sleepMillis = sleepMillis;
96             this.sleepAfter = sleepAfter;
97             this.startTableId = startTableId;
98             this.endTableId = endTableId;
99             this.isCreateParents = isCreateParents;
100             remainingTxReturn.set(flowsPerDpn / batchSize);
101         }
102
103         @Override
104         public void run() {
105             LOG.info("Starting flow writer task for dpid: {}. Number of transactions: {}", dpId,
106                     flowsPerDpn / batchSize);
107             writeOpStatus.set(FlowCounter.OperationStatus.IN_PROGRESS.status());
108             short tableId = startTableId;
109             int numSubmits = flowsPerDpn / batchSize;
110             int sourceIp = 1;
111             int newBatchSize = batchSize;
112
113             for (int i = 1; i <= numSubmits; i++) {
114                 WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
115                 short calculatedTableId = tableId;
116                 for (; sourceIp <= newBatchSize; sourceIp++) {
117                     String flowId = "Flow-" + dpId + "." + calculatedTableId + "." + sourceIp;
118                     Flow flow = null;
119                     if (add) {
120                         Match match = BulkOMaticUtils.getMatch(sourceIp);
121                         flow = BulkOMaticUtils.buildFlow(calculatedTableId, flowId, match);
122                     }
123
124                     addFlowToTx(writeTransaction, flowId,
125                             BulkOMaticUtils.getFlowInstanceIdentifier(calculatedTableId, flowId, dpId), flow, sourceIp,
126                             calculatedTableId);
127
128                     if (sourceIp < newBatchSize) {
129                         short numberA = 1;
130                         short numberB = (short) (endTableId - startTableId + 1);
131                         calculatedTableId = (short) ((calculatedTableId + numberA) % numberB + startTableId);
132                     }
133                 }
134                 writeTransaction.commit().addCallback(
135                         new DsCallBack(dpId, tableId, calculatedTableId, sourceIp), MoreExecutors.directExecutor());
136                 // Wrap around
137                 tableId = (short) ((calculatedTableId + 1) % (short) (endTableId - startTableId + 1) + startTableId);
138                 newBatchSize += batchSize;
139                 if (i % sleepAfter == 0 && sleepMillis > 0) {
140                     try {
141                         Thread.sleep(sleepMillis);
142                     } catch (InterruptedException e) {
143                         LOG.error("Writer Thread Interrupted: {}", e.getMessage());
144                     }
145                 }
146             }
147         }
148
149         private void addFlowToTx(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid,
150                 Flow flow, Integer sourceIp, Short tableId) {
151             if (add) {
152                 LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
153                 writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, isCreateParents);
154             } else {
155                 LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid);
156                 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid);
157             }
158         }
159
160         private class DsCallBack implements FutureCallback<Object> {
161             private final String dpId;
162             private final int sourceIp;
163             private final short endTableId;
164             private final short beginTableId;
165
166             DsCallBack(String dpId, Short beginTableId, Short endTableId, Integer sourceIp) {
167                 this.dpId = dpId;
168                 this.sourceIp = sourceIp;
169                 this.endTableId = endTableId;
170                 this.beginTableId = beginTableId;
171             }
172
173             @Override
174             public void onSuccess(Object object) {
175                 if (remainingTxReturn.decrementAndGet() <= 0) {
176                     long dur = System.nanoTime() - startTime;
177                     LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId, dur);
178                     if (0 == countDpnWriteCompletion.decrementAndGet()
179                             && writeOpStatus.get() != FlowCounter.OperationStatus.FAILURE.status()) {
180                         writeOpStatus.set(FlowCounter.OperationStatus.SUCCESS.status());
181                         taskCompletionTime.set(dur);
182                     }
183                 }
184             }
185
186             @Override
187             public void onFailure(Throwable error) {
188                 if (remainingTxReturn.decrementAndGet() <= 0) {
189                     long dur = System.nanoTime() - startTime;
190                     LOG.info("Completed all flows installation for: dpid: {} in {}ns", dpId, dur);
191                 }
192                 LOG.error("Error: {} in Datastore write operation: dpid: {}, begin tableId: {}, "
193                         + "end tableId: {}, sourceIp: {} ", error, dpId, beginTableId, endTableId, sourceIp);
194                 writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());
195             }
196         }
197     }
198 }