* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.openflowplugin.applications.bulk.o.matic;
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
+import static java.util.Objects.requireNonNull;
+import static java.util.Objects.requireNonNullElse;
+
+import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.Future;
import javax.management.InstanceAlreadyExistsException;
import javax.management.MBeanRegistrationException;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.infrautils.utils.concurrent.JdkFutures;
+import org.opendaylight.infrautils.utils.concurrent.LoggingFutures;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.WriteTransaction;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsDsInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsDsOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsRpcInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.ErrorType;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.slf4j.Logger;
private final FlowCounter flowCounterBeanImpl = new FlowCounter();
private final ExecutorService fjService = new ForkJoinPool();
- public SalBulkFlowServiceImpl(SalFlowService flowService, DataBroker dataBroker) {
- this.flowService = Preconditions.checkNotNull(flowService);
- this.dataBroker = Preconditions.checkNotNull(dataBroker);
+ public SalBulkFlowServiceImpl(final SalFlowService flowService, final DataBroker dataBroker) {
+ this.flowService = requireNonNull(flowService);
+ this.dataBroker = requireNonNull(dataBroker);
- JdkFutures.addErrorLogging(register(new RegisterInputBuilder().build()), LOG, "register");
+ LoggingFutures.addErrorLogging(register(new RegisterInputBuilder().build()), LOG, "register");
}
@Override
- public ListenableFuture<RpcResult<AddFlowsDsOutput>> addFlowsDs(AddFlowsDsInput input) {
+ public ListenableFuture<RpcResult<AddFlowsDsOutput>> addFlowsDs(final AddFlowsDsInput input) {
WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
- boolean createParentsNextTime = MoreObjects.firstNonNull(input.isAlwaysCreateParents(), Boolean.FALSE);
+ boolean createParentsNextTime = requireNonNullElse(input.getAlwaysCreateParents(), Boolean.FALSE);
boolean createParents = true;
for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) {
FlowBuilder flowBuilder = new FlowBuilder(bulkFlow);
flowBuilder.setTableId(bulkFlow.getTableId());
flowBuilder.setId(new FlowId(bulkFlow.getFlowId()));
- writeTransaction.put(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow),
- flowBuilder.build(), createParents);
+ if (createParents) {
+ writeTransaction.mergeParentStructurePut(LogicalDatastoreType.CONFIGURATION,
+ getFlowInstanceIdentifier(bulkFlow),
+ flowBuilder.build());
+ } else {
+ writeTransaction.put(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow),
+ flowBuilder.build());
+ }
createParents = createParentsNextTime;
}
- ListenableFuture<Void> submitFuture = writeTransaction.submit();
+ FluentFuture<?> submitFuture = writeTransaction.commit();
return Futures.transform(handleResultFuture(Futures.allAsList(submitFuture)), voidRpcResult -> {
if (voidRpcResult.isSuccessful()) {
return RpcResultBuilder.<AddFlowsDsOutput>success().build();
},MoreExecutors.directExecutor());
}
- private InstanceIdentifier<Flow> getFlowInstanceIdentifier(BulkFlowDsItem bulkFlow) {
+ private static InstanceIdentifier<Flow> getFlowInstanceIdentifier(final BulkFlowDsItem bulkFlow) {
final NodeRef nodeRef = bulkFlow.getNode();
return ((InstanceIdentifier<Node>) nodeRef.getValue()).augmentation(FlowCapableNode.class)
.child(Table.class, new TableKey(bulkFlow.getTableId()))
}
@Override
- public ListenableFuture<RpcResult<RemoveFlowsDsOutput>> removeFlowsDs(RemoveFlowsDsInput input) {
+ public ListenableFuture<RpcResult<RemoveFlowsDsOutput>> removeFlowsDs(final RemoveFlowsDsInput input) {
WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) {
writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow));
}
- return Futures.transform(handleResultFuture(Futures.allAsList(writeTransaction.submit())), voidRpcResult -> {
+ return Futures.transform(handleResultFuture(Futures.allAsList(writeTransaction.commit())), voidRpcResult -> {
if (voidRpcResult.isSuccessful()) {
return RpcResultBuilder.<RemoveFlowsDsOutput>success().build();
} else {
}, MoreExecutors.directExecutor());
}
- private <T> ListenableFuture<RpcResult<Void>> handleResultFuture(ListenableFuture<List<T>> submitFuture) {
+ private static <T> ListenableFuture<RpcResult<Void>> handleResultFuture(
+ final ListenableFuture<List<T>> submitFuture) {
final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
Futures.addCallback(submitFuture, new FutureCallback<List<T>>() {
@Override
- public void onSuccess(List<T> result) {
+ public void onSuccess(final List<T> result) {
rpcResult.set(RpcResultBuilder.success((Void) null).build());
}
@Override
- public void onFailure(Throwable throwable) {
+ public void onFailure(final Throwable throwable) {
RpcResultBuilder<Void> rpcResultBld = RpcResultBuilder.<Void>failed()
- .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION,
+ .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(ErrorType.APPLICATION,
null, throwable.getMessage())));
rpcResult.set(rpcResultBld.build());
}
}
@Override
- public ListenableFuture<RpcResult<AddFlowsRpcOutput>> addFlowsRpc(AddFlowsRpcInput input) {
+ public ListenableFuture<RpcResult<AddFlowsRpcOutput>> addFlowsRpc(final AddFlowsRpcInput input) {
List<ListenableFuture<RpcResult<AddFlowOutput>>> bulkResults = new ArrayList<>();
for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) {
final NodeRef nodeRef = bulkFlow.getNode();
flowInputBuilder.setNode(nodeRef);
flowInputBuilder.setTableId(bulkFlow.getTableId());
- Future<RpcResult<AddFlowOutput>> rpcAddFlowResult = flowService.addFlow(flowInputBuilder.build());
- bulkResults.add(JdkFutureAdapters.listenInPoolThread(rpcAddFlowResult));
+ bulkResults.add(flowService.addFlow(flowInputBuilder.build()));
}
return Futures.transform(handleResultFuture(Futures.allAsList(bulkResults)), voidRpcResult -> {
if (voidRpcResult.isSuccessful()) {
}
@Override
- public ListenableFuture<RpcResult<ReadFlowTestOutput>> readFlowTest(ReadFlowTestInput input) {
+ public ListenableFuture<RpcResult<ReadFlowTestOutput>> readFlowTest(final ReadFlowTestInput input) {
FlowReader flowReader = FlowReader.getNewInstance(dataBroker, input.getDpnCount().intValue(),
- input.getFlowsPerDpn().intValue(), input.isVerbose(), input.isIsConfigDs(),
+ input.getFlowsPerDpn().intValue(), input.getVerbose(), input.getIsConfigDs(),
input.getStartTableId().shortValue(), input.getEndTableId().shortValue());
flowCounterBeanImpl.setReader(flowReader);
fjService.execute(flowReader);
}
@Override
- public ListenableFuture<RpcResult<FlowRpcAddTestOutput>> flowRpcAddTest(FlowRpcAddTestInput input) {
+ public ListenableFuture<RpcResult<FlowRpcAddTestOutput>> flowRpcAddTest(final FlowRpcAddTestInput input) {
FlowWriterDirectOFRpc flowAddRpcTestImpl = new FlowWriterDirectOFRpc(dataBroker, flowService, fjService);
flowAddRpcTestImpl.rpcFlowAdd(input.getDpnId(), input.getFlowCount().intValue(),
input.getRpcBatchSize().intValue());
}
@Override
- public ListenableFuture<RpcResult<RegisterOutput>> register(RegisterInput input) {
+ public ListenableFuture<RpcResult<RegisterOutput>> register(final RegisterInput input) {
RpcResultBuilder<RegisterOutput> rpcResultBuilder = RpcResultBuilder.success();
try {
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
} catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException
| NotCompliantMBeanException e) {
rpcResultBuilder = RpcResultBuilder.failed();
- LOG.warn("Exception occurred: {} ", e.getMessage(), e);
+ LOG.warn("Exception occurred", e);
}
return Futures.immediateFuture(rpcResultBuilder.build());
}
@Override
- public ListenableFuture<RpcResult<RemoveFlowsRpcOutput>> removeFlowsRpc(RemoveFlowsRpcInput input) {
+ public ListenableFuture<RpcResult<RemoveFlowsRpcOutput>> removeFlowsRpc(final RemoveFlowsRpcInput input) {
List<ListenableFuture<RpcResult<RemoveFlowOutput>>> bulkResults = new ArrayList<>();
for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) {
final NodeRef nodeRef = bulkFlow.getNode();
flowInputBuilder.setNode(nodeRef);
flowInputBuilder.setTableId(bulkFlow.getTableId());
- Future<RpcResult<RemoveFlowOutput>> rpcAddFlowResult = flowService.removeFlow(flowInputBuilder.build());
- bulkResults.add(JdkFutureAdapters.listenInPoolThread(rpcAddFlowResult));
+ bulkResults.add(flowService.removeFlow(flowInputBuilder.build()));
}
return Futures.transform(handleResultFuture(Futures.allAsList(bulkResults)), voidRpcResult -> {
if (voidRpcResult.isSuccessful()) {
}
@Override
- public ListenableFuture<RpcResult<FlowTestOutput>> flowTest(FlowTestInput input) {
- if (input.isTxChain()) {
+ public ListenableFuture<RpcResult<FlowTestOutput>> flowTest(final FlowTestInput input) {
+ if (input.getTxChain()) {
FlowWriterTxChain flowTester = new FlowWriterTxChain(dataBroker, fjService);
flowCounterBeanImpl.setWriter(flowTester);
- if (input.isIsAdd()) {
+ if (input.getIsAdd()) {
flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
input.getBatchSize().intValue(), input.getSleepFor().intValue(),
input.getSleepAfter().intValue(), input.getStartTableId().shortValue(),
- input.getEndTableId().shortValue(), input.isCreateParents());
+ input.getEndTableId().shortValue(), input.getCreateParents());
} else {
flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
RpcResultBuilder<FlowTestOutput> rpcResultBuilder = RpcResultBuilder.success();
return Futures.immediateFuture(rpcResultBuilder.build());
}
- if (input.isSeq()) {
+ if (input.getSeq()) {
FlowWriterSequential flowTester = new FlowWriterSequential(dataBroker, fjService);
flowCounterBeanImpl.setWriter(flowTester);
- if (input.isIsAdd()) {
+ if (input.getIsAdd()) {
flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
input.getBatchSize().intValue(), input.getSleepFor().intValue(),
input.getStartTableId().shortValue(), input.getEndTableId().shortValue(),
- input.isCreateParents());
+ input.getCreateParents());
} else {
flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
} else {
FlowWriterConcurrent flowTester = new FlowWriterConcurrent(dataBroker, fjService);
flowCounterBeanImpl.setWriter(flowTester);
- if (input.isIsAdd()) {
+ if (input.getIsAdd()) {
flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
input.getBatchSize().intValue(), input.getSleepFor().intValue(),
input.getSleepAfter().intValue(), input.getStartTableId().shortValue(),
- input.getEndTableId().shortValue(), input.isCreateParents());
+ input.getEndTableId().shortValue(), input.getCreateParents());
} else {
flowTester.deleteFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
input.getBatchSize().intValue(), input.getStartTableId().shortValue(),
}
@Override
- public ListenableFuture<RpcResult<FlowRpcAddMultipleOutput>> flowRpcAddMultiple(FlowRpcAddMultipleInput input) {
+ public ListenableFuture<RpcResult<FlowRpcAddMultipleOutput>> flowRpcAddMultiple(
+ final FlowRpcAddMultipleInput input) {
FlowWriterDirectOFRpc flowTesterRPC = new FlowWriterDirectOFRpc(dataBroker, flowService, fjService);
flowTesterRPC.rpcFlowAddAll(input.getFlowCount().intValue(), input.getRpcBatchSize().intValue());
RpcResultBuilder<FlowRpcAddMultipleOutput> rpcResultBuilder = RpcResultBuilder.success();