Merge "Add Nicira extension support for matching IPv6 Src/Dst"
[openflowplugin.git] / applications / bulk-o-matic / src / main / java / org / opendaylight / openflowplugin / applications / bulk / o / matic / SalBulkFlowServiceImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
10
11 import com.google.common.base.MoreObjects;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.CheckedFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.JdkFutureAdapters;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.SettableFuture;
19 import java.lang.management.ManagementFactory;
20 import java.util.ArrayList;
21 import java.util.Collections;
22 import java.util.List;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.ForkJoinPool;
25 import java.util.concurrent.Future;
26 import javax.management.InstanceAlreadyExistsException;
27 import javax.management.MBeanRegistrationException;
28 import javax.management.MBeanServer;
29 import javax.management.MalformedObjectNameException;
30 import javax.management.NotCompliantMBeanException;
31 import javax.management.ObjectName;
32 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
33 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
34 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
35 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsDsInput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsRpcInput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.BulkFlowBaseContentGrouping;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddMultipleInput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddTestInput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowTestInput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.ReadFlowTestInput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsDsInput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsRpcInput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.SalBulkFlowService;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.TableTestInput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.TableTestInput.Operation;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.bulk.flow.ds.list.grouping.BulkFlowDsItem;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
63 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
64 import org.opendaylight.yangtools.yang.common.RpcError;
65 import org.opendaylight.yangtools.yang.common.RpcResult;
66 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69
70 /**
71  * Simple implementation providing bulk flows operations.
72  */
73 public class SalBulkFlowServiceImpl implements SalBulkFlowService {
74
75     private static final Logger LOG = LoggerFactory.getLogger(SalBulkFlowServiceImpl.class);
76
77     private final SalFlowService flowService;
78     private final DataBroker dataBroker;
79     private final FlowCounter flowCounterBeanImpl = new FlowCounter();
80     private final ExecutorService fjService = new ForkJoinPool();
81     public SalBulkFlowServiceImpl(SalFlowService flowService, DataBroker dataBroker) {
82         this.flowService = Preconditions.checkNotNull(flowService);
83         this.dataBroker = Preconditions.checkNotNull(dataBroker);
84         register();
85     }
86
87     @Override
88     public Future<RpcResult<Void>> addFlowsDs(AddFlowsDsInput input) {
89         WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
90         boolean createParentsNextTime = MoreObjects.firstNonNull(input.isAlwaysCreateParents(), Boolean.FALSE);
91         boolean createParents = true;
92         for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) {
93             FlowBuilder flowBuilder = new FlowBuilder(bulkFlow);
94             flowBuilder.setTableId(bulkFlow.getTableId());
95             flowBuilder.setId(new FlowId(bulkFlow.getFlowId()));
96             writeTransaction.put(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow),
97                     flowBuilder.build(), createParents);
98             createParents = createParentsNextTime;
99         }
100         CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTransaction.submit();
101         return handleResultFuture(submitFuture);
102     }
103
104     private InstanceIdentifier<Flow> getFlowInstanceIdentifier(BulkFlowDsItem bulkFlow) {
105         final NodeRef nodeRef = bulkFlow.getNode();
106         return ((InstanceIdentifier<Node>) nodeRef.getValue())
107                 .augmentation(FlowCapableNode.class)
108                 .child(Table.class, new TableKey(bulkFlow.getTableId()))
109                 .child(Flow.class,
110                         new FlowKey(new FlowId(bulkFlow.getFlowId())));
111     }
112
113     @Override
114     public Future<RpcResult<Void>> removeFlowsDs(RemoveFlowsDsInput input) {
115         WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
116         for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) {
117             writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow));
118         }
119         CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTransaction.submit();
120         return handleResultFuture(submitFuture);
121     }
122
123     private ListenableFuture<RpcResult<Void>> handleResultFuture(CheckedFuture<Void,
124             TransactionCommitFailedException> submitFuture) {
125         final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
126         Futures.addCallback(submitFuture, new FutureCallback<Void>() {
127             @Override
128             public void onSuccess(Void result) {
129                 rpcResult.set(RpcResultBuilder.success(result).build());
130             }
131
132             @Override
133             public void onFailure(Throwable t) {
134                 RpcResultBuilder<Void> rpcResultBld = RpcResultBuilder.<Void>failed()
135                         .withRpcErrors(Collections.singleton(
136                                 RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, null, t.getMessage())
137                         ));
138                 rpcResult.set(rpcResultBld.build());
139             }
140         });
141         return rpcResult;
142     }
143
144     private <T> ListenableFuture<RpcResult<Void>> handleResultFuture(ListenableFuture<List<T>> submitFuture) {
145         final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
146         Futures.addCallback(submitFuture, new FutureCallback<List<T>>() {
147             @Override
148             public void onSuccess(List<T> result) {
149                 rpcResult.set(RpcResultBuilder.success((Void) null).build());
150             }
151
152             @Override
153             public void onFailure(Throwable t) {
154                 RpcResultBuilder<Void> rpcResultBld = RpcResultBuilder.<Void>failed()
155                         .withRpcErrors(Collections.singleton(
156                                 RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, null, t.getMessage())
157                         ));
158                 rpcResult.set(rpcResultBld.build());
159             }
160         });
161         return rpcResult;
162     }
163
164     @Override
165     public Future<RpcResult<Void>> addFlowsRpc(AddFlowsRpcInput input) {
166         List<ListenableFuture<RpcResult<AddFlowOutput>>> bulkResults = new ArrayList<>();
167
168         for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) {
169             AddFlowInputBuilder flowInputBuilder = new AddFlowInputBuilder((org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) bulkFlow);
170             final NodeRef nodeRef = bulkFlow.getNode();
171             flowInputBuilder.setNode(nodeRef);
172             flowInputBuilder.setTableId(bulkFlow.getTableId());
173             Future<RpcResult<AddFlowOutput>> rpcAddFlowResult = flowService.addFlow(flowInputBuilder.build());
174             bulkResults.add(JdkFutureAdapters.listenInPoolThread(rpcAddFlowResult));
175         }
176         return handleResultFuture(Futures.allAsList(bulkResults));
177     }
178
179     @Override
180     public Future<RpcResult<Void>> readFlowTest(ReadFlowTestInput input) {
181         FlowReader flowReader = FlowReader.getNewInstance(dataBroker,
182                 input.getDpnCount().intValue(),
183                 input.getFlowsPerDpn().intValue(), input.isVerbose(),
184                 input.isIsConfigDs(),input.getStartTableId().shortValue(),
185                 input.getEndTableId().shortValue());
186         flowCounterBeanImpl.setReader(flowReader);
187         fjService.execute(flowReader);
188         RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
189         return Futures.immediateFuture(rpcResultBuilder.build());
190     }
191
192     @Override
193     public Future<RpcResult<Void>> flowRpcAddTest(FlowRpcAddTestInput input) {
194         FlowWriterDirectOFRpc flowAddRpcTestImpl = new FlowWriterDirectOFRpc(dataBroker, flowService, fjService);
195         flowAddRpcTestImpl.rpcFlowAdd(
196                 input.getDpnId(),
197                 input.getFlowCount().intValue(),
198                 input.getRpcBatchSize().intValue());
199
200
201         RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
202         return Futures.immediateFuture(rpcResultBuilder.build());
203     }
204
205     @Override
206     public Future<RpcResult<Void>> register() {
207         RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
208         try {
209         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
210         String pathToMBean = String.format("%s:type=%s",
211                 FlowCounter.class.getPackage().getName(),
212                 FlowCounter.class.getSimpleName());
213         ObjectName name = null;
214
215             name = new ObjectName(pathToMBean);
216             mbs.registerMBean(flowCounterBeanImpl, name);
217         } catch (MalformedObjectNameException | InstanceAlreadyExistsException
218                 | MBeanRegistrationException | NotCompliantMBeanException e) {
219             rpcResultBuilder = RpcResultBuilder.failed();
220             LOG.warn("Exception occurred: {} ", e.getMessage(), e);
221         }
222         return Futures.immediateFuture(rpcResultBuilder.build());
223     }
224
225     @Override
226     public Future<RpcResult<Void>> removeFlowsRpc(RemoveFlowsRpcInput input) {
227         List<ListenableFuture<RpcResult<RemoveFlowOutput>>> bulkResults = new ArrayList<>();
228
229         for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) {
230             RemoveFlowInputBuilder flowInputBuilder = new RemoveFlowInputBuilder((org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) bulkFlow);
231             final NodeRef nodeRef = bulkFlow.getNode();
232             flowInputBuilder.setNode(nodeRef);
233             flowInputBuilder.setTableId(bulkFlow.getTableId());
234             Future<RpcResult<RemoveFlowOutput>> rpcAddFlowResult = flowService.removeFlow(flowInputBuilder.build());
235             bulkResults.add(JdkFutureAdapters.listenInPoolThread(rpcAddFlowResult));
236         }
237         return handleResultFuture(Futures.allAsList(bulkResults));
238     }
239
240     @Override
241     public Future<RpcResult<Void>> flowTest(FlowTestInput input) {
242         if (input.isTxChain()) {
243             FlowWriterTxChain flowTester = new FlowWriterTxChain(dataBroker, fjService);
244             flowCounterBeanImpl.setWriter(flowTester);
245             if (input.isIsAdd()){
246                 flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
247                         input.getBatchSize().intValue(), input.getSleepFor().intValue(),
248                         input.getSleepAfter().intValue(), input.getStartTableId().shortValue(),
249                         input.getEndTableId().shortValue(), input.isCreateParents());
250             } else {
251                 flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
252                         input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
253                         input.getEndTableId().shortValue());
254             }
255             RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
256             return Futures.immediateFuture(rpcResultBuilder.build());
257         }
258         if (input.isSeq()) {
259             FlowWriterSequential flowTester = new FlowWriterSequential(dataBroker, fjService);
260             flowCounterBeanImpl.setWriter(flowTester);
261             if (input.isIsAdd()){
262                 flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
263                         input.getBatchSize().intValue(), input.getSleepFor().intValue(),
264                         input.getStartTableId().shortValue(), input.getEndTableId().shortValue(),
265                         input.isCreateParents());
266             } else {
267                 flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
268                         input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
269                         input.getEndTableId().shortValue());
270             }
271         } else {
272             FlowWriterConcurrent flowTester = new FlowWriterConcurrent(dataBroker, fjService);
273             flowCounterBeanImpl.setWriter(flowTester);
274             if (input.isIsAdd()){
275                 flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
276                         input.getBatchSize().intValue(), input.getSleepFor().intValue(),
277                         input.getSleepAfter().intValue(), input.getStartTableId().shortValue(),
278                         input.getEndTableId().shortValue(), input.isCreateParents());
279             } else {
280                 flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
281                         input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
282                         input.getEndTableId().shortValue());
283             }
284         }
285         RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
286         return Futures.immediateFuture(rpcResultBuilder.build());
287     }
288
289     @Override
290     public Future<RpcResult<Void>> tableTest(final TableTestInput input) {
291         final TableWriter writer = new TableWriter(dataBroker, fjService);
292         flowCounterBeanImpl.setWriter(writer);
293         switch (input.getOperation()) {
294             case Add:
295                 writer.addTables(input.getDpnCount().intValue(),
296                     input.getStartTableId().shortValue(), input.getEndTableId().shortValue());
297                 break;
298             case Delete:
299                 writer.deleteTables(input.getDpnCount().intValue(),
300                     input.getStartTableId().shortValue(), input.getEndTableId().shortValue());
301                 break;
302             default:
303                 RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.failed();
304                 return Futures.immediateFuture(rpcResultBuilder.build());
305         }
306         RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
307         return Futures.immediateFuture(rpcResultBuilder.build());
308     }
309
310     @Override
311     public Future<RpcResult<Void>> flowRpcAddMultiple(FlowRpcAddMultipleInput input) {
312         FlowWriterDirectOFRpc flowTesterRPC = new FlowWriterDirectOFRpc(dataBroker, flowService, fjService);
313         flowTesterRPC.rpcFlowAddAll(input.getFlowCount().intValue(), input.getRpcBatchSize().intValue());
314         RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
315         return Futures.immediateFuture(rpcResultBuilder.build());
316     }
317 }