2 * Copyright (c) 2015, 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
11 import com.google.common.base.MoreObjects;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.FluentFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import com.google.common.util.concurrent.SettableFuture;
19 import java.lang.management.ManagementFactory;
20 import java.util.ArrayList;
21 import java.util.Collections;
22 import java.util.List;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.ForkJoinPool;
25 import javax.management.InstanceAlreadyExistsException;
26 import javax.management.MBeanRegistrationException;
27 import javax.management.MBeanServer;
28 import javax.management.MalformedObjectNameException;
29 import javax.management.NotCompliantMBeanException;
30 import javax.management.ObjectName;
31 import org.opendaylight.infrautils.utils.concurrent.LoggingFutures;
32 import org.opendaylight.mdsal.binding.api.DataBroker;
33 import org.opendaylight.mdsal.binding.api.WriteTransaction;
34 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsDsInput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsDsOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsRpcInput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsRpcOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.BulkFlowBaseContentGrouping;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddMultipleInput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddMultipleOutput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddTestInput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddTestOutput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowTestInput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowTestOutput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.ReadFlowTestInput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.ReadFlowTestOutput;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RegisterInput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RegisterInputBuilder;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RegisterOutput;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsDsInput;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsDsOutput;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsRpcInput;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsRpcOutput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.SalBulkFlowService;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.TableTestInput;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.TableTestOutput;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.bulk.flow.ds.list.grouping.BulkFlowDsItem;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
73 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
74 import org.opendaylight.yangtools.yang.common.RpcError;
75 import org.opendaylight.yangtools.yang.common.RpcResult;
76 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
77 import org.slf4j.Logger;
78 import org.slf4j.LoggerFactory;
81 * Simple implementation providing bulk flows operations.
83 public class SalBulkFlowServiceImpl implements SalBulkFlowService {
85 private static final Logger LOG = LoggerFactory.getLogger(SalBulkFlowServiceImpl.class);
87 private final SalFlowService flowService;
88 private final DataBroker dataBroker;
89 private final FlowCounter flowCounterBeanImpl = new FlowCounter();
90 private final ExecutorService fjService = new ForkJoinPool();
92 public SalBulkFlowServiceImpl(SalFlowService flowService, DataBroker dataBroker) {
93 this.flowService = Preconditions.checkNotNull(flowService);
94 this.dataBroker = Preconditions.checkNotNull(dataBroker);
96 LoggingFutures.addErrorLogging(register(new RegisterInputBuilder().build()), LOG, "register");
100 public ListenableFuture<RpcResult<AddFlowsDsOutput>> addFlowsDs(AddFlowsDsInput input) {
101 WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
102 boolean createParentsNextTime = MoreObjects.firstNonNull(input.isAlwaysCreateParents(), Boolean.FALSE);
103 boolean createParents = true;
104 for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) {
105 FlowBuilder flowBuilder = new FlowBuilder(bulkFlow);
106 flowBuilder.setTableId(bulkFlow.getTableId());
107 flowBuilder.setId(new FlowId(bulkFlow.getFlowId()));
109 writeTransaction.mergeParentStructurePut(LogicalDatastoreType.CONFIGURATION,
110 getFlowInstanceIdentifier(bulkFlow),
111 flowBuilder.build());
113 writeTransaction.put(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow),
114 flowBuilder.build());
116 createParents = createParentsNextTime;
118 FluentFuture<?> submitFuture = writeTransaction.commit();
119 return Futures.transform(handleResultFuture(Futures.allAsList(submitFuture)), voidRpcResult -> {
120 if (voidRpcResult.isSuccessful()) {
121 return RpcResultBuilder.<AddFlowsDsOutput>success().build();
123 return RpcResultBuilder.<AddFlowsDsOutput>failed().build();
125 },MoreExecutors.directExecutor());
128 private InstanceIdentifier<Flow> getFlowInstanceIdentifier(BulkFlowDsItem bulkFlow) {
129 final NodeRef nodeRef = bulkFlow.getNode();
130 return ((InstanceIdentifier<Node>) nodeRef.getValue()).augmentation(FlowCapableNode.class)
131 .child(Table.class, new TableKey(bulkFlow.getTableId()))
132 .child(Flow.class, new FlowKey(new FlowId(bulkFlow.getFlowId())));
136 public ListenableFuture<RpcResult<RemoveFlowsDsOutput>> removeFlowsDs(RemoveFlowsDsInput input) {
137 WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
138 for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) {
139 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow));
141 return Futures.transform(handleResultFuture(Futures.allAsList(writeTransaction.commit())), voidRpcResult -> {
142 if (voidRpcResult.isSuccessful()) {
143 return RpcResultBuilder.<RemoveFlowsDsOutput>success().build();
145 return RpcResultBuilder.<RemoveFlowsDsOutput>failed().build();
147 }, MoreExecutors.directExecutor());
150 private <T> ListenableFuture<RpcResult<Void>> handleResultFuture(ListenableFuture<List<T>> submitFuture) {
151 final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
152 Futures.addCallback(submitFuture, new FutureCallback<List<T>>() {
154 public void onSuccess(List<T> result) {
155 rpcResult.set(RpcResultBuilder.success((Void) null).build());
159 public void onFailure(Throwable throwable) {
160 RpcResultBuilder<Void> rpcResultBld = RpcResultBuilder.<Void>failed()
161 .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION,
162 null, throwable.getMessage())));
163 rpcResult.set(rpcResultBld.build());
165 }, MoreExecutors.directExecutor());
170 public ListenableFuture<RpcResult<AddFlowsRpcOutput>> addFlowsRpc(AddFlowsRpcInput input) {
171 List<ListenableFuture<RpcResult<AddFlowOutput>>> bulkResults = new ArrayList<>();
173 for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) {
174 AddFlowInputBuilder flowInputBuilder = new AddFlowInputBuilder(
175 (org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) bulkFlow);
176 final NodeRef nodeRef = bulkFlow.getNode();
177 flowInputBuilder.setNode(nodeRef);
178 flowInputBuilder.setTableId(bulkFlow.getTableId());
179 bulkResults.add(flowService.addFlow(flowInputBuilder.build()));
181 return Futures.transform(handleResultFuture(Futures.allAsList(bulkResults)), voidRpcResult -> {
182 if (voidRpcResult.isSuccessful()) {
183 return RpcResultBuilder.<AddFlowsRpcOutput>success().build();
185 return RpcResultBuilder.<AddFlowsRpcOutput>failed().build();
187 },MoreExecutors.directExecutor());
191 public ListenableFuture<RpcResult<ReadFlowTestOutput>> readFlowTest(ReadFlowTestInput input) {
192 FlowReader flowReader = FlowReader.getNewInstance(dataBroker, input.getDpnCount().intValue(),
193 input.getFlowsPerDpn().intValue(), input.isVerbose(), input.isIsConfigDs(),
194 input.getStartTableId().shortValue(), input.getEndTableId().shortValue());
195 flowCounterBeanImpl.setReader(flowReader);
196 fjService.execute(flowReader);
197 RpcResultBuilder<ReadFlowTestOutput> rpcResultBuilder = RpcResultBuilder.success();
198 return Futures.immediateFuture(rpcResultBuilder.build());
202 public ListenableFuture<RpcResult<FlowRpcAddTestOutput>> flowRpcAddTest(FlowRpcAddTestInput input) {
203 FlowWriterDirectOFRpc flowAddRpcTestImpl = new FlowWriterDirectOFRpc(dataBroker, flowService, fjService);
204 flowAddRpcTestImpl.rpcFlowAdd(input.getDpnId(), input.getFlowCount().intValue(),
205 input.getRpcBatchSize().intValue());
207 RpcResultBuilder<FlowRpcAddTestOutput> rpcResultBuilder = RpcResultBuilder.success();
208 return Futures.immediateFuture(rpcResultBuilder.build());
212 public ListenableFuture<RpcResult<RegisterOutput>> register(RegisterInput input) {
213 RpcResultBuilder<RegisterOutput> rpcResultBuilder = RpcResultBuilder.success();
215 MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
216 String pathToMBean = String.format("%s:type=%s", FlowCounter.class.getPackage().getName(),
217 FlowCounter.class.getSimpleName());
218 ObjectName name = new ObjectName(pathToMBean);
219 mbs.registerMBean(flowCounterBeanImpl, name);
220 } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException
221 | NotCompliantMBeanException e) {
222 rpcResultBuilder = RpcResultBuilder.failed();
223 LOG.warn("Exception occurred", e);
225 return Futures.immediateFuture(rpcResultBuilder.build());
229 public ListenableFuture<RpcResult<RemoveFlowsRpcOutput>> removeFlowsRpc(RemoveFlowsRpcInput input) {
230 List<ListenableFuture<RpcResult<RemoveFlowOutput>>> bulkResults = new ArrayList<>();
232 for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) {
233 RemoveFlowInputBuilder flowInputBuilder = new RemoveFlowInputBuilder(
234 (org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) bulkFlow);
235 final NodeRef nodeRef = bulkFlow.getNode();
236 flowInputBuilder.setNode(nodeRef);
237 flowInputBuilder.setTableId(bulkFlow.getTableId());
238 bulkResults.add(flowService.removeFlow(flowInputBuilder.build()));
240 return Futures.transform(handleResultFuture(Futures.allAsList(bulkResults)), voidRpcResult -> {
241 if (voidRpcResult.isSuccessful()) {
242 return RpcResultBuilder.<RemoveFlowsRpcOutput>success().build();
244 return RpcResultBuilder.<RemoveFlowsRpcOutput>failed().build();
246 }, MoreExecutors.directExecutor());
250 public ListenableFuture<RpcResult<FlowTestOutput>> flowTest(FlowTestInput input) {
251 if (input.isTxChain()) {
252 FlowWriterTxChain flowTester = new FlowWriterTxChain(dataBroker, fjService);
253 flowCounterBeanImpl.setWriter(flowTester);
254 if (input.isIsAdd()) {
255 flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
256 input.getBatchSize().intValue(), input.getSleepFor().intValue(),
257 input.getSleepAfter().intValue(), input.getStartTableId().shortValue(),
258 input.getEndTableId().shortValue(), input.isCreateParents());
260 flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
261 input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
262 input.getEndTableId().shortValue());
264 RpcResultBuilder<FlowTestOutput> rpcResultBuilder = RpcResultBuilder.success();
265 return Futures.immediateFuture(rpcResultBuilder.build());
268 FlowWriterSequential flowTester = new FlowWriterSequential(dataBroker, fjService);
269 flowCounterBeanImpl.setWriter(flowTester);
270 if (input.isIsAdd()) {
271 flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
272 input.getBatchSize().intValue(), input.getSleepFor().intValue(),
273 input.getStartTableId().shortValue(), input.getEndTableId().shortValue(),
274 input.isCreateParents());
276 flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
277 input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
278 input.getEndTableId().shortValue());
281 FlowWriterConcurrent flowTester = new FlowWriterConcurrent(dataBroker, fjService);
282 flowCounterBeanImpl.setWriter(flowTester);
283 if (input.isIsAdd()) {
284 flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
285 input.getBatchSize().intValue(), input.getSleepFor().intValue(),
286 input.getSleepAfter().intValue(), input.getStartTableId().shortValue(),
287 input.getEndTableId().shortValue(), input.isCreateParents());
289 flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
290 input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
291 input.getEndTableId().shortValue());
294 RpcResultBuilder<FlowTestOutput> rpcResultBuilder = RpcResultBuilder.success();
295 return Futures.immediateFuture(rpcResultBuilder.build());
299 public ListenableFuture<RpcResult<TableTestOutput>> tableTest(final TableTestInput input) {
300 final TableWriter writer = new TableWriter(dataBroker, fjService);
301 flowCounterBeanImpl.setWriter(writer);
302 switch (input.getOperation()) {
304 writer.addTables(input.getDpnCount().intValue(), input.getStartTableId().shortValue(),
305 input.getEndTableId().shortValue());
308 writer.deleteTables(input.getDpnCount().intValue(), input.getStartTableId().shortValue(),
309 input.getEndTableId().shortValue());
312 RpcResultBuilder<TableTestOutput> rpcResultBuilder = RpcResultBuilder.failed();
313 return Futures.immediateFuture(rpcResultBuilder.build());
315 RpcResultBuilder<TableTestOutput> rpcResultBuilder = RpcResultBuilder.success();
316 return Futures.immediateFuture(rpcResultBuilder.build());
320 public ListenableFuture<RpcResult<FlowRpcAddMultipleOutput>> flowRpcAddMultiple(FlowRpcAddMultipleInput input) {
321 FlowWriterDirectOFRpc flowTesterRPC = new FlowWriterDirectOFRpc(dataBroker, flowService, fjService);
322 flowTesterRPC.rpcFlowAddAll(input.getFlowCount().intValue(), input.getRpcBatchSize().intValue());
323 RpcResultBuilder<FlowRpcAddMultipleOutput> rpcResultBuilder = RpcResultBuilder.success();
324 return Futures.immediateFuture(rpcResultBuilder.build());