Migrate to LoggingFutures
[openflowplugin.git] / applications / bulk-o-matic / src / main / java / org / opendaylight / openflowplugin / applications / bulk / o / matic / SalBulkFlowServiceImpl.java
1 /*
2  * Copyright (c) 2015, 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
10
11 import com.google.common.base.MoreObjects;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import com.google.common.util.concurrent.SettableFuture;
18 import java.lang.management.ManagementFactory;
19 import java.util.ArrayList;
20 import java.util.Collections;
21 import java.util.List;
22 import java.util.concurrent.ExecutorService;
23 import java.util.concurrent.ForkJoinPool;
24 import javax.management.InstanceAlreadyExistsException;
25 import javax.management.MBeanRegistrationException;
26 import javax.management.MBeanServer;
27 import javax.management.MalformedObjectNameException;
28 import javax.management.NotCompliantMBeanException;
29 import javax.management.ObjectName;
30 import org.opendaylight.infrautils.utils.concurrent.LoggingFutures;
31 import org.opendaylight.mdsal.binding.api.DataBroker;
32 import org.opendaylight.mdsal.binding.api.WriteTransaction;
33 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsDsInput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsDsOutput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsRpcInput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsRpcOutput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.BulkFlowBaseContentGrouping;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddMultipleInput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddMultipleOutput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddTestInput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddTestOutput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowTestInput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowTestOutput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.ReadFlowTestInput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.ReadFlowTestOutput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RegisterInput;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RegisterInputBuilder;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RegisterOutput;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsDsInput;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsDsOutput;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsRpcInput;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsRpcOutput;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.SalBulkFlowService;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.TableTestInput;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.TableTestOutput;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.bulk.flow.ds.list.grouping.BulkFlowDsItem;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
72 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
73 import org.opendaylight.yangtools.yang.common.RpcError;
74 import org.opendaylight.yangtools.yang.common.RpcResult;
75 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
76 import org.slf4j.Logger;
77 import org.slf4j.LoggerFactory;
78
79 /**
80  * Simple implementation providing bulk flows operations.
81  */
82 public class SalBulkFlowServiceImpl implements SalBulkFlowService {
83
84     private static final Logger LOG = LoggerFactory.getLogger(SalBulkFlowServiceImpl.class);
85
86     private final SalFlowService flowService;
87     private final DataBroker dataBroker;
88     private final FlowCounter flowCounterBeanImpl = new FlowCounter();
89     private final ExecutorService fjService = new ForkJoinPool();
90
91     public SalBulkFlowServiceImpl(SalFlowService flowService, DataBroker dataBroker) {
92         this.flowService = Preconditions.checkNotNull(flowService);
93         this.dataBroker = Preconditions.checkNotNull(dataBroker);
94
95         LoggingFutures.addErrorLogging(register(new RegisterInputBuilder().build()), LOG, "register");
96     }
97
98     @Override
99     public ListenableFuture<RpcResult<AddFlowsDsOutput>> addFlowsDs(AddFlowsDsInput input) {
100         WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
101         boolean createParentsNextTime = MoreObjects.firstNonNull(input.isAlwaysCreateParents(), Boolean.FALSE);
102         boolean createParents = true;
103         for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) {
104             FlowBuilder flowBuilder = new FlowBuilder(bulkFlow);
105             flowBuilder.setTableId(bulkFlow.getTableId());
106             flowBuilder.setId(new FlowId(bulkFlow.getFlowId()));
107             writeTransaction.put(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow),
108                     flowBuilder.build(), createParents);
109             createParents = createParentsNextTime;
110         }
111         ListenableFuture<?> submitFuture = writeTransaction.commit();
112         return Futures.transform(handleResultFuture(Futures.allAsList(submitFuture)), voidRpcResult -> {
113             if (voidRpcResult.isSuccessful()) {
114                 return RpcResultBuilder.<AddFlowsDsOutput>success().build();
115             } else {
116                 return RpcResultBuilder.<AddFlowsDsOutput>failed().build();
117             }
118         },MoreExecutors.directExecutor());
119     }
120
121     private InstanceIdentifier<Flow> getFlowInstanceIdentifier(BulkFlowDsItem bulkFlow) {
122         final NodeRef nodeRef = bulkFlow.getNode();
123         return ((InstanceIdentifier<Node>) nodeRef.getValue()).augmentation(FlowCapableNode.class)
124                 .child(Table.class, new TableKey(bulkFlow.getTableId()))
125                 .child(Flow.class, new FlowKey(new FlowId(bulkFlow.getFlowId())));
126     }
127
128     @Override
129     public ListenableFuture<RpcResult<RemoveFlowsDsOutput>> removeFlowsDs(RemoveFlowsDsInput input) {
130         WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
131         for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) {
132             writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow));
133         }
134         return Futures.transform(handleResultFuture(Futures.allAsList(writeTransaction.commit())), voidRpcResult -> {
135             if (voidRpcResult.isSuccessful()) {
136                 return RpcResultBuilder.<RemoveFlowsDsOutput>success().build();
137             } else {
138                 return RpcResultBuilder.<RemoveFlowsDsOutput>failed().build();
139             }
140         }, MoreExecutors.directExecutor());
141     }
142
143     private <T> ListenableFuture<RpcResult<Void>> handleResultFuture(ListenableFuture<List<T>> submitFuture) {
144         final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
145         Futures.addCallback(submitFuture, new FutureCallback<List<T>>() {
146             @Override
147             public void onSuccess(List<T> result) {
148                 rpcResult.set(RpcResultBuilder.success((Void) null).build());
149             }
150
151             @Override
152             public void onFailure(Throwable throwable) {
153                 RpcResultBuilder<Void> rpcResultBld = RpcResultBuilder.<Void>failed()
154                         .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION,
155                                 null, throwable.getMessage())));
156                 rpcResult.set(rpcResultBld.build());
157             }
158         }, MoreExecutors.directExecutor());
159         return rpcResult;
160     }
161
162     @Override
163     public ListenableFuture<RpcResult<AddFlowsRpcOutput>> addFlowsRpc(AddFlowsRpcInput input) {
164         List<ListenableFuture<RpcResult<AddFlowOutput>>> bulkResults = new ArrayList<>();
165
166         for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) {
167             AddFlowInputBuilder flowInputBuilder = new AddFlowInputBuilder(
168                     (org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) bulkFlow);
169             final NodeRef nodeRef = bulkFlow.getNode();
170             flowInputBuilder.setNode(nodeRef);
171             flowInputBuilder.setTableId(bulkFlow.getTableId());
172             bulkResults.add(flowService.addFlow(flowInputBuilder.build()));
173         }
174         return Futures.transform(handleResultFuture(Futures.allAsList(bulkResults)), voidRpcResult -> {
175             if (voidRpcResult.isSuccessful()) {
176                 return RpcResultBuilder.<AddFlowsRpcOutput>success().build();
177             } else {
178                 return RpcResultBuilder.<AddFlowsRpcOutput>failed().build();
179             }
180         },MoreExecutors.directExecutor());
181     }
182
183     @Override
184     public ListenableFuture<RpcResult<ReadFlowTestOutput>> readFlowTest(ReadFlowTestInput input) {
185         FlowReader flowReader = FlowReader.getNewInstance(dataBroker, input.getDpnCount().intValue(),
186                 input.getFlowsPerDpn().intValue(), input.isVerbose(), input.isIsConfigDs(),
187                 input.getStartTableId().shortValue(), input.getEndTableId().shortValue());
188         flowCounterBeanImpl.setReader(flowReader);
189         fjService.execute(flowReader);
190         RpcResultBuilder<ReadFlowTestOutput> rpcResultBuilder = RpcResultBuilder.success();
191         return Futures.immediateFuture(rpcResultBuilder.build());
192     }
193
194     @Override
195     public ListenableFuture<RpcResult<FlowRpcAddTestOutput>> flowRpcAddTest(FlowRpcAddTestInput input) {
196         FlowWriterDirectOFRpc flowAddRpcTestImpl = new FlowWriterDirectOFRpc(dataBroker, flowService, fjService);
197         flowAddRpcTestImpl.rpcFlowAdd(input.getDpnId(), input.getFlowCount().intValue(),
198                 input.getRpcBatchSize().intValue());
199
200         RpcResultBuilder<FlowRpcAddTestOutput> rpcResultBuilder = RpcResultBuilder.success();
201         return Futures.immediateFuture(rpcResultBuilder.build());
202     }
203
204     @Override
205     public ListenableFuture<RpcResult<RegisterOutput>> register(RegisterInput input) {
206         RpcResultBuilder<RegisterOutput> rpcResultBuilder = RpcResultBuilder.success();
207         try {
208             MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
209             String pathToMBean = String.format("%s:type=%s", FlowCounter.class.getPackage().getName(),
210                     FlowCounter.class.getSimpleName());
211             ObjectName name = new ObjectName(pathToMBean);
212             mbs.registerMBean(flowCounterBeanImpl, name);
213         } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException
214                 | NotCompliantMBeanException e) {
215             rpcResultBuilder = RpcResultBuilder.failed();
216             LOG.warn("Exception occurred", e);
217         }
218         return Futures.immediateFuture(rpcResultBuilder.build());
219     }
220
221     @Override
222     public ListenableFuture<RpcResult<RemoveFlowsRpcOutput>> removeFlowsRpc(RemoveFlowsRpcInput input) {
223         List<ListenableFuture<RpcResult<RemoveFlowOutput>>> bulkResults = new ArrayList<>();
224
225         for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) {
226             RemoveFlowInputBuilder flowInputBuilder = new RemoveFlowInputBuilder(
227                     (org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) bulkFlow);
228             final NodeRef nodeRef = bulkFlow.getNode();
229             flowInputBuilder.setNode(nodeRef);
230             flowInputBuilder.setTableId(bulkFlow.getTableId());
231             bulkResults.add(flowService.removeFlow(flowInputBuilder.build()));
232         }
233         return Futures.transform(handleResultFuture(Futures.allAsList(bulkResults)), voidRpcResult -> {
234             if (voidRpcResult.isSuccessful()) {
235                 return RpcResultBuilder.<RemoveFlowsRpcOutput>success().build();
236             } else {
237                 return RpcResultBuilder.<RemoveFlowsRpcOutput>failed().build();
238             }
239         }, MoreExecutors.directExecutor());
240     }
241
242     @Override
243     public ListenableFuture<RpcResult<FlowTestOutput>> flowTest(FlowTestInput input) {
244         if (input.isTxChain()) {
245             FlowWriterTxChain flowTester = new FlowWriterTxChain(dataBroker, fjService);
246             flowCounterBeanImpl.setWriter(flowTester);
247             if (input.isIsAdd()) {
248                 flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
249                         input.getBatchSize().intValue(), input.getSleepFor().intValue(),
250                         input.getSleepAfter().intValue(), input.getStartTableId().shortValue(),
251                         input.getEndTableId().shortValue(), input.isCreateParents());
252             } else {
253                 flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
254                         input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
255                         input.getEndTableId().shortValue());
256             }
257             RpcResultBuilder<FlowTestOutput> rpcResultBuilder = RpcResultBuilder.success();
258             return Futures.immediateFuture(rpcResultBuilder.build());
259         }
260         if (input.isSeq()) {
261             FlowWriterSequential flowTester = new FlowWriterSequential(dataBroker, fjService);
262             flowCounterBeanImpl.setWriter(flowTester);
263             if (input.isIsAdd()) {
264                 flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
265                         input.getBatchSize().intValue(), input.getSleepFor().intValue(),
266                         input.getStartTableId().shortValue(), input.getEndTableId().shortValue(),
267                         input.isCreateParents());
268             } else {
269                 flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
270                         input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
271                         input.getEndTableId().shortValue());
272             }
273         } else {
274             FlowWriterConcurrent flowTester = new FlowWriterConcurrent(dataBroker, fjService);
275             flowCounterBeanImpl.setWriter(flowTester);
276             if (input.isIsAdd()) {
277                 flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
278                         input.getBatchSize().intValue(), input.getSleepFor().intValue(),
279                         input.getSleepAfter().intValue(), input.getStartTableId().shortValue(),
280                         input.getEndTableId().shortValue(), input.isCreateParents());
281             } else {
282                 flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
283                         input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
284                         input.getEndTableId().shortValue());
285             }
286         }
287         RpcResultBuilder<FlowTestOutput> rpcResultBuilder = RpcResultBuilder.success();
288         return Futures.immediateFuture(rpcResultBuilder.build());
289     }
290
291     @Override
292     public ListenableFuture<RpcResult<TableTestOutput>> tableTest(final TableTestInput input) {
293         final TableWriter writer = new TableWriter(dataBroker, fjService);
294         flowCounterBeanImpl.setWriter(writer);
295         switch (input.getOperation()) {
296             case Add:
297                 writer.addTables(input.getDpnCount().intValue(), input.getStartTableId().shortValue(),
298                         input.getEndTableId().shortValue());
299                 break;
300             case Delete:
301                 writer.deleteTables(input.getDpnCount().intValue(), input.getStartTableId().shortValue(),
302                         input.getEndTableId().shortValue());
303                 break;
304             default:
305                 RpcResultBuilder<TableTestOutput> rpcResultBuilder = RpcResultBuilder.failed();
306                 return Futures.immediateFuture(rpcResultBuilder.build());
307         }
308         RpcResultBuilder<TableTestOutput> rpcResultBuilder = RpcResultBuilder.success();
309         return Futures.immediateFuture(rpcResultBuilder.build());
310     }
311
312     @Override
313     public ListenableFuture<RpcResult<FlowRpcAddMultipleOutput>> flowRpcAddMultiple(FlowRpcAddMultipleInput input) {
314         FlowWriterDirectOFRpc flowTesterRPC = new FlowWriterDirectOFRpc(dataBroker, flowService, fjService);
315         flowTesterRPC.rpcFlowAddAll(input.getFlowCount().intValue(), input.getRpcBatchSize().intValue());
316         RpcResultBuilder<FlowRpcAddMultipleOutput> rpcResultBuilder = RpcResultBuilder.success();
317         return Futures.immediateFuture(rpcResultBuilder.build());
318     }
319 }