*/
package org.opendaylight.openflowplugin.applications.bulk.o.matic;
+import static java.util.Objects.requireNonNull;
+
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.opendaylight.infrautils.utils.concurrent.LoggingFutures;
import org.opendaylight.mdsal.binding.api.DataBroker;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlow;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowTableRef;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
import org.slf4j.LoggerFactory;
public class FlowWriterDirectOFRpc {
-
private static final Logger LOG = LoggerFactory.getLogger(FlowWriterDirectOFRpc.class);
- private final DataBroker dataBroker;
- private final SalFlowService flowService;
- private final ExecutorService flowPusher;
private static final long PAUSE_BETWEEN_BATCH_MILLIS = 40;
- public FlowWriterDirectOFRpc(final DataBroker dataBroker, final SalFlowService salFlowService,
- final ExecutorService flowPusher) {
- this.dataBroker = dataBroker;
- this.flowService = salFlowService;
- this.flowPusher = flowPusher;
+ private final DataBroker dataBroker;
+ private final Executor flowPusher;
+ private final AddFlow addFlow;
+
+ public FlowWriterDirectOFRpc(final DataBroker dataBroker, final Executor flowPusher, final AddFlow addFlow) {
+ this.dataBroker = requireNonNull(dataBroker);
+ this.flowPusher = requireNonNull(flowPusher);
+ this.addFlow = requireNonNull(addFlow);
}
- public void rpcFlowAdd(String dpId, int flowsPerDpn, int batchSize) {
+ public void rpcFlowAdd(final String dpId, final int flowsPerDpn, final int batchSize) {
if (!getAllNodes().isEmpty() && getAllNodes().contains(dpId)) {
FlowRPCHandlerTask addFlowRpcTask = new FlowRPCHandlerTask(dpId, flowsPerDpn, batchSize);
flowPusher.execute(addFlowRpcTask);
}
}
- public void rpcFlowAddAll(int flowsPerDpn, int batchSize) {
+ public void rpcFlowAddAll(final int flowsPerDpn, final int batchSize) {
Set<String> nodeIdSet = getAllNodes();
if (nodeIdSet.isEmpty()) {
LOG.warn("No nodes seen on OPERATIONAL DS. Aborting !!!!");
AddFlowInput addFlowInput = builder.build();
LOG.debug("RPC invocation for adding flow-id {} with input {}", flowId, addFlowInput);
- LoggingFutures.addErrorLogging(flowService.addFlow(addFlowInput), LOG, "addFlow");
+ LoggingFutures.addErrorLogging(addFlow.invoke(addFlowInput), LOG, "addFlow");
if (i % batchSize == 0) {
try {
import static java.util.Objects.requireNonNull;
import static java.util.Objects.requireNonNullElse;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableClassToInstanceMap;
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
+import java.util.stream.Collectors;
+import javax.annotation.PreDestroy;
+import javax.inject.Inject;
+import javax.inject.Singleton;
import javax.management.InstanceAlreadyExistsException;
import javax.management.MBeanRegistrationException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.opendaylight.infrautils.utils.concurrent.LoggingFutures;
import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.RpcConsumerRegistry;
+import org.opendaylight.mdsal.binding.api.RpcProviderService;
import org.opendaylight.mdsal.binding.api.WriteTransaction;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsDs;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsDsInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsDsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsRpc;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsRpcInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsRpcOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.BulkFlowBaseContentGrouping;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddMultiple;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddMultipleInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddMultipleOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddTest;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddTestInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowRpcAddTestOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowTest;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowTestInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.FlowTestOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.ReadFlowTest;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.ReadFlowTestInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.ReadFlowTestOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.Register;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RegisterInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RegisterInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RegisterOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsDs;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsDsInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsDsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsRpc;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsRpcInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsRpcOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.SalBulkFlowService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.TableTest;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.TableTestInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.TableTestOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.bulk.flow.ds.list.grouping.BulkFlowDsItem;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlow;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlow;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.Rpc;
import org.opendaylight.yangtools.yang.common.ErrorType;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Simple implementation providing bulk flows operations.
*/
-public class SalBulkFlowServiceImpl implements SalBulkFlowService {
+@Singleton
+@Component(service = { })
+public final class SalBulkFlowRpcs implements AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(SalBulkFlowRpcs.class);
- private static final Logger LOG = LoggerFactory.getLogger(SalBulkFlowServiceImpl.class);
-
- private final SalFlowService flowService;
- private final DataBroker dataBroker;
private final FlowCounter flowCounterBeanImpl = new FlowCounter();
private final ExecutorService fjService = new ForkJoinPool();
+ private final DataBroker dataBroker;
+ private final AddFlow addFlow;
+ private final RemoveFlow removeFlow;
+ private final Registration reg;
- public SalBulkFlowServiceImpl(final SalFlowService flowService, final DataBroker dataBroker) {
- this.flowService = requireNonNull(flowService);
+ @Inject
+ @Activate
+ public SalBulkFlowRpcs(@Reference final DataBroker dataBroker, @Reference final RpcConsumerRegistry rpcService,
+ @Reference final RpcProviderService rpcProviderService) {
this.dataBroker = requireNonNull(dataBroker);
-
+ addFlow = rpcService.getRpc(AddFlow.class);
+ removeFlow = rpcService.getRpc(RemoveFlow.class);
+ reg = rpcProviderService.registerRpcImplementations(ImmutableClassToInstanceMap.<Rpc<?, ?>>builder()
+ .put(Register.class, this::register)
+ .put(AddFlowsRpc.class, this::addFlowsRpc)
+ .put(RemoveFlowsRpc.class, this::removeFlowsRpc)
+ .put(AddFlowsDs.class, this::addFlowsDs)
+ .put(RemoveFlowsDs.class, this::removeFlowsDs)
+ .put(FlowTest.class, this::flowTest)
+ .put(ReadFlowTest.class, this::readFlowTest)
+ .put(FlowRpcAddTest.class, this::flowRpcAddTest)
+ .put(FlowRpcAddMultiple.class, this::flowRpcAddMultiple)
+ .put(TableTest.class, this::tableTest)
+ .build());
LoggingFutures.addErrorLogging(register(new RegisterInputBuilder().build()), LOG, "register");
}
+ @PreDestroy
+ @Deactivate
@Override
- public ListenableFuture<RpcResult<AddFlowsDsOutput>> addFlowsDs(final AddFlowsDsInput input) {
+ public void close() {
+ reg.close();
+ }
+
+ @VisibleForTesting
+ ListenableFuture<RpcResult<AddFlowsDsOutput>> addFlowsDs(final AddFlowsDsInput input) {
WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
boolean createParentsNextTime = requireNonNullElse(input.getAlwaysCreateParents(), Boolean.FALSE);
boolean createParents = true;
- for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) {
+ for (BulkFlowDsItem bulkFlow : input.nonnullBulkFlowDsItem()) {
FlowBuilder flowBuilder = new FlowBuilder(bulkFlow);
flowBuilder.setTableId(bulkFlow.getTableId());
flowBuilder.setId(new FlowId(bulkFlow.getFlowId()));
.child(Flow.class, new FlowKey(new FlowId(bulkFlow.getFlowId())));
}
- @Override
- public ListenableFuture<RpcResult<RemoveFlowsDsOutput>> removeFlowsDs(final RemoveFlowsDsInput input) {
+ @VisibleForTesting
+ ListenableFuture<RpcResult<RemoveFlowsDsOutput>> removeFlowsDs(final RemoveFlowsDsInput input) {
WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
- for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) {
+ for (BulkFlowDsItem bulkFlow : input.nonnullBulkFlowDsItem()) {
writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow));
}
return Futures.transform(handleResultFuture(Futures.allAsList(writeTransaction.commit())), voidRpcResult -> {
return rpcResult;
}
- @Override
- public ListenableFuture<RpcResult<AddFlowsRpcOutput>> addFlowsRpc(final AddFlowsRpcInput input) {
+ @VisibleForTesting
+ ListenableFuture<RpcResult<AddFlowsRpcOutput>> addFlowsRpc(final AddFlowsRpcInput input) {
List<ListenableFuture<RpcResult<AddFlowOutput>>> bulkResults = new ArrayList<>();
for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) {
final NodeRef nodeRef = bulkFlow.getNode();
flowInputBuilder.setNode(nodeRef);
flowInputBuilder.setTableId(bulkFlow.getTableId());
- bulkResults.add(flowService.addFlow(flowInputBuilder.build()));
+ bulkResults.add(addFlow.invoke(flowInputBuilder.build()));
}
return Futures.transform(handleResultFuture(Futures.allAsList(bulkResults)), voidRpcResult -> {
if (voidRpcResult.isSuccessful()) {
},MoreExecutors.directExecutor());
}
- @Override
- public ListenableFuture<RpcResult<ReadFlowTestOutput>> readFlowTest(final ReadFlowTestInput input) {
+ @VisibleForTesting
+ ListenableFuture<RpcResult<ReadFlowTestOutput>> readFlowTest(final ReadFlowTestInput input) {
FlowReader flowReader = FlowReader.getNewInstance(dataBroker, input.getDpnCount().intValue(),
input.getFlowsPerDpn().intValue(), input.getVerbose(), input.getIsConfigDs(),
input.getStartTableId().shortValue(), input.getEndTableId().shortValue());
return Futures.immediateFuture(rpcResultBuilder.build());
}
- @Override
- public ListenableFuture<RpcResult<FlowRpcAddTestOutput>> flowRpcAddTest(final FlowRpcAddTestInput input) {
- FlowWriterDirectOFRpc flowAddRpcTestImpl = new FlowWriterDirectOFRpc(dataBroker, flowService, fjService);
+ @VisibleForTesting
+ ListenableFuture<RpcResult<FlowRpcAddTestOutput>> flowRpcAddTest(final FlowRpcAddTestInput input) {
+ FlowWriterDirectOFRpc flowAddRpcTestImpl = new FlowWriterDirectOFRpc(dataBroker, fjService, addFlow);
flowAddRpcTestImpl.rpcFlowAdd(input.getDpnId(), input.getFlowCount().intValue(),
input.getRpcBatchSize().intValue());
return Futures.immediateFuture(rpcResultBuilder.build());
}
- @Override
- public ListenableFuture<RpcResult<RegisterOutput>> register(final RegisterInput input) {
+ private ListenableFuture<RpcResult<RegisterOutput>> register(final RegisterInput input) {
RpcResultBuilder<RegisterOutput> rpcResultBuilder = RpcResultBuilder.success();
try {
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
return Futures.immediateFuture(rpcResultBuilder.build());
}
- @Override
- public ListenableFuture<RpcResult<RemoveFlowsRpcOutput>> removeFlowsRpc(final RemoveFlowsRpcInput input) {
- List<ListenableFuture<RpcResult<RemoveFlowOutput>>> bulkResults = new ArrayList<>();
-
- for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) {
- RemoveFlowInputBuilder flowInputBuilder = new RemoveFlowInputBuilder(
- (org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) bulkFlow);
- final NodeRef nodeRef = bulkFlow.getNode();
- flowInputBuilder.setNode(nodeRef);
- flowInputBuilder.setTableId(bulkFlow.getTableId());
- bulkResults.add(flowService.removeFlow(flowInputBuilder.build()));
- }
- return Futures.transform(handleResultFuture(Futures.allAsList(bulkResults)), voidRpcResult -> {
- if (voidRpcResult.isSuccessful()) {
- return RpcResultBuilder.<RemoveFlowsRpcOutput>success().build();
- } else {
- return RpcResultBuilder.<RemoveFlowsRpcOutput>failed().build();
- }
- }, MoreExecutors.directExecutor());
+ @VisibleForTesting
+ ListenableFuture<RpcResult<RemoveFlowsRpcOutput>> removeFlowsRpc(final RemoveFlowsRpcInput input) {
+ return Futures.transform(handleResultFuture(
+ Futures.allAsList(input.nonnullBulkFlowItem().stream()
+ .map(bulkFlow -> removeFlow.invoke(new RemoveFlowInputBuilder(
+ (org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) bulkFlow)
+ .setNode(bulkFlow.getNode())
+ .setTableId(bulkFlow.getTableId())
+ .build()))
+ .collect(Collectors.toList()))),
+ voidRpcResult -> {
+ if (voidRpcResult.isSuccessful()) {
+ return RpcResultBuilder.<RemoveFlowsRpcOutput>success().build();
+ } else {
+ return RpcResultBuilder.<RemoveFlowsRpcOutput>failed().build();
+ }
+ }, MoreExecutors.directExecutor());
}
- @Override
- public ListenableFuture<RpcResult<FlowTestOutput>> flowTest(final FlowTestInput input) {
+ @VisibleForTesting
+ ListenableFuture<RpcResult<FlowTestOutput>> flowTest(final FlowTestInput input) {
if (input.getTxChain()) {
FlowWriterTxChain flowTester = new FlowWriterTxChain(dataBroker, fjService);
flowCounterBeanImpl.setWriter(flowTester);
return Futures.immediateFuture(rpcResultBuilder.build());
}
- @Override
- public ListenableFuture<RpcResult<TableTestOutput>> tableTest(final TableTestInput input) {
- final TableWriter writer = new TableWriter(dataBroker, fjService);
+ @VisibleForTesting
+ ListenableFuture<RpcResult<TableTestOutput>> tableTest(final TableTestInput input) {
+ final var writer = new TableWriter(dataBroker, fjService);
flowCounterBeanImpl.setWriter(writer);
switch (input.getOperation()) {
case Add:
return Futures.immediateFuture(rpcResultBuilder.build());
}
- @Override
- public ListenableFuture<RpcResult<FlowRpcAddMultipleOutput>> flowRpcAddMultiple(
- final FlowRpcAddMultipleInput input) {
- FlowWriterDirectOFRpc flowTesterRPC = new FlowWriterDirectOFRpc(dataBroker, flowService, fjService);
- flowTesterRPC.rpcFlowAddAll(input.getFlowCount().intValue(), input.getRpcBatchSize().intValue());
- RpcResultBuilder<FlowRpcAddMultipleOutput> rpcResultBuilder = RpcResultBuilder.success();
- return Futures.immediateFuture(rpcResultBuilder.build());
+ @VisibleForTesting
+ ListenableFuture<RpcResult<FlowRpcAddMultipleOutput>> flowRpcAddMultiple(final FlowRpcAddMultipleInput input) {
+ new FlowWriterDirectOFRpc(dataBroker, fjService, addFlow)
+ .rpcFlowAddAll(input.getFlowCount().intValue(), input.getRpcBatchSize().intValue());
+ return RpcResultBuilder.<FlowRpcAddMultipleOutput>success().buildFuture();
}
}
import org.mockito.junit.MockitoJUnitRunner;
import org.opendaylight.mdsal.binding.api.DataBroker;
import org.opendaylight.mdsal.binding.api.ReadTransaction;
+import org.opendaylight.mdsal.binding.api.RpcConsumerRegistry;
+import org.opendaylight.mdsal.binding.api.RpcProviderService;
import org.opendaylight.mdsal.binding.api.WriteTransaction;
import org.opendaylight.mdsal.common.api.CommitInfo;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.bulk.flow.list.grouping.BulkFlowItemBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlow;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlow;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yangtools.yang.common.Uint8;
/**
- * Test for {@link SalBulkFlowServiceImpl}.
+ * Test for {@link SalBulkFlowRpcs}.
*/
@RunWith(MockitoJUnitRunner.class)
-public class SalBulkFlowServiceImplTest {
-
+public class SalBulkFlowRpcsTest {
@Mock
private DataBroker mockDataBroker;
@Mock
- private SalFlowService mockSalFlowService;
+ private RpcConsumerRegistry mockRpcService;
+ @Mock
+ private RpcProviderService mockRpcProviderService;
@Mock
private WriteTransaction writeTransaction;
@Mock
private Nodes mockNodes;
@Mock
private Node mockNode;
+ @Mock
+ private AddFlow addFlow;
+ @Mock
+ private RemoveFlow removeFlow;
@Captor
private ArgumentCaptor<Flow> flowArgumentCaptor;
- private SalBulkFlowServiceImpl salBulkFlowService;
+ private SalBulkFlowRpcs salBulkFlowService;
@Before
public void setUp() {
lenient().doReturn(FluentFutures.immediateFluentFuture(Optional.of(mockNode))).when(readOnlyTransaction)
.read(any(LogicalDatastoreType.class), any());
- salBulkFlowService = new SalBulkFlowServiceImpl(mockSalFlowService, mockDataBroker);
+
+ doReturn(addFlow).when(mockRpcService).getRpc(AddFlow.class);
+ doReturn(removeFlow).when(mockRpcService).getRpc(RemoveFlow.class);
+ salBulkFlowService = new SalBulkFlowRpcs(mockDataBroker, mockRpcService, mockRpcProviderService);
}
@Test
@Test
public void testAddRemoveFlowsRpc() {
- Mockito.when(mockSalFlowService.addFlow(ArgumentMatchers.any()))
+ Mockito.when(addFlow.invoke(ArgumentMatchers.any()))
.thenReturn(RpcResultBuilder.success(new AddFlowOutputBuilder().build()).buildFuture());
-
- Mockito.when(mockSalFlowService.removeFlow(ArgumentMatchers.any()))
- .thenReturn(RpcResultBuilder.success(new RemoveFlowOutputBuilder().build()).buildFuture());
+ Mockito.when(removeFlow.invoke(ArgumentMatchers.any()))
+ .thenReturn(RpcResultBuilder.success(new RemoveFlowOutputBuilder().build()).buildFuture());
final BulkFlowItemBuilder bulkFlowItemBuilder = new BulkFlowItemBuilder();
final InstanceIdentifier<Node> nodeId = BulkOMaticUtils.getFlowCapableNodeId("1");
final AddFlowsRpcInput addFlowsRpcInput = addFlowsRpcInputBuilder.build();
salBulkFlowService.addFlowsRpc(addFlowsRpcInput);
- verify(mockSalFlowService).addFlow(ArgumentMatchers.any());
+ verify(addFlow).invoke(ArgumentMatchers.any());
final RemoveFlowsRpcInputBuilder removeFlowsRpcInputBuilder = new RemoveFlowsRpcInputBuilder();
removeFlowsRpcInputBuilder.setBulkFlowItem(bulkFlowItems);
final RemoveFlowsRpcInput removeFlowsRpcInput = removeFlowsRpcInputBuilder.build();
salBulkFlowService.removeFlowsRpc(removeFlowsRpcInput);
- verify(mockSalFlowService).removeFlow(ArgumentMatchers.any());
+ verify(removeFlow).invoke(ArgumentMatchers.any());
}
@Test