+ }, service);
+
+ /* Commit the bundle on the openflow switch */
+ ListenableFuture<RpcResult<ControlBundleOutput>> commitBundleFuture
+ = Futures.transformAsync(addbundlesFuture, rpcResult -> {
+ LOG.debug("Adding bundle messages completed for device {}", dpnId);
+ return salBundleService.controlBundle(commitBundleInput);
+ }, service);
+
+ /* Bundles not supported for meters */
+ Collection<Meter> meters = flowNode.get().nonnullMeter().values();
+ Futures.transformAsync(commitBundleFuture,
+ rpcResult -> {
+ if (rpcResult.isSuccessful()) {
+ for (Meter meter : meters) {
+ final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdentity
+ .child(Meter.class, meter.key());
+ provider.getMeterCommiter().add(meterIdent, meter, nodeIdentity);
+ }
+ }
+ return Futures.immediateFuture(null);
+ }, service);
+ try {
+ RpcResult<ControlBundleOutput> bundleFuture = commitBundleFuture.get();
+ if (bundleFuture != null && bundleFuture.isSuccessful()) {
+ reconciliationState.setState(COMPLETED, LocalDateTime.now());
+ LOG.debug("Completing bundle based reconciliation for device ID:{}", dpnId);
+ OF_EVENT_LOG.debug("Bundle Reconciliation Finish, Node: {}", dpnId);
+ return true;
+ } else {
+ reconciliationState.setState(FAILED, LocalDateTime.now());
+ LOG.error("commit bundle failed for device {} with error {}", dpnId,
+ commitBundleFuture.get().getErrors());
+ return false;
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ reconciliationState.setState(FAILED, LocalDateTime.now());
+ LOG.error("commit bundle failed for device {} with error ", dpnId, e);
+ return false;
+ } finally {
+ service.shutdown();
+ }