Bug 6161 - PrintStackTrace replaced with logger
[openflowplugin.git] / applications / bulk-o-matic / src / main / java / org / opendaylight / openflowplugin / applications / bulk / o / matic / SalBulkFlowServiceImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
10
11 import com.google.common.base.MoreObjects;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.CheckedFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.JdkFutureAdapters;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.SettableFuture;
19 import java.lang.management.ManagementFactory;
20 import java.util.ArrayList;
21 import java.util.Collections;
22 import java.util.List;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.ForkJoinPool;
25 import java.util.concurrent.Future;
26 import javax.management.InstanceAlreadyExistsException;
27 import javax.management.MBeanRegistrationException;
28 import javax.management.MBeanServer;
29 import javax.management.MalformedObjectNameException;
30 import javax.management.NotCompliantMBeanException;
31 import javax.management.ObjectName;
32 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
33 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
34 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
35 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsDsInput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsRpcInput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.BulkFlowBaseContentGrouping;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddMultipleInput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddTestInput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowTestInput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.ReadFlowTestInput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsDsInput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsRpcInput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.SalBulkFlowService;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.bulk.flow.ds.list.grouping.BulkFlowDsItem;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
61 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
62 import org.opendaylight.yangtools.yang.common.RpcError;
63 import org.opendaylight.yangtools.yang.common.RpcResult;
64 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
65 import org.slf4j.Logger;
66 import org.slf4j.LoggerFactory;
67
68 /**
69  * Simple implementation providing bulk flows operations.
70  */
71 public class SalBulkFlowServiceImpl implements SalBulkFlowService {
72
73     private static final Logger LOG = LoggerFactory.getLogger(SalBulkFlowServiceImpl.class);
74
75     private final SalFlowService flowService;
76     private final DataBroker dataBroker;
77     private final FlowCounter flowCounterBeanImpl = new FlowCounter();
78     private final ExecutorService fjService = new ForkJoinPool();
79     public SalBulkFlowServiceImpl(SalFlowService flowService, DataBroker dataBroker) {
80         this.flowService = Preconditions.checkNotNull(flowService);
81         this.dataBroker = Preconditions.checkNotNull(dataBroker);
82         register();
83     }
84
85     @Override
86     public Future<RpcResult<Void>> addFlowsDs(AddFlowsDsInput input) {
87         WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
88         boolean createParentsNextTime = MoreObjects.firstNonNull(input.isAlwaysCreateParents(), Boolean.FALSE);
89         boolean createParents = true;
90         for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) {
91             FlowBuilder flowBuilder = new FlowBuilder(bulkFlow);
92             flowBuilder.setTableId(bulkFlow.getTableId());
93             flowBuilder.setId(new FlowId(bulkFlow.getFlowId()));
94             writeTransaction.put(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow),
95                     flowBuilder.build(), createParents);
96             createParents = createParentsNextTime;
97         }
98         CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTransaction.submit();
99         return handleResultFuture(submitFuture);
100     }
101
102     private InstanceIdentifier<Flow> getFlowInstanceIdentifier(BulkFlowDsItem bulkFlow) {
103         final NodeRef nodeRef = bulkFlow.getNode();
104         return ((InstanceIdentifier<Node>) nodeRef.getValue())
105                 .augmentation(FlowCapableNode.class)
106                 .child(Table.class, new TableKey(bulkFlow.getTableId()))
107                 .child(Flow.class,
108                         new FlowKey(new FlowId(bulkFlow.getFlowId())));
109     }
110
111     @Override
112     public Future<RpcResult<Void>> removeFlowsDs(RemoveFlowsDsInput input) {
113         WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
114         for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) {
115             writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow));
116         }
117         CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTransaction.submit();
118         return handleResultFuture(submitFuture);
119     }
120
121     private ListenableFuture<RpcResult<Void>> handleResultFuture(CheckedFuture<Void,
122             TransactionCommitFailedException> submitFuture) {
123         final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
124         Futures.addCallback(submitFuture, new FutureCallback<Void>() {
125             @Override
126             public void onSuccess(Void result) {
127                 rpcResult.set(RpcResultBuilder.success(result).build());
128             }
129
130             @Override
131             public void onFailure(Throwable t) {
132                 RpcResultBuilder<Void> rpcResultBld = RpcResultBuilder.<Void>failed()
133                         .withRpcErrors(Collections.singleton(
134                                 RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, null, t.getMessage())
135                         ));
136                 rpcResult.set(rpcResultBld.build());
137             }
138         });
139         return rpcResult;
140     }
141
142     private <T> ListenableFuture<RpcResult<Void>> handleResultFuture(ListenableFuture<List<T>> submitFuture) {
143         final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
144         Futures.addCallback(submitFuture, new FutureCallback<List<T>>() {
145             @Override
146             public void onSuccess(List<T> result) {
147                 rpcResult.set(RpcResultBuilder.success((Void) null).build());
148             }
149
150             @Override
151             public void onFailure(Throwable t) {
152                 RpcResultBuilder<Void> rpcResultBld = RpcResultBuilder.<Void>failed()
153                         .withRpcErrors(Collections.singleton(
154                                 RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, null, t.getMessage())
155                         ));
156                 rpcResult.set(rpcResultBld.build());
157             }
158         });
159         return rpcResult;
160     }
161
162     @Override
163     public Future<RpcResult<Void>> addFlowsRpc(AddFlowsRpcInput input) {
164         List<ListenableFuture<RpcResult<AddFlowOutput>>> bulkResults = new ArrayList<>();
165
166         for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) {
167             AddFlowInputBuilder flowInputBuilder = new AddFlowInputBuilder((org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) bulkFlow);
168             final NodeRef nodeRef = bulkFlow.getNode();
169             flowInputBuilder.setNode(nodeRef);
170             flowInputBuilder.setTableId(bulkFlow.getTableId());
171             Future<RpcResult<AddFlowOutput>> rpcAddFlowResult = flowService.addFlow(flowInputBuilder.build());
172             bulkResults.add(JdkFutureAdapters.listenInPoolThread(rpcAddFlowResult));
173         }
174         return handleResultFuture(Futures.allAsList(bulkResults));
175     }
176
177     @Override
178     public Future<RpcResult<Void>> readFlowTest(ReadFlowTestInput input) {
179         FlowReader flowReader = FlowReader.getNewInstance(dataBroker,
180                 input.getDpnCount().intValue(),
181                 input.getFlowsPerDpn().intValue(), input.isVerbose(),
182                 input.isIsConfigDs(),input.getStartTableId().shortValue(),
183                 input.getEndTableId().shortValue());
184         flowCounterBeanImpl.setReader(flowReader);
185         fjService.execute(flowReader);
186         RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
187         return Futures.immediateFuture(rpcResultBuilder.build());
188     }
189
190     @Override
191     public Future<RpcResult<Void>> flowRpcAddTest(FlowRpcAddTestInput input) {
192         FlowWriterDirectOFRpc flowAddRpcTestImpl = new FlowWriterDirectOFRpc(dataBroker, flowService, fjService);
193         flowAddRpcTestImpl.rpcFlowAdd(
194                 input.getDpnId(),
195                 input.getFlowCount().intValue(),
196                 input.getRpcBatchSize().intValue());
197
198
199         RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
200         return Futures.immediateFuture(rpcResultBuilder.build());
201     }
202
203     @Override
204     public Future<RpcResult<Void>> register() {
205         RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
206         try {
207         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
208         String pathToMBean = String.format("%s:type=%s",
209                 FlowCounter.class.getPackage().getName(),
210                 FlowCounter.class.getSimpleName());
211         ObjectName name = null;
212
213             name = new ObjectName(pathToMBean);
214             mbs.registerMBean(flowCounterBeanImpl, name);
215         } catch (MalformedObjectNameException | InstanceAlreadyExistsException
216                 | MBeanRegistrationException | NotCompliantMBeanException e) {
217             rpcResultBuilder = RpcResultBuilder.failed();
218             LOG.warn("Exception occurred: {} ", e.getMessage(), e);
219         }
220         return Futures.immediateFuture(rpcResultBuilder.build());
221     }
222
223     @Override
224     public Future<RpcResult<Void>> removeFlowsRpc(RemoveFlowsRpcInput input) {
225         List<ListenableFuture<RpcResult<RemoveFlowOutput>>> bulkResults = new ArrayList<>();
226
227         for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) {
228             RemoveFlowInputBuilder flowInputBuilder = new RemoveFlowInputBuilder((org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) bulkFlow);
229             final NodeRef nodeRef = bulkFlow.getNode();
230             flowInputBuilder.setNode(nodeRef);
231             flowInputBuilder.setTableId(bulkFlow.getTableId());
232             Future<RpcResult<RemoveFlowOutput>> rpcAddFlowResult = flowService.removeFlow(flowInputBuilder.build());
233             bulkResults.add(JdkFutureAdapters.listenInPoolThread(rpcAddFlowResult));
234         }
235         return handleResultFuture(Futures.allAsList(bulkResults));
236     }
237
238     @Override
239     public Future<RpcResult<Void>> flowTest(FlowTestInput input) {
240         if (input.isTxChain()) {
241             FlowWriterTxChain flowTester = new FlowWriterTxChain(dataBroker, fjService);
242             flowCounterBeanImpl.setWriter(flowTester);
243             if (input.isIsAdd()){
244                 flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
245                         input.getBatchSize().intValue(), input.getSleepFor().intValue(),
246                         input.getSleepAfter().intValue(), input.getStartTableId().shortValue(),
247                         input.getEndTableId().shortValue());
248             } else {
249                 flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
250                         input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
251                         input.getEndTableId().shortValue());
252             }
253             RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
254             return Futures.immediateFuture(rpcResultBuilder.build());
255         }
256         if (input.isSeq()) {
257             FlowWriterSequential flowTester = new FlowWriterSequential(dataBroker, fjService);
258             flowCounterBeanImpl.setWriter(flowTester);
259             if (input.isIsAdd()){
260                 flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
261                         input.getBatchSize().intValue(), input.getSleepFor().intValue(),
262                         input.getStartTableId().shortValue(), input.getEndTableId().shortValue());
263             } else {
264                 flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
265                         input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
266                         input.getEndTableId().shortValue());
267             }
268         } else {
269             FlowWriterConcurrent flowTester = new FlowWriterConcurrent(dataBroker, fjService);
270             flowCounterBeanImpl.setWriter(flowTester);
271             if (input.isIsAdd()){
272                 flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
273                         input.getBatchSize().intValue(), input.getSleepFor().intValue(),
274                         input.getSleepAfter().intValue(), input.getStartTableId().shortValue(),
275                         input.getEndTableId().shortValue());
276             } else {
277                 flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
278                         input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
279                         input.getEndTableId().shortValue());
280             }
281         }
282         RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
283         return Futures.immediateFuture(rpcResultBuilder.build());
284     }
285
286     @Override
287     public Future<RpcResult<Void>> flowRpcAddMultiple(FlowRpcAddMultipleInput input) {
288         FlowWriterDirectOFRpc flowTesterRPC = new FlowWriterDirectOFRpc(dataBroker, flowService, fjService);
289         flowTesterRPC.rpcFlowAddAll(input.getFlowCount().intValue(), input.getRpcBatchSize().intValue());
290         RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
291         return Futures.immediateFuture(rpcResultBuilder.build());
292     }
293 }