import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.Future;
import javax.management.InstanceAlreadyExistsException;
import javax.management.MBeanRegistrationException;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.infrautils.utils.concurrent.JdkFutures;
+import org.opendaylight.infrautils.utils.concurrent.LoggingFutures;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.WriteTransaction;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsDsInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsDsOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.bulk.flow.service.rev150608.AddFlowsRpcInput;
this.flowService = Preconditions.checkNotNull(flowService);
this.dataBroker = Preconditions.checkNotNull(dataBroker);
- JdkFutures.addErrorLogging(register(new RegisterInputBuilder().build()), LOG, "register");
+ LoggingFutures.addErrorLogging(register(new RegisterInputBuilder().build()), LOG, "register");
}
@Override
FlowBuilder flowBuilder = new FlowBuilder(bulkFlow);
flowBuilder.setTableId(bulkFlow.getTableId());
flowBuilder.setId(new FlowId(bulkFlow.getFlowId()));
- writeTransaction.put(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow),
- flowBuilder.build(), createParents);
+ if (createParents) {
+ writeTransaction.mergeParentStructurePut(LogicalDatastoreType.CONFIGURATION,
+ getFlowInstanceIdentifier(bulkFlow),
+ flowBuilder.build());
+ } else {
+ writeTransaction.put(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow),
+ flowBuilder.build());
+ }
createParents = createParentsNextTime;
}
- ListenableFuture<Void> submitFuture = writeTransaction.submit();
+ FluentFuture<?> submitFuture = writeTransaction.commit();
return Futures.transform(handleResultFuture(Futures.allAsList(submitFuture)), voidRpcResult -> {
if (voidRpcResult.isSuccessful()) {
return RpcResultBuilder.<AddFlowsDsOutput>success().build();
for (BulkFlowDsItem bulkFlow : input.getBulkFlowDsItem()) {
writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, getFlowInstanceIdentifier(bulkFlow));
}
- return Futures.transform(handleResultFuture(Futures.allAsList(writeTransaction.submit())), voidRpcResult -> {
+ return Futures.transform(handleResultFuture(Futures.allAsList(writeTransaction.commit())), voidRpcResult -> {
if (voidRpcResult.isSuccessful()) {
return RpcResultBuilder.<RemoveFlowsDsOutput>success().build();
} else {
final NodeRef nodeRef = bulkFlow.getNode();
flowInputBuilder.setNode(nodeRef);
flowInputBuilder.setTableId(bulkFlow.getTableId());
- Future<RpcResult<AddFlowOutput>> rpcAddFlowResult = flowService.addFlow(flowInputBuilder.build());
- bulkResults.add(JdkFutureAdapters.listenInPoolThread(rpcAddFlowResult));
+ bulkResults.add(flowService.addFlow(flowInputBuilder.build()));
}
return Futures.transform(handleResultFuture(Futures.allAsList(bulkResults)), voidRpcResult -> {
if (voidRpcResult.isSuccessful()) {
} catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException
| NotCompliantMBeanException e) {
rpcResultBuilder = RpcResultBuilder.failed();
- LOG.warn("Exception occurred: {} ", e);
+ LOG.warn("Exception occurred", e);
}
return Futures.immediateFuture(rpcResultBuilder.build());
}
final NodeRef nodeRef = bulkFlow.getNode();
flowInputBuilder.setNode(nodeRef);
flowInputBuilder.setTableId(bulkFlow.getTableId());
- Future<RpcResult<RemoveFlowOutput>> rpcAddFlowResult = flowService.removeFlow(flowInputBuilder.build());
- bulkResults.add(JdkFutureAdapters.listenInPoolThread(rpcAddFlowResult));
+ bulkResults.add(flowService.removeFlow(flowInputBuilder.build()));
}
return Futures.transform(handleResultFuture(Futures.allAsList(bulkResults)), voidRpcResult -> {
if (voidRpcResult.isSuccessful()) {