import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
- import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.JdkFutureAdapters;
import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
- import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.openflowplugin.api.OFConstants;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
import org.opendaylight.openflowplugin.applications.frm.FlowNodeReconciliation;
connectedNode.toString());
reconciliationPreProcess(connectedNode);
}
- LOG.debug("Bundle based reconciliation status : {}",
- provider.isBundleBasedReconciliationEnabled() ? "Enable" : "Disable");
if (provider.isBundleBasedReconciliationEnabled()) {
BundleBasedReconciliationTask bundleBasedReconTask = new BundleBasedReconciliationTask(connectedNode);
return JdkFutureAdapters.listenInPoolThread(executor.submit(bundleBasedReconTask));
Optional<FlowCapableNode> flowNode = Optional.absent();
BundleId bundleIdValue = new BundleId(BUNDLE_ID.getAndIncrement());
BigInteger dpnId = getDpnIdFromNodeName(node);
- LOG.debug("Triggering bundle based reconciliation for device :{}", dpnId);
+ LOG.info("Triggering bundle based reconciliation for device : {}", dpnId);
ReadOnlyTransaction trans = provider.getReadTranaction();
try {
flowNode = trans.read(LogicalDatastoreType.CONFIGURATION, nodeIdentity).get();
LOG.debug("FlowNode present for Datapath ID {}", dpnId);
final NodeRef nodeRef = new NodeRef(nodeIdentity.firstIdentifierOf(Node.class));
+ final ControlBundleInput closeBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
+ .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
+ .setType(BundleControlType.ONFBCTCLOSEREQUEST).build();
+
final ControlBundleInput openBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
.setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS).setType(BundleControlType.ONFBCTOPENREQUEST)
.build();
.setNode(nodeRef).setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
.setMessages(createMessages(nodeRef, flowNode)).build();
- ListenableFuture<RpcResult<ControlBundleOutput>> openBundle
- = salBundleService.controlBundle(openBundleInput);
+ /* Close previously opened bundle on the openflow switch if any */
+ ListenableFuture<RpcResult<ControlBundleOutput>> closeBundle
+ = salBundleService.controlBundle(closeBundleInput);
- ListenableFuture<RpcResult<AddBundleMessagesOutput>> addBundleMessagesFuture = Futures
- .transformAsync(JdkFutureAdapters.listenInPoolThread(openBundle), rpcResult -> {
+ /* Open a new bundle on the switch */
+ ListenableFuture<RpcResult<ControlBundleOutput>> openBundle =
+ Futures.transformAsync(closeBundle,
+ rpcResult -> salBundleService.controlBundle(openBundleInput),
+ MoreExecutors.directExecutor());
+
+ /* Push groups and flows via bundle add messages */
+ ListenableFuture<RpcResult<AddBundleMessagesOutput>> addBundleMessagesFuture
+ = Futures.transformAsync(openBundle, rpcResult -> {
if (rpcResult.isSuccessful()) {
- return JdkFutureAdapters
- .listenInPoolThread(salBundleService.addBundleMessages(addBundleMessagesInput));
+ return salBundleService.addBundleMessages(addBundleMessagesInput);
}
return Futures.immediateFuture(null);
}, MoreExecutors.directExecutor());
+
+ /* Commit the bundle on the openflow switch */
ListenableFuture<RpcResult<ControlBundleOutput>> commitBundleFuture
= Futures.transformAsync(addBundleMessagesFuture, rpcResult -> {
if (rpcResult.isSuccessful()) {
- return JdkFutureAdapters
- .listenInPoolThread(salBundleService.controlBundle(commitBundleInput));
+ return salBundleService.controlBundle(commitBundleInput);
}
return Futures.immediateFuture(null);
}, MoreExecutors.directExecutor());
writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, staleFlowIId);
}
- CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTransaction.submit();
+ ListenableFuture<Void> submitFuture = writeTransaction.submit();
handleStaleEntityDeletionResultFuture(submitFuture);
}
writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, staleGroupIId);
}
- CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTransaction.submit();
+ ListenableFuture<Void> submitFuture = writeTransaction.submit();
handleStaleEntityDeletionResultFuture(submitFuture);
}
writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, staleMeterIId);
}
- CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTransaction.submit();
+ ListenableFuture<Void> submitFuture = writeTransaction.submit();
handleStaleEntityDeletionResultFuture(submitFuture);
}
return nodeIdent.child(StaleMeter.class, new StaleMeterKey(new MeterId(staleMeter.getMeterId())));
}
- private void handleStaleEntityDeletionResultFuture(
- CheckedFuture<Void, TransactionCommitFailedException> submitFuture) {
+ private void handleStaleEntityDeletionResultFuture(ListenableFuture<Void> submitFuture) {
Futures.addCallback(submitFuture, new FutureCallback<Void>() {
@Override
public void onSuccess(Void result) {