2 * Copyright (c) 2015, 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
11 import com.google.common.base.MoreObjects;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.JdkFutureAdapters;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import com.google.common.util.concurrent.SettableFuture;
19 import java.lang.management.ManagementFactory;
20 import java.util.ArrayList;
21 import java.util.Collections;
22 import java.util.List;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.ForkJoinPool;
25 import java.util.concurrent.Future;
26 import javax.management.InstanceAlreadyExistsException;
27 import javax.management.MBeanRegistrationException;
28 import javax.management.MBeanServer;
29 import javax.management.MalformedObjectNameException;
30 import javax.management.NotCompliantMBeanException;
31 import javax.management.ObjectName;
32 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
33 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
34 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
35 import org.opendaylight.infrautils.utils.concurrent.JdkFutures;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsDsInput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsRpcInput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.BulkFlowBaseContentGrouping;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddMultipleInput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddTestInput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowTestInput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.ReadFlowTestInput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsDsInput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsRpcInput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.SalBulkFlowService;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.TableTestInput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.bulk.flow.ds.list.grouping.BulkFlowDsItem;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
62 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
63 import org.opendaylight.yangtools.yang.common.RpcError;
64 import org.opendaylight.yangtools.yang.common.RpcResult;
65 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
66 import org.slf4j.Logger;
67 import org.slf4j.LoggerFactory;
70 * Simple implementation providing bulk flows operations.
72 public class SalBulkFlowServiceImpl implements SalBulkFlowService {
74 private static final Logger LOG = LoggerFactory.getLogger(SalBulkFlowServiceImpl.class);
76 private final SalFlowService flowService;
77 private final DataBroker dataBroker;
78 private final FlowCounter flowCounterBeanImpl = new FlowCounter();
79 private final ExecutorService fjService = new ForkJoinPool();
81 public SalBulkFlowServiceImpl(SalFlowService flowService, DataBroker dataBroker) {
82 this.flowService = Preconditions.checkNotNull(flowService);
83 this.dataBroker = Preconditions.checkNotNull(dataBroker);
85 JdkFutures.addErrorLogging(register(), LOG, "register");
89 public Future<RpcResult<Void>> addFlowsDs(AddFlowsDsInput input) {
90 WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
91 boolean createParentsNextTime = MoreObjects.firstNonNull(input.isAlwaysCreateParents(), Boolean.FALSE);
92 boolean createParents = true;
93 for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) {
94 FlowBuilder flowBuilder = new FlowBuilder(bulkFlow);
95 flowBuilder.setTableId(bulkFlow.getTableId());
96 flowBuilder.setId(new FlowId(bulkFlow.getFlowId()));
97 writeTransaction.put(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow),
98 flowBuilder.build(), createParents);
99 createParents = createParentsNextTime;
101 ListenableFuture<Void> submitFuture = writeTransaction.submit();
102 return handleResultFuture(Futures.allAsList(submitFuture));
105 private InstanceIdentifier<Flow> getFlowInstanceIdentifier(BulkFlowDsItem bulkFlow) {
106 final NodeRef nodeRef = bulkFlow.getNode();
107 return ((InstanceIdentifier<Node>) nodeRef.getValue()).augmentation(FlowCapableNode.class)
108 .child(Table.class, new TableKey(bulkFlow.getTableId()))
109 .child(Flow.class, new FlowKey(new FlowId(bulkFlow.getFlowId())));
113 public Future<RpcResult<Void>> removeFlowsDs(RemoveFlowsDsInput input) {
114 WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
115 for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) {
116 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow));
118 return handleResultFuture(Futures.allAsList(writeTransaction.submit()));
121 private <T> ListenableFuture<RpcResult<Void>> handleResultFuture(ListenableFuture<List<T>> submitFuture) {
122 final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
123 Futures.addCallback(submitFuture, new FutureCallback<List<T>>() {
125 public void onSuccess(List<T> result) {
126 rpcResult.set(RpcResultBuilder.success((Void) null).build());
130 public void onFailure(Throwable throwable) {
131 RpcResultBuilder<Void> rpcResultBld = RpcResultBuilder.<Void>failed()
132 .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION,
133 null, throwable.getMessage())));
134 rpcResult.set(rpcResultBld.build());
136 }, MoreExecutors.directExecutor());
141 public Future<RpcResult<Void>> addFlowsRpc(AddFlowsRpcInput input) {
142 List<ListenableFuture<RpcResult<AddFlowOutput>>> bulkResults = new ArrayList<>();
144 for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) {
145 AddFlowInputBuilder flowInputBuilder = new AddFlowInputBuilder(
146 (org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) bulkFlow);
147 final NodeRef nodeRef = bulkFlow.getNode();
148 flowInputBuilder.setNode(nodeRef);
149 flowInputBuilder.setTableId(bulkFlow.getTableId());
150 Future<RpcResult<AddFlowOutput>> rpcAddFlowResult = flowService.addFlow(flowInputBuilder.build());
151 bulkResults.add(JdkFutureAdapters.listenInPoolThread(rpcAddFlowResult));
153 return handleResultFuture(Futures.allAsList(bulkResults));
157 public Future<RpcResult<Void>> readFlowTest(ReadFlowTestInput input) {
158 FlowReader flowReader = FlowReader.getNewInstance(dataBroker, input.getDpnCount().intValue(),
159 input.getFlowsPerDpn().intValue(), input.isVerbose(), input.isIsConfigDs(),
160 input.getStartTableId().shortValue(), input.getEndTableId().shortValue());
161 flowCounterBeanImpl.setReader(flowReader);
162 fjService.execute(flowReader);
163 RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
164 return Futures.immediateFuture(rpcResultBuilder.build());
168 public Future<RpcResult<Void>> flowRpcAddTest(FlowRpcAddTestInput input) {
169 FlowWriterDirectOFRpc flowAddRpcTestImpl = new FlowWriterDirectOFRpc(dataBroker, flowService, fjService);
170 flowAddRpcTestImpl.rpcFlowAdd(input.getDpnId(), input.getFlowCount().intValue(),
171 input.getRpcBatchSize().intValue());
173 RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
174 return Futures.immediateFuture(rpcResultBuilder.build());
178 public Future<RpcResult<Void>> register() {
179 RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
181 MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
182 String pathToMBean = String.format("%s:type=%s", FlowCounter.class.getPackage().getName(),
183 FlowCounter.class.getSimpleName());
184 ObjectName name = new ObjectName(pathToMBean);
185 mbs.registerMBean(flowCounterBeanImpl, name);
186 } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException
187 | NotCompliantMBeanException e) {
188 rpcResultBuilder = RpcResultBuilder.failed();
189 LOG.warn("Exception occurred: {} ", e.getMessage(), e);
191 return Futures.immediateFuture(rpcResultBuilder.build());
195 public Future<RpcResult<Void>> removeFlowsRpc(RemoveFlowsRpcInput input) {
196 List<ListenableFuture<RpcResult<RemoveFlowOutput>>> bulkResults = new ArrayList<>();
198 for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) {
199 RemoveFlowInputBuilder flowInputBuilder = new RemoveFlowInputBuilder(
200 (org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) bulkFlow);
201 final NodeRef nodeRef = bulkFlow.getNode();
202 flowInputBuilder.setNode(nodeRef);
203 flowInputBuilder.setTableId(bulkFlow.getTableId());
204 Future<RpcResult<RemoveFlowOutput>> rpcAddFlowResult = flowService.removeFlow(flowInputBuilder.build());
205 bulkResults.add(JdkFutureAdapters.listenInPoolThread(rpcAddFlowResult));
207 return handleResultFuture(Futures.allAsList(bulkResults));
211 public Future<RpcResult<Void>> flowTest(FlowTestInput input) {
212 if (input.isTxChain()) {
213 FlowWriterTxChain flowTester = new FlowWriterTxChain(dataBroker, fjService);
214 flowCounterBeanImpl.setWriter(flowTester);
215 if (input.isIsAdd()) {
216 flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
217 input.getBatchSize().intValue(), input.getSleepFor().intValue(),
218 input.getSleepAfter().intValue(), input.getStartTableId().shortValue(),
219 input.getEndTableId().shortValue(), input.isCreateParents());
221 flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
222 input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
223 input.getEndTableId().shortValue());
225 RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
226 return Futures.immediateFuture(rpcResultBuilder.build());
229 FlowWriterSequential flowTester = new FlowWriterSequential(dataBroker, fjService);
230 flowCounterBeanImpl.setWriter(flowTester);
231 if (input.isIsAdd()) {
232 flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
233 input.getBatchSize().intValue(), input.getSleepFor().intValue(),
234 input.getStartTableId().shortValue(), input.getEndTableId().shortValue(),
235 input.isCreateParents());
237 flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
238 input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
239 input.getEndTableId().shortValue());
242 FlowWriterConcurrent flowTester = new FlowWriterConcurrent(dataBroker, fjService);
243 flowCounterBeanImpl.setWriter(flowTester);
244 if (input.isIsAdd()) {
245 flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
246 input.getBatchSize().intValue(), input.getSleepFor().intValue(),
247 input.getSleepAfter().intValue(), input.getStartTableId().shortValue(),
248 input.getEndTableId().shortValue(), input.isCreateParents());
250 flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
251 input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
252 input.getEndTableId().shortValue());
255 RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
256 return Futures.immediateFuture(rpcResultBuilder.build());
260 public Future<RpcResult<Void>> tableTest(final TableTestInput input) {
261 final TableWriter writer = new TableWriter(dataBroker, fjService);
262 flowCounterBeanImpl.setWriter(writer);
263 switch (input.getOperation()) {
265 writer.addTables(input.getDpnCount().intValue(), input.getStartTableId().shortValue(),
266 input.getEndTableId().shortValue());
269 writer.deleteTables(input.getDpnCount().intValue(), input.getStartTableId().shortValue(),
270 input.getEndTableId().shortValue());
273 RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.failed();
274 return Futures.immediateFuture(rpcResultBuilder.build());
276 RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
277 return Futures.immediateFuture(rpcResultBuilder.build());
281 public Future<RpcResult<Void>> flowRpcAddMultiple(FlowRpcAddMultipleInput input) {
282 FlowWriterDirectOFRpc flowTesterRPC = new FlowWriterDirectOFRpc(dataBroker, flowService, fjService);
283 flowTesterRPC.rpcFlowAddAll(input.getFlowCount().intValue(), input.getRpcBatchSize().intValue());
284 RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
285 return Futures.immediateFuture(rpcResultBuilder.build());