Do not use RpcService in bulk-o-matic 00/108100/16
authorlubos-cicut <lubos.cicut@pantheon.tech>
Sun, 1 Oct 2023 18:51:22 +0000 (20:51 +0200)
committerRobert Varga <nite@hq.sk>
Wed, 7 Feb 2024 07:25:09 +0000 (07:25 +0000)
Use single RPCs instead of rolled-up RpcServices. We also register all
RPCs as an atomic entity via the SalBulkFlowRpcs component -- ditching
blueprint in the process of doing so.

JIRA: OPNFLWPLUG-1112
JIRA: OPNFLWPLUG-1125
Change-Id: Id7d1a1c8fa330d4f8d798d707fe06a9d8ca9da0e
Signed-off-by: lubos-cicut <lubos.cicut@pantheon.tech>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
applications/bulk-o-matic/pom.xml
applications/bulk-o-matic/src/main/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/FlowWriterDirectOFRpc.java
applications/bulk-o-matic/src/main/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/SalBulkFlowRpcs.java [moved from applications/bulk-o-matic/src/main/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/SalBulkFlowServiceImpl.java with 74% similarity]
applications/bulk-o-matic/src/main/resources/OSGI-INF/blueprint/bulk-o-matic.xml [deleted file]
applications/bulk-o-matic/src/test/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/FlowWriterDirectOFRpcTest.java
applications/bulk-o-matic/src/test/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/SalBulkFlowRpcsTest.java [moved from applications/bulk-o-matic/src/test/java/org/opendaylight/openflowplugin/applications/bulk/o/matic/SalBulkFlowServiceImplTest.java with 91% similarity]

index c7ee4d2d88a77c949fa8fd1c4afd486ca7beaa77..567abedfe4417df7cf23f2e77ccbd01c39913a55 100644 (file)
@@ -1,38 +1,51 @@
 <?xml version="1.0"?>
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.opendaylight.openflowplugin</groupId>
-    <artifactId>openflowplugin-parent</artifactId>
-    <version>0.18.0-SNAPSHOT</version>
-    <relativePath>../../parent</relativePath>
-  </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.opendaylight.openflowplugin</groupId>
+        <artifactId>openflowplugin-parent</artifactId>
+        <version>0.18.0-SNAPSHOT</version>
+        <relativePath>../../parent</relativePath>
+    </parent>
 
-  <groupId>org.opendaylight.openflowplugin.applications</groupId>
-  <artifactId>bulk-o-matic</artifactId>
-  <packaging>bundle</packaging>
+    <groupId>org.opendaylight.openflowplugin.applications</groupId>
+    <artifactId>bulk-o-matic</artifactId>
+    <packaging>bundle</packaging>
 
   <dependencies>
-    <dependency>
-      <groupId>org.opendaylight.mdsal</groupId>
-      <artifactId>mdsal-binding-api</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.opendaylight.openflowplugin.model</groupId>
-      <artifactId>model-inventory</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.opendaylight.openflowplugin.model</groupId>
-      <artifactId>model-flow-base</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.opendaylight.openflowplugin.model</groupId>
-      <artifactId>model-flow-service</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.opendaylight.infrautils</groupId>
-      <artifactId>infrautils-util</artifactId>
-    </dependency>
+      <dependency>
+          <groupId>com.guicedee.services</groupId>
+          <artifactId>javax.inject</artifactId>
+          <optional>true</optional>
+      </dependency>
+      <dependency>
+          <groupId>jakarta.annotation</groupId>
+          <artifactId>jakarta.annotation-api</artifactId>
+          <optional>true</optional>
+      </dependency>
+      <dependency>
+          <groupId>org.opendaylight.infrautils</groupId>
+          <artifactId>infrautils-util</artifactId>
+      </dependency>
+      <dependency>
+          <groupId>org.opendaylight.mdsal</groupId>
+          <artifactId>mdsal-binding-api</artifactId>
+      </dependency>
+      <dependency>
+          <groupId>org.opendaylight.openflowplugin.model</groupId>
+          <artifactId>model-inventory</artifactId>
+      </dependency>
+      <dependency>
+          <groupId>org.opendaylight.openflowplugin.model</groupId>
+          <artifactId>model-flow-base</artifactId>
+      </dependency>
+      <dependency>
+          <groupId>org.opendaylight.openflowplugin.model</groupId>
+          <artifactId>model-flow-service</artifactId>
+      </dependency>
+      <dependency>
+          <groupId>org.osgi</groupId>
+          <artifactId>org.osgi.service.component.annotations</artifactId>
+      </dependency>
   </dependencies>
-
 </project>
index 1ac1db7564058c466822374e3da57050aabaa473..c7436fc8ca7ffe92d291e5bbed59e710d941a29b 100644 (file)
@@ -7,13 +7,15 @@
  */
 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
 
+import static java.util.Objects.requireNonNull;
+
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.infrautils.utils.concurrent.LoggingFutures;
 import org.opendaylight.mdsal.binding.api.DataBroker;
@@ -21,10 +23,10 @@ import org.opendaylight.mdsal.binding.api.ReadTransaction;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowTableRef;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
@@ -35,28 +37,27 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class FlowWriterDirectOFRpc {
-
     private static final Logger LOG = LoggerFactory.getLogger(FlowWriterDirectOFRpc.class);
-    private final DataBroker dataBroker;
-    private final SalFlowService flowService;
-    private final ExecutorService flowPusher;
     private static final long PAUSE_BETWEEN_BATCH_MILLIS = 40;
 
-    public FlowWriterDirectOFRpc(final DataBroker dataBroker, final SalFlowService salFlowService,
-            final ExecutorService flowPusher) {
-        this.dataBroker = dataBroker;
-        this.flowService = salFlowService;
-        this.flowPusher = flowPusher;
+    private final DataBroker dataBroker;
+    private final Executor flowPusher;
+    private final AddFlow addFlow;
+
+    public FlowWriterDirectOFRpc(final DataBroker dataBroker, final Executor flowPusher, final AddFlow addFlow) {
+        this.dataBroker = requireNonNull(dataBroker);
+        this.flowPusher = requireNonNull(flowPusher);
+        this.addFlow = requireNonNull(addFlow);
     }
 
-    public void rpcFlowAdd(String dpId, int flowsPerDpn, int batchSize) {
+    public void rpcFlowAdd(final String dpId, final int flowsPerDpn, final int batchSize) {
         if (!getAllNodes().isEmpty() && getAllNodes().contains(dpId)) {
             FlowRPCHandlerTask addFlowRpcTask = new FlowRPCHandlerTask(dpId, flowsPerDpn, batchSize);
             flowPusher.execute(addFlowRpcTask);
         }
     }
 
-    public void rpcFlowAddAll(int flowsPerDpn, int batchSize) {
+    public void rpcFlowAddAll(final int flowsPerDpn, final int batchSize) {
         Set<String> nodeIdSet = getAllNodes();
         if (nodeIdSet.isEmpty()) {
             LOG.warn("No nodes seen on OPERATIONAL DS. Aborting !!!!");
@@ -133,7 +134,7 @@ public class FlowWriterDirectOFRpc {
                 AddFlowInput addFlowInput = builder.build();
 
                 LOG.debug("RPC invocation for adding flow-id {} with input {}", flowId, addFlowInput);
-                LoggingFutures.addErrorLogging(flowService.addFlow(addFlowInput), LOG, "addFlow");
+                LoggingFutures.addErrorLogging(addFlow.invoke(addFlowInput), LOG, "addFlow");
 
                 if (i % batchSize == 0) {
                     try {
@@ -10,6 +10,8 @@ package org.opendaylight.openflowplugin.applications.bulk.o.matic;
 import static java.util.Objects.requireNonNull;
 import static java.util.Objects.requireNonNullElse;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableClassToInstanceMap;
 import com.google.common.util.concurrent.FluentFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -22,6 +24,10 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ForkJoinPool;
+import java.util.stream.Collectors;
+import javax.annotation.PreDestroy;
+import javax.inject.Inject;
+import javax.inject.Singleton;
 import javax.management.InstanceAlreadyExistsException;
 import javax.management.MBeanRegistrationException;
 import javax.management.MBeanServer;
@@ -30,29 +36,40 @@ import javax.management.NotCompliantMBeanException;
 import javax.management.ObjectName;
 import org.opendaylight.infrautils.utils.concurrent.LoggingFutures;
 import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.RpcConsumerRegistry;
+import org.opendaylight.mdsal.binding.api.RpcProviderService;
 import org.opendaylight.mdsal.binding.api.WriteTransaction;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsDs;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsDsInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsDsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsRpc;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsRpcInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsRpcOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.BulkFlowBaseContentGrouping;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddMultiple;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddMultipleInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddMultipleOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddTest;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddTestInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddTestOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowTest;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowTestInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowTestOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.ReadFlowTest;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.ReadFlowTestInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.ReadFlowTestOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.Register;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RegisterInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RegisterInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RegisterOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsDs;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsDsInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsDsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsRpc;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsRpcInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsRpcOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.SalBulkFlowService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.TableTest;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.TableTestInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.TableTestOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.bulk.flow.ds.list.grouping.BulkFlowDsItem;
@@ -63,45 +80,76 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.ta
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yangtools.concepts.Registration;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.Rpc;
 import org.opendaylight.yangtools.yang.common.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Simple implementation providing bulk flows operations.
  */
-public class SalBulkFlowServiceImpl implements SalBulkFlowService {
+@Singleton
+@Component(service = { })
+public final class SalBulkFlowRpcs implements AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(SalBulkFlowRpcs.class);
 
-    private static final Logger LOG = LoggerFactory.getLogger(SalBulkFlowServiceImpl.class);
-
-    private final SalFlowService flowService;
-    private final DataBroker dataBroker;
     private final FlowCounter flowCounterBeanImpl = new FlowCounter();
     private final ExecutorService fjService = new ForkJoinPool();
+    private final DataBroker dataBroker;
+    private final AddFlow addFlow;
+    private final RemoveFlow removeFlow;
+    private final Registration reg;
 
-    public SalBulkFlowServiceImpl(final SalFlowService flowService, final DataBroker dataBroker) {
-        this.flowService = requireNonNull(flowService);
+    @Inject
+    @Activate
+    public SalBulkFlowRpcs(@Reference final DataBroker dataBroker, @Reference final RpcConsumerRegistry rpcService,
+            @Reference final RpcProviderService rpcProviderService) {
         this.dataBroker = requireNonNull(dataBroker);
-
+        addFlow = rpcService.getRpc(AddFlow.class);
+        removeFlow = rpcService.getRpc(RemoveFlow.class);
+        reg = rpcProviderService.registerRpcImplementations(ImmutableClassToInstanceMap.<Rpc<?, ?>>builder()
+            .put(Register.class, this::register)
+            .put(AddFlowsRpc.class, this::addFlowsRpc)
+            .put(RemoveFlowsRpc.class, this::removeFlowsRpc)
+            .put(AddFlowsDs.class, this::addFlowsDs)
+            .put(RemoveFlowsDs.class, this::removeFlowsDs)
+            .put(FlowTest.class, this::flowTest)
+            .put(ReadFlowTest.class, this::readFlowTest)
+            .put(FlowRpcAddTest.class, this::flowRpcAddTest)
+            .put(FlowRpcAddMultiple.class, this::flowRpcAddMultiple)
+            .put(TableTest.class, this::tableTest)
+            .build());
         LoggingFutures.addErrorLogging(register(new RegisterInputBuilder().build()), LOG, "register");
     }
 
+    @PreDestroy
+    @Deactivate
     @Override
-    public ListenableFuture<RpcResult<AddFlowsDsOutput>> addFlowsDs(final AddFlowsDsInput input) {
+    public void close() {
+        reg.close();
+    }
+
+    @VisibleForTesting
+    ListenableFuture<RpcResult<AddFlowsDsOutput>> addFlowsDs(final AddFlowsDsInput input) {
         WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
         boolean createParentsNextTime = requireNonNullElse(input.getAlwaysCreateParents(), Boolean.FALSE);
         boolean createParents = true;
-        for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) {
+        for (BulkFlowDsItem bulkFlow : input.nonnullBulkFlowDsItem()) {
             FlowBuilder flowBuilder = new FlowBuilder(bulkFlow);
             flowBuilder.setTableId(bulkFlow.getTableId());
             flowBuilder.setId(new FlowId(bulkFlow.getFlowId()));
@@ -132,10 +180,10 @@ public class SalBulkFlowServiceImpl implements SalBulkFlowService {
                 .child(Flow.class, new FlowKey(new FlowId(bulkFlow.getFlowId())));
     }
 
-    @Override
-    public ListenableFuture<RpcResult<RemoveFlowsDsOutput>> removeFlowsDs(final RemoveFlowsDsInput input) {
+    @VisibleForTesting
+    ListenableFuture<RpcResult<RemoveFlowsDsOutput>> removeFlowsDs(final RemoveFlowsDsInput input) {
         WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
-        for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) {
+        for (BulkFlowDsItem bulkFlow : input.nonnullBulkFlowDsItem()) {
             writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow));
         }
         return Futures.transform(handleResultFuture(Futures.allAsList(writeTransaction.commit())), voidRpcResult -> {
@@ -167,8 +215,8 @@ public class SalBulkFlowServiceImpl implements SalBulkFlowService {
         return rpcResult;
     }
 
-    @Override
-    public ListenableFuture<RpcResult<AddFlowsRpcOutput>> addFlowsRpc(final AddFlowsRpcInput input) {
+    @VisibleForTesting
+    ListenableFuture<RpcResult<AddFlowsRpcOutput>> addFlowsRpc(final AddFlowsRpcInput input) {
         List<ListenableFuture<RpcResult<AddFlowOutput>>> bulkResults = new ArrayList<>();
 
         for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) {
@@ -177,7 +225,7 @@ public class SalBulkFlowServiceImpl implements SalBulkFlowService {
             final NodeRef nodeRef = bulkFlow.getNode();
             flowInputBuilder.setNode(nodeRef);
             flowInputBuilder.setTableId(bulkFlow.getTableId());
-            bulkResults.add(flowService.addFlow(flowInputBuilder.build()));
+            bulkResults.add(addFlow.invoke(flowInputBuilder.build()));
         }
         return Futures.transform(handleResultFuture(Futures.allAsList(bulkResults)), voidRpcResult -> {
             if (voidRpcResult.isSuccessful()) {
@@ -188,8 +236,8 @@ public class SalBulkFlowServiceImpl implements SalBulkFlowService {
         },MoreExecutors.directExecutor());
     }
 
-    @Override
-    public ListenableFuture<RpcResult<ReadFlowTestOutput>> readFlowTest(final ReadFlowTestInput input) {
+    @VisibleForTesting
+    ListenableFuture<RpcResult<ReadFlowTestOutput>> readFlowTest(final ReadFlowTestInput input) {
         FlowReader flowReader = FlowReader.getNewInstance(dataBroker, input.getDpnCount().intValue(),
                 input.getFlowsPerDpn().intValue(), input.getVerbose(), input.getIsConfigDs(),
                 input.getStartTableId().shortValue(), input.getEndTableId().shortValue());
@@ -199,9 +247,9 @@ public class SalBulkFlowServiceImpl implements SalBulkFlowService {
         return Futures.immediateFuture(rpcResultBuilder.build());
     }
 
-    @Override
-    public ListenableFuture<RpcResult<FlowRpcAddTestOutput>> flowRpcAddTest(final FlowRpcAddTestInput input) {
-        FlowWriterDirectOFRpc flowAddRpcTestImpl = new FlowWriterDirectOFRpc(dataBroker, flowService, fjService);
+    @VisibleForTesting
+    ListenableFuture<RpcResult<FlowRpcAddTestOutput>> flowRpcAddTest(final FlowRpcAddTestInput input) {
+        FlowWriterDirectOFRpc flowAddRpcTestImpl = new FlowWriterDirectOFRpc(dataBroker, fjService, addFlow);
         flowAddRpcTestImpl.rpcFlowAdd(input.getDpnId(), input.getFlowCount().intValue(),
                 input.getRpcBatchSize().intValue());
 
@@ -209,8 +257,7 @@ public class SalBulkFlowServiceImpl implements SalBulkFlowService {
         return Futures.immediateFuture(rpcResultBuilder.build());
     }
 
-    @Override
-    public ListenableFuture<RpcResult<RegisterOutput>> register(final RegisterInput input) {
+    private ListenableFuture<RpcResult<RegisterOutput>> register(final RegisterInput input) {
         RpcResultBuilder<RegisterOutput> rpcResultBuilder = RpcResultBuilder.success();
         try {
             MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
@@ -226,29 +273,27 @@ public class SalBulkFlowServiceImpl implements SalBulkFlowService {
         return Futures.immediateFuture(rpcResultBuilder.build());
     }
 
-    @Override
-    public ListenableFuture<RpcResult<RemoveFlowsRpcOutput>> removeFlowsRpc(final RemoveFlowsRpcInput input) {
-        List<ListenableFuture<RpcResult<RemoveFlowOutput>>> bulkResults = new ArrayList<>();
-
-        for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) {
-            RemoveFlowInputBuilder flowInputBuilder = new RemoveFlowInputBuilder(
-                    (org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) bulkFlow);
-            final NodeRef nodeRef = bulkFlow.getNode();
-            flowInputBuilder.setNode(nodeRef);
-            flowInputBuilder.setTableId(bulkFlow.getTableId());
-            bulkResults.add(flowService.removeFlow(flowInputBuilder.build()));
-        }
-        return Futures.transform(handleResultFuture(Futures.allAsList(bulkResults)), voidRpcResult -> {
-            if (voidRpcResult.isSuccessful()) {
-                return RpcResultBuilder.<RemoveFlowsRpcOutput>success().build();
-            } else {
-                return RpcResultBuilder.<RemoveFlowsRpcOutput>failed().build();
-            }
-        }, MoreExecutors.directExecutor());
+    @VisibleForTesting
+    ListenableFuture<RpcResult<RemoveFlowsRpcOutput>> removeFlowsRpc(final RemoveFlowsRpcInput input) {
+        return Futures.transform(handleResultFuture(
+            Futures.allAsList(input.nonnullBulkFlowItem().stream()
+                .map(bulkFlow -> removeFlow.invoke(new RemoveFlowInputBuilder(
+                    (org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) bulkFlow)
+                    .setNode(bulkFlow.getNode())
+                    .setTableId(bulkFlow.getTableId())
+                    .build()))
+                .collect(Collectors.toList()))),
+            voidRpcResult -> {
+                if (voidRpcResult.isSuccessful()) {
+                    return RpcResultBuilder.<RemoveFlowsRpcOutput>success().build();
+                } else {
+                    return RpcResultBuilder.<RemoveFlowsRpcOutput>failed().build();
+                }
+            }, MoreExecutors.directExecutor());
     }
 
-    @Override
-    public ListenableFuture<RpcResult<FlowTestOutput>> flowTest(final FlowTestInput input) {
+    @VisibleForTesting
+    ListenableFuture<RpcResult<FlowTestOutput>> flowTest(final FlowTestInput input) {
         if (input.getTxChain()) {
             FlowWriterTxChain flowTester = new FlowWriterTxChain(dataBroker, fjService);
             flowCounterBeanImpl.setWriter(flowTester);
@@ -296,9 +341,9 @@ public class SalBulkFlowServiceImpl implements SalBulkFlowService {
         return Futures.immediateFuture(rpcResultBuilder.build());
     }
 
-    @Override
-    public ListenableFuture<RpcResult<TableTestOutput>> tableTest(final TableTestInput input) {
-        final TableWriter writer = new TableWriter(dataBroker, fjService);
+    @VisibleForTesting
+    ListenableFuture<RpcResult<TableTestOutput>> tableTest(final TableTestInput input) {
+        final var writer = new TableWriter(dataBroker, fjService);
         flowCounterBeanImpl.setWriter(writer);
         switch (input.getOperation()) {
             case Add:
@@ -317,12 +362,10 @@ public class SalBulkFlowServiceImpl implements SalBulkFlowService {
         return Futures.immediateFuture(rpcResultBuilder.build());
     }
 
-    @Override
-    public ListenableFuture<RpcResult<FlowRpcAddMultipleOutput>> flowRpcAddMultiple(
-            final FlowRpcAddMultipleInput input) {
-        FlowWriterDirectOFRpc flowTesterRPC = new FlowWriterDirectOFRpc(dataBroker, flowService, fjService);
-        flowTesterRPC.rpcFlowAddAll(input.getFlowCount().intValue(), input.getRpcBatchSize().intValue());
-        RpcResultBuilder<FlowRpcAddMultipleOutput> rpcResultBuilder = RpcResultBuilder.success();
-        return Futures.immediateFuture(rpcResultBuilder.build());
+    @VisibleForTesting
+    ListenableFuture<RpcResult<FlowRpcAddMultipleOutput>> flowRpcAddMultiple(final FlowRpcAddMultipleInput input) {
+        new FlowWriterDirectOFRpc(dataBroker, fjService, addFlow)
+            .rpcFlowAddAll(input.getFlowCount().intValue(), input.getRpcBatchSize().intValue());
+        return RpcResultBuilder.<FlowRpcAddMultipleOutput>success().buildFuture();
     }
 }
diff --git a/applications/bulk-o-matic/src/main/resources/OSGI-INF/blueprint/bulk-o-matic.xml b/applications/bulk-o-matic/src/main/resources/OSGI-INF/blueprint/bulk-o-matic.xml
deleted file mode 100644 (file)
index 425cd1e..0000000
+++ /dev/null
@@ -1,17 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
-           xmlns:odl="http://opendaylight.org/xmlns/blueprint/v1.0.0"
-        odl:use-default-for-reference-types="true">
-
-  <reference id="dataBroker" interface="org.opendaylight.mdsal.binding.api.DataBroker"/>
-
-  <odl:rpc-service id="flowService"
-      interface="org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService"/>
-
-  <bean id="bulkFlowService" class="org.opendaylight.openflowplugin.applications.bulk.o.matic.SalBulkFlowServiceImpl">
-    <argument ref="flowService"/>
-    <argument ref="dataBroker"/>
-  </bean>
-
-  <odl:rpc-implementation ref="bulkFlowService"/>
-</blueprint>
index 2de02cdce88b1c8996f884819146b8da35b92fa2..ce4241b807e3b3277c14020b7adfb16e941be5bc 100644 (file)
@@ -25,7 +25,7 @@ import org.mockito.junit.MockitoJUnitRunner;
 import org.opendaylight.mdsal.binding.api.DataBroker;
 import org.opendaylight.mdsal.binding.api.ReadTransaction;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
@@ -33,34 +33,30 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.N
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Test for {@link FlowWriterDirectOFRpc}.
  */
 @RunWith(MockitoJUnitRunner.class)
 public class FlowWriterDirectOFRpcTest {
-
-    private static final Logger LOG = LoggerFactory.getLogger(FlowWriterDirectOFRpcTest.class);
     private static final int FLOWS_PER_DPN = 100;
 
     @Mock
     private DataBroker mockDataBroker;
     @Mock
-    private SalFlowService mockSalFlowService;
-    @Mock
     private ExecutorService mockFlowPusher;
     @Mock
     private ReadTransaction readOnlyTransaction;
     @Mock
     private Nodes mockNodes;
+    @Mock
+    private AddFlow addFlow;
 
     private FlowWriterDirectOFRpc flowWriterDirectOFRpc;
 
     @Before
     public void setUp() {
-        doReturn(RpcResultBuilder.success().buildFuture()).when(mockSalFlowService).addFlow(any());
+        doReturn(RpcResultBuilder.success().buildFuture()).when(addFlow).invoke(any());
 
         when(mockDataBroker.newReadOnlyTransaction()).thenReturn(readOnlyTransaction);
         NodeBuilder nodeBuilder = new NodeBuilder()
@@ -80,18 +76,18 @@ public class FlowWriterDirectOFRpcTest {
             return null;
         }).when(mockFlowPusher).execute(ArgumentMatchers.any());
 
-        flowWriterDirectOFRpc = new FlowWriterDirectOFRpc(mockDataBroker, mockSalFlowService, mockFlowPusher);
+        flowWriterDirectOFRpc = new FlowWriterDirectOFRpc(mockDataBroker, mockFlowPusher, addFlow);
     }
 
     @Test
     public void testRpcFlowAdd() {
         flowWriterDirectOFRpc.rpcFlowAdd("1", FLOWS_PER_DPN, 10);
-        Mockito.verify(mockSalFlowService, Mockito.times(FLOWS_PER_DPN)).addFlow(Mockito.any());
+        Mockito.verify(addFlow, Mockito.times(FLOWS_PER_DPN)).invoke(Mockito.any());
     }
 
     @Test
     public void testRpcFlowAddAll() {
         flowWriterDirectOFRpc.rpcFlowAddAll(FLOWS_PER_DPN, 10);
-        Mockito.verify(mockSalFlowService, Mockito.times(FLOWS_PER_DPN)).addFlow(Mockito.any());
+        Mockito.verify(addFlow, Mockito.times(FLOWS_PER_DPN)).invoke(Mockito.any());
     }
 }
@@ -30,6 +30,8 @@ import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.opendaylight.mdsal.binding.api.DataBroker;
 import org.opendaylight.mdsal.binding.api.ReadTransaction;
+import org.opendaylight.mdsal.binding.api.RpcConsumerRegistry;
+import org.opendaylight.mdsal.binding.api.RpcProviderService;
 import org.opendaylight.mdsal.binding.api.WriteTransaction;
 import org.opendaylight.mdsal.common.api.CommitInfo;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
@@ -60,9 +62,10 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608
 import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.bulk.flow.list.grouping.BulkFlowItemBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
@@ -74,15 +77,16 @@ import org.opendaylight.yangtools.yang.common.Uint32;
 import org.opendaylight.yangtools.yang.common.Uint8;
 
 /**
- * Test for {@link SalBulkFlowServiceImpl}.
+ * Test for {@link SalBulkFlowRpcs}.
  */
 @RunWith(MockitoJUnitRunner.class)
-public class SalBulkFlowServiceImplTest {
-
+public class SalBulkFlowRpcsTest {
     @Mock
     private DataBroker mockDataBroker;
     @Mock
-    private SalFlowService mockSalFlowService;
+    private RpcConsumerRegistry mockRpcService;
+    @Mock
+    private RpcProviderService mockRpcProviderService;
     @Mock
     private WriteTransaction writeTransaction;
     @Mock
@@ -91,10 +95,14 @@ public class SalBulkFlowServiceImplTest {
     private Nodes mockNodes;
     @Mock
     private Node mockNode;
+    @Mock
+    private AddFlow addFlow;
+    @Mock
+    private RemoveFlow removeFlow;
     @Captor
     private ArgumentCaptor<Flow> flowArgumentCaptor;
 
-    private SalBulkFlowServiceImpl salBulkFlowService;
+    private SalBulkFlowRpcs salBulkFlowService;
 
     @Before
     public void setUp() {
@@ -103,7 +111,10 @@ public class SalBulkFlowServiceImplTest {
 
         lenient().doReturn(FluentFutures.immediateFluentFuture(Optional.of(mockNode))).when(readOnlyTransaction)
             .read(any(LogicalDatastoreType.class), any());
-        salBulkFlowService = new SalBulkFlowServiceImpl(mockSalFlowService, mockDataBroker);
+
+        doReturn(addFlow).when(mockRpcService).getRpc(AddFlow.class);
+        doReturn(removeFlow).when(mockRpcService).getRpc(RemoveFlow.class);
+        salBulkFlowService = new SalBulkFlowRpcs(mockDataBroker, mockRpcService, mockRpcProviderService);
     }
 
     @Test
@@ -147,11 +158,10 @@ public class SalBulkFlowServiceImplTest {
 
     @Test
     public void testAddRemoveFlowsRpc() {
-        Mockito.when(mockSalFlowService.addFlow(ArgumentMatchers.any()))
+        Mockito.when(addFlow.invoke(ArgumentMatchers.any()))
                 .thenReturn(RpcResultBuilder.success(new AddFlowOutputBuilder().build()).buildFuture());
-
-        Mockito.when(mockSalFlowService.removeFlow(ArgumentMatchers.any()))
-                .thenReturn(RpcResultBuilder.success(new RemoveFlowOutputBuilder().build()).buildFuture());
+        Mockito.when(removeFlow.invoke(ArgumentMatchers.any()))
+            .thenReturn(RpcResultBuilder.success(new RemoveFlowOutputBuilder().build()).buildFuture());
 
         final BulkFlowItemBuilder bulkFlowItemBuilder = new BulkFlowItemBuilder();
         final InstanceIdentifier<Node> nodeId = BulkOMaticUtils.getFlowCapableNodeId("1");
@@ -167,7 +177,7 @@ public class SalBulkFlowServiceImplTest {
         final AddFlowsRpcInput addFlowsRpcInput = addFlowsRpcInputBuilder.build();
         salBulkFlowService.addFlowsRpc(addFlowsRpcInput);
 
-        verify(mockSalFlowService).addFlow(ArgumentMatchers.any());
+        verify(addFlow).invoke(ArgumentMatchers.any());
 
         final RemoveFlowsRpcInputBuilder removeFlowsRpcInputBuilder = new RemoveFlowsRpcInputBuilder();
         removeFlowsRpcInputBuilder.setBulkFlowItem(bulkFlowItems);
@@ -175,7 +185,7 @@ public class SalBulkFlowServiceImplTest {
         final RemoveFlowsRpcInput removeFlowsRpcInput = removeFlowsRpcInputBuilder.build();
         salBulkFlowService.removeFlowsRpc(removeFlowsRpcInput);
 
-        verify(mockSalFlowService).removeFlow(ArgumentMatchers.any());
+        verify(removeFlow).invoke(ArgumentMatchers.any());
     }
 
     @Test