Merge "BUG-5573: Altering the HandshakePool to have a LinkedBlockingQueue instead...
[openflowplugin.git] / applications / bulk-o-matic / src / main / java / org / opendaylight / openflowplugin / applications / bulk / o / matic / SalBulkFlowServiceImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
10
11 import com.google.common.base.MoreObjects;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.*;
14 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
15 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
16 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
17 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.*;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.bulk.flow.ds.list.grouping.BulkFlowDsItem;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.*;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
30 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
31 import org.opendaylight.yangtools.yang.common.RpcError;
32 import org.opendaylight.yangtools.yang.common.RpcResult;
33 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
34
35 import javax.management.*;
36 import java.lang.management.ManagementFactory;
37 import java.util.ArrayList;
38 import java.util.Collections;
39 import java.util.List;
40 import java.util.concurrent.ExecutorService;
41 import java.util.concurrent.ForkJoinPool;
42 import java.util.concurrent.Future;
43
44 /**
45  * Simple implementation providing bulk flows operations.
46  */
47 public class SalBulkFlowServiceImpl implements SalBulkFlowService {
48
49     private final SalFlowService flowService;
50     private final DataBroker dataBroker;
51     private FlowCounter flowCounterBeanImpl = new FlowCounter();
52     private final ExecutorService fjService = new ForkJoinPool();
53     public SalBulkFlowServiceImpl(SalFlowService flowService, DataBroker dataBroker) {
54         this.flowService = Preconditions.checkNotNull(flowService);
55         this.dataBroker = Preconditions.checkNotNull(dataBroker);
56     }
57
58     @Override
59     public Future<RpcResult<Void>> addFlowsDs(AddFlowsDsInput input) {
60         WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
61         boolean createParentsNextTime = MoreObjects.firstNonNull(input.isAlwaysCreateParents(), Boolean.FALSE);
62         boolean createParents = true;
63         for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) {
64             FlowBuilder flowBuilder = new FlowBuilder(bulkFlow);
65             flowBuilder.setTableId(bulkFlow.getTableId());
66             flowBuilder.setId(new FlowId(bulkFlow.getFlowId()));
67             writeTransaction.put(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow),
68                     flowBuilder.build(), createParents);
69             createParents = createParentsNextTime;
70         }
71         CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTransaction.submit();
72         return handleResultFuture(submitFuture);
73     }
74
75     private InstanceIdentifier<Flow> getFlowInstanceIdentifier(BulkFlowDsItem bulkFlow) {
76         final NodeRef nodeRef = bulkFlow.getNode();
77         return ((InstanceIdentifier<Node>) nodeRef.getValue())
78                 .augmentation(FlowCapableNode.class)
79                 .child(Table.class, new TableKey(bulkFlow.getTableId()))
80                 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow.class,
81                         new FlowKey(new FlowId(bulkFlow.getFlowId())));
82     }
83
84     @Override
85     public Future<RpcResult<Void>> removeFlowsDs(RemoveFlowsDsInput input) {
86         WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
87         for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) {
88             writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow));
89         }
90         CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTransaction.submit();
91         return handleResultFuture(submitFuture);
92     }
93
94     private ListenableFuture<RpcResult<Void>> handleResultFuture(CheckedFuture<Void,
95             TransactionCommitFailedException> submitFuture) {
96         final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
97         Futures.addCallback(submitFuture, new FutureCallback<Void>() {
98             @Override
99             public void onSuccess(Void result) {
100                 rpcResult.set(RpcResultBuilder.success(result).build());
101             }
102
103             @Override
104             public void onFailure(Throwable t) {
105                 RpcResultBuilder<Void> rpcResultBld = RpcResultBuilder.<Void>failed()
106                         .withRpcErrors(Collections.singleton(
107                                 RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, null, t.getMessage())
108                         ));
109                 rpcResult.set(rpcResultBld.build());
110             }
111         });
112         return rpcResult;
113     }
114
115     private <T> ListenableFuture<RpcResult<Void>> handleResultFuture(ListenableFuture<List<T>> submitFuture) {
116         final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
117         Futures.addCallback(submitFuture, new FutureCallback<List<T>>() {
118             @Override
119             public void onSuccess(List<T> result) {
120                 rpcResult.set(RpcResultBuilder.success((Void) null).build());
121             }
122
123             @Override
124             public void onFailure(Throwable t) {
125                 RpcResultBuilder<Void> rpcResultBld = RpcResultBuilder.<Void>failed()
126                         .withRpcErrors(Collections.singleton(
127                                 RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, null, t.getMessage())
128                         ));
129                 rpcResult.set(rpcResultBld.build());
130             }
131         });
132         return rpcResult;
133     }
134
135     @Override
136     public Future<RpcResult<Void>> addFlowsRpc(AddFlowsRpcInput input) {
137         List<ListenableFuture<RpcResult<AddFlowOutput>>> bulkResults = new ArrayList<>();
138
139         for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) {
140             AddFlowInputBuilder flowInputBuilder = new AddFlowInputBuilder((Flow) bulkFlow);
141             final NodeRef nodeRef = bulkFlow.getNode();
142             flowInputBuilder.setNode(nodeRef);
143             flowInputBuilder.setTableId(bulkFlow.getTableId());
144             Future<RpcResult<AddFlowOutput>> rpcAddFlowResult = flowService.addFlow(flowInputBuilder.build());
145             bulkResults.add(JdkFutureAdapters.listenInPoolThread(rpcAddFlowResult));
146         }
147         return handleResultFuture(Futures.allAsList(bulkResults));
148     }
149
150     @Override
151     public Future<RpcResult<Void>> readFlowTest(ReadFlowTestInput input) {
152         FlowReader flowReader = FlowReader.getNewInstance(dataBroker,
153                 input.getDpnCount().intValue(),
154                 input.getFlowsPerDpn().intValue(), input.isVerbose(),
155                 input.isIsConfigDs(),input.getStartTableId().shortValue(),
156                 input.getEndTableId().shortValue());
157         flowCounterBeanImpl.setReader(flowReader);
158         fjService.execute(flowReader);
159         RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
160         return Futures.immediateFuture(rpcResultBuilder.build());
161     }
162
163     @Override
164     public Future<RpcResult<Void>> flowRpcAddTest(FlowRpcAddTestInput input) {
165         FlowWriterDirectOFRpc flowAddRpcTestImpl = new FlowWriterDirectOFRpc(dataBroker, flowService, fjService);
166         flowAddRpcTestImpl.rpcFlowAdd(
167                 input.getDpnId(),
168                 input.getFlowCount().intValue(),
169                 input.getRpcBatchSize().intValue());
170
171
172         RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
173         return Futures.immediateFuture(rpcResultBuilder.build());
174     }
175
176     @Override
177     public Future<RpcResult<Void>> register() {
178         RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
179         try {
180         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
181         String pathToMBean = String.format("%s:type=%s",
182                 FlowCounter.class.getPackage().getName(),
183                 FlowCounter.class.getSimpleName());
184         ObjectName name = null;
185
186             name = new ObjectName(pathToMBean);
187             mbs.registerMBean(flowCounterBeanImpl, name);
188         } catch (MalformedObjectNameException | InstanceAlreadyExistsException
189                 | MBeanRegistrationException | NotCompliantMBeanException e) {
190             rpcResultBuilder = RpcResultBuilder.failed();
191             e.printStackTrace();
192         }
193         return Futures.immediateFuture(rpcResultBuilder.build());
194     }
195
196     @Override
197     public Future<RpcResult<Void>> removeFlowsRpc(RemoveFlowsRpcInput input) {
198         List<ListenableFuture<RpcResult<RemoveFlowOutput>>> bulkResults = new ArrayList<>();
199
200         for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) {
201             RemoveFlowInputBuilder flowInputBuilder = new RemoveFlowInputBuilder((Flow) bulkFlow);
202             final NodeRef nodeRef = bulkFlow.getNode();
203             flowInputBuilder.setNode(nodeRef);
204             flowInputBuilder.setTableId(bulkFlow.getTableId());
205             Future<RpcResult<RemoveFlowOutput>> rpcAddFlowResult = flowService.removeFlow(flowInputBuilder.build());
206             bulkResults.add(JdkFutureAdapters.listenInPoolThread(rpcAddFlowResult));
207         }
208         return handleResultFuture(Futures.allAsList(bulkResults));
209     }
210
211     @Override
212     public Future<RpcResult<Void>> flowTest(FlowTestInput input) {
213         if (input.isTxChain()) {
214             FlowWriterTxChain flowTester = new FlowWriterTxChain(dataBroker, fjService);
215             flowCounterBeanImpl.setWriter(flowTester);
216             if (input.isIsAdd()){
217                 flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
218                         input.getBatchSize().intValue(), input.getSleepFor().intValue(),
219                         input.getSleepAfter().intValue(), input.getStartTableId().shortValue(),
220                         input.getEndTableId().shortValue());
221             } else {
222                 flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
223                         input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
224                         input.getEndTableId().shortValue());
225             }
226             RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
227             return Futures.immediateFuture(rpcResultBuilder.build());
228         }
229         if (input.isSeq()) {
230             FlowWriterSequential flowTester = new FlowWriterSequential(dataBroker, fjService);
231             flowCounterBeanImpl.setWriter(flowTester);
232             if (input.isIsAdd()){
233                 flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
234                         input.getBatchSize().intValue(), input.getSleepFor().intValue(),
235                         input.getStartTableId().shortValue(), input.getEndTableId().shortValue());
236             } else {
237                 flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
238                         input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
239                         input.getEndTableId().shortValue());
240             }
241         } else {
242             FlowWriterConcurrent flowTester = new FlowWriterConcurrent(dataBroker, fjService);
243             flowCounterBeanImpl.setWriter(flowTester);
244             if (input.isIsAdd()){
245                 flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
246                         input.getBatchSize().intValue(), input.getSleepFor().intValue(),
247                         input.getSleepAfter().intValue(), input.getStartTableId().shortValue(),
248                         input.getEndTableId().shortValue());
249             } else {
250                 flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
251                         input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
252                         input.getEndTableId().shortValue());
253             }
254         }
255         RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
256         return Futures.immediateFuture(rpcResultBuilder.build());
257     }
258
259     @Override
260     public Future<RpcResult<Void>> flowRpcAddMultiple(FlowRpcAddMultipleInput input) {
261         FlowWriterDirectOFRpc flowTesterRPC = new FlowWriterDirectOFRpc(dataBroker, flowService, fjService);
262         flowTesterRPC.rpcFlowAddAll(input.getFlowCount().intValue(), input.getRpcBatchSize().intValue());
263         RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
264         return Futures.immediateFuture(rpcResultBuilder.build());
265     }
266 }