import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
private static final long MAX_ADD_GROUP_TIMEOUT = TimeUnit.SECONDS.toNanos(20);
private static final String SEPARATOR = ":";
private static final int THREAD_POOL_SIZE = 4;
-
+ private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder()
+ .setNameFormat("BundleResync-%d")
+ .setDaemon(false)
+ .setUncaughtExceptionHandler((thread, ex) -> LOG.error("Uncaught exception {}", thread, ex))
+ .build();
private final DataBroker dataBroker;
private final ForwardingRulesManager provider;
private final String serviceName;
Optional<FlowCapableNode> flowNode = Optional.empty();
BundleId bundleIdValue = new BundleId(BUNDLE_ID.getAndIncrement());
BigInteger dpnId = getDpnIdFromNodeName(node);
+ ExecutorService service = Executors.newSingleThreadExecutor(THREAD_FACTORY);
LOG.info("Triggering bundle based reconciliation for device : {}", dpnId);
try (ReadTransaction trans = provider.getReadTransaction()) {
flowNode = trans.read(LogicalDatastoreType.CONFIGURATION, nodeIdentity).get();
} catch (ExecutionException | InterruptedException e) {
LOG.error("Error occurred while reading the configuration data store for node {}", nodeIdentity, e);
}
-
- if (flowNode.isPresent()) {
- LOG.debug("FlowNode present for Datapath ID {}", dpnId);
- OF_EVENT_LOG.debug("Bundle Reconciliation Start, Node: {}", dpnId);
- final NodeRef nodeRef = new NodeRef(nodeIdentity.firstIdentifierOf(Node.class));
-
- final ControlBundleInput closeBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
- .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
- .setType(BundleControlType.ONFBCTCLOSEREQUEST).build();
-
- final ControlBundleInput openBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
- .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS).setType(BundleControlType.ONFBCTOPENREQUEST)
- .build();
-
- final ControlBundleInput commitBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
- .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
- .setType(BundleControlType.ONFBCTCOMMITREQUEST).build();
-
- final AddBundleMessagesInput deleteAllFlowGroupsInput = new AddBundleMessagesInputBuilder()
- .setNode(nodeRef).setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
- .setMessages(createMessages(nodeRef)).build();
-
- LOG.debug("Closing openflow bundle for device {}", dpnId);
- /* Close previously opened bundle on the openflow switch if any */
- ListenableFuture<RpcResult<ControlBundleOutput>> closeBundle
- = salBundleService.controlBundle(closeBundleInput);
-
- /* Open a new bundle on the switch */
- ListenableFuture<RpcResult<ControlBundleOutput>> openBundle =
+ try {
+ if (flowNode.isPresent()) {
+ LOG.debug("FlowNode present for Datapath ID {}", dpnId);
+ OF_EVENT_LOG.debug("Bundle Reconciliation Start, Node: {}", dpnId);
+ final NodeRef nodeRef = new NodeRef(nodeIdentity.firstIdentifierOf(Node.class));
+
+ final ControlBundleInput closeBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
+ .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
+ .setType(BundleControlType.ONFBCTCLOSEREQUEST).build();
+
+ final ControlBundleInput openBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
+ .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
+ .setType(BundleControlType.ONFBCTOPENREQUEST).build();
+
+ final ControlBundleInput commitBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
+ .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
+ .setType(BundleControlType.ONFBCTCOMMITREQUEST).build();
+
+ final AddBundleMessagesInput deleteAllFlowGroupsInput = new AddBundleMessagesInputBuilder()
+ .setNode(nodeRef).setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
+ .setMessages(createMessages(nodeRef)).build();
+
+ LOG.debug("Closing openflow bundle for device {}", dpnId);
+ /* Close previously opened bundle on the openflow switch if any */
+ ListenableFuture<RpcResult<ControlBundleOutput>> closeBundle
+ = salBundleService.controlBundle(closeBundleInput);
+
+ /* Open a new bundle on the switch */
+ ListenableFuture<RpcResult<ControlBundleOutput>> openBundle =
Futures.transformAsync(closeBundle,
rpcResult -> salBundleService.controlBundle(openBundleInput),
- MoreExecutors.directExecutor());
-
- /* Push groups and flows via bundle add messages */
- ListenableFuture<RpcResult<AddBundleMessagesOutput>> deleteAllFlowGroupsFuture
- = Futures.transformAsync(openBundle, rpcResult -> {
- if (rpcResult.isSuccessful()) {
- return salBundleService.addBundleMessages(deleteAllFlowGroupsInput);
- }
- return Futures.immediateFuture(null);
- }, MoreExecutors.directExecutor());
+ service);
- /* Push flows and groups via bundle add messages */
- Optional<FlowCapableNode> finalFlowNode = flowNode;
- ListenableFuture<List<RpcResult<AddBundleMessagesOutput>>> addbundlesFuture
- = Futures.transformAsync(deleteAllFlowGroupsFuture, rpcResult -> {
+ /* Push groups and flows via bundle add messages */
+ ListenableFuture<RpcResult<AddBundleMessagesOutput>> deleteAllFlowGroupsFuture
+ = Futures.transformAsync(openBundle, rpcResult -> {
+ if (rpcResult.isSuccessful()) {
+ return salBundleService.addBundleMessages(deleteAllFlowGroupsInput);
+ }
+ return Futures.immediateFuture(null);
+ }, service);
+
+ /* Push flows and groups via bundle add messages */
+ Optional<FlowCapableNode> finalFlowNode = flowNode;
+ ListenableFuture<List<RpcResult<AddBundleMessagesOutput>>> addbundlesFuture
+ = Futures.transformAsync(deleteAllFlowGroupsFuture, rpcResult -> {
+ if (rpcResult.isSuccessful()) {
+ LOG.debug("Adding delete all flow/group message is successful for device {}",dpnId);
+ return Futures.allAsList(addBundleMessages(finalFlowNode.get(), bundleIdValue,
+ nodeIdentity));
+ }
+ return Futures.immediateFuture(null);
+ }, service);
+
+ /* Commit the bundle on the openflow switch */
+ ListenableFuture<RpcResult<ControlBundleOutput>> commitBundleFuture = Futures.transformAsync(
+ addbundlesFuture, rpcResult -> {
+ LOG.debug("Adding bundle messages completed for device {}", dpnId);
+ return salBundleService.controlBundle(commitBundleInput);
+ }, service);
+
+ /* Bundles not supported for meters */
+ List<Meter> meters = flowNode.get().getMeter() != null ? flowNode.get().getMeter()
+ : Collections.emptyList();
+ Futures.transformAsync(commitBundleFuture,
+ rpcResult -> {
if (rpcResult.isSuccessful()) {
- LOG.debug("Adding delete all flow/group message is successful for device {}", dpnId);
- return Futures.allAsList(addBundleMessages(finalFlowNode.get(), bundleIdValue,
- nodeIdentity));
+ for (Meter meter : meters) {
+ final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdentity
+ .child(Meter.class, meter.key());
+ provider.getMeterCommiter().add(meterIdent, meter, nodeIdentity);
+ }
}
return Futures.immediateFuture(null);
- }, MoreExecutors.directExecutor());
-
- /* Commit the bundle on the openflow switch */
- ListenableFuture<RpcResult<ControlBundleOutput>> commitBundleFuture = Futures.transformAsync(
- addbundlesFuture, rpcResult -> {
- LOG.debug("Adding bundle messages completed for device {}", dpnId);
- return salBundleService.controlBundle(commitBundleInput);
- }, MoreExecutors.directExecutor());
-
- /* Bundles not supported for meters */
- List<Meter> meters = flowNode.get().getMeter() != null ? flowNode.get().getMeter()
- : Collections.emptyList();
- Futures.transformAsync(commitBundleFuture,
- rpcResult -> {
- if (rpcResult.isSuccessful()) {
- for (Meter meter : meters) {
- final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdentity
- .child(Meter.class, meter.key());
- provider.getMeterCommiter().add(meterIdent, meter, nodeIdentity);
- }
+ }, service);
+ try {
+ RpcResult<ControlBundleOutput> bundleFuture = commitBundleFuture.get();
+ if (bundleFuture != null && bundleFuture.isSuccessful()) {
+ LOG.debug("Completing bundle based reconciliation for device ID:{}", dpnId);
+ OF_EVENT_LOG.debug("Bundle Reconciliation Finish, Node: {}", dpnId);
+ return true;
+ } else {
+ LOG.error("commit bundle failed for device {} with error {}", dpnId,
+ commitBundleFuture.get().getErrors());
+ return false;
}
- return Futures.immediateFuture(null);
- }, MoreExecutors.directExecutor());
- try {
- RpcResult<ControlBundleOutput> bundleFuture = commitBundleFuture.get();
- if (bundleFuture != null && bundleFuture.isSuccessful()) {
- LOG.debug("Completing bundle based reconciliation for device ID:{}", dpnId);
- OF_EVENT_LOG.debug("Bundle Reconciliation Finish, Node: {}", dpnId);
- return true;
- } else {
- LOG.error("commit bundle failed for device {} with error {}", dpnId,
- commitBundleFuture.get().getErrors());
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Error while doing bundle based reconciliation for device ID:{}", dpnId);
return false;
}
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("Error while doing bundle based reconciliation for device ID:{}", dpnId);
- return false;
}
+ LOG.error("FlowNode not present for Datapath ID {}", dpnId);
+ return false;
+ } finally {
+ service.shutdown();
}
- LOG.error("FlowNode not present for Datapath ID {}", dpnId);
- return false;
}
}