X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=applications%2Farbitratorreconciliation%2Fimpl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fapplications%2Farbitratorreconciliation%2Fimpl%2FArbitratorReconciliationManagerImpl.java;h=b8e605c3e64a2f52634295ef9af550e229abad9c;hb=ac2ce0f5e05ac0a0b91e614d4bb595d17ea8e1d6;hp=9ff3c889dad462198037166e13c203bd8bda9231;hpb=c9708f464ca7bf76dd30c19f6a830e57e28b8d04;p=openflowplugin.git diff --git a/applications/arbitratorreconciliation/impl/src/main/java/org/opendaylight/openflowplugin/applications/arbitratorreconciliation/impl/ArbitratorReconciliationManagerImpl.java b/applications/arbitratorreconciliation/impl/src/main/java/org/opendaylight/openflowplugin/applications/arbitratorreconciliation/impl/ArbitratorReconciliationManagerImpl.java index 9ff3c889da..b8e605c3e6 100644 --- a/applications/arbitratorreconciliation/impl/src/main/java/org/opendaylight/openflowplugin/applications/arbitratorreconciliation/impl/ArbitratorReconciliationManagerImpl.java +++ b/applications/arbitratorreconciliation/impl/src/main/java/org/opendaylight/openflowplugin/applications/arbitratorreconciliation/impl/ArbitratorReconciliationManagerImpl.java @@ -5,34 +5,33 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.openflowplugin.applications.arbitratorreconciliation.impl; +import static java.util.Objects.requireNonNull; + import com.google.common.base.Function; -import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableClassToInstanceMap; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.JdkFutureAdapters; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; -import java.math.BigInteger; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; -import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.inject.Inject; import javax.inject.Singleton; -import org.apache.aries.blueprint.annotation.service.Reference; -import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration; -import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.opendaylight.mdsal.binding.api.RpcConsumerRegistry; +import org.opendaylight.mdsal.binding.api.RpcProviderService; import org.opendaylight.openflowplugin.api.OFConstants; import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; import org.opendaylight.openflowplugin.applications.reconciliation.NotificationRegistration; @@ -40,168 +39,176 @@ import org.opendaylight.openflowplugin.applications.reconciliation.Reconciliatio import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationNotificationListener; import org.opendaylight.serviceutils.upgrade.UpgradeState; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes; -import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessages; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesOutput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundle; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleOutput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.SalBundleService; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.Messages; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.MessagesBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.Message; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.MessageBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCase; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCaseBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveGroupCase; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveGroupCaseBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.flow._case.RemoveFlowCaseDataBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.group._case.RemoveGroupCaseDataBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleControlType; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleFlags; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.ArbitratorReconcileService; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundle; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundle; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.rf.state.rev170713.ResultState; +import org.opendaylight.yangtools.concepts.Registration; +import org.opendaylight.yangtools.util.concurrent.FluentFutures; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; +import org.opendaylight.yangtools.yang.binding.Rpc; +import org.opendaylight.yangtools.yang.common.ErrorType; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; +import org.opendaylight.yangtools.yang.common.Uint32; +import org.opendaylight.yangtools.yang.common.Uint64; +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Deactivate; +import org.osgi.service.component.annotations.Reference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @Singleton -public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileService, - ReconciliationNotificationListener, AutoCloseable { - +@Component(service = { }) +public final class ArbitratorReconciliationManagerImpl implements ReconciliationNotificationListener, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(ArbitratorReconciliationManagerImpl.class); - private static final int THREAD_POOL_SIZE = 4; private static final AtomicLong BUNDLE_ID = new AtomicLong(); private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true); - private static final int ARBITRATOR_RECONCILIATION_PRIORITY = Integer - .getInteger("arbitrator.reconciliation.manager.priority", 0/*default*/); private static final String SERVICE_NAME = "ArbitratorReconciliationManager"; private static final String SEPARATOR = ":"; - private final SalBundleService salBundleService; - private final ReconciliationManager reconciliationManager; - private final RoutedRpcRegistration routedRpcReg; - private final UpgradeState upgradeState; - private NotificationRegistration registration; + private static final BundleRemoveFlowCase DELETE_ALL_FLOW = new BundleRemoveFlowCaseBuilder() + .setRemoveFlowCaseData(new RemoveFlowCaseDataBuilder() + .setTableId(OFConstants.OFPTT_ALL) + .build()) + .build(); + private static final BundleRemoveGroupCase DELETE_ALL_GROUP = new BundleRemoveGroupCaseBuilder() + .setRemoveGroupCaseData(new RemoveGroupCaseDataBuilder(new GroupBuilder() + .setGroupType(GroupTypes.GroupAll) + .setGroupId(new GroupId(OFConstants.OFPG_ALL)) + .build()).build()) + .build(); + + // FIXME: use CM to control this constant + private static final int THREAD_POOL_SIZE = 4; + // FIXME: use CM to control this constant + private static final int ARBITRATOR_RECONCILIATION_PRIORITY = + Integer.getInteger("arbitrator.reconciliation.manager.priority", 0 /*default*/); + private final ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE); - private final Map bundleIdMap = new ConcurrentHashMap<>(); + private final Map bundleIdMap = new ConcurrentHashMap<>(); + private final ConcurrentMap rpcRegistrations = new ConcurrentHashMap<>(); - @Inject - public ArbitratorReconciliationManagerImpl(@Reference RpcProviderRegistry rpcRegistry, - @Reference ReconciliationManager reconciliationManager, @Reference UpgradeState upgradeState) { - Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry cannot be null !"); - this.reconciliationManager = Preconditions.checkNotNull(reconciliationManager, - "ReconciliationManager cannot be null!"); - this.salBundleService = Preconditions.checkNotNull(rpcRegistry.getRpcService(SalBundleService.class), - "RPC SalBundlService not found."); - this.routedRpcReg = rpcRegistry.addRoutedRpcImplementation(ArbitratorReconcileService.class, - this); - this.upgradeState = Preconditions.checkNotNull(upgradeState, "UpgradeState cannot be null!"); - } + private final RpcProviderService rpcProviderService; + private final UpgradeState upgradeState; + private final NotificationRegistration registration; + private final AddBundleMessages addBundleMessages; + private final ControlBundle controlBundle; - @PostConstruct - public void start() { + @Inject + @Activate + public ArbitratorReconciliationManagerImpl(@Reference final ReconciliationManager reconciliationManager, + @Reference final RpcProviderService rpcProviderService, @Reference final RpcConsumerRegistry rpcService, + @Reference final UpgradeState upgradeState) { + this.rpcProviderService = requireNonNull(rpcProviderService); + this.upgradeState = requireNonNull(upgradeState); + addBundleMessages = requireNonNull(rpcService.getRpc(AddBundleMessages.class)); + controlBundle = requireNonNull(rpcService.getRpc(ControlBundle.class)); registration = reconciliationManager.registerService(this); LOG.info("ArbitratorReconciliationManager has started successfully."); } - @Override + @Deactivate @PreDestroy + @Override public void close() throws Exception { executor.shutdown(); - if (registration != null) { - registration.close(); - registration = null; - } + registration.close(); } - @Override - public ListenableFuture> commitActiveBundle( - CommitActiveBundleInput input) { - BigInteger nodeId = input.getNodeId(); - if (bundleIdMap.containsKey(nodeId)) { - BundleId bundleId = bundleIdMap.get(nodeId).getBundleId(); - if (bundleId != null) { - final ControlBundleInput commitBundleInput = new ControlBundleInputBuilder() - .setNode(input.getNode()).setBundleId(bundleId) - .setFlags(BUNDLE_FLAGS) - .setType(BundleControlType.ONFBCTCOMMITREQUEST).build(); - ListenableFuture> rpcResult = salBundleService - .controlBundle(commitBundleInput); - bundleIdMap.put(nodeId, new BundleDetails(bundleId, rpcResult)); - Futures.addCallback(rpcResult, new CommitActiveBundleCallback(nodeId), - MoreExecutors.directExecutor()); - return Futures.transform( - rpcResult, - this.createRpcResultCondenser("committed active bundle"), - MoreExecutors.directExecutor()); - } + private ListenableFuture> commitActiveBundle( + final CommitActiveBundleInput input) { + final var nodeId = input.getNodeId(); + final var details = bundleIdMap.get(nodeId); + if (details != null) { + final var rpcResult = controlBundle.invoke(new ControlBundleInputBuilder() + .setNode(input.getNode()) + .setBundleId(details.bundleId) + .setFlags(BUNDLE_FLAGS) + .setType(BundleControlType.ONFBCTCOMMITREQUEST) + .build()); + bundleIdMap.put(nodeId, new BundleDetails(details.bundleId, rpcResult)); + + Futures.addCallback(rpcResult, new CommitActiveBundleCallback(nodeId), MoreExecutors.directExecutor()); + return Futures.transform(rpcResult, createRpcResultCondenser("committed active bundle"), + MoreExecutors.directExecutor()); } return RpcResultBuilder.success(new CommitActiveBundleOutputBuilder() .setResult(null).build()) - .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, - null, "No active bundle found for the node" + nodeId.toString()))).buildFuture(); + .withRpcErrors(List.of(RpcResultBuilder.newError(ErrorType.APPLICATION, + null, "No active bundle found for the node" + nodeId))).buildFuture(); } - @Override - public ListenableFuture> getActiveBundle(GetActiveBundleInput input) { - BigInteger nodeId = input.getNodeId(); + private ListenableFuture> getActiveBundle(final GetActiveBundleInput input) { + Uint64 nodeId = input.getNodeId(); BundleDetails bundleDetails = bundleIdMap.get(nodeId); if (bundleDetails != null) { try { //This blocking call is used to prevent the applications from pushing flows and groups via the default // pipeline when the commit bundle is ongoing. - bundleDetails.getResult().get(); + bundleDetails.result.get(); return RpcResultBuilder.success(new GetActiveBundleOutputBuilder() - .setResult(bundleDetails.getBundleId()).build()).buildFuture(); - } catch (InterruptedException | ExecutionException | NullPointerException e) { + .setResult(bundleDetails.bundleId) + .build()) + .buildFuture(); + } catch (InterruptedException | ExecutionException e) { return RpcResultBuilder.failed() - .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, + .withRpcErrors(List.of(RpcResultBuilder.newError(ErrorType.APPLICATION, null, e.getMessage()))).buildFuture(); } } - return RpcResultBuilder.success(new GetActiveBundleOutputBuilder() - .setResult(null).build()).buildFuture(); + return RpcResultBuilder.success(new GetActiveBundleOutputBuilder().setResult(null).build()).buildFuture(); } @Override - public ListenableFuture startReconciliation(DeviceInfo node) { + public ListenableFuture startReconciliation(final DeviceInfo node) { registerRpc(node); if (upgradeState.isUpgradeInProgress()) { LOG.trace("Starting arbitrator reconciliation for node {}", node.getDatapathId()); return reconcileConfiguration(node); } LOG.trace("arbitrator reconciliation is disabled"); - return Futures.immediateFuture(true); + return FluentFutures.immediateTrueFluentFuture(); } @Override - public ListenableFuture endReconciliation(DeviceInfo node) { - BigInteger datapathId = node.getDatapathId(); + public ListenableFuture endReconciliation(final DeviceInfo node) { + Uint64 datapathId = node.getDatapathId(); LOG.trace("Stopping arbitrator reconciliation for node {}", datapathId); bundleIdMap.remove(datapathId); deregisterRpc(node); - return Futures.immediateFuture(true); + return FluentFutures.immediateTrueFluentFuture(); } @Override @@ -219,45 +226,25 @@ public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileS return ResultState.DONOTHING; } - private ListenableFuture reconcileConfiguration(DeviceInfo node) { - LOG.info("Triggering arbitrator reconciliation for device {}", node.getDatapathId()); - ArbitratorReconciliationTask upgradeReconTask = new ArbitratorReconciliationTask(node); - return JdkFutureAdapters.listenInPoolThread(executor.submit(upgradeReconTask)); - } - private Messages createMessages(final NodeRef nodeRef) { - final List messages = new ArrayList<>(); - messages.add(new MessageBuilder().setNode(nodeRef).setBundleInnerMessage( - new BundleRemoveFlowCaseBuilder() - .setRemoveFlowCaseData(new RemoveFlowCaseDataBuilder(getDeleteAllFlow()).build()) - .build()).build()); - - messages.add(new MessageBuilder().setNode(nodeRef).setBundleInnerMessage( - new BundleRemoveGroupCaseBuilder() - .setRemoveGroupCaseData(new RemoveGroupCaseDataBuilder(getDeleteAllGroup()).build()) - .build()).build()); + private static Messages createMessages(final NodeRef nodeRef) { + final var messages = List.of( + new MessageBuilder().setNode(nodeRef).setBundleInnerMessage(DELETE_ALL_FLOW).build(), + new MessageBuilder().setNode(nodeRef).setBundleInnerMessage(DELETE_ALL_GROUP).build()); LOG.debug("The size of the flows and group messages created in createMessage() {}", messages.size()); return new MessagesBuilder().setMessage(messages).build(); } - private Flow getDeleteAllFlow() { - final FlowBuilder flowBuilder = new FlowBuilder(); - flowBuilder.setTableId(OFConstants.OFPTT_ALL); - return flowBuilder.build(); - } - - private Group getDeleteAllGroup() { - final GroupBuilder groupBuilder = new GroupBuilder(); - groupBuilder.setGroupType(GroupTypes.GroupAll); - groupBuilder.setGroupId(new GroupId(OFConstants.OFPG_ALL)); - return groupBuilder.build(); + private ListenableFuture reconcileConfiguration(final DeviceInfo node) { + LOG.info("Triggering arbitrator reconciliation for device {}", node.getDatapathId()); + return Futures.submit(new ArbitratorReconciliationTask(node), executor); } - private class ArbitratorReconciliationTask implements Callable { - final DeviceInfo deviceInfo; + private final class ArbitratorReconciliationTask implements Callable { + private final DeviceInfo deviceInfo; ArbitratorReconciliationTask(final DeviceInfo deviceInfo) { - this.deviceInfo = deviceInfo; + this.deviceInfo = requireNonNull(deviceInfo); } @Override @@ -265,44 +252,39 @@ public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileS InstanceIdentifier nodeIdentity = deviceInfo.getNodeInstanceIdentifier() .augmentation(FlowCapableNode.class); String node = nodeIdentity.firstKeyOf(Node.class).getId().getValue(); - BundleId bundleIdValue = new BundleId(BUNDLE_ID.getAndIncrement()); + final var bundleIdValue = new BundleId(Uint32.valueOf(BUNDLE_ID.getAndIncrement())); LOG.debug("Triggering arbitrator reconciliation for device :{}", node); final NodeRef nodeRef = new NodeRef(nodeIdentity.firstIdentifierOf(Node.class)); - final ControlBundleInput closeBundleInput = new ControlBundleInputBuilder().setNode(nodeRef) - .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS) - .setType(BundleControlType.ONFBCTCLOSEREQUEST).build(); - - final ControlBundleInput openBundleInput = new ControlBundleInputBuilder().setNode(nodeRef) - .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS) - .setType(BundleControlType.ONFBCTOPENREQUEST).build(); - - final AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder() - .setNode(nodeRef).setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS) - .setMessages(createMessages(nodeRef)).build(); - - ListenableFuture> closeBundle = salBundleService - .controlBundle(closeBundleInput); - - ListenableFuture> openBundleMessagesFuture = Futures - .transformAsync(closeBundle, rpcResult -> salBundleService - .controlBundle(openBundleInput), MoreExecutors.directExecutor()); - - ListenableFuture> addBundleMessagesFuture = Futures - .transformAsync(openBundleMessagesFuture, rpcResult -> { - if (rpcResult.isSuccessful()) { - return salBundleService - .addBundleMessages(addBundleMessagesInput); - } - return Futures.immediateFuture(null); - }, MoreExecutors.directExecutor()); - BigInteger nodeId = getDpnIdFromNodeName(node); + final var openBundleMessagesFuture = Futures.transformAsync( + controlBundle.invoke(new ControlBundleInputBuilder() + .setNode(nodeRef) + .setBundleId(bundleIdValue) + .setFlags(BUNDLE_FLAGS) + .setType(BundleControlType.ONFBCTCLOSEREQUEST) + .build()), + rpcResult -> controlBundle.invoke(new ControlBundleInputBuilder() + .setNode(nodeRef) + .setBundleId(bundleIdValue) + .setFlags(BUNDLE_FLAGS) + .setType(BundleControlType.ONFBCTOPENREQUEST) + .build()), MoreExecutors.directExecutor()); + + final var addBundleMessagesFuture = Futures.transformAsync(openBundleMessagesFuture, + rpcResult -> rpcResult.isSuccessful() + ? addBundleMessages.invoke(new AddBundleMessagesInputBuilder() + .setNode(nodeRef) + .setBundleId(bundleIdValue) + .setFlags(BUNDLE_FLAGS) + .setMessages(createMessages(nodeRef)) + .build()) + : FluentFutures.immediateNullFluentFuture(), MoreExecutors.directExecutor()); + final var nodeId = getDpnIdFromNodeName(node); try { if (addBundleMessagesFuture.get().isSuccessful()) { bundleIdMap.put(nodeId, new BundleDetails(bundleIdValue, - Futures.immediateFuture(null))); - LOG.debug("Arbitrator reconciliation initial task has been completed for node {} and open up" - + " for application programming.", nodeId); + FluentFutures.immediateNullFluentFuture())); + LOG.debug("Arbitrator reconciliation initial task has been completed for node {} ", nodeId); return true; } else { LOG.error("Error while performing arbitrator reconciliation for device:{}", nodeId); @@ -316,31 +298,30 @@ public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileS } public final class CommitActiveBundleCallback implements FutureCallback> { - private final BigInteger nodeId; + private final Uint64 nodeId; - private CommitActiveBundleCallback(final BigInteger nodeId) { + private CommitActiveBundleCallback(final Uint64 nodeId) { this.nodeId = nodeId; } @Override - public void onSuccess(RpcResult rpcResult) { + public void onSuccess(final RpcResult rpcResult) { LOG.debug("Completed arbitrator reconciliation for device:{}", nodeId); bundleIdMap.remove(nodeId); } @Override - public void onFailure(Throwable throwable) { - LOG.error("Error while performing arbitrator reconciliation for device {}", - nodeId, throwable); + public void onFailure(final Throwable throwable) { + LOG.error("Error while performing arbitrator reconciliation for device {}", nodeId, throwable); } } - private Function, - RpcResult> createRpcResultCondenser(final String action) { + private static Function, RpcResult> createRpcResultCondenser( + final String action) { return input -> { final RpcResultBuilder resultSink; if (input != null) { - List errors = new ArrayList<>(); + final var errors = new ArrayList(); if (!input.isSuccessful()) { errors.addAll(input.getErrors()); resultSink = RpcResultBuilder.failed().withRpcErrors(errors); @@ -349,47 +330,41 @@ public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileS } } else { resultSink = RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "action of " + action + " failed"); + .withError(ErrorType.APPLICATION, "action of " + action + " failed"); } return resultSink.build(); }; } - private void registerRpc(DeviceInfo node) { - KeyedInstanceIdentifier path = InstanceIdentifier.create(Nodes.class) - .child(Node.class, new NodeKey(node.getNodeId())); + private void registerRpc(final DeviceInfo node) { + final var path = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(node.getNodeId())); LOG.debug("The path is registered : {}", path); - routedRpcReg.registerPath(NodeContext.class, path); + rpcRegistrations.put(node.getNodeId().getValue(), rpcProviderService.registerRpcImplementations( + ImmutableClassToInstanceMap.>builder() + .put(GetActiveBundle.class, this::getActiveBundle) + .put(CommitActiveBundle.class, this::commitActiveBundle) + .build(), Set.of(path))); } - private void deregisterRpc(DeviceInfo node) { - KeyedInstanceIdentifier path = InstanceIdentifier.create(Nodes.class).child(Node.class, - new NodeKey(node.getNodeId())); + private void deregisterRpc(final DeviceInfo node) { + final var path = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(node.getNodeId())); LOG.debug("The path is unregistered : {}", path); - routedRpcReg.unregisterPath(NodeContext.class, path); - } - - private static class BundleDetails { - private final BundleId bundleId; - private final ListenableFuture> result; - - BundleDetails(BundleId bundleId, ListenableFuture> result) { - this.bundleId = bundleId; - this.result = result; - } - - public BundleId getBundleId() { - return bundleId; + final var reg = rpcRegistrations.remove(node.getNodeId().getValue()); + if (reg != null) { + reg.close(); } + } - public ListenableFuture> getResult() { - return result; + @NonNullByDefault + record BundleDetails(BundleId bundleId, ListenableFuture> result) { + BundleDetails { + requireNonNull(bundleId); + requireNonNull(result); } } - private BigInteger getDpnIdFromNodeName(String nodeName) { + private static Uint64 getDpnIdFromNodeName(final String nodeName) { String dpnId = nodeName.substring(nodeName.lastIndexOf(SEPARATOR) + 1); - return new BigInteger(dpnId); + return Uint64.valueOf(dpnId); } - }