/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2015, 2017 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsDsInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsRpcInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.BulkFlowBaseContentGrouping;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.RemoveFlowsRpcInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.SalBulkFlowService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.TableTestInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.TableTestInput.Operation;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.bulk.flow.ds.list.grouping.BulkFlowDsItem;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
private final DataBroker dataBroker;
private final FlowCounter flowCounterBeanImpl = new FlowCounter();
private final ExecutorService fjService = new ForkJoinPool();
+
public SalBulkFlowServiceImpl(SalFlowService flowService, DataBroker dataBroker) {
this.flowService = Preconditions.checkNotNull(flowService);
this.dataBroker = Preconditions.checkNotNull(dataBroker);
flowBuilder.build(), createParents);
createParents = createParentsNextTime;
}
- CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTransaction.submit();
- return handleResultFuture(submitFuture);
+ ListenableFuture<Void> submitFuture = writeTransaction.submit();
+ return handleResultFuture(Futures.allAsList(submitFuture));
}
private InstanceIdentifier<Flow> getFlowInstanceIdentifier(BulkFlowDsItem bulkFlow) {
final NodeRef nodeRef = bulkFlow.getNode();
- return ((InstanceIdentifier<Node>) nodeRef.getValue())
- .augmentation(FlowCapableNode.class)
+ return ((InstanceIdentifier<Node>) nodeRef.getValue()).augmentation(FlowCapableNode.class)
.child(Table.class, new TableKey(bulkFlow.getTableId()))
- .child(Flow.class,
- new FlowKey(new FlowId(bulkFlow.getFlowId())));
+ .child(Flow.class, new FlowKey(new FlowId(bulkFlow.getFlowId())));
}
@Override
for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) {
writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow));
}
- CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTransaction.submit();
- return handleResultFuture(submitFuture);
- }
-
- private ListenableFuture<RpcResult<Void>> handleResultFuture(CheckedFuture<Void,
- TransactionCommitFailedException> submitFuture) {
- final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
- Futures.addCallback(submitFuture, new FutureCallback<Void>() {
- @Override
- public void onSuccess(Void result) {
- rpcResult.set(RpcResultBuilder.success(result).build());
- }
-
- @Override
- public void onFailure(Throwable t) {
- RpcResultBuilder<Void> rpcResultBld = RpcResultBuilder.<Void>failed()
- .withRpcErrors(Collections.singleton(
- RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, null, t.getMessage())
- ));
- rpcResult.set(rpcResultBld.build());
- }
- });
- return rpcResult;
+ return handleResultFuture(Futures.allAsList(writeTransaction.submit()));
}
private <T> ListenableFuture<RpcResult<Void>> handleResultFuture(ListenableFuture<List<T>> submitFuture) {
}
@Override
- public void onFailure(Throwable t) {
+ public void onFailure(Throwable throwable) {
RpcResultBuilder<Void> rpcResultBld = RpcResultBuilder.<Void>failed()
- .withRpcErrors(Collections.singleton(
- RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, null, t.getMessage())
- ));
+ .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION,
+ null, throwable.getMessage())));
rpcResult.set(rpcResultBld.build());
}
- });
+ }, MoreExecutors.directExecutor());
return rpcResult;
}
List<ListenableFuture<RpcResult<AddFlowOutput>>> bulkResults = new ArrayList<>();
for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) {
- AddFlowInputBuilder flowInputBuilder = new AddFlowInputBuilder((org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) bulkFlow);
+ AddFlowInputBuilder flowInputBuilder = new AddFlowInputBuilder(
+ (org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) bulkFlow);
final NodeRef nodeRef = bulkFlow.getNode();
flowInputBuilder.setNode(nodeRef);
flowInputBuilder.setTableId(bulkFlow.getTableId());
@Override
public Future<RpcResult<Void>> readFlowTest(ReadFlowTestInput input) {
- FlowReader flowReader = FlowReader.getNewInstance(dataBroker,
- input.getDpnCount().intValue(),
- input.getFlowsPerDpn().intValue(), input.isVerbose(),
- input.isIsConfigDs(),input.getStartTableId().shortValue(),
- input.getEndTableId().shortValue());
+ FlowReader flowReader = FlowReader.getNewInstance(dataBroker, input.getDpnCount().intValue(),
+ input.getFlowsPerDpn().intValue(), input.isVerbose(), input.isIsConfigDs(),
+ input.getStartTableId().shortValue(), input.getEndTableId().shortValue());
flowCounterBeanImpl.setReader(flowReader);
fjService.execute(flowReader);
RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
@Override
public Future<RpcResult<Void>> flowRpcAddTest(FlowRpcAddTestInput input) {
FlowWriterDirectOFRpc flowAddRpcTestImpl = new FlowWriterDirectOFRpc(dataBroker, flowService, fjService);
- flowAddRpcTestImpl.rpcFlowAdd(
- input.getDpnId(),
- input.getFlowCount().intValue(),
+ flowAddRpcTestImpl.rpcFlowAdd(input.getDpnId(), input.getFlowCount().intValue(),
input.getRpcBatchSize().intValue());
-
RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
return Futures.immediateFuture(rpcResultBuilder.build());
}
public Future<RpcResult<Void>> register() {
RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.success();
try {
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- String pathToMBean = String.format("%s:type=%s",
- FlowCounter.class.getPackage().getName(),
- FlowCounter.class.getSimpleName());
- ObjectName name = null;
-
- name = new ObjectName(pathToMBean);
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ String pathToMBean = String.format("%s:type=%s", FlowCounter.class.getPackage().getName(),
+ FlowCounter.class.getSimpleName());
+ ObjectName name = new ObjectName(pathToMBean);
mbs.registerMBean(flowCounterBeanImpl, name);
- } catch (MalformedObjectNameException | InstanceAlreadyExistsException
- | MBeanRegistrationException | NotCompliantMBeanException e) {
+ } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException
+ | NotCompliantMBeanException e) {
rpcResultBuilder = RpcResultBuilder.failed();
LOG.warn("Exception occurred: {} ", e.getMessage(), e);
}
List<ListenableFuture<RpcResult<RemoveFlowOutput>>> bulkResults = new ArrayList<>();
for (BulkFlowBaseContentGrouping bulkFlow : input.getBulkFlowItem()) {
- RemoveFlowInputBuilder flowInputBuilder = new RemoveFlowInputBuilder((org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) bulkFlow);
+ RemoveFlowInputBuilder flowInputBuilder = new RemoveFlowInputBuilder(
+ (org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) bulkFlow);
final NodeRef nodeRef = bulkFlow.getNode();
flowInputBuilder.setNode(nodeRef);
flowInputBuilder.setTableId(bulkFlow.getTableId());
if (input.isTxChain()) {
FlowWriterTxChain flowTester = new FlowWriterTxChain(dataBroker, fjService);
flowCounterBeanImpl.setWriter(flowTester);
- if (input.isIsAdd()){
+ if (input.isIsAdd()) {
flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
input.getBatchSize().intValue(), input.getSleepFor().intValue(),
input.getSleepAfter().intValue(), input.getStartTableId().shortValue(),
if (input.isSeq()) {
FlowWriterSequential flowTester = new FlowWriterSequential(dataBroker, fjService);
flowCounterBeanImpl.setWriter(flowTester);
- if (input.isIsAdd()){
+ if (input.isIsAdd()) {
flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
input.getBatchSize().intValue(), input.getSleepFor().intValue(),
input.getStartTableId().shortValue(), input.getEndTableId().shortValue(),
} else {
FlowWriterConcurrent flowTester = new FlowWriterConcurrent(dataBroker, fjService);
flowCounterBeanImpl.setWriter(flowTester);
- if (input.isIsAdd()){
+ if (input.isIsAdd()) {
flowTester.addFlows(input.getDpnCount().intValue(), input.getFlowsPerDpn().intValue(),
input.getBatchSize().intValue(), input.getSleepFor().intValue(),
input.getSleepAfter().intValue(), input.getStartTableId().shortValue(),
flowCounterBeanImpl.setWriter(writer);
switch (input.getOperation()) {
case Add:
- writer.addTables(input.getDpnCount().intValue(),
- input.getStartTableId().shortValue(), input.getEndTableId().shortValue());
+ writer.addTables(input.getDpnCount().intValue(), input.getStartTableId().shortValue(),
+ input.getEndTableId().shortValue());
break;
case Delete:
- writer.deleteTables(input.getDpnCount().intValue(),
- input.getStartTableId().shortValue(), input.getEndTableId().shortValue());
+ writer.deleteTables(input.getDpnCount().intValue(), input.getStartTableId().shortValue(),
+ input.getEndTableId().shortValue());
break;
default:
RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.failed();