X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=applications%2Fbulk-o-matic%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fapplications%2Fbulk%2Fo%2Fmatic%2FSalBulkFlowServiceImpl.java;h=4e017a5b07dea545ea360a911e345dd40596cd87;hb=24769609bb11cf949e9f92d6e255cc0bf296173b;hp=9f904348e47371bcfff4377fe24fc64295ec9168;hpb=18dc690ea2106eea6c2111d998581e8ded03231c;p=openflowplugin.git diff --git a/applications/bulk-o-matic/src/main/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/SalBulkFlowServiceImpl.java b/applications/bulk-o-matic/src/main/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/SalBulkFlowServiceImpl.java index 9f904348e4..4e017a5b07 100644 --- a/applications/bulk-o-matic/src/main/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/SalBulkFlowServiceImpl.java +++ b/applications/bulk-o-matic/src/main/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/SalBulkFlowServiceImpl.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2015, 2017 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -10,12 +10,39 @@ package org.opendaylight.openflowplugin.applications.bulk.o.matic; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.*; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.JdkFutureAdapters; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; +import java.lang.management.ManagementFactory; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.Future; +import javax.management.InstanceAlreadyExistsException; +import javax.management.MBeanRegistrationException; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.NotCompliantMBeanException; +import javax.management.ObjectName; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; -import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.*; +import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsDsInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsRpcInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.BulkFlowBaseContentGrouping; +import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddMultipleInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddTestInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowTestInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.ReadFlowTestInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsDsInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsRpcInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.SalBulkFlowService; +import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.TableTestInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.bulk.flow.ds.list.grouping.BulkFlowDsItem; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId; @@ -24,35 +51,36 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.ta import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.*; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; - -import javax.management.*; -import java.lang.management.ManagementFactory; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.Future; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Simple implementation providing bulk flows operations. */ public class SalBulkFlowServiceImpl implements SalBulkFlowService { + private static final Logger LOG = LoggerFactory.getLogger(SalBulkFlowServiceImpl.class); + private final SalFlowService flowService; private final DataBroker dataBroker; - private FlowCounter flowCounterBeanImpl = new FlowCounter(); + private final FlowCounter flowCounterBeanImpl = new FlowCounter(); private final ExecutorService fjService = new ForkJoinPool(); + public SalBulkFlowServiceImpl(SalFlowService flowService, DataBroker dataBroker) { this.flowService = Preconditions.checkNotNull(flowService); this.dataBroker = Preconditions.checkNotNull(dataBroker); + register(); } @Override @@ -68,17 +96,15 @@ public class SalBulkFlowServiceImpl implements SalBulkFlowService { flowBuilder.build(), createParents); createParents = createParentsNextTime; } - CheckedFuture submitFuture = writeTransaction.submit(); - return handleResultFuture(submitFuture); + ListenableFuture submitFuture = writeTransaction.submit(); + return handleResultFuture(Futures.allAsList(submitFuture)); } private InstanceIdentifier getFlowInstanceIdentifier(BulkFlowDsItem bulkFlow) { final NodeRef nodeRef = bulkFlow.getNode(); - return ((InstanceIdentifier) nodeRef.getValue()) - .augmentation(FlowCapableNode.class) + return ((InstanceIdentifier) nodeRef.getValue()).augmentation(FlowCapableNode.class) .child(Table.class, new TableKey(bulkFlow.getTableId())) - .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow.class, - new FlowKey(new FlowId(bulkFlow.getFlowId()))); + .child(Flow.class, new FlowKey(new FlowId(bulkFlow.getFlowId()))); } @Override @@ -87,29 +113,7 @@ public class SalBulkFlowServiceImpl implements SalBulkFlowService { for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) { writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow)); } - CheckedFuture submitFuture = writeTransaction.submit(); - return handleResultFuture(submitFuture); - } - - private ListenableFuture> handleResultFuture(CheckedFuture submitFuture) { - final SettableFuture> rpcResult = SettableFuture.create(); - Futures.addCallback(submitFuture, new FutureCallback() { - @Override - public void onSuccess(Void result) { - rpcResult.set(RpcResultBuilder.success(result).build()); - } - - @Override - public void onFailure(Throwable t) { - RpcResultBuilder rpcResultBld = RpcResultBuilder.failed() - .withRpcErrors(Collections.singleton( - RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, null, t.getMessage()) - )); - rpcResult.set(rpcResultBld.build()); - } - }); - return rpcResult; + return handleResultFuture(Futures.allAsList(writeTransaction.submit())); } private ListenableFuture> handleResultFuture(ListenableFuture> submitFuture) { @@ -121,14 +125,13 @@ public class SalBulkFlowServiceImpl implements SalBulkFlowService { } @Override - public void onFailure(Throwable t) { + public void onFailure(Throwable throwable) { RpcResultBuilder rpcResultBld = RpcResultBuilder.failed() - .withRpcErrors(Collections.singleton( - RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, null, t.getMessage()) - )); + .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, + null, throwable.getMessage()))); rpcResult.set(rpcResultBld.build()); } - }); + }, MoreExecutors.directExecutor()); return rpcResult; } @@ -137,7 +140,8 @@ public class SalBulkFlowServiceImpl implements SalBulkFlowService { List>> bulkResults = new ArrayList<>(); for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) { - AddFlowInputBuilder flowInputBuilder = new AddFlowInputBuilder((Flow) bulkFlow); + AddFlowInputBuilder flowInputBuilder = new AddFlowInputBuilder( + (org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) bulkFlow); final NodeRef nodeRef = bulkFlow.getNode(); flowInputBuilder.setNode(nodeRef); flowInputBuilder.setTableId(bulkFlow.getTableId()); @@ -149,11 +153,9 @@ public class SalBulkFlowServiceImpl implements SalBulkFlowService { @Override public Future> readFlowTest(ReadFlowTestInput input) { - FlowReader flowReader = FlowReader.getNewInstance(dataBroker, - input.getDpnCount().intValue(), - input.getFlowsPerDpn().intValue(), input.isVerbose(), - input.isIsConfigDs(),input.getStartTableId().shortValue(), - input.getEndTableId().shortValue()); + FlowReader flowReader = FlowReader.getNewInstance(dataBroker, input.getDpnCount().intValue(), + input.getFlowsPerDpn().intValue(), input.isVerbose(), input.isIsConfigDs(), + input.getStartTableId().shortValue(), input.getEndTableId().shortValue()); flowCounterBeanImpl.setReader(flowReader); fjService.execute(flowReader); RpcResultBuilder rpcResultBuilder = RpcResultBuilder.success(); @@ -163,12 +165,9 @@ public class SalBulkFlowServiceImpl implements SalBulkFlowService { @Override public Future> flowRpcAddTest(FlowRpcAddTestInput input) { FlowWriterDirectOFRpc flowAddRpcTestImpl = new FlowWriterDirectOFRpc(dataBroker, flowService, fjService); - flowAddRpcTestImpl.rpcFlowAdd( - input.getDpnId(), - input.getFlowCount().intValue(), + flowAddRpcTestImpl.rpcFlowAdd(input.getDpnId(), input.getFlowCount().intValue(), input.getRpcBatchSize().intValue()); - RpcResultBuilder rpcResultBuilder = RpcResultBuilder.success(); return Futures.immediateFuture(rpcResultBuilder.build()); } @@ -177,18 +176,15 @@ public class SalBulkFlowServiceImpl implements SalBulkFlowService { public Future> register() { RpcResultBuilder rpcResultBuilder = RpcResultBuilder.success(); try { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - String pathToMBean = String.format("%s:type=%s", - FlowCounter.class.getPackage().getName(), - FlowCounter.class.getSimpleName()); - ObjectName name = null; - - name = new ObjectName(pathToMBean); + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + String pathToMBean = String.format("%s:type=%s", FlowCounter.class.getPackage().getName(), + FlowCounter.class.getSimpleName()); + ObjectName name = new ObjectName(pathToMBean); mbs.registerMBean(flowCounterBeanImpl, name); - } catch (MalformedObjectNameException | InstanceAlreadyExistsException - | MBeanRegistrationException | NotCompliantMBeanException e) { + } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException + | NotCompliantMBeanException e) { rpcResultBuilder = RpcResultBuilder.failed(); - e.printStackTrace(); + LOG.warn("Exception occurred: {} ", e.getMessage(), e); } return Futures.immediateFuture(rpcResultBuilder.build()); } @@ -198,7 +194,8 @@ public class SalBulkFlowServiceImpl implements SalBulkFlowService { List>> bulkResults = new ArrayList<>(); for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) { - RemoveFlowInputBuilder flowInputBuilder = new RemoveFlowInputBuilder((Flow) bulkFlow); + RemoveFlowInputBuilder flowInputBuilder = new RemoveFlowInputBuilder( + (org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) bulkFlow); final NodeRef nodeRef = bulkFlow.getNode(); flowInputBuilder.setNode(nodeRef); flowInputBuilder.setTableId(bulkFlow.getTableId()); @@ -213,11 +210,11 @@ public class SalBulkFlowServiceImpl implements SalBulkFlowService { if (input.isTxChain()) { FlowWriterTxChain flowTester = new FlowWriterTxChain(dataBroker, fjService); flowCounterBeanImpl.setWriter(flowTester); - if (input.isIsAdd()){ + if (input.isIsAdd()) { flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(), input.getBatchSize().intValue(), input.getSleepFor().intValue(), input.getSleepAfter().intValue(), input.getStartTableId().shortValue(), - input.getEndTableId().shortValue()); + input.getEndTableId().shortValue(), input.isCreateParents()); } else { flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(), input.getBatchSize().intValue(), input.getStartTableId().shortValue(), @@ -229,10 +226,11 @@ public class SalBulkFlowServiceImpl implements SalBulkFlowService { if (input.isSeq()) { FlowWriterSequential flowTester = new FlowWriterSequential(dataBroker, fjService); flowCounterBeanImpl.setWriter(flowTester); - if (input.isIsAdd()){ + if (input.isIsAdd()) { flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(), input.getBatchSize().intValue(), input.getSleepFor().intValue(), - input.getStartTableId().shortValue(), input.getEndTableId().shortValue()); + input.getStartTableId().shortValue(), input.getEndTableId().shortValue(), + input.isCreateParents()); } else { flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(), input.getBatchSize().intValue(), input.getStartTableId().shortValue(), @@ -241,11 +239,11 @@ public class SalBulkFlowServiceImpl implements SalBulkFlowService { } else { FlowWriterConcurrent flowTester = new FlowWriterConcurrent(dataBroker, fjService); flowCounterBeanImpl.setWriter(flowTester); - if (input.isIsAdd()){ + if (input.isIsAdd()) { flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(), input.getBatchSize().intValue(), input.getSleepFor().intValue(), input.getSleepAfter().intValue(), input.getStartTableId().shortValue(), - input.getEndTableId().shortValue()); + input.getEndTableId().shortValue(), input.isCreateParents()); } else { flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(), input.getBatchSize().intValue(), input.getStartTableId().shortValue(), @@ -256,6 +254,27 @@ public class SalBulkFlowServiceImpl implements SalBulkFlowService { return Futures.immediateFuture(rpcResultBuilder.build()); } + @Override + public Future> tableTest(final TableTestInput input) { + final TableWriter writer = new TableWriter(dataBroker, fjService); + flowCounterBeanImpl.setWriter(writer); + switch (input.getOperation()) { + case Add: + writer.addTables(input.getDpnCount().intValue(), input.getStartTableId().shortValue(), + input.getEndTableId().shortValue()); + break; + case Delete: + writer.deleteTables(input.getDpnCount().intValue(), input.getStartTableId().shortValue(), + input.getEndTableId().shortValue()); + break; + default: + RpcResultBuilder rpcResultBuilder = RpcResultBuilder.failed(); + return Futures.immediateFuture(rpcResultBuilder.build()); + } + RpcResultBuilder rpcResultBuilder = RpcResultBuilder.success(); + return Futures.immediateFuture(rpcResultBuilder.build()); + } + @Override public Future> flowRpcAddMultiple(FlowRpcAddMultipleInput input) { FlowWriterDirectOFRpc flowTesterRPC = new FlowWriterDirectOFRpc(dataBroker, flowService, fjService);