70c259a399b3c5c93f2a291c6aa6d50aaac87acd
[openflowplugin.git] / applications / bulk-o-matic / src / main / java / org / opendaylight / openflowplugin / applications / bulk / o / matic / SalBulkFlowServiceImpl.java
1 /*
2  * Copyright (c) 2015, 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
10
11 import com.google.common.base.MoreObjects;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.JdkFutureAdapters;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import com.google.common.util.concurrent.SettableFuture;
19 import java.lang.management.ManagementFactory;
20 import java.util.ArrayList;
21 import java.util.Collections;
22 import java.util.List;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.ForkJoinPool;
25 import java.util.concurrent.Future;
26 import javax.management.InstanceAlreadyExistsException;
27 import javax.management.MBeanRegistrationException;
28 import javax.management.MBeanServer;
29 import javax.management.MalformedObjectNameException;
30 import javax.management.NotCompliantMBeanException;
31 import javax.management.ObjectName;
32 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
33 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
34 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
35 import org.opendaylight.infrautils.utils.concurrent.JdkFutures;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsDsInput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsDsOutput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsRpcInput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsRpcOutput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.BulkFlowBaseContentGrouping;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddMultipleInput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddMultipleOutput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddTestInput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddTestOutput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowTestInput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowTestOutput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.ReadFlowTestInput;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.ReadFlowTestOutput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RegisterInput;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RegisterInputBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RegisterOutput;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsDsInput;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsDsOutput;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsRpcInput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsRpcOutput;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.SalBulkFlowService;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.TableTestInput;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.TableTestOutput;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.bulk.flow.ds.list.grouping.BulkFlowDsItem;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
74 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
75 import org.opendaylight.yangtools.yang.common.RpcError;
76 import org.opendaylight.yangtools.yang.common.RpcResult;
77 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
78 import org.slf4j.Logger;
79 import org.slf4j.LoggerFactory;
80
81 /**
82  * Simple implementation providing bulk flows operations.
83  */
84 public class SalBulkFlowServiceImpl implements SalBulkFlowService {
85
86     private static final Logger LOG = LoggerFactory.getLogger(SalBulkFlowServiceImpl.class);
87
88     private final SalFlowService flowService;
89     private final DataBroker dataBroker;
90     private final FlowCounter flowCounterBeanImpl = new FlowCounter();
91     private final ExecutorService fjService = new ForkJoinPool();
92
93     public SalBulkFlowServiceImpl(SalFlowService flowService, DataBroker dataBroker) {
94         this.flowService = Preconditions.checkNotNull(flowService);
95         this.dataBroker = Preconditions.checkNotNull(dataBroker);
96
97         JdkFutures.addErrorLogging(register(new RegisterInputBuilder().build()), LOG, "register");
98     }
99
100     @Override
101     public ListenableFuture<RpcResult<AddFlowsDsOutput>> addFlowsDs(AddFlowsDsInput input) {
102         WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
103         boolean createParentsNextTime = MoreObjects.firstNonNull(input.isAlwaysCreateParents(), Boolean.FALSE);
104         boolean createParents = true;
105         for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) {
106             FlowBuilder flowBuilder = new FlowBuilder(bulkFlow);
107             flowBuilder.setTableId(bulkFlow.getTableId());
108             flowBuilder.setId(new FlowId(bulkFlow.getFlowId()));
109             writeTransaction.put(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow),
110                     flowBuilder.build(), createParents);
111             createParents = createParentsNextTime;
112         }
113         ListenableFuture<Void> submitFuture = writeTransaction.submit();
114         return Futures.transform(handleResultFuture(Futures.allAsList(submitFuture)), voidRpcResult -> {
115             if (voidRpcResult.isSuccessful()) {
116                 return RpcResultBuilder.<AddFlowsDsOutput>success().build();
117             } else {
118                 return RpcResultBuilder.<AddFlowsDsOutput>failed().build();
119             }
120         },MoreExecutors.directExecutor());
121     }
122
123     private InstanceIdentifier<Flow> getFlowInstanceIdentifier(BulkFlowDsItem bulkFlow) {
124         final NodeRef nodeRef = bulkFlow.getNode();
125         return ((InstanceIdentifier<Node>) nodeRef.getValue()).augmentation(FlowCapableNode.class)
126                 .child(Table.class, new TableKey(bulkFlow.getTableId()))
127                 .child(Flow.class, new FlowKey(new FlowId(bulkFlow.getFlowId())));
128     }
129
130     @Override
131     public ListenableFuture<RpcResult<RemoveFlowsDsOutput>> removeFlowsDs(RemoveFlowsDsInput input) {
132         WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
133         for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) {
134             writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow));
135         }
136         return Futures.transform(handleResultFuture(Futures.allAsList(writeTransaction.submit())), voidRpcResult -> {
137             if (voidRpcResult.isSuccessful()) {
138                 return RpcResultBuilder.<RemoveFlowsDsOutput>success().build();
139             } else {
140                 return RpcResultBuilder.<RemoveFlowsDsOutput>failed().build();
141             }
142         }, MoreExecutors.directExecutor());
143     }
144
145     private <T> ListenableFuture<RpcResult<Void>> handleResultFuture(ListenableFuture<List<T>> submitFuture) {
146         final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
147         Futures.addCallback(submitFuture, new FutureCallback<List<T>>() {
148             @Override
149             public void onSuccess(List<T> result) {
150                 rpcResult.set(RpcResultBuilder.success((Void) null).build());
151             }
152
153             @Override
154             public void onFailure(Throwable throwable) {
155                 RpcResultBuilder<Void> rpcResultBld = RpcResultBuilder.<Void>failed()
156                         .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION,
157                                 null, throwable.getMessage())));
158                 rpcResult.set(rpcResultBld.build());
159             }
160         }, MoreExecutors.directExecutor());
161         return rpcResult;
162     }
163
164     @Override
165     public ListenableFuture<RpcResult<AddFlowsRpcOutput>> addFlowsRpc(AddFlowsRpcInput input) {
166         List<ListenableFuture<RpcResult<AddFlowOutput>>> bulkResults = new ArrayList<>();
167
168         for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) {
169             AddFlowInputBuilder flowInputBuilder = new AddFlowInputBuilder(
170                     (org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) bulkFlow);
171             final NodeRef nodeRef = bulkFlow.getNode();
172             flowInputBuilder.setNode(nodeRef);
173             flowInputBuilder.setTableId(bulkFlow.getTableId());
174             Future<RpcResult<AddFlowOutput>> rpcAddFlowResult = flowService.addFlow(flowInputBuilder.build());
175             bulkResults.add(JdkFutureAdapters.listenInPoolThread(rpcAddFlowResult));
176         }
177         return Futures.transform(handleResultFuture(Futures.allAsList(bulkResults)), voidRpcResult -> {
178             if (voidRpcResult.isSuccessful()) {
179                 return RpcResultBuilder.<AddFlowsRpcOutput>success().build();
180             } else {
181                 return RpcResultBuilder.<AddFlowsRpcOutput>failed().build();
182             }
183         },MoreExecutors.directExecutor());
184     }
185
186     @Override
187     public ListenableFuture<RpcResult<ReadFlowTestOutput>> readFlowTest(ReadFlowTestInput input) {
188         FlowReader flowReader = FlowReader.getNewInstance(dataBroker, input.getDpnCount().intValue(),
189                 input.getFlowsPerDpn().intValue(), input.isVerbose(), input.isIsConfigDs(),
190                 input.getStartTableId().shortValue(), input.getEndTableId().shortValue());
191         flowCounterBeanImpl.setReader(flowReader);
192         fjService.execute(flowReader);
193         RpcResultBuilder<ReadFlowTestOutput> rpcResultBuilder = RpcResultBuilder.success();
194         return Futures.immediateFuture(rpcResultBuilder.build());
195     }
196
197     @Override
198     public ListenableFuture<RpcResult<FlowRpcAddTestOutput>> flowRpcAddTest(FlowRpcAddTestInput input) {
199         FlowWriterDirectOFRpc flowAddRpcTestImpl = new FlowWriterDirectOFRpc(dataBroker, flowService, fjService);
200         flowAddRpcTestImpl.rpcFlowAdd(input.getDpnId(), input.getFlowCount().intValue(),
201                 input.getRpcBatchSize().intValue());
202
203         RpcResultBuilder<FlowRpcAddTestOutput> rpcResultBuilder = RpcResultBuilder.success();
204         return Futures.immediateFuture(rpcResultBuilder.build());
205     }
206
207     @Override
208     public ListenableFuture<RpcResult<RegisterOutput>> register(RegisterInput input) {
209         RpcResultBuilder<RegisterOutput> rpcResultBuilder = RpcResultBuilder.success();
210         try {
211             MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
212             String pathToMBean = String.format("%s:type=%s", FlowCounter.class.getPackage().getName(),
213                     FlowCounter.class.getSimpleName());
214             ObjectName name = new ObjectName(pathToMBean);
215             mbs.registerMBean(flowCounterBeanImpl, name);
216         } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException
217                 | NotCompliantMBeanException e) {
218             rpcResultBuilder = RpcResultBuilder.failed();
219             LOG.warn("Exception occurred: {} ", e);
220         }
221         return Futures.immediateFuture(rpcResultBuilder.build());
222     }
223
224     @Override
225     public ListenableFuture<RpcResult<RemoveFlowsRpcOutput>> removeFlowsRpc(RemoveFlowsRpcInput input) {
226         List<ListenableFuture<RpcResult<RemoveFlowOutput>>> bulkResults = new ArrayList<>();
227
228         for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) {
229             RemoveFlowInputBuilder flowInputBuilder = new RemoveFlowInputBuilder(
230                     (org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) bulkFlow);
231             final NodeRef nodeRef = bulkFlow.getNode();
232             flowInputBuilder.setNode(nodeRef);
233             flowInputBuilder.setTableId(bulkFlow.getTableId());
234             Future<RpcResult<RemoveFlowOutput>> rpcAddFlowResult = flowService.removeFlow(flowInputBuilder.build());
235             bulkResults.add(JdkFutureAdapters.listenInPoolThread(rpcAddFlowResult));
236         }
237         return Futures.transform(handleResultFuture(Futures.allAsList(bulkResults)), voidRpcResult -> {
238             if (voidRpcResult.isSuccessful()) {
239                 return RpcResultBuilder.<RemoveFlowsRpcOutput>success().build();
240             } else {
241                 return RpcResultBuilder.<RemoveFlowsRpcOutput>failed().build();
242             }
243         }, MoreExecutors.directExecutor());
244     }
245
246     @Override
247     public ListenableFuture<RpcResult<FlowTestOutput>> flowTest(FlowTestInput input) {
248         if (input.isTxChain()) {
249             FlowWriterTxChain flowTester = new FlowWriterTxChain(dataBroker, fjService);
250             flowCounterBeanImpl.setWriter(flowTester);
251             if (input.isIsAdd()) {
252                 flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
253                         input.getBatchSize().intValue(), input.getSleepFor().intValue(),
254                         input.getSleepAfter().intValue(), input.getStartTableId().shortValue(),
255                         input.getEndTableId().shortValue(), input.isCreateParents());
256             } else {
257                 flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
258                         input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
259                         input.getEndTableId().shortValue());
260             }
261             RpcResultBuilder<FlowTestOutput> rpcResultBuilder = RpcResultBuilder.success();
262             return Futures.immediateFuture(rpcResultBuilder.build());
263         }
264         if (input.isSeq()) {
265             FlowWriterSequential flowTester = new FlowWriterSequential(dataBroker, fjService);
266             flowCounterBeanImpl.setWriter(flowTester);
267             if (input.isIsAdd()) {
268                 flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
269                         input.getBatchSize().intValue(), input.getSleepFor().intValue(),
270                         input.getStartTableId().shortValue(), input.getEndTableId().shortValue(),
271                         input.isCreateParents());
272             } else {
273                 flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
274                         input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
275                         input.getEndTableId().shortValue());
276             }
277         } else {
278             FlowWriterConcurrent flowTester = new FlowWriterConcurrent(dataBroker, fjService);
279             flowCounterBeanImpl.setWriter(flowTester);
280             if (input.isIsAdd()) {
281                 flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
282                         input.getBatchSize().intValue(), input.getSleepFor().intValue(),
283                         input.getSleepAfter().intValue(), input.getStartTableId().shortValue(),
284                         input.getEndTableId().shortValue(), input.isCreateParents());
285             } else {
286                 flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
287                         input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
288                         input.getEndTableId().shortValue());
289             }
290         }
291         RpcResultBuilder<FlowTestOutput> rpcResultBuilder = RpcResultBuilder.success();
292         return Futures.immediateFuture(rpcResultBuilder.build());
293     }
294
295     @Override
296     public ListenableFuture<RpcResult<TableTestOutput>> tableTest(final TableTestInput input) {
297         final TableWriter writer = new TableWriter(dataBroker, fjService);
298         flowCounterBeanImpl.setWriter(writer);
299         switch (input.getOperation()) {
300             case Add:
301                 writer.addTables(input.getDpnCount().intValue(), input.getStartTableId().shortValue(),
302                         input.getEndTableId().shortValue());
303                 break;
304             case Delete:
305                 writer.deleteTables(input.getDpnCount().intValue(), input.getStartTableId().shortValue(),
306                         input.getEndTableId().shortValue());
307                 break;
308             default:
309                 RpcResultBuilder<TableTestOutput> rpcResultBuilder = RpcResultBuilder.failed();
310                 return Futures.immediateFuture(rpcResultBuilder.build());
311         }
312         RpcResultBuilder<TableTestOutput> rpcResultBuilder = RpcResultBuilder.success();
313         return Futures.immediateFuture(rpcResultBuilder.build());
314     }
315
316     @Override
317     public ListenableFuture<RpcResult<FlowRpcAddMultipleOutput>> flowRpcAddMultiple(FlowRpcAddMultipleInput input) {
318         FlowWriterDirectOFRpc flowTesterRPC = new FlowWriterDirectOFRpc(dataBroker, flowService, fjService);
319         flowTesterRPC.rpcFlowAddAll(input.getFlowCount().intValue(), input.getRpcBatchSize().intValue());
320         RpcResultBuilder<FlowRpcAddMultipleOutput> rpcResultBuilder = RpcResultBuilder.success();
321         return Futures.immediateFuture(rpcResultBuilder.build());
322     }
323 }