Bump MRI upstreams
[openflowplugin.git] / applications / bulk-o-matic / src / main / java / org / opendaylight / openflowplugin / applications / bulk / o / matic / SalBulkFlowServiceImpl.java
1 /*
2  * Copyright (c) 2015, 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
9
10 import static java.util.Objects.requireNonNull;
11 import static java.util.Objects.requireNonNullElse;
12
13 import com.google.common.util.concurrent.FluentFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import com.google.common.util.concurrent.SettableFuture;
19 import java.lang.management.ManagementFactory;
20 import java.util.ArrayList;
21 import java.util.Collections;
22 import java.util.List;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.ForkJoinPool;
25 import javax.management.InstanceAlreadyExistsException;
26 import javax.management.MBeanRegistrationException;
27 import javax.management.MBeanServer;
28 import javax.management.MalformedObjectNameException;
29 import javax.management.NotCompliantMBeanException;
30 import javax.management.ObjectName;
31 import org.opendaylight.infrautils.utils.concurrent.LoggingFutures;
32 import org.opendaylight.mdsal.binding.api.DataBroker;
33 import org.opendaylight.mdsal.binding.api.WriteTransaction;
34 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsDsInput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsDsOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsRpcInput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsRpcOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.BulkFlowBaseContentGrouping;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddMultipleInput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddMultipleOutput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddTestInput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddTestOutput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowTestInput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowTestOutput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.ReadFlowTestInput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.ReadFlowTestOutput;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RegisterInput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RegisterInputBuilder;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RegisterOutput;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsDsInput;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsDsOutput;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsRpcInput;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsRpcOutput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.SalBulkFlowService;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.TableTestInput;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.TableTestOutput;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.bulk.flow.ds.list.grouping.BulkFlowDsItem;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
73 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
74 import org.opendaylight.yangtools.yang.common.ErrorType;
75 import org.opendaylight.yangtools.yang.common.RpcResult;
76 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
77 import org.slf4j.Logger;
78 import org.slf4j.LoggerFactory;
79
80 /**
81  * Simple implementation providing bulk flows operations.
82  */
83 public class SalBulkFlowServiceImpl implements SalBulkFlowService {
84
85     private static final Logger LOG = LoggerFactory.getLogger(SalBulkFlowServiceImpl.class);
86
87     private final SalFlowService flowService;
88     private final DataBroker dataBroker;
89     private final FlowCounter flowCounterBeanImpl = new FlowCounter();
90     private final ExecutorService fjService = new ForkJoinPool();
91
92     public SalBulkFlowServiceImpl(final SalFlowService flowService, final DataBroker dataBroker) {
93         this.flowService = requireNonNull(flowService);
94         this.dataBroker = requireNonNull(dataBroker);
95
96         LoggingFutures.addErrorLogging(register(new RegisterInputBuilder().build()), LOG, "register");
97     }
98
99     @Override
100     public ListenableFuture<RpcResult<AddFlowsDsOutput>> addFlowsDs(final AddFlowsDsInput input) {
101         WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
102         boolean createParentsNextTime = requireNonNullElse(input.getAlwaysCreateParents(), Boolean.FALSE);
103         boolean createParents = true;
104         for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) {
105             FlowBuilder flowBuilder = new FlowBuilder(bulkFlow);
106             flowBuilder.setTableId(bulkFlow.getTableId());
107             flowBuilder.setId(new FlowId(bulkFlow.getFlowId()));
108             if (createParents) {
109                 writeTransaction.mergeParentStructurePut(LogicalDatastoreType.CONFIGURATION,
110                         getFlowInstanceIdentifier(bulkFlow),
111                         flowBuilder.build());
112             } else {
113                 writeTransaction.put(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow),
114                         flowBuilder.build());
115             }
116             createParents = createParentsNextTime;
117         }
118         FluentFuture<?> submitFuture = writeTransaction.commit();
119         return Futures.transform(handleResultFuture(Futures.allAsList(submitFuture)), voidRpcResult -> {
120             if (voidRpcResult.isSuccessful()) {
121                 return RpcResultBuilder.<AddFlowsDsOutput>success().build();
122             } else {
123                 return RpcResultBuilder.<AddFlowsDsOutput>failed().build();
124             }
125         },MoreExecutors.directExecutor());
126     }
127
128     private static InstanceIdentifier<Flow> getFlowInstanceIdentifier(final BulkFlowDsItem bulkFlow) {
129         final NodeRef nodeRef = bulkFlow.getNode();
130         return ((InstanceIdentifier<Node>) nodeRef.getValue()).augmentation(FlowCapableNode.class)
131                 .child(Table.class, new TableKey(bulkFlow.getTableId()))
132                 .child(Flow.class, new FlowKey(new FlowId(bulkFlow.getFlowId())));
133     }
134
135     @Override
136     public ListenableFuture<RpcResult<RemoveFlowsDsOutput>> removeFlowsDs(final RemoveFlowsDsInput input) {
137         WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
138         for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) {
139             writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow));
140         }
141         return Futures.transform(handleResultFuture(Futures.allAsList(writeTransaction.commit())), voidRpcResult -> {
142             if (voidRpcResult.isSuccessful()) {
143                 return RpcResultBuilder.<RemoveFlowsDsOutput>success().build();
144             } else {
145                 return RpcResultBuilder.<RemoveFlowsDsOutput>failed().build();
146             }
147         }, MoreExecutors.directExecutor());
148     }
149
150     private static <T> ListenableFuture<RpcResult<Void>> handleResultFuture(
151             final ListenableFuture<List<T>> submitFuture) {
152         final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
153         Futures.addCallback(submitFuture, new FutureCallback<List<T>>() {
154             @Override
155             public void onSuccess(final List<T> result) {
156                 rpcResult.set(RpcResultBuilder.success((Void) null).build());
157             }
158
159             @Override
160             public void onFailure(final Throwable throwable) {
161                 RpcResultBuilder<Void> rpcResultBld = RpcResultBuilder.<Void>failed()
162                         .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(ErrorType.APPLICATION,
163                                 null, throwable.getMessage())));
164                 rpcResult.set(rpcResultBld.build());
165             }
166         }, MoreExecutors.directExecutor());
167         return rpcResult;
168     }
169
170     @Override
171     public ListenableFuture<RpcResult<AddFlowsRpcOutput>> addFlowsRpc(final AddFlowsRpcInput input) {
172         List<ListenableFuture<RpcResult<AddFlowOutput>>> bulkResults = new ArrayList<>();
173
174         for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) {
175             AddFlowInputBuilder flowInputBuilder = new AddFlowInputBuilder(
176                     (org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) bulkFlow);
177             final NodeRef nodeRef = bulkFlow.getNode();
178             flowInputBuilder.setNode(nodeRef);
179             flowInputBuilder.setTableId(bulkFlow.getTableId());
180             bulkResults.add(flowService.addFlow(flowInputBuilder.build()));
181         }
182         return Futures.transform(handleResultFuture(Futures.allAsList(bulkResults)), voidRpcResult -> {
183             if (voidRpcResult.isSuccessful()) {
184                 return RpcResultBuilder.<AddFlowsRpcOutput>success().build();
185             } else {
186                 return RpcResultBuilder.<AddFlowsRpcOutput>failed().build();
187             }
188         },MoreExecutors.directExecutor());
189     }
190
191     @Override
192     public ListenableFuture<RpcResult<ReadFlowTestOutput>> readFlowTest(final ReadFlowTestInput input) {
193         FlowReader flowReader = FlowReader.getNewInstance(dataBroker, input.getDpnCount().intValue(),
194                 input.getFlowsPerDpn().intValue(), input.getVerbose(), input.getIsConfigDs(),
195                 input.getStartTableId().shortValue(), input.getEndTableId().shortValue());
196         flowCounterBeanImpl.setReader(flowReader);
197         fjService.execute(flowReader);
198         RpcResultBuilder<ReadFlowTestOutput> rpcResultBuilder = RpcResultBuilder.success();
199         return Futures.immediateFuture(rpcResultBuilder.build());
200     }
201
202     @Override
203     public ListenableFuture<RpcResult<FlowRpcAddTestOutput>> flowRpcAddTest(final FlowRpcAddTestInput input) {
204         FlowWriterDirectOFRpc flowAddRpcTestImpl = new FlowWriterDirectOFRpc(dataBroker, flowService, fjService);
205         flowAddRpcTestImpl.rpcFlowAdd(input.getDpnId(), input.getFlowCount().intValue(),
206                 input.getRpcBatchSize().intValue());
207
208         RpcResultBuilder<FlowRpcAddTestOutput> rpcResultBuilder = RpcResultBuilder.success();
209         return Futures.immediateFuture(rpcResultBuilder.build());
210     }
211
212     @Override
213     public ListenableFuture<RpcResult<RegisterOutput>> register(final RegisterInput input) {
214         RpcResultBuilder<RegisterOutput> rpcResultBuilder = RpcResultBuilder.success();
215         try {
216             MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
217             String pathToMBean = String.format("%s:type=%s", FlowCounter.class.getPackage().getName(),
218                     FlowCounter.class.getSimpleName());
219             ObjectName name = new ObjectName(pathToMBean);
220             mbs.registerMBean(flowCounterBeanImpl, name);
221         } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException
222                 | NotCompliantMBeanException e) {
223             rpcResultBuilder = RpcResultBuilder.failed();
224             LOG.warn("Exception occurred", e);
225         }
226         return Futures.immediateFuture(rpcResultBuilder.build());
227     }
228
229     @Override
230     public ListenableFuture<RpcResult<RemoveFlowsRpcOutput>> removeFlowsRpc(final RemoveFlowsRpcInput input) {
231         List<ListenableFuture<RpcResult<RemoveFlowOutput>>> bulkResults = new ArrayList<>();
232
233         for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) {
234             RemoveFlowInputBuilder flowInputBuilder = new RemoveFlowInputBuilder(
235                     (org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) bulkFlow);
236             final NodeRef nodeRef = bulkFlow.getNode();
237             flowInputBuilder.setNode(nodeRef);
238             flowInputBuilder.setTableId(bulkFlow.getTableId());
239             bulkResults.add(flowService.removeFlow(flowInputBuilder.build()));
240         }
241         return Futures.transform(handleResultFuture(Futures.allAsList(bulkResults)), voidRpcResult -> {
242             if (voidRpcResult.isSuccessful()) {
243                 return RpcResultBuilder.<RemoveFlowsRpcOutput>success().build();
244             } else {
245                 return RpcResultBuilder.<RemoveFlowsRpcOutput>failed().build();
246             }
247         }, MoreExecutors.directExecutor());
248     }
249
250     @Override
251     public ListenableFuture<RpcResult<FlowTestOutput>> flowTest(final FlowTestInput input) {
252         if (input.getTxChain()) {
253             FlowWriterTxChain flowTester = new FlowWriterTxChain(dataBroker, fjService);
254             flowCounterBeanImpl.setWriter(flowTester);
255             if (input.getIsAdd()) {
256                 flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
257                         input.getBatchSize().intValue(), input.getSleepFor().intValue(),
258                         input.getSleepAfter().intValue(), input.getStartTableId().shortValue(),
259                         input.getEndTableId().shortValue(), input.getCreateParents());
260             } else {
261                 flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
262                         input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
263                         input.getEndTableId().shortValue());
264             }
265             RpcResultBuilder<FlowTestOutput> rpcResultBuilder = RpcResultBuilder.success();
266             return Futures.immediateFuture(rpcResultBuilder.build());
267         }
268         if (input.getSeq()) {
269             FlowWriterSequential flowTester = new FlowWriterSequential(dataBroker, fjService);
270             flowCounterBeanImpl.setWriter(flowTester);
271             if (input.getIsAdd()) {
272                 flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
273                         input.getBatchSize().intValue(), input.getSleepFor().intValue(),
274                         input.getStartTableId().shortValue(), input.getEndTableId().shortValue(),
275                         input.getCreateParents());
276             } else {
277                 flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
278                         input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
279                         input.getEndTableId().shortValue());
280             }
281         } else {
282             FlowWriterConcurrent flowTester = new FlowWriterConcurrent(dataBroker, fjService);
283             flowCounterBeanImpl.setWriter(flowTester);
284             if (input.getIsAdd()) {
285                 flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
286                         input.getBatchSize().intValue(), input.getSleepFor().intValue(),
287                         input.getSleepAfter().intValue(), input.getStartTableId().shortValue(),
288                         input.getEndTableId().shortValue(), input.getCreateParents());
289             } else {
290                 flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
291                         input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
292                         input.getEndTableId().shortValue());
293             }
294         }
295         RpcResultBuilder<FlowTestOutput> rpcResultBuilder = RpcResultBuilder.success();
296         return Futures.immediateFuture(rpcResultBuilder.build());
297     }
298
299     @Override
300     public ListenableFuture<RpcResult<TableTestOutput>> tableTest(final TableTestInput input) {
301         final TableWriter writer = new TableWriter(dataBroker, fjService);
302         flowCounterBeanImpl.setWriter(writer);
303         switch (input.getOperation()) {
304             case Add:
305                 writer.addTables(input.getDpnCount().intValue(), input.getStartTableId().shortValue(),
306                         input.getEndTableId().shortValue());
307                 break;
308             case Delete:
309                 writer.deleteTables(input.getDpnCount().intValue(), input.getStartTableId().shortValue(),
310                         input.getEndTableId().shortValue());
311                 break;
312             default:
313                 RpcResultBuilder<TableTestOutput> rpcResultBuilder = RpcResultBuilder.failed();
314                 return Futures.immediateFuture(rpcResultBuilder.build());
315         }
316         RpcResultBuilder<TableTestOutput> rpcResultBuilder = RpcResultBuilder.success();
317         return Futures.immediateFuture(rpcResultBuilder.build());
318     }
319
320     @Override
321     public ListenableFuture<RpcResult<FlowRpcAddMultipleOutput>> flowRpcAddMultiple(
322             final FlowRpcAddMultipleInput input) {
323         FlowWriterDirectOFRpc flowTesterRPC = new FlowWriterDirectOFRpc(dataBroker, flowService, fjService);
324         flowTesterRPC.rpcFlowAddAll(input.getFlowCount().intValue(), input.getRpcBatchSize().intValue());
325         RpcResultBuilder<FlowRpcAddMultipleOutput> rpcResultBuilder = RpcResultBuilder.success();
326         return Futures.immediateFuture(rpcResultBuilder.build());
327     }
328 }