BUG-8607: Fix issues in checkstyle enforcement for module bulk-o-matic
[openflowplugin.git] / applications / bulk-o-matic / src / main / java / org / opendaylight / openflowplugin / applications / bulk / o / matic / SalBulkFlowServiceImpl.java
1 /*
2  * Copyright (c) 2015, 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
10
11 import com.google.common.base.MoreObjects;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.CheckedFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.JdkFutureAdapters;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.SettableFuture;
19 import java.lang.management.ManagementFactory;
20 import java.util.ArrayList;
21 import java.util.Collections;
22 import java.util.List;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.ForkJoinPool;
25 import java.util.concurrent.Future;
26 import javax.management.InstanceAlreadyExistsException;
27 import javax.management.MBeanRegistrationException;
28 import javax.management.MBeanServer;
29 import javax.management.MalformedObjectNameException;
30 import javax.management.NotCompliantMBeanException;
31 import javax.management.ObjectName;
32 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
33 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
34 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
35 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsDsInput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsRpcInput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.BulkFlowBaseContentGrouping;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddMultipleInput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddTestInput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowTestInput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.ReadFlowTestInput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsDsInput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsRpcInput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.SalBulkFlowService;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.TableTestInput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.bulk.flow.ds.list.grouping.BulkFlowDsItem;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
62 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
63 import org.opendaylight.yangtools.yang.common.RpcError;
64 import org.opendaylight.yangtools.yang.common.RpcResult;
65 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
66 import org.slf4j.Logger;
67 import org.slf4j.LoggerFactory;
68
69 /**
70  * Simple implementation providing bulk flows operations.
71  */
72 public class SalBulkFlowServiceImpl implements SalBulkFlowService {
73
74     private static final Logger LOG = LoggerFactory.getLogger(SalBulkFlowServiceImpl.class);
75
76     private final SalFlowService flowService;
77     private final DataBroker dataBroker;
78     private final FlowCounter flowCounterBeanImpl = new FlowCounter();
79     private final ExecutorService fjService = new ForkJoinPool();
80
81     public SalBulkFlowServiceImpl(SalFlowService flowService, DataBroker dataBroker) {
82         this.flowService = Preconditions.checkNotNull(flowService);
83         this.dataBroker = Preconditions.checkNotNull(dataBroker);
84         register();
85     }
86
87     @Override
88     public Future<RpcResult<Void>> addFlowsDs(AddFlowsDsInput input) {
89         WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
90         boolean createParentsNextTime = MoreObjects.firstNonNull(input.isAlwaysCreateParents(), Boolean.FALSE);
91         boolean createParents = true;
92         for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) {
93             FlowBuilder flowBuilder = new FlowBuilder(bulkFlow);
94             flowBuilder.setTableId(bulkFlow.getTableId());
95             flowBuilder.setId(new FlowId(bulkFlow.getFlowId()));
96             writeTransaction.put(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow),
97                     flowBuilder.build(), createParents);
98             createParents = createParentsNextTime;
99         }
100         CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTransaction.submit();
101         return handleResultFuture(submitFuture);
102     }
103
104     private InstanceIdentifier<Flow> getFlowInstanceIdentifier(BulkFlowDsItem bulkFlow) {
105         final NodeRef nodeRef = bulkFlow.getNode();
106         return ((InstanceIdentifier<Node>) nodeRef.getValue()).augmentation(FlowCapableNode.class)
107                 .child(Table.class, new TableKey(bulkFlow.getTableId()))
108                 .child(Flow.class, new FlowKey(new FlowId(bulkFlow.getFlowId())));
109     }
110
111     @Override
112     public Future<RpcResult<Void>> removeFlowsDs(RemoveFlowsDsInput input) {
113         WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
114         for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) {
115             writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow));
116         }
117         CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTransaction.submit();
118         return handleResultFuture(submitFuture);
119     }
120
121     private ListenableFuture<RpcResult<Void>> handleResultFuture(
122             CheckedFuture<Void, TransactionCommitFailedException> submitFuture) {
123         final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
124         Futures.addCallback(submitFuture, new FutureCallback<Void>() {
125             @Override
126             public void onSuccess(Void result) {
127                 rpcResult.set(RpcResultBuilder.success(result).build());
128             }
129
130             @Override
131             public void onFailure(Throwable throwable) {
132                 RpcResultBuilder<Void> rpcResultBld = RpcResultBuilder.<Void>failed()
133                         .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION,
134                                 null, throwable.getMessage())));
135                 rpcResult.set(rpcResultBld.build());
136             }
137         });
138         return rpcResult;
139     }
140
141     private <T> ListenableFuture<RpcResult<Void>> handleResultFuture(ListenableFuture<List<T>> submitFuture) {
142         final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
143         Futures.addCallback(submitFuture, new FutureCallback<List<T>>() {
144             @Override
145             public void onSuccess(List<T> result) {
146                 rpcResult.set(RpcResultBuilder.success((Void) null).build());
147             }
148
149             @Override
150             public void onFailure(Throwable throwable) {
151                 RpcResultBuilder<Void> rpcResultBld = RpcResultBuilder.<Void>failed()
152                         .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION,
153                                 null, throwable.getMessage())));
154                 rpcResult.set(rpcResultBld.build());
155             }
156         });
157         return rpcResult;
158     }
159
160     @Override
161     public Future<RpcResult<Void>> addFlowsRpc(AddFlowsRpcInput input) {
162         List<ListenableFuture<RpcResult<AddFlowOutput>>> bulkResults = new ArrayList<>();
163
164         for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) {
165             AddFlowInputBuilder flowInputBuilder = new AddFlowInputBuilder(
166                     (org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) bulkFlow);
167             final NodeRef nodeRef = bulkFlow.getNode();
168             flowInputBuilder.setNode(nodeRef);
169             flowInputBuilder.setTableId(bulkFlow.getTableId());
170             Future<RpcResult<AddFlowOutput>> rpcAddFlowResult = flowService.addFlow(flowInputBuilder.build());
171             bulkResults.add(JdkFutureAdapters.listenInPoolThread(rpcAddFlowResult));
172         }
173         return handleResultFuture(Futures.allAsList(bulkResults));
174     }
175
176     @Override
177     public Future<RpcResult<Void>> readFlowTest(ReadFlowTestInput input) {
178         FlowReader flowReader = FlowReader.getNewInstance(dataBroker, input.getDpnCount().intValue(),
179                 input.getFlowsPerDpn().intValue(), input.isVerbose(), input.isIsConfigDs(),
180                 input.getStartTableId().shortValue(), input.getEndTableId().shortValue());
181         flowCounterBeanImpl.setReader(flowReader);
182         fjService.execute(flowReader);
183         RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
184         return Futures.immediateFuture(rpcResultBuilder.build());
185     }
186
187     @Override
188     public Future<RpcResult<Void>> flowRpcAddTest(FlowRpcAddTestInput input) {
189         FlowWriterDirectOFRpc flowAddRpcTestImpl = new FlowWriterDirectOFRpc(dataBroker, flowService, fjService);
190         flowAddRpcTestImpl.rpcFlowAdd(input.getDpnId(), input.getFlowCount().intValue(),
191                 input.getRpcBatchSize().intValue());
192
193         RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
194         return Futures.immediateFuture(rpcResultBuilder.build());
195     }
196
197     @Override
198     public Future<RpcResult<Void>> register() {
199         RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
200         try {
201             MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
202             String pathToMBean = String.format("%s:type=%s", FlowCounter.class.getPackage().getName(),
203                     FlowCounter.class.getSimpleName());
204             ObjectName name = new ObjectName(pathToMBean);
205             mbs.registerMBean(flowCounterBeanImpl, name);
206         } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException
207                 | NotCompliantMBeanException e) {
208             rpcResultBuilder = RpcResultBuilder.failed();
209             LOG.warn("Exception occurred: {} ", e.getMessage(), e);
210         }
211         return Futures.immediateFuture(rpcResultBuilder.build());
212     }
213
214     @Override
215     public Future<RpcResult<Void>> removeFlowsRpc(RemoveFlowsRpcInput input) {
216         List<ListenableFuture<RpcResult<RemoveFlowOutput>>> bulkResults = new ArrayList<>();
217
218         for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) {
219             RemoveFlowInputBuilder flowInputBuilder = new RemoveFlowInputBuilder(
220                     (org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) bulkFlow);
221             final NodeRef nodeRef = bulkFlow.getNode();
222             flowInputBuilder.setNode(nodeRef);
223             flowInputBuilder.setTableId(bulkFlow.getTableId());
224             Future<RpcResult<RemoveFlowOutput>> rpcAddFlowResult = flowService.removeFlow(flowInputBuilder.build());
225             bulkResults.add(JdkFutureAdapters.listenInPoolThread(rpcAddFlowResult));
226         }
227         return handleResultFuture(Futures.allAsList(bulkResults));
228     }
229
230     @Override
231     public Future<RpcResult<Void>> flowTest(FlowTestInput input) {
232         if (input.isTxChain()) {
233             FlowWriterTxChain flowTester = new FlowWriterTxChain(dataBroker, fjService);
234             flowCounterBeanImpl.setWriter(flowTester);
235             if (input.isIsAdd()) {
236                 flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
237                         input.getBatchSize().intValue(), input.getSleepFor().intValue(),
238                         input.getSleepAfter().intValue(), input.getStartTableId().shortValue(),
239                         input.getEndTableId().shortValue(), input.isCreateParents());
240             } else {
241                 flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
242                         input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
243                         input.getEndTableId().shortValue());
244             }
245             RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
246             return Futures.immediateFuture(rpcResultBuilder.build());
247         }
248         if (input.isSeq()) {
249             FlowWriterSequential flowTester = new FlowWriterSequential(dataBroker, fjService);
250             flowCounterBeanImpl.setWriter(flowTester);
251             if (input.isIsAdd()) {
252                 flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
253                         input.getBatchSize().intValue(), input.getSleepFor().intValue(),
254                         input.getStartTableId().shortValue(), input.getEndTableId().shortValue(),
255                         input.isCreateParents());
256             } else {
257                 flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
258                         input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
259                         input.getEndTableId().shortValue());
260             }
261         } else {
262             FlowWriterConcurrent flowTester = new FlowWriterConcurrent(dataBroker, fjService);
263             flowCounterBeanImpl.setWriter(flowTester);
264             if (input.isIsAdd()) {
265                 flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
266                         input.getBatchSize().intValue(), input.getSleepFor().intValue(),
267                         input.getSleepAfter().intValue(), input.getStartTableId().shortValue(),
268                         input.getEndTableId().shortValue(), input.isCreateParents());
269             } else {
270                 flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
271                         input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
272                         input.getEndTableId().shortValue());
273             }
274         }
275         RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
276         return Futures.immediateFuture(rpcResultBuilder.build());
277     }
278
279     @Override
280     public Future<RpcResult<Void>> tableTest(final TableTestInput input) {
281         final TableWriter writer = new TableWriter(dataBroker, fjService);
282         flowCounterBeanImpl.setWriter(writer);
283         switch (input.getOperation()) {
284             case Add:
285                 writer.addTables(input.getDpnCount().intValue(), input.getStartTableId().shortValue(),
286                         input.getEndTableId().shortValue());
287                 break;
288             case Delete:
289                 writer.deleteTables(input.getDpnCount().intValue(), input.getStartTableId().shortValue(),
290                         input.getEndTableId().shortValue());
291                 break;
292             default:
293                 RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.failed();
294                 return Futures.immediateFuture(rpcResultBuilder.build());
295         }
296         RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
297         return Futures.immediateFuture(rpcResultBuilder.build());
298     }
299
300     @Override
301     public Future<RpcResult<Void>> flowRpcAddMultiple(FlowRpcAddMultipleInput input) {
302         FlowWriterDirectOFRpc flowTesterRPC = new FlowWriterDirectOFRpc(dataBroker, flowService, fjService);
303         flowTesterRPC.rpcFlowAddAll(input.getFlowCount().intValue(), input.getRpcBatchSize().intValue());
304         RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
305         return Futures.immediateFuture(rpcResultBuilder.build());
306     }
307 }