Merge "Remove deprecated"
[openflowplugin.git] / applications / bulk-o-matic / src / main / java / org / opendaylight / openflowplugin / applications / bulk / o / matic / SalBulkFlowServiceImpl.java
1 /*
2  * Copyright (c) 2015, 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
10
11 import com.google.common.base.MoreObjects;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.JdkFutureAdapters;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import com.google.common.util.concurrent.SettableFuture;
19 import java.lang.management.ManagementFactory;
20 import java.util.ArrayList;
21 import java.util.Collections;
22 import java.util.List;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.ForkJoinPool;
25 import java.util.concurrent.Future;
26 import javax.management.InstanceAlreadyExistsException;
27 import javax.management.MBeanRegistrationException;
28 import javax.management.MBeanServer;
29 import javax.management.MalformedObjectNameException;
30 import javax.management.NotCompliantMBeanException;
31 import javax.management.ObjectName;
32 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
33 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
34 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsDsInput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsRpcInput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.BulkFlowBaseContentGrouping;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddMultipleInput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddTestInput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowTestInput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.ReadFlowTestInput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsDsInput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsRpcInput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.SalBulkFlowService;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.TableTestInput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.bulk.flow.ds.list.grouping.BulkFlowDsItem;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
61 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
62 import org.opendaylight.yangtools.yang.common.RpcError;
63 import org.opendaylight.yangtools.yang.common.RpcResult;
64 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
65 import org.slf4j.Logger;
66 import org.slf4j.LoggerFactory;
67
68 /**
69  * Simple implementation providing bulk flows operations.
70  */
71 public class SalBulkFlowServiceImpl implements SalBulkFlowService {
72
73     private static final Logger LOG = LoggerFactory.getLogger(SalBulkFlowServiceImpl.class);
74
75     private final SalFlowService flowService;
76     private final DataBroker dataBroker;
77     private final FlowCounter flowCounterBeanImpl = new FlowCounter();
78     private final ExecutorService fjService = new ForkJoinPool();
79
80     public SalBulkFlowServiceImpl(SalFlowService flowService, DataBroker dataBroker) {
81         this.flowService = Preconditions.checkNotNull(flowService);
82         this.dataBroker = Preconditions.checkNotNull(dataBroker);
83         register();
84     }
85
86     @Override
87     public Future<RpcResult<Void>> addFlowsDs(AddFlowsDsInput input) {
88         WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
89         boolean createParentsNextTime = MoreObjects.firstNonNull(input.isAlwaysCreateParents(), Boolean.FALSE);
90         boolean createParents = true;
91         for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) {
92             FlowBuilder flowBuilder = new FlowBuilder(bulkFlow);
93             flowBuilder.setTableId(bulkFlow.getTableId());
94             flowBuilder.setId(new FlowId(bulkFlow.getFlowId()));
95             writeTransaction.put(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow),
96                     flowBuilder.build(), createParents);
97             createParents = createParentsNextTime;
98         }
99         ListenableFuture<Void> submitFuture = writeTransaction.submit();
100         return handleResultFuture(Futures.allAsList(submitFuture));
101     }
102
103     private InstanceIdentifier<Flow> getFlowInstanceIdentifier(BulkFlowDsItem bulkFlow) {
104         final NodeRef nodeRef = bulkFlow.getNode();
105         return ((InstanceIdentifier<Node>) nodeRef.getValue()).augmentation(FlowCapableNode.class)
106                 .child(Table.class, new TableKey(bulkFlow.getTableId()))
107                 .child(Flow.class, new FlowKey(new FlowId(bulkFlow.getFlowId())));
108     }
109
110     @Override
111     public Future<RpcResult<Void>> removeFlowsDs(RemoveFlowsDsInput input) {
112         WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
113         for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) {
114             writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow));
115         }
116         return handleResultFuture(Futures.allAsList(writeTransaction.submit()));
117     }
118
119     private <T> ListenableFuture<RpcResult<Void>> handleResultFuture(ListenableFuture<List<T>> submitFuture) {
120         final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
121         Futures.addCallback(submitFuture, new FutureCallback<List<T>>() {
122             @Override
123             public void onSuccess(List<T> result) {
124                 rpcResult.set(RpcResultBuilder.success((Void) null).build());
125             }
126
127             @Override
128             public void onFailure(Throwable throwable) {
129                 RpcResultBuilder<Void> rpcResultBld = RpcResultBuilder.<Void>failed()
130                         .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION,
131                                 null, throwable.getMessage())));
132                 rpcResult.set(rpcResultBld.build());
133             }
134         }, MoreExecutors.directExecutor());
135         return rpcResult;
136     }
137
138     @Override
139     public Future<RpcResult<Void>> addFlowsRpc(AddFlowsRpcInput input) {
140         List<ListenableFuture<RpcResult<AddFlowOutput>>> bulkResults = new ArrayList<>();
141
142         for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) {
143             AddFlowInputBuilder flowInputBuilder = new AddFlowInputBuilder(
144                     (org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) bulkFlow);
145             final NodeRef nodeRef = bulkFlow.getNode();
146             flowInputBuilder.setNode(nodeRef);
147             flowInputBuilder.setTableId(bulkFlow.getTableId());
148             Future<RpcResult<AddFlowOutput>> rpcAddFlowResult = flowService.addFlow(flowInputBuilder.build());
149             bulkResults.add(JdkFutureAdapters.listenInPoolThread(rpcAddFlowResult));
150         }
151         return handleResultFuture(Futures.allAsList(bulkResults));
152     }
153
154     @Override
155     public Future<RpcResult<Void>> readFlowTest(ReadFlowTestInput input) {
156         FlowReader flowReader = FlowReader.getNewInstance(dataBroker, input.getDpnCount().intValue(),
157                 input.getFlowsPerDpn().intValue(), input.isVerbose(), input.isIsConfigDs(),
158                 input.getStartTableId().shortValue(), input.getEndTableId().shortValue());
159         flowCounterBeanImpl.setReader(flowReader);
160         fjService.execute(flowReader);
161         RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
162         return Futures.immediateFuture(rpcResultBuilder.build());
163     }
164
165     @Override
166     public Future<RpcResult<Void>> flowRpcAddTest(FlowRpcAddTestInput input) {
167         FlowWriterDirectOFRpc flowAddRpcTestImpl = new FlowWriterDirectOFRpc(dataBroker, flowService, fjService);
168         flowAddRpcTestImpl.rpcFlowAdd(input.getDpnId(), input.getFlowCount().intValue(),
169                 input.getRpcBatchSize().intValue());
170
171         RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
172         return Futures.immediateFuture(rpcResultBuilder.build());
173     }
174
175     @Override
176     public Future<RpcResult<Void>> register() {
177         RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
178         try {
179             MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
180             String pathToMBean = String.format("%s:type=%s", FlowCounter.class.getPackage().getName(),
181                     FlowCounter.class.getSimpleName());
182             ObjectName name = new ObjectName(pathToMBean);
183             mbs.registerMBean(flowCounterBeanImpl, name);
184         } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException
185                 | NotCompliantMBeanException e) {
186             rpcResultBuilder = RpcResultBuilder.failed();
187             LOG.warn("Exception occurred: {} ", e.getMessage(), e);
188         }
189         return Futures.immediateFuture(rpcResultBuilder.build());
190     }
191
192     @Override
193     public Future<RpcResult<Void>> removeFlowsRpc(RemoveFlowsRpcInput input) {
194         List<ListenableFuture<RpcResult<RemoveFlowOutput>>> bulkResults = new ArrayList<>();
195
196         for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) {
197             RemoveFlowInputBuilder flowInputBuilder = new RemoveFlowInputBuilder(
198                     (org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) bulkFlow);
199             final NodeRef nodeRef = bulkFlow.getNode();
200             flowInputBuilder.setNode(nodeRef);
201             flowInputBuilder.setTableId(bulkFlow.getTableId());
202             Future<RpcResult<RemoveFlowOutput>> rpcAddFlowResult = flowService.removeFlow(flowInputBuilder.build());
203             bulkResults.add(JdkFutureAdapters.listenInPoolThread(rpcAddFlowResult));
204         }
205         return handleResultFuture(Futures.allAsList(bulkResults));
206     }
207
208     @Override
209     public Future<RpcResult<Void>> flowTest(FlowTestInput input) {
210         if (input.isTxChain()) {
211             FlowWriterTxChain flowTester = new FlowWriterTxChain(dataBroker, fjService);
212             flowCounterBeanImpl.setWriter(flowTester);
213             if (input.isIsAdd()) {
214                 flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
215                         input.getBatchSize().intValue(), input.getSleepFor().intValue(),
216                         input.getSleepAfter().intValue(), input.getStartTableId().shortValue(),
217                         input.getEndTableId().shortValue(), input.isCreateParents());
218             } else {
219                 flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
220                         input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
221                         input.getEndTableId().shortValue());
222             }
223             RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
224             return Futures.immediateFuture(rpcResultBuilder.build());
225         }
226         if (input.isSeq()) {
227             FlowWriterSequential flowTester = new FlowWriterSequential(dataBroker, fjService);
228             flowCounterBeanImpl.setWriter(flowTester);
229             if (input.isIsAdd()) {
230                 flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
231                         input.getBatchSize().intValue(), input.getSleepFor().intValue(),
232                         input.getStartTableId().shortValue(), input.getEndTableId().shortValue(),
233                         input.isCreateParents());
234             } else {
235                 flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
236                         input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
237                         input.getEndTableId().shortValue());
238             }
239         } else {
240             FlowWriterConcurrent flowTester = new FlowWriterConcurrent(dataBroker, fjService);
241             flowCounterBeanImpl.setWriter(flowTester);
242             if (input.isIsAdd()) {
243                 flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
244                         input.getBatchSize().intValue(), input.getSleepFor().intValue(),
245                         input.getSleepAfter().intValue(), input.getStartTableId().shortValue(),
246                         input.getEndTableId().shortValue(), input.isCreateParents());
247             } else {
248                 flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
249                         input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
250                         input.getEndTableId().shortValue());
251             }
252         }
253         RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
254         return Futures.immediateFuture(rpcResultBuilder.build());
255     }
256
257     @Override
258     public Future<RpcResult<Void>> tableTest(final TableTestInput input) {
259         final TableWriter writer = new TableWriter(dataBroker, fjService);
260         flowCounterBeanImpl.setWriter(writer);
261         switch (input.getOperation()) {
262             case Add:
263                 writer.addTables(input.getDpnCount().intValue(), input.getStartTableId().shortValue(),
264                         input.getEndTableId().shortValue());
265                 break;
266             case Delete:
267                 writer.deleteTables(input.getDpnCount().intValue(), input.getStartTableId().shortValue(),
268                         input.getEndTableId().shortValue());
269                 break;
270             default:
271                 RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.failed();
272                 return Futures.immediateFuture(rpcResultBuilder.build());
273         }
274         RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
275         return Futures.immediateFuture(rpcResultBuilder.build());
276     }
277
278     @Override
279     public Future<RpcResult<Void>> flowRpcAddMultiple(FlowRpcAddMultipleInput input) {
280         FlowWriterDirectOFRpc flowTesterRPC = new FlowWriterDirectOFRpc(dataBroker, flowService, fjService);
281         flowTesterRPC.rpcFlowAddAll(input.getFlowCount().intValue(), input.getRpcBatchSize().intValue());
282         RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
283         return Futures.immediateFuture(rpcResultBuilder.build());
284     }
285 }