Merge changes from topic 'blueprint'
[openflowplugin.git] / applications / bulk-o-matic / src / main / java / org / opendaylight / openflowplugin / applications / bulk / o / matic / SalBulkFlowServiceImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
10
11 import com.google.common.base.MoreObjects;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.CheckedFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.JdkFutureAdapters;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.SettableFuture;
19 import java.lang.management.ManagementFactory;
20 import java.util.ArrayList;
21 import java.util.Collections;
22 import java.util.List;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.ForkJoinPool;
25 import java.util.concurrent.Future;
26 import javax.management.InstanceAlreadyExistsException;
27 import javax.management.MBeanRegistrationException;
28 import javax.management.MBeanServer;
29 import javax.management.MalformedObjectNameException;
30 import javax.management.NotCompliantMBeanException;
31 import javax.management.ObjectName;
32 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
33 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
34 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
35 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsDsInput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsRpcInput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.BulkFlowBaseContentGrouping;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddMultipleInput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddTestInput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowTestInput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.ReadFlowTestInput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsDsInput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsRpcInput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.SalBulkFlowService;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.bulk.flow.ds.list.grouping.BulkFlowDsItem;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
61 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
62 import org.opendaylight.yangtools.yang.common.RpcError;
63 import org.opendaylight.yangtools.yang.common.RpcResult;
64 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
65
66 /**
67  * Simple implementation providing bulk flows operations.
68  */
69 public class SalBulkFlowServiceImpl implements SalBulkFlowService {
70
71     private final SalFlowService flowService;
72     private final DataBroker dataBroker;
73     private final FlowCounter flowCounterBeanImpl = new FlowCounter();
74     private final ExecutorService fjService = new ForkJoinPool();
75     public SalBulkFlowServiceImpl(SalFlowService flowService, DataBroker dataBroker) {
76         this.flowService = Preconditions.checkNotNull(flowService);
77         this.dataBroker = Preconditions.checkNotNull(dataBroker);
78         register();
79     }
80
81     @Override
82     public Future<RpcResult<Void>> addFlowsDs(AddFlowsDsInput input) {
83         WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
84         boolean createParentsNextTime = MoreObjects.firstNonNull(input.isAlwaysCreateParents(), Boolean.FALSE);
85         boolean createParents = true;
86         for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) {
87             FlowBuilder flowBuilder = new FlowBuilder(bulkFlow);
88             flowBuilder.setTableId(bulkFlow.getTableId());
89             flowBuilder.setId(new FlowId(bulkFlow.getFlowId()));
90             writeTransaction.put(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow),
91                     flowBuilder.build(), createParents);
92             createParents = createParentsNextTime;
93         }
94         CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTransaction.submit();
95         return handleResultFuture(submitFuture);
96     }
97
98     private InstanceIdentifier<Flow> getFlowInstanceIdentifier(BulkFlowDsItem bulkFlow) {
99         final NodeRef nodeRef = bulkFlow.getNode();
100         return ((InstanceIdentifier<Node>) nodeRef.getValue())
101                 .augmentation(FlowCapableNode.class)
102                 .child(Table.class, new TableKey(bulkFlow.getTableId()))
103                 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow.class,
104                         new FlowKey(new FlowId(bulkFlow.getFlowId())));
105     }
106
107     @Override
108     public Future<RpcResult<Void>> removeFlowsDs(RemoveFlowsDsInput input) {
109         WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
110         for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) {
111             writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow));
112         }
113         CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTransaction.submit();
114         return handleResultFuture(submitFuture);
115     }
116
117     private ListenableFuture<RpcResult<Void>> handleResultFuture(CheckedFuture<Void,
118             TransactionCommitFailedException> submitFuture) {
119         final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
120         Futures.addCallback(submitFuture, new FutureCallback<Void>() {
121             @Override
122             public void onSuccess(Void result) {
123                 rpcResult.set(RpcResultBuilder.success(result).build());
124             }
125
126             @Override
127             public void onFailure(Throwable t) {
128                 RpcResultBuilder<Void> rpcResultBld = RpcResultBuilder.<Void>failed()
129                         .withRpcErrors(Collections.singleton(
130                                 RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, null, t.getMessage())
131                         ));
132                 rpcResult.set(rpcResultBld.build());
133             }
134         });
135         return rpcResult;
136     }
137
138     private <T> ListenableFuture<RpcResult<Void>> handleResultFuture(ListenableFuture<List<T>> submitFuture) {
139         final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
140         Futures.addCallback(submitFuture, new FutureCallback<List<T>>() {
141             @Override
142             public void onSuccess(List<T> result) {
143                 rpcResult.set(RpcResultBuilder.success((Void) null).build());
144             }
145
146             @Override
147             public void onFailure(Throwable t) {
148                 RpcResultBuilder<Void> rpcResultBld = RpcResultBuilder.<Void>failed()
149                         .withRpcErrors(Collections.singleton(
150                                 RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, null, t.getMessage())
151                         ));
152                 rpcResult.set(rpcResultBld.build());
153             }
154         });
155         return rpcResult;
156     }
157
158     @Override
159     public Future<RpcResult<Void>> addFlowsRpc(AddFlowsRpcInput input) {
160         List<ListenableFuture<RpcResult<AddFlowOutput>>> bulkResults = new ArrayList<>();
161
162         for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) {
163             AddFlowInputBuilder flowInputBuilder = new AddFlowInputBuilder((Flow) bulkFlow);
164             final NodeRef nodeRef = bulkFlow.getNode();
165             flowInputBuilder.setNode(nodeRef);
166             flowInputBuilder.setTableId(bulkFlow.getTableId());
167             Future<RpcResult<AddFlowOutput>> rpcAddFlowResult = flowService.addFlow(flowInputBuilder.build());
168             bulkResults.add(JdkFutureAdapters.listenInPoolThread(rpcAddFlowResult));
169         }
170         return handleResultFuture(Futures.allAsList(bulkResults));
171     }
172
173     @Override
174     public Future<RpcResult<Void>> readFlowTest(ReadFlowTestInput input) {
175         FlowReader flowReader = FlowReader.getNewInstance(dataBroker,
176                 input.getDpnCount().intValue(),
177                 input.getFlowsPerDpn().intValue(), input.isVerbose(),
178                 input.isIsConfigDs(),input.getStartTableId().shortValue(),
179                 input.getEndTableId().shortValue());
180         flowCounterBeanImpl.setReader(flowReader);
181         fjService.execute(flowReader);
182         RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
183         return Futures.immediateFuture(rpcResultBuilder.build());
184     }
185
186     @Override
187     public Future<RpcResult<Void>> flowRpcAddTest(FlowRpcAddTestInput input) {
188         FlowWriterDirectOFRpc flowAddRpcTestImpl = new FlowWriterDirectOFRpc(dataBroker, flowService, fjService);
189         flowAddRpcTestImpl.rpcFlowAdd(
190                 input.getDpnId(),
191                 input.getFlowCount().intValue(),
192                 input.getRpcBatchSize().intValue());
193
194
195         RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
196         return Futures.immediateFuture(rpcResultBuilder.build());
197     }
198
199     @Override
200     public Future<RpcResult<Void>> register() {
201         RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
202         try {
203         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
204         String pathToMBean = String.format("%s:type=%s",
205                 FlowCounter.class.getPackage().getName(),
206                 FlowCounter.class.getSimpleName());
207         ObjectName name = null;
208
209             name = new ObjectName(pathToMBean);
210             mbs.registerMBean(flowCounterBeanImpl, name);
211         } catch (MalformedObjectNameException | InstanceAlreadyExistsException
212                 | MBeanRegistrationException | NotCompliantMBeanException e) {
213             rpcResultBuilder = RpcResultBuilder.failed();
214             e.printStackTrace();
215         }
216         return Futures.immediateFuture(rpcResultBuilder.build());
217     }
218
219     @Override
220     public Future<RpcResult<Void>> removeFlowsRpc(RemoveFlowsRpcInput input) {
221         List<ListenableFuture<RpcResult<RemoveFlowOutput>>> bulkResults = new ArrayList<>();
222
223         for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) {
224             RemoveFlowInputBuilder flowInputBuilder = new RemoveFlowInputBuilder((Flow) bulkFlow);
225             final NodeRef nodeRef = bulkFlow.getNode();
226             flowInputBuilder.setNode(nodeRef);
227             flowInputBuilder.setTableId(bulkFlow.getTableId());
228             Future<RpcResult<RemoveFlowOutput>> rpcAddFlowResult = flowService.removeFlow(flowInputBuilder.build());
229             bulkResults.add(JdkFutureAdapters.listenInPoolThread(rpcAddFlowResult));
230         }
231         return handleResultFuture(Futures.allAsList(bulkResults));
232     }
233
234     @Override
235     public Future<RpcResult<Void>> flowTest(FlowTestInput input) {
236         if (input.isTxChain()) {
237             FlowWriterTxChain flowTester = new FlowWriterTxChain(dataBroker, fjService);
238             flowCounterBeanImpl.setWriter(flowTester);
239             if (input.isIsAdd()){
240                 flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
241                         input.getBatchSize().intValue(), input.getSleepFor().intValue(),
242                         input.getSleepAfter().intValue(), input.getStartTableId().shortValue(),
243                         input.getEndTableId().shortValue());
244             } else {
245                 flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
246                         input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
247                         input.getEndTableId().shortValue());
248             }
249             RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
250             return Futures.immediateFuture(rpcResultBuilder.build());
251         }
252         if (input.isSeq()) {
253             FlowWriterSequential flowTester = new FlowWriterSequential(dataBroker, fjService);
254             flowCounterBeanImpl.setWriter(flowTester);
255             if (input.isIsAdd()){
256                 flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
257                         input.getBatchSize().intValue(), input.getSleepFor().intValue(),
258                         input.getStartTableId().shortValue(), input.getEndTableId().shortValue());
259             } else {
260                 flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
261                         input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
262                         input.getEndTableId().shortValue());
263             }
264         } else {
265             FlowWriterConcurrent flowTester = new FlowWriterConcurrent(dataBroker, fjService);
266             flowCounterBeanImpl.setWriter(flowTester);
267             if (input.isIsAdd()){
268                 flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
269                         input.getBatchSize().intValue(), input.getSleepFor().intValue(),
270                         input.getSleepAfter().intValue(), input.getStartTableId().shortValue(),
271                         input.getEndTableId().shortValue());
272             } else {
273                 flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
274                         input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
275                         input.getEndTableId().shortValue());
276             }
277         }
278         RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
279         return Futures.immediateFuture(rpcResultBuilder.build());
280     }
281
282     @Override
283     public Future<RpcResult<Void>> flowRpcAddMultiple(FlowRpcAddMultipleInput input) {
284         FlowWriterDirectOFRpc flowTesterRPC = new FlowWriterDirectOFRpc(dataBroker, flowService, fjService);
285         flowTesterRPC.rpcFlowAddAll(input.getFlowCount().intValue(), input.getRpcBatchSize().intValue());
286         RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
287         return Futures.immediateFuture(rpcResultBuilder.build());
288     }
289 }