X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=applications%2Fbulk-o-matic%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fapplications%2Fbulk%2Fo%2Fmatic%2FSalBulkFlowServiceImpl.java;h=ea323eedf446f947ad61a569da2c1bb95fe20cff;hb=777c94332871b8c34f56f7f2010de1536cb759ba;hp=68de810198ca8317968b1095321442d191792388;hpb=af8ead7873bc02797d6beb46054b5f7d3f2aa29a;p=openflowplugin.git diff --git a/applications/bulk-o-matic/src/main/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/SalBulkFlowServiceImpl.java b/applications/bulk-o-matic/src/main/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/SalBulkFlowServiceImpl.java index 68de810198..ea323eedf4 100644 --- a/applications/bulk-o-matic/src/main/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/SalBulkFlowServiceImpl.java +++ b/applications/bulk-o-matic/src/main/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/SalBulkFlowServiceImpl.java @@ -1,20 +1,20 @@ /* - * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2015, 2017 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.openflowplugin.applications.bulk.o.matic; -import com.google.common.base.MoreObjects; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.CheckedFuture; +import static java.util.Objects.requireNonNull; +import static java.util.Objects.requireNonNullElse; + +import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.JdkFutureAdapters; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import java.lang.management.ManagementFactory; import java.util.ArrayList; @@ -22,27 +22,39 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.Future; import javax.management.InstanceAlreadyExistsException; import javax.management.MBeanRegistrationException; import javax.management.MBeanServer; import javax.management.MalformedObjectNameException; import javax.management.NotCompliantMBeanException; import javax.management.ObjectName; -import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.infrautils.utils.concurrent.LoggingFutures; +import org.opendaylight.mdsal.binding.api.DataBroker; +import org.opendaylight.mdsal.binding.api.WriteTransaction; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsDsInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsDsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsRpcInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsRpcOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.BulkFlowBaseContentGrouping; import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddMultipleInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddMultipleOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddTestInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddTestOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowTestInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowTestOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.ReadFlowTestInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.ReadFlowTestOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RegisterInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RegisterInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RegisterOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsDsInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsDsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsRpcInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsRpcOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.SalBulkFlowService; +import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.TableTestInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.TableTestOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.bulk.flow.ds.list.grouping.BulkFlowDsItem; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId; @@ -59,203 +71,208 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalF import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.common.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Simple implementation providing bulk flows operations. */ public class SalBulkFlowServiceImpl implements SalBulkFlowService { + private static final Logger LOG = LoggerFactory.getLogger(SalBulkFlowServiceImpl.class); + private final SalFlowService flowService; private final DataBroker dataBroker; private final FlowCounter flowCounterBeanImpl = new FlowCounter(); private final ExecutorService fjService = new ForkJoinPool(); - public SalBulkFlowServiceImpl(SalFlowService flowService, DataBroker dataBroker) { - this.flowService = Preconditions.checkNotNull(flowService); - this.dataBroker = Preconditions.checkNotNull(dataBroker); - register(); + + public SalBulkFlowServiceImpl(final SalFlowService flowService, final DataBroker dataBroker) { + this.flowService = requireNonNull(flowService); + this.dataBroker = requireNonNull(dataBroker); + + LoggingFutures.addErrorLogging(register(new RegisterInputBuilder().build()), LOG, "register"); } @Override - public Future> addFlowsDs(AddFlowsDsInput input) { + public ListenableFuture> addFlowsDs(final AddFlowsDsInput input) { WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction(); - boolean createParentsNextTime = MoreObjects.firstNonNull(input.isAlwaysCreateParents(), Boolean.FALSE); + boolean createParentsNextTime = requireNonNullElse(input.getAlwaysCreateParents(), Boolean.FALSE); boolean createParents = true; for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) { FlowBuilder flowBuilder = new FlowBuilder(bulkFlow); flowBuilder.setTableId(bulkFlow.getTableId()); flowBuilder.setId(new FlowId(bulkFlow.getFlowId())); - writeTransaction.put(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow), - flowBuilder.build(), createParents); + if (createParents) { + writeTransaction.mergeParentStructurePut(LogicalDatastoreType.CONFIGURATION, + getFlowInstanceIdentifier(bulkFlow), + flowBuilder.build()); + } else { + writeTransaction.put(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow), + flowBuilder.build()); + } createParents = createParentsNextTime; } - CheckedFuture submitFuture = writeTransaction.submit(); - return handleResultFuture(submitFuture); + FluentFuture submitFuture = writeTransaction.commit(); + return Futures.transform(handleResultFuture(Futures.allAsList(submitFuture)), voidRpcResult -> { + if (voidRpcResult.isSuccessful()) { + return RpcResultBuilder.success().build(); + } else { + return RpcResultBuilder.failed().build(); + } + },MoreExecutors.directExecutor()); } - private InstanceIdentifier getFlowInstanceIdentifier(BulkFlowDsItem bulkFlow) { + private static InstanceIdentifier getFlowInstanceIdentifier(final BulkFlowDsItem bulkFlow) { final NodeRef nodeRef = bulkFlow.getNode(); - return ((InstanceIdentifier) nodeRef.getValue()) - .augmentation(FlowCapableNode.class) + return ((InstanceIdentifier) nodeRef.getValue()).augmentation(FlowCapableNode.class) .child(Table.class, new TableKey(bulkFlow.getTableId())) - .child(Flow.class, - new FlowKey(new FlowId(bulkFlow.getFlowId()))); + .child(Flow.class, new FlowKey(new FlowId(bulkFlow.getFlowId()))); } @Override - public Future> removeFlowsDs(RemoveFlowsDsInput input) { + public ListenableFuture> removeFlowsDs(final RemoveFlowsDsInput input) { WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction(); for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) { writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow)); } - CheckedFuture submitFuture = writeTransaction.submit(); - return handleResultFuture(submitFuture); - } - - private ListenableFuture> handleResultFuture(CheckedFuture submitFuture) { - final SettableFuture> rpcResult = SettableFuture.create(); - Futures.addCallback(submitFuture, new FutureCallback() { - @Override - public void onSuccess(Void result) { - rpcResult.set(RpcResultBuilder.success(result).build()); - } - - @Override - public void onFailure(Throwable t) { - RpcResultBuilder rpcResultBld = RpcResultBuilder.failed() - .withRpcErrors(Collections.singleton( - RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, null, t.getMessage()) - )); - rpcResult.set(rpcResultBld.build()); + return Futures.transform(handleResultFuture(Futures.allAsList(writeTransaction.commit())), voidRpcResult -> { + if (voidRpcResult.isSuccessful()) { + return RpcResultBuilder.success().build(); + } else { + return RpcResultBuilder.failed().build(); } - }); - return rpcResult; + }, MoreExecutors.directExecutor()); } - private ListenableFuture> handleResultFuture(ListenableFuture> submitFuture) { + private static ListenableFuture> handleResultFuture( + final ListenableFuture> submitFuture) { final SettableFuture> rpcResult = SettableFuture.create(); Futures.addCallback(submitFuture, new FutureCallback>() { @Override - public void onSuccess(List result) { + public void onSuccess(final List result) { rpcResult.set(RpcResultBuilder.success((Void) null).build()); } @Override - public void onFailure(Throwable t) { + public void onFailure(final Throwable throwable) { RpcResultBuilder rpcResultBld = RpcResultBuilder.failed() - .withRpcErrors(Collections.singleton( - RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, null, t.getMessage()) - )); + .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(ErrorType.APPLICATION, + null, throwable.getMessage()))); rpcResult.set(rpcResultBld.build()); } - }); + }, MoreExecutors.directExecutor()); return rpcResult; } @Override - public Future> addFlowsRpc(AddFlowsRpcInput input) { + public ListenableFuture> addFlowsRpc(final AddFlowsRpcInput input) { List>> bulkResults = new ArrayList<>(); for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) { - AddFlowInputBuilder flowInputBuilder = new AddFlowInputBuilder((org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) bulkFlow); + AddFlowInputBuilder flowInputBuilder = new AddFlowInputBuilder( + (org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) bulkFlow); final NodeRef nodeRef = bulkFlow.getNode(); flowInputBuilder.setNode(nodeRef); flowInputBuilder.setTableId(bulkFlow.getTableId()); - Future> rpcAddFlowResult = flowService.addFlow(flowInputBuilder.build()); - bulkResults.add(JdkFutureAdapters.listenInPoolThread(rpcAddFlowResult)); + bulkResults.add(flowService.addFlow(flowInputBuilder.build())); } - return handleResultFuture(Futures.allAsList(bulkResults)); + return Futures.transform(handleResultFuture(Futures.allAsList(bulkResults)), voidRpcResult -> { + if (voidRpcResult.isSuccessful()) { + return RpcResultBuilder.success().build(); + } else { + return RpcResultBuilder.failed().build(); + } + },MoreExecutors.directExecutor()); } @Override - public Future> readFlowTest(ReadFlowTestInput input) { - FlowReader flowReader = FlowReader.getNewInstance(dataBroker, - input.getDpnCount().intValue(), - input.getFlowsPerDpn().intValue(), input.isVerbose(), - input.isIsConfigDs(),input.getStartTableId().shortValue(), - input.getEndTableId().shortValue()); + public ListenableFuture> readFlowTest(final ReadFlowTestInput input) { + FlowReader flowReader = FlowReader.getNewInstance(dataBroker, input.getDpnCount().intValue(), + input.getFlowsPerDpn().intValue(), input.getVerbose(), input.getIsConfigDs(), + input.getStartTableId().shortValue(), input.getEndTableId().shortValue()); flowCounterBeanImpl.setReader(flowReader); fjService.execute(flowReader); - RpcResultBuilder rpcResultBuilder = RpcResultBuilder.success(); + RpcResultBuilder rpcResultBuilder = RpcResultBuilder.success(); return Futures.immediateFuture(rpcResultBuilder.build()); } @Override - public Future> flowRpcAddTest(FlowRpcAddTestInput input) { + public ListenableFuture> flowRpcAddTest(final FlowRpcAddTestInput input) { FlowWriterDirectOFRpc flowAddRpcTestImpl = new FlowWriterDirectOFRpc(dataBroker, flowService, fjService); - flowAddRpcTestImpl.rpcFlowAdd( - input.getDpnId(), - input.getFlowCount().intValue(), + flowAddRpcTestImpl.rpcFlowAdd(input.getDpnId(), input.getFlowCount().intValue(), input.getRpcBatchSize().intValue()); - - RpcResultBuilder rpcResultBuilder = RpcResultBuilder.success(); + RpcResultBuilder rpcResultBuilder = RpcResultBuilder.success(); return Futures.immediateFuture(rpcResultBuilder.build()); } @Override - public Future> register() { - RpcResultBuilder rpcResultBuilder = RpcResultBuilder.success(); + public ListenableFuture> register(final RegisterInput input) { + RpcResultBuilder rpcResultBuilder = RpcResultBuilder.success(); try { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - String pathToMBean = String.format("%s:type=%s", - FlowCounter.class.getPackage().getName(), - FlowCounter.class.getSimpleName()); - ObjectName name = null; - - name = new ObjectName(pathToMBean); + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + String pathToMBean = String.format("%s:type=%s", FlowCounter.class.getPackage().getName(), + FlowCounter.class.getSimpleName()); + ObjectName name = new ObjectName(pathToMBean); mbs.registerMBean(flowCounterBeanImpl, name); - } catch (MalformedObjectNameException | InstanceAlreadyExistsException - | MBeanRegistrationException | NotCompliantMBeanException e) { + } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException + | NotCompliantMBeanException e) { rpcResultBuilder = RpcResultBuilder.failed(); - e.printStackTrace(); + LOG.warn("Exception occurred", e); } return Futures.immediateFuture(rpcResultBuilder.build()); } @Override - public Future> removeFlowsRpc(RemoveFlowsRpcInput input) { + public ListenableFuture> removeFlowsRpc(final RemoveFlowsRpcInput input) { List>> bulkResults = new ArrayList<>(); for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) { - RemoveFlowInputBuilder flowInputBuilder = new RemoveFlowInputBuilder((org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) bulkFlow); + RemoveFlowInputBuilder flowInputBuilder = new RemoveFlowInputBuilder( + (org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) bulkFlow); final NodeRef nodeRef = bulkFlow.getNode(); flowInputBuilder.setNode(nodeRef); flowInputBuilder.setTableId(bulkFlow.getTableId()); - Future> rpcAddFlowResult = flowService.removeFlow(flowInputBuilder.build()); - bulkResults.add(JdkFutureAdapters.listenInPoolThread(rpcAddFlowResult)); + bulkResults.add(flowService.removeFlow(flowInputBuilder.build())); } - return handleResultFuture(Futures.allAsList(bulkResults)); + return Futures.transform(handleResultFuture(Futures.allAsList(bulkResults)), voidRpcResult -> { + if (voidRpcResult.isSuccessful()) { + return RpcResultBuilder.success().build(); + } else { + return RpcResultBuilder.failed().build(); + } + }, MoreExecutors.directExecutor()); } @Override - public Future> flowTest(FlowTestInput input) { - if (input.isTxChain()) { + public ListenableFuture> flowTest(final FlowTestInput input) { + if (input.getTxChain()) { FlowWriterTxChain flowTester = new FlowWriterTxChain(dataBroker, fjService); flowCounterBeanImpl.setWriter(flowTester); - if (input.isIsAdd()){ + if (input.getIsAdd()) { flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(), input.getBatchSize().intValue(), input.getSleepFor().intValue(), input.getSleepAfter().intValue(), input.getStartTableId().shortValue(), - input.getEndTableId().shortValue()); + input.getEndTableId().shortValue(), input.getCreateParents()); } else { flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(), input.getBatchSize().intValue(), input.getStartTableId().shortValue(), input.getEndTableId().shortValue()); } - RpcResultBuilder rpcResultBuilder = RpcResultBuilder.success(); + RpcResultBuilder rpcResultBuilder = RpcResultBuilder.success(); return Futures.immediateFuture(rpcResultBuilder.build()); } - if (input.isSeq()) { + if (input.getSeq()) { FlowWriterSequential flowTester = new FlowWriterSequential(dataBroker, fjService); flowCounterBeanImpl.setWriter(flowTester); - if (input.isIsAdd()){ + if (input.getIsAdd()) { flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(), input.getBatchSize().intValue(), input.getSleepFor().intValue(), - input.getStartTableId().shortValue(), input.getEndTableId().shortValue()); + input.getStartTableId().shortValue(), input.getEndTableId().shortValue(), + input.getCreateParents()); } else { flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(), input.getBatchSize().intValue(), input.getStartTableId().shortValue(), @@ -264,26 +281,48 @@ public class SalBulkFlowServiceImpl implements SalBulkFlowService { } else { FlowWriterConcurrent flowTester = new FlowWriterConcurrent(dataBroker, fjService); flowCounterBeanImpl.setWriter(flowTester); - if (input.isIsAdd()){ + if (input.getIsAdd()) { flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(), input.getBatchSize().intValue(), input.getSleepFor().intValue(), input.getSleepAfter().intValue(), input.getStartTableId().shortValue(), - input.getEndTableId().shortValue()); + input.getEndTableId().shortValue(), input.getCreateParents()); } else { flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(), input.getBatchSize().intValue(), input.getStartTableId().shortValue(), input.getEndTableId().shortValue()); } } - RpcResultBuilder rpcResultBuilder = RpcResultBuilder.success(); + RpcResultBuilder rpcResultBuilder = RpcResultBuilder.success(); + return Futures.immediateFuture(rpcResultBuilder.build()); + } + + @Override + public ListenableFuture> tableTest(final TableTestInput input) { + final TableWriter writer = new TableWriter(dataBroker, fjService); + flowCounterBeanImpl.setWriter(writer); + switch (input.getOperation()) { + case Add: + writer.addTables(input.getDpnCount().intValue(), input.getStartTableId().shortValue(), + input.getEndTableId().shortValue()); + break; + case Delete: + writer.deleteTables(input.getDpnCount().intValue(), input.getStartTableId().shortValue(), + input.getEndTableId().shortValue()); + break; + default: + RpcResultBuilder rpcResultBuilder = RpcResultBuilder.failed(); + return Futures.immediateFuture(rpcResultBuilder.build()); + } + RpcResultBuilder rpcResultBuilder = RpcResultBuilder.success(); return Futures.immediateFuture(rpcResultBuilder.build()); } @Override - public Future> flowRpcAddMultiple(FlowRpcAddMultipleInput input) { + public ListenableFuture> flowRpcAddMultiple( + final FlowRpcAddMultipleInput input) { FlowWriterDirectOFRpc flowTesterRPC = new FlowWriterDirectOFRpc(dataBroker, flowService, fjService); flowTesterRPC.rpcFlowAddAll(input.getFlowCount().intValue(), input.getRpcBatchSize().intValue()); - RpcResultBuilder rpcResultBuilder = RpcResultBuilder.success(); + RpcResultBuilder rpcResultBuilder = RpcResultBuilder.success(); return Futures.immediateFuture(rpcResultBuilder.build()); } }