2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
11 import com.google.common.base.MoreObjects;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.*;
14 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
15 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
16 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
17 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.*;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.bulk.flow.ds.list.grouping.BulkFlowDsItem;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.*;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
30 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
31 import org.opendaylight.yangtools.yang.common.RpcError;
32 import org.opendaylight.yangtools.yang.common.RpcResult;
33 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
35 import javax.management.*;
36 import java.lang.management.ManagementFactory;
37 import java.util.ArrayList;
38 import java.util.Collections;
39 import java.util.List;
40 import java.util.concurrent.ExecutorService;
41 import java.util.concurrent.ForkJoinPool;
42 import java.util.concurrent.Future;
45 * Simple implementation providing bulk flows operations.
47 public class SalBulkFlowServiceImpl implements SalBulkFlowService {
49 private final SalFlowService flowService;
50 private final DataBroker dataBroker;
51 private FlowCounter flowCounterBeanImpl = new FlowCounter();
52 private final ExecutorService fjService = new ForkJoinPool();
53 public SalBulkFlowServiceImpl(SalFlowService flowService, DataBroker dataBroker) {
54 this.flowService = Preconditions.checkNotNull(flowService);
55 this.dataBroker = Preconditions.checkNotNull(dataBroker);
59 public Future<RpcResult<Void>> addFlowsDs(AddFlowsDsInput input) {
60 WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
61 boolean createParentsNextTime = MoreObjects.firstNonNull(input.isAlwaysCreateParents(), Boolean.FALSE);
62 boolean createParents = true;
63 for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) {
64 FlowBuilder flowBuilder = new FlowBuilder(bulkFlow);
65 flowBuilder.setTableId(bulkFlow.getTableId());
66 flowBuilder.setId(new FlowId(bulkFlow.getFlowId()));
67 writeTransaction.put(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow),
68 flowBuilder.build(), createParents);
69 createParents = createParentsNextTime;
71 CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTransaction.submit();
72 return handleResultFuture(submitFuture);
75 private InstanceIdentifier<Flow> getFlowInstanceIdentifier(BulkFlowDsItem bulkFlow) {
76 final NodeRef nodeRef = bulkFlow.getNode();
77 return ((InstanceIdentifier<Node>) nodeRef.getValue())
78 .augmentation(FlowCapableNode.class)
79 .child(Table.class, new TableKey(bulkFlow.getTableId()))
80 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow.class,
81 new FlowKey(new FlowId(bulkFlow.getFlowId())));
85 public Future<RpcResult<Void>> removeFlowsDs(RemoveFlowsDsInput input) {
86 WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
87 for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) {
88 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow));
90 CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTransaction.submit();
91 return handleResultFuture(submitFuture);
94 private ListenableFuture<RpcResult<Void>> handleResultFuture(CheckedFuture<Void,
95 TransactionCommitFailedException> submitFuture) {
96 final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
97 Futures.addCallback(submitFuture, new FutureCallback<Void>() {
99 public void onSuccess(Void result) {
100 rpcResult.set(RpcResultBuilder.success(result).build());
104 public void onFailure(Throwable t) {
105 RpcResultBuilder<Void> rpcResultBld = RpcResultBuilder.<Void>failed()
106 .withRpcErrors(Collections.singleton(
107 RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, null, t.getMessage())
109 rpcResult.set(rpcResultBld.build());
115 private <T> ListenableFuture<RpcResult<Void>> handleResultFuture(ListenableFuture<List<T>> submitFuture) {
116 final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
117 Futures.addCallback(submitFuture, new FutureCallback<List<T>>() {
119 public void onSuccess(List<T> result) {
120 rpcResult.set(RpcResultBuilder.success((Void) null).build());
124 public void onFailure(Throwable t) {
125 RpcResultBuilder<Void> rpcResultBld = RpcResultBuilder.<Void>failed()
126 .withRpcErrors(Collections.singleton(
127 RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, null, t.getMessage())
129 rpcResult.set(rpcResultBld.build());
136 public Future<RpcResult<Void>> addFlowsRpc(AddFlowsRpcInput input) {
137 List<ListenableFuture<RpcResult<AddFlowOutput>>> bulkResults = new ArrayList<>();
139 for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) {
140 AddFlowInputBuilder flowInputBuilder = new AddFlowInputBuilder((Flow) bulkFlow);
141 final NodeRef nodeRef = bulkFlow.getNode();
142 flowInputBuilder.setNode(nodeRef);
143 flowInputBuilder.setTableId(bulkFlow.getTableId());
144 Future<RpcResult<AddFlowOutput>> rpcAddFlowResult = flowService.addFlow(flowInputBuilder.build());
145 bulkResults.add(JdkFutureAdapters.listenInPoolThread(rpcAddFlowResult));
147 return handleResultFuture(Futures.allAsList(bulkResults));
151 public Future<RpcResult<Void>> readFlowTest(ReadFlowTestInput input) {
152 FlowReader flowReader = FlowReader.getNewInstance(dataBroker,
153 input.getDpnCount().intValue(),
154 input.getFlowsPerDpn().intValue(), input.isVerbose(),
155 input.isIsConfigDs(),input.getStartTableId().shortValue(),
156 input.getEndTableId().shortValue());
157 flowCounterBeanImpl.setReader(flowReader);
158 fjService.execute(flowReader);
159 RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
160 return Futures.immediateFuture(rpcResultBuilder.build());
164 public Future<RpcResult<Void>> flowRpcAddTest(FlowRpcAddTestInput input) {
165 FlowWriterDirectOFRpc flowAddRpcTestImpl = new FlowWriterDirectOFRpc(dataBroker, flowService, fjService);
166 flowAddRpcTestImpl.rpcFlowAdd(
168 input.getFlowCount().intValue(),
169 input.getRpcBatchSize().intValue());
172 RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
173 return Futures.immediateFuture(rpcResultBuilder.build());
177 public Future<RpcResult<Void>> register() {
178 RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
180 MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
181 String pathToMBean = String.format("%s:type=%s",
182 FlowCounter.class.getPackage().getName(),
183 FlowCounter.class.getSimpleName());
184 ObjectName name = null;
186 name = new ObjectName(pathToMBean);
187 mbs.registerMBean(flowCounterBeanImpl, name);
188 } catch (MalformedObjectNameException | InstanceAlreadyExistsException
189 | MBeanRegistrationException | NotCompliantMBeanException e) {
190 rpcResultBuilder = RpcResultBuilder.failed();
193 return Futures.immediateFuture(rpcResultBuilder.build());
197 public Future<RpcResult<Void>> removeFlowsRpc(RemoveFlowsRpcInput input) {
198 List<ListenableFuture<RpcResult<RemoveFlowOutput>>> bulkResults = new ArrayList<>();
200 for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) {
201 RemoveFlowInputBuilder flowInputBuilder = new RemoveFlowInputBuilder((Flow) bulkFlow);
202 final NodeRef nodeRef = bulkFlow.getNode();
203 flowInputBuilder.setNode(nodeRef);
204 flowInputBuilder.setTableId(bulkFlow.getTableId());
205 Future<RpcResult<RemoveFlowOutput>> rpcAddFlowResult = flowService.removeFlow(flowInputBuilder.build());
206 bulkResults.add(JdkFutureAdapters.listenInPoolThread(rpcAddFlowResult));
208 return handleResultFuture(Futures.allAsList(bulkResults));
212 public Future<RpcResult<Void>> flowTest(FlowTestInput input) {
213 if (input.isTxChain()) {
214 FlowWriterTxChain flowTester = new FlowWriterTxChain(dataBroker, fjService);
215 flowCounterBeanImpl.setWriter(flowTester);
216 if (input.isIsAdd()){
217 flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
218 input.getBatchSize().intValue(), input.getSleepFor().intValue(),
219 input.getSleepAfter().intValue(), input.getStartTableId().shortValue(),
220 input.getEndTableId().shortValue());
222 flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
223 input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
224 input.getEndTableId().shortValue());
226 RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
227 return Futures.immediateFuture(rpcResultBuilder.build());
230 FlowWriterSequential flowTester = new FlowWriterSequential(dataBroker, fjService);
231 flowCounterBeanImpl.setWriter(flowTester);
232 if (input.isIsAdd()){
233 flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
234 input.getBatchSize().intValue(), input.getSleepFor().intValue(),
235 input.getStartTableId().shortValue(), input.getEndTableId().shortValue());
237 flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
238 input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
239 input.getEndTableId().shortValue());
242 FlowWriterConcurrent flowTester = new FlowWriterConcurrent(dataBroker, fjService);
243 flowCounterBeanImpl.setWriter(flowTester);
244 if (input.isIsAdd()){
245 flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
246 input.getBatchSize().intValue(), input.getSleepFor().intValue(),
247 input.getSleepAfter().intValue(), input.getStartTableId().shortValue(),
248 input.getEndTableId().shortValue());
250 flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
251 input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
252 input.getEndTableId().shortValue());
255 RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
256 return Futures.immediateFuture(rpcResultBuilder.build());
260 public Future<RpcResult<Void>> flowRpcAddMultiple(FlowRpcAddMultipleInput input) {
261 FlowWriterDirectOFRpc flowTesterRPC = new FlowWriterDirectOFRpc(dataBroker, flowService, fjService);
262 flowTesterRPC.rpcFlowAddAll(input.getFlowCount().intValue(), input.getRpcBatchSize().intValue());
263 RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
264 return Futures.immediateFuture(rpcResultBuilder.build());