Merge "Bug 8293: Add table writer to bulk-o-matic"
[openflowplugin.git] / applications / bulk-o-matic / src / main / java / org / opendaylight / openflowplugin / applications / bulk / o / matic / FlowWriterSequential.java
1 /*
2  * Copyright (c) 2016 Ericsson Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
9
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.Futures;
12 import java.util.concurrent.ExecutorService;
13 import java.util.concurrent.atomic.AtomicInteger;
14 import java.util.concurrent.atomic.AtomicLong;
15 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
16 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
17 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
20 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23
24 public class FlowWriterSequential implements FlowCounterMBean {
25     private static final Logger LOG = LoggerFactory.getLogger(FlowWriterSequential.class);
26     private final DataBroker dataBroker;
27     private final ExecutorService flowPusher;
28     protected int dpnCount;
29     private long startTime;
30     private AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
31     private AtomicInteger countDpnWriteCompletion = new AtomicInteger();
32     private AtomicLong taskCompletionTime = new AtomicLong();
33
34     public FlowWriterSequential(final DataBroker dataBroker, ExecutorService flowPusher) {
35         this.dataBroker = dataBroker;
36         this.flowPusher = flowPusher;
37         LOG.info("Using Sequential implementation of Flow Writer.");
38     }
39
40     public void addFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, int sleepMillis,
41                          short startTableId, short endTableId, boolean isCreateParents) {
42         LOG.info("Using Sequential implementation of Flow Writer.");
43         this.dpnCount = dpnCount;
44         countDpnWriteCompletion.set(dpnCount);
45         startTime = System.nanoTime();
46         for (int i = 1; i <= dpnCount; i++) {
47             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, true, batchSize,
48                     sleepMillis, startTableId, endTableId, isCreateParents);
49             flowPusher.execute(task);
50         }
51     }
52
53     public void deleteFlows(Integer dpnCount, Integer flowsPerDPN, int batchSize, short startTableId,
54                             short endTableId) {
55         LOG.info("Using Sequential implementation of Flow Writer.");
56         countDpnWriteCompletion.set(dpnCount);
57         for (int i = 1; i <= dpnCount; i++) {
58             FlowHandlerTask task = new FlowHandlerTask(Integer.toString(i), flowsPerDPN, false, batchSize, 0,
59                     startTableId, endTableId, false);
60             flowPusher.execute(task);
61         }
62     }
63
64     @Override
65     public int getWriteOpStatus() {
66         return writeOpStatus.get();
67     }
68
69     @Override
70     public long getTaskCompletionTime() {
71         return taskCompletionTime.get();
72     }
73
74     private class FlowHandlerTask implements Runnable {
75         private final String dpId;
76         private final int flowsPerDpn;
77         private final boolean add;
78         private final int batchSize;
79         private final int sleepMillis;
80         private final short startTableId;
81         private final short endTableId;
82         private final boolean isCreateParents;
83
84         public FlowHandlerTask(final String dpId,
85                                final int flowsPerDpn,
86                                final boolean add,
87                                final int batchSize,
88                                int sleepMillis,
89                                final short startTableId,
90                                final short endTableId,
91                                final boolean isCreateParents){
92             this.dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + dpId;
93             this.add = add;
94             this.flowsPerDpn = flowsPerDpn;
95             this.batchSize = batchSize;
96             this.sleepMillis = sleepMillis;
97             this.startTableId = startTableId;
98             this.endTableId = endTableId;
99             this.isCreateParents = isCreateParents;
100         }
101
102         @Override
103         public void run() {
104             LOG.info("Starting flow writer task for dpid: {}. Number of transactions: {}", dpId, flowsPerDpn/batchSize);
105             writeOpStatus.set(FlowCounter.OperationStatus.IN_PROGRESS.status());
106
107             Short tableId = startTableId;
108             Integer sourceIp = 1;
109
110             WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
111             short k = tableId;
112
113             for (; sourceIp <= batchSize; sourceIp++) {
114                 String flowId = "Flow-" + dpId + "." + k + "." + sourceIp;
115                 LOG.debug("Adding flow with id: {}", flowId);
116                 Flow flow = null;
117                 if (add) {
118                     Match match = BulkOMaticUtils.getMatch(sourceIp);
119                     flow = BulkOMaticUtils.buildFlow(k, flowId, match);
120                 }
121                 addFlowToTx(writeTransaction, flowId,
122                         BulkOMaticUtils.getFlowInstanceIdentifier(k, flowId, dpId), flow);
123
124                 if (sourceIp < batchSize) {
125                     short a = 1;
126                     short b = (short)(endTableId - startTableId + 1);
127                     k = (short) (((k + a) % b) + startTableId);
128                 }
129             }
130
131             LOG.debug("Submitting Txn for dpId: {}, begin tableId: {}, end tableId: {}, sourceIp: {}", dpId, tableId, k, sourceIp);
132
133             Futures.addCallback(writeTransaction.submit(), new DsCallBack(dpId, sourceIp, k));
134         }
135
136         private void addFlowToTx(WriteTransaction writeTransaction, String flowId, InstanceIdentifier<Flow> flowIid,
137                                  Flow flow) {
138             if (add) {
139                 LOG.trace("Adding flow for flowId: {}, flowIid: {}", flowId, flowIid);
140                 writeTransaction.put(LogicalDatastoreType.CONFIGURATION, flowIid, flow, isCreateParents);
141             } else {
142                 LOG.trace("Deleting flow for flowId: {}, flowIid: {}", flowId, flowIid);
143                 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, flowIid);
144             }
145         }
146
147         private class DsCallBack implements FutureCallback {
148             private String dpId;
149             private Integer sourceIp;
150             private Short tableId;
151
152             public DsCallBack(String dpId, Integer sourceIp, Short tableId) {
153                 this.dpId = dpId;
154                 this.sourceIp = sourceIp;
155                 short a = 1;
156                 short b = (short)(endTableId - startTableId + 1);
157                 this.tableId = (short) (((tableId + a) % b) + startTableId);
158             }
159
160             @Override
161             public void onSuccess(Object o) {
162                 if (sourceIp > flowsPerDpn) {
163                     long dur = System.nanoTime() - startTime;
164                     LOG.info("Completed all flows installation for: dpid: {}, tableId: {}, sourceIp: {} in {}ns", dpId,
165                             tableId, sourceIp, dur);
166                     if(0 == countDpnWriteCompletion.decrementAndGet() &&
167                             writeOpStatus.get() != FlowCounter.OperationStatus.FAILURE.status()) {
168                         writeOpStatus.set(FlowCounter.OperationStatus.SUCCESS.status());
169                         taskCompletionTime.set(dur);
170                     }
171                     return;
172                 }
173                 try {
174                     if (sleepMillis > 0) {
175                         Thread.sleep(sleepMillis);
176                     }
177                 } catch (InterruptedException e) {
178                     LOG.error("Writer Thread Interrupted while sleeping: {}", e.getMessage());
179                 }
180
181                 WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
182                 int newBatchSize = sourceIp + batchSize - 1;
183                 short k = tableId;
184                 for (; sourceIp <= newBatchSize; sourceIp++) {
185                     String flowId = "Flow-" + dpId + "." + k + "." + sourceIp;
186                     Flow flow = null;
187                     if (add) {
188                         Match match = BulkOMaticUtils.getMatch(sourceIp);
189                         flow = BulkOMaticUtils.buildFlow(k, flowId, match);
190                     }
191                     LOG.debug("Adding flow with id: {}", flowId);
192                     addFlowToTx(writeTransaction, flowId,
193                             BulkOMaticUtils.getFlowInstanceIdentifier(k, flowId, dpId), flow);
194
195                     if (sourceIp < newBatchSize) {
196                         short a = 1;
197                         short b = (short)(endTableId - startTableId + 1);
198                         k = (short) (((k + a) % b) + startTableId);
199                     }
200                 }
201                 LOG.debug("OnSuccess: Submitting Txn for dpId: {}, begin tableId: {}, end tableId: {}, sourceIp: {}",
202                         dpId, tableId, k, sourceIp);
203                 Futures.addCallback(writeTransaction.submit(), new DsCallBack(dpId, sourceIp, k));
204             }
205
206             public void onFailure(Throwable error) {
207                 LOG.error("Error: {} in Datastore write operation: dpid: {}, tableId: {}, sourceIp: {}",
208                         error, dpId, tableId, sourceIp);
209                 writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());
210             }
211         }
212     }
213 }