X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=applications%2Fbulk-o-matic%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fapplications%2Fbulk%2Fo%2Fmatic%2FSalBulkFlowServiceImpl.java;h=ea323eedf446f947ad61a569da2c1bb95fe20cff;hb=777c94332871b8c34f56f7f2010de1536cb759ba;hp=4e017a5b07dea545ea360a911e345dd40596cd87;hpb=24769609bb11cf949e9f92d6e255cc0bf296173b;p=openflowplugin.git diff --git a/applications/bulk-o-matic/src/main/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/SalBulkFlowServiceImpl.java b/applications/bulk-o-matic/src/main/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/SalBulkFlowServiceImpl.java index 4e017a5b07..ea323eedf4 100644 --- a/applications/bulk-o-matic/src/main/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/SalBulkFlowServiceImpl.java +++ b/applications/bulk-o-matic/src/main/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/SalBulkFlowServiceImpl.java @@ -5,14 +5,14 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.openflowplugin.applications.bulk.o.matic; -import com.google.common.base.MoreObjects; -import com.google.common.base.Preconditions; +import static java.util.Objects.requireNonNull; +import static java.util.Objects.requireNonNullElse; + +import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.JdkFutureAdapters; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; @@ -22,27 +22,39 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.Future; import javax.management.InstanceAlreadyExistsException; import javax.management.MBeanRegistrationException; import javax.management.MBeanServer; import javax.management.MalformedObjectNameException; import javax.management.NotCompliantMBeanException; import javax.management.ObjectName; -import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.infrautils.utils.concurrent.LoggingFutures; +import org.opendaylight.mdsal.binding.api.DataBroker; +import org.opendaylight.mdsal.binding.api.WriteTransaction; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsDsInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsDsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsRpcInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsRpcOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.BulkFlowBaseContentGrouping; import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddMultipleInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddMultipleOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddTestInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddTestOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowTestInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowTestOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.ReadFlowTestInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.ReadFlowTestOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RegisterInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RegisterInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RegisterOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsDsInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsDsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsRpcInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsRpcOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.SalBulkFlowService; import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.TableTestInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.TableTestOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.bulk.flow.ds.list.grouping.BulkFlowDsItem; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId; @@ -59,7 +71,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalF import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.common.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.slf4j.Logger; @@ -77,30 +89,43 @@ public class SalBulkFlowServiceImpl implements SalBulkFlowService { private final FlowCounter flowCounterBeanImpl = new FlowCounter(); private final ExecutorService fjService = new ForkJoinPool(); - public SalBulkFlowServiceImpl(SalFlowService flowService, DataBroker dataBroker) { - this.flowService = Preconditions.checkNotNull(flowService); - this.dataBroker = Preconditions.checkNotNull(dataBroker); - register(); + public SalBulkFlowServiceImpl(final SalFlowService flowService, final DataBroker dataBroker) { + this.flowService = requireNonNull(flowService); + this.dataBroker = requireNonNull(dataBroker); + + LoggingFutures.addErrorLogging(register(new RegisterInputBuilder().build()), LOG, "register"); } @Override - public Future> addFlowsDs(AddFlowsDsInput input) { + public ListenableFuture> addFlowsDs(final AddFlowsDsInput input) { WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction(); - boolean createParentsNextTime = MoreObjects.firstNonNull(input.isAlwaysCreateParents(), Boolean.FALSE); + boolean createParentsNextTime = requireNonNullElse(input.getAlwaysCreateParents(), Boolean.FALSE); boolean createParents = true; for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) { FlowBuilder flowBuilder = new FlowBuilder(bulkFlow); flowBuilder.setTableId(bulkFlow.getTableId()); flowBuilder.setId(new FlowId(bulkFlow.getFlowId())); - writeTransaction.put(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow), - flowBuilder.build(), createParents); + if (createParents) { + writeTransaction.mergeParentStructurePut(LogicalDatastoreType.CONFIGURATION, + getFlowInstanceIdentifier(bulkFlow), + flowBuilder.build()); + } else { + writeTransaction.put(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow), + flowBuilder.build()); + } createParents = createParentsNextTime; } - ListenableFuture submitFuture = writeTransaction.submit(); - return handleResultFuture(Futures.allAsList(submitFuture)); + FluentFuture submitFuture = writeTransaction.commit(); + return Futures.transform(handleResultFuture(Futures.allAsList(submitFuture)), voidRpcResult -> { + if (voidRpcResult.isSuccessful()) { + return RpcResultBuilder.success().build(); + } else { + return RpcResultBuilder.failed().build(); + } + },MoreExecutors.directExecutor()); } - private InstanceIdentifier getFlowInstanceIdentifier(BulkFlowDsItem bulkFlow) { + private static InstanceIdentifier getFlowInstanceIdentifier(final BulkFlowDsItem bulkFlow) { final NodeRef nodeRef = bulkFlow.getNode(); return ((InstanceIdentifier) nodeRef.getValue()).augmentation(FlowCapableNode.class) .child(Table.class, new TableKey(bulkFlow.getTableId())) @@ -108,26 +133,33 @@ public class SalBulkFlowServiceImpl implements SalBulkFlowService { } @Override - public Future> removeFlowsDs(RemoveFlowsDsInput input) { + public ListenableFuture> removeFlowsDs(final RemoveFlowsDsInput input) { WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction(); for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) { writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow)); } - return handleResultFuture(Futures.allAsList(writeTransaction.submit())); + return Futures.transform(handleResultFuture(Futures.allAsList(writeTransaction.commit())), voidRpcResult -> { + if (voidRpcResult.isSuccessful()) { + return RpcResultBuilder.success().build(); + } else { + return RpcResultBuilder.failed().build(); + } + }, MoreExecutors.directExecutor()); } - private ListenableFuture> handleResultFuture(ListenableFuture> submitFuture) { + private static ListenableFuture> handleResultFuture( + final ListenableFuture> submitFuture) { final SettableFuture> rpcResult = SettableFuture.create(); Futures.addCallback(submitFuture, new FutureCallback>() { @Override - public void onSuccess(List result) { + public void onSuccess(final List result) { rpcResult.set(RpcResultBuilder.success((Void) null).build()); } @Override - public void onFailure(Throwable throwable) { + public void onFailure(final Throwable throwable) { RpcResultBuilder rpcResultBld = RpcResultBuilder.failed() - .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, + .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(ErrorType.APPLICATION, null, throwable.getMessage()))); rpcResult.set(rpcResultBld.build()); } @@ -136,7 +168,7 @@ public class SalBulkFlowServiceImpl implements SalBulkFlowService { } @Override - public Future> addFlowsRpc(AddFlowsRpcInput input) { + public ListenableFuture> addFlowsRpc(final AddFlowsRpcInput input) { List>> bulkResults = new ArrayList<>(); for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) { @@ -145,36 +177,41 @@ public class SalBulkFlowServiceImpl implements SalBulkFlowService { final NodeRef nodeRef = bulkFlow.getNode(); flowInputBuilder.setNode(nodeRef); flowInputBuilder.setTableId(bulkFlow.getTableId()); - Future> rpcAddFlowResult = flowService.addFlow(flowInputBuilder.build()); - bulkResults.add(JdkFutureAdapters.listenInPoolThread(rpcAddFlowResult)); + bulkResults.add(flowService.addFlow(flowInputBuilder.build())); } - return handleResultFuture(Futures.allAsList(bulkResults)); + return Futures.transform(handleResultFuture(Futures.allAsList(bulkResults)), voidRpcResult -> { + if (voidRpcResult.isSuccessful()) { + return RpcResultBuilder.success().build(); + } else { + return RpcResultBuilder.failed().build(); + } + },MoreExecutors.directExecutor()); } @Override - public Future> readFlowTest(ReadFlowTestInput input) { + public ListenableFuture> readFlowTest(final ReadFlowTestInput input) { FlowReader flowReader = FlowReader.getNewInstance(dataBroker, input.getDpnCount().intValue(), - input.getFlowsPerDpn().intValue(), input.isVerbose(), input.isIsConfigDs(), + input.getFlowsPerDpn().intValue(), input.getVerbose(), input.getIsConfigDs(), input.getStartTableId().shortValue(), input.getEndTableId().shortValue()); flowCounterBeanImpl.setReader(flowReader); fjService.execute(flowReader); - RpcResultBuilder rpcResultBuilder = RpcResultBuilder.success(); + RpcResultBuilder rpcResultBuilder = RpcResultBuilder.success(); return Futures.immediateFuture(rpcResultBuilder.build()); } @Override - public Future> flowRpcAddTest(FlowRpcAddTestInput input) { + public ListenableFuture> flowRpcAddTest(final FlowRpcAddTestInput input) { FlowWriterDirectOFRpc flowAddRpcTestImpl = new FlowWriterDirectOFRpc(dataBroker, flowService, fjService); flowAddRpcTestImpl.rpcFlowAdd(input.getDpnId(), input.getFlowCount().intValue(), input.getRpcBatchSize().intValue()); - RpcResultBuilder rpcResultBuilder = RpcResultBuilder.success(); + RpcResultBuilder rpcResultBuilder = RpcResultBuilder.success(); return Futures.immediateFuture(rpcResultBuilder.build()); } @Override - public Future> register() { - RpcResultBuilder rpcResultBuilder = RpcResultBuilder.success(); + public ListenableFuture> register(final RegisterInput input) { + RpcResultBuilder rpcResultBuilder = RpcResultBuilder.success(); try { MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); String pathToMBean = String.format("%s:type=%s", FlowCounter.class.getPackage().getName(), @@ -184,13 +221,13 @@ public class SalBulkFlowServiceImpl implements SalBulkFlowService { } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) { rpcResultBuilder = RpcResultBuilder.failed(); - LOG.warn("Exception occurred: {} ", e.getMessage(), e); + LOG.warn("Exception occurred", e); } return Futures.immediateFuture(rpcResultBuilder.build()); } @Override - public Future> removeFlowsRpc(RemoveFlowsRpcInput input) { + public ListenableFuture> removeFlowsRpc(final RemoveFlowsRpcInput input) { List>> bulkResults = new ArrayList<>(); for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) { @@ -199,38 +236,43 @@ public class SalBulkFlowServiceImpl implements SalBulkFlowService { final NodeRef nodeRef = bulkFlow.getNode(); flowInputBuilder.setNode(nodeRef); flowInputBuilder.setTableId(bulkFlow.getTableId()); - Future> rpcAddFlowResult = flowService.removeFlow(flowInputBuilder.build()); - bulkResults.add(JdkFutureAdapters.listenInPoolThread(rpcAddFlowResult)); + bulkResults.add(flowService.removeFlow(flowInputBuilder.build())); } - return handleResultFuture(Futures.allAsList(bulkResults)); + return Futures.transform(handleResultFuture(Futures.allAsList(bulkResults)), voidRpcResult -> { + if (voidRpcResult.isSuccessful()) { + return RpcResultBuilder.success().build(); + } else { + return RpcResultBuilder.failed().build(); + } + }, MoreExecutors.directExecutor()); } @Override - public Future> flowTest(FlowTestInput input) { - if (input.isTxChain()) { + public ListenableFuture> flowTest(final FlowTestInput input) { + if (input.getTxChain()) { FlowWriterTxChain flowTester = new FlowWriterTxChain(dataBroker, fjService); flowCounterBeanImpl.setWriter(flowTester); - if (input.isIsAdd()) { + if (input.getIsAdd()) { flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(), input.getBatchSize().intValue(), input.getSleepFor().intValue(), input.getSleepAfter().intValue(), input.getStartTableId().shortValue(), - input.getEndTableId().shortValue(), input.isCreateParents()); + input.getEndTableId().shortValue(), input.getCreateParents()); } else { flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(), input.getBatchSize().intValue(), input.getStartTableId().shortValue(), input.getEndTableId().shortValue()); } - RpcResultBuilder rpcResultBuilder = RpcResultBuilder.success(); + RpcResultBuilder rpcResultBuilder = RpcResultBuilder.success(); return Futures.immediateFuture(rpcResultBuilder.build()); } - if (input.isSeq()) { + if (input.getSeq()) { FlowWriterSequential flowTester = new FlowWriterSequential(dataBroker, fjService); flowCounterBeanImpl.setWriter(flowTester); - if (input.isIsAdd()) { + if (input.getIsAdd()) { flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(), input.getBatchSize().intValue(), input.getSleepFor().intValue(), input.getStartTableId().shortValue(), input.getEndTableId().shortValue(), - input.isCreateParents()); + input.getCreateParents()); } else { flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(), input.getBatchSize().intValue(), input.getStartTableId().shortValue(), @@ -239,23 +281,23 @@ public class SalBulkFlowServiceImpl implements SalBulkFlowService { } else { FlowWriterConcurrent flowTester = new FlowWriterConcurrent(dataBroker, fjService); flowCounterBeanImpl.setWriter(flowTester); - if (input.isIsAdd()) { + if (input.getIsAdd()) { flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(), input.getBatchSize().intValue(), input.getSleepFor().intValue(), input.getSleepAfter().intValue(), input.getStartTableId().shortValue(), - input.getEndTableId().shortValue(), input.isCreateParents()); + input.getEndTableId().shortValue(), input.getCreateParents()); } else { flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(), input.getBatchSize().intValue(), input.getStartTableId().shortValue(), input.getEndTableId().shortValue()); } } - RpcResultBuilder rpcResultBuilder = RpcResultBuilder.success(); + RpcResultBuilder rpcResultBuilder = RpcResultBuilder.success(); return Futures.immediateFuture(rpcResultBuilder.build()); } @Override - public Future> tableTest(final TableTestInput input) { + public ListenableFuture> tableTest(final TableTestInput input) { final TableWriter writer = new TableWriter(dataBroker, fjService); flowCounterBeanImpl.setWriter(writer); switch (input.getOperation()) { @@ -268,18 +310,19 @@ public class SalBulkFlowServiceImpl implements SalBulkFlowService { input.getEndTableId().shortValue()); break; default: - RpcResultBuilder rpcResultBuilder = RpcResultBuilder.failed(); + RpcResultBuilder rpcResultBuilder = RpcResultBuilder.failed(); return Futures.immediateFuture(rpcResultBuilder.build()); } - RpcResultBuilder rpcResultBuilder = RpcResultBuilder.success(); + RpcResultBuilder rpcResultBuilder = RpcResultBuilder.success(); return Futures.immediateFuture(rpcResultBuilder.build()); } @Override - public Future> flowRpcAddMultiple(FlowRpcAddMultipleInput input) { + public ListenableFuture> flowRpcAddMultiple( + final FlowRpcAddMultipleInput input) { FlowWriterDirectOFRpc flowTesterRPC = new FlowWriterDirectOFRpc(dataBroker, flowService, fjService); flowTesterRPC.rpcFlowAddAll(input.getFlowCount().intValue(), input.getRpcBatchSize().intValue()); - RpcResultBuilder rpcResultBuilder = RpcResultBuilder.success(); + RpcResultBuilder rpcResultBuilder = RpcResultBuilder.success(); return Futures.immediateFuture(rpcResultBuilder.build()); } }