Convert arbitratorreconciliation to OSGi DS
[openflowplugin.git] / applications / arbitratorreconciliation / impl / src / main / java / org / opendaylight / openflowplugin / applications / arbitratorreconciliation / impl / ArbitratorReconciliationManagerImpl.java
index 8eec6568fb3ecc2c58d8897a42a2926238b13967..b8e605c3e64a2f52634295ef9af550e229abad9c 100644 (file)
@@ -10,11 +10,10 @@ package org.opendaylight.openflowplugin.applications.arbitratorreconciliation.im
 import static java.util.Objects.requireNonNull;
 
 import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableClassToInstanceMap;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import java.util.ArrayList;
 import java.util.List;
@@ -24,12 +23,13 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicLong;
-import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import javax.inject.Inject;
 import javax.inject.Singleton;
+import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.opendaylight.mdsal.binding.api.RpcConsumerRegistry;
 import org.opendaylight.mdsal.binding.api.RpcProviderService;
 import org.opendaylight.openflowplugin.api.OFConstants;
@@ -46,16 +46,13 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessages;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundle;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.SalBundleService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.Messages;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.MessagesBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.Message;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.MessageBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCase;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCaseBuilder;
@@ -66,117 +63,106 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.on
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleControlType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleFlags;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.ArbitratorReconcileService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundle;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundle;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.rf.state.rev170713.ResultState;
-import org.opendaylight.yangtools.concepts.ObjectRegistration;
+import org.opendaylight.yangtools.concepts.Registration;
 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
-import org.opendaylight.yangtools.yang.binding.RpcService;
+import org.opendaylight.yangtools.yang.binding.Rpc;
 import org.opendaylight.yangtools.yang.common.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.opendaylight.yangtools.yang.common.Uint32;
 import org.opendaylight.yangtools.yang.common.Uint64;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @Singleton
-public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileService,
-        ReconciliationNotificationListener, AutoCloseable {
-
+@Component(service = { })
+public final class ArbitratorReconciliationManagerImpl implements ReconciliationNotificationListener, AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(ArbitratorReconciliationManagerImpl.class);
-    private static final int THREAD_POOL_SIZE = 4;
     private static final AtomicLong BUNDLE_ID = new AtomicLong();
     private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true);
-    private static final int ARBITRATOR_RECONCILIATION_PRIORITY = Integer
-            .getInteger("arbitrator.reconciliation.manager.priority", 0/*default*/);
     private static final String SERVICE_NAME = "ArbitratorReconciliationManager";
     private static final String SEPARATOR = ":";
 
     private static final BundleRemoveFlowCase DELETE_ALL_FLOW = new BundleRemoveFlowCaseBuilder()
-            .setRemoveFlowCaseData(new RemoveFlowCaseDataBuilder()
-                .setTableId(OFConstants.OFPTT_ALL)
-                .build())
-            .build();
+        .setRemoveFlowCaseData(new RemoveFlowCaseDataBuilder()
+            .setTableId(OFConstants.OFPTT_ALL)
+            .build())
+        .build();
     private static final BundleRemoveGroupCase DELETE_ALL_GROUP = new BundleRemoveGroupCaseBuilder()
-            .setRemoveGroupCaseData(new RemoveGroupCaseDataBuilder(new GroupBuilder()
-                    .setGroupType(GroupTypes.GroupAll)
-                    .setGroupId(new GroupId(OFConstants.OFPG_ALL))
-                    .build()).build())
-            .build();
-
-    private final SalBundleService salBundleService;
-    private final ReconciliationManager reconciliationManager;
+        .setRemoveGroupCaseData(new RemoveGroupCaseDataBuilder(new GroupBuilder()
+            .setGroupType(GroupTypes.GroupAll)
+            .setGroupId(new GroupId(OFConstants.OFPG_ALL))
+            .build()).build())
+        .build();
+
+    // FIXME: use CM to control this constant
+    private static final int THREAD_POOL_SIZE = 4;
+    // FIXME: use CM to control this constant
+    private static final int ARBITRATOR_RECONCILIATION_PRIORITY =
+        Integer.getInteger("arbitrator.reconciliation.manager.priority", 0 /*default*/);
+
+    private final ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
+    private final Map<Uint64, BundleDetails> bundleIdMap = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, Registration> rpcRegistrations = new ConcurrentHashMap<>();
+
     private final RpcProviderService rpcProviderService;
     private final UpgradeState upgradeState;
-    private NotificationRegistration registration;
-    private final ListeningExecutorService executor = MoreExecutors.listeningDecorator(
-            Executors.newFixedThreadPool(THREAD_POOL_SIZE));
-    private final Map<Uint64, BundleDetails> bundleIdMap = new ConcurrentHashMap<>();
-    private final ConcurrentMap<String,
-            ObjectRegistration<? extends RpcService>> rpcRegistrations = new ConcurrentHashMap<>();
+    private final NotificationRegistration registration;
+    private final AddBundleMessages addBundleMessages;
+    private final ControlBundle controlBundle;
 
     @Inject
-    public ArbitratorReconciliationManagerImpl(final ReconciliationManager reconciliationManager,
-            final RpcProviderService rpcProviderService, final RpcConsumerRegistry rpcRegistry,
-            final UpgradeState upgradeState) {
-        Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry cannot be null !");
-        this.reconciliationManager = requireNonNull(reconciliationManager, "ReconciliationManager cannot be null!");
-        salBundleService = requireNonNull(rpcRegistry.getRpcService(SalBundleService.class),
-                "RPC SalBundleService not found.");
-        this.rpcProviderService = rpcProviderService;
-        this.upgradeState = requireNonNull(upgradeState, "UpgradeState cannot be null!");
-    }
-
-    @PostConstruct
-    public void start() {
+    @Activate
+    public ArbitratorReconciliationManagerImpl(@Reference final ReconciliationManager reconciliationManager,
+            @Reference final RpcProviderService rpcProviderService, @Reference final RpcConsumerRegistry rpcService,
+            @Reference final UpgradeState upgradeState) {
+        this.rpcProviderService = requireNonNull(rpcProviderService);
+        this.upgradeState = requireNonNull(upgradeState);
+        addBundleMessages = requireNonNull(rpcService.getRpc(AddBundleMessages.class));
+        controlBundle = requireNonNull(rpcService.getRpc(ControlBundle.class));
         registration = reconciliationManager.registerService(this);
         LOG.info("ArbitratorReconciliationManager has started successfully.");
     }
 
-    @Override
+    @Deactivate
     @PreDestroy
+    @Override
     public void close() throws Exception {
         executor.shutdown();
-        if (registration != null) {
-            registration.close();
-            registration = null;
-        }
+        registration.close();
     }
 
-    @Override
-    public ListenableFuture<RpcResult<CommitActiveBundleOutput>> commitActiveBundle(
+    private ListenableFuture<RpcResult<CommitActiveBundleOutput>> commitActiveBundle(
             final CommitActiveBundleInput input) {
-        Uint64 nodeId = input.getNodeId();
-        if (bundleIdMap.containsKey(nodeId)) {
-            BundleId bundleId = bundleIdMap.get(nodeId).getBundleId();
-            if (bundleId != null) {
-                final ControlBundleInput commitBundleInput = new ControlBundleInputBuilder()
-                        .setNode(input.getNode())
-                        .setBundleId(bundleId)
-                        .setFlags(BUNDLE_FLAGS)
-                        .setType(BundleControlType.ONFBCTCOMMITREQUEST)
-                        .build();
-                ListenableFuture<RpcResult<ControlBundleOutput>> rpcResult = salBundleService
-                        .controlBundle(commitBundleInput);
-                bundleIdMap.put(nodeId, new BundleDetails(bundleId, rpcResult));
-
-                Futures.addCallback(rpcResult,
-                        new CommitActiveBundleCallback(nodeId),
-                        MoreExecutors.directExecutor());
-                return Futures.transform(
-                        rpcResult,
-                        createRpcResultCondenser("committed active bundle"),
-                        MoreExecutors.directExecutor());
-            }
+        final var nodeId = input.getNodeId();
+        final var details = bundleIdMap.get(nodeId);
+        if (details != null) {
+            final var rpcResult = controlBundle.invoke(new ControlBundleInputBuilder()
+                .setNode(input.getNode())
+                .setBundleId(details.bundleId)
+                .setFlags(BUNDLE_FLAGS)
+                .setType(BundleControlType.ONFBCTCOMMITREQUEST)
+                .build());
+            bundleIdMap.put(nodeId, new BundleDetails(details.bundleId, rpcResult));
+
+            Futures.addCallback(rpcResult, new CommitActiveBundleCallback(nodeId), MoreExecutors.directExecutor());
+            return Futures.transform(rpcResult, createRpcResultCondenser("committed active bundle"),
+                MoreExecutors.directExecutor());
         }
         return RpcResultBuilder.success(new CommitActiveBundleOutputBuilder()
                 .setResult(null).build())
@@ -184,17 +170,16 @@ public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileS
                         null, "No active bundle found for the node" + nodeId))).buildFuture();
     }
 
-    @Override
-    public ListenableFuture<RpcResult<GetActiveBundleOutput>> getActiveBundle(final GetActiveBundleInput input) {
+    private ListenableFuture<RpcResult<GetActiveBundleOutput>> getActiveBundle(final GetActiveBundleInput input) {
         Uint64 nodeId = input.getNodeId();
         BundleDetails bundleDetails = bundleIdMap.get(nodeId);
         if (bundleDetails != null) {
             try {
                 //This blocking call is used to prevent the applications from pushing flows and groups via the default
                 // pipeline when the commit bundle is ongoing.
-                bundleDetails.getResult().get();
+                bundleDetails.result.get();
                 return RpcResultBuilder.success(new GetActiveBundleOutputBuilder()
-                        .setResult(bundleDetails.getBundleId())
+                        .setResult(bundleDetails.bundleId)
                         .build())
                         .buildFuture();
             } catch (InterruptedException | ExecutionException e) {
@@ -203,8 +188,7 @@ public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileS
                                 null, e.getMessage()))).buildFuture();
             }
         }
-        return RpcResultBuilder.success(new GetActiveBundleOutputBuilder()
-                .setResult(null).build()).buildFuture();
+        return RpcResultBuilder.success(new GetActiveBundleOutputBuilder().setResult(null).build()).buildFuture();
     }
 
     @Override
@@ -242,29 +226,25 @@ public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileS
         return ResultState.DONOTHING;
     }
 
-    private ListenableFuture<Boolean> reconcileConfiguration(final DeviceInfo node) {
-        LOG.info("Triggering arbitrator reconciliation for device {}", node.getDatapathId());
-        ArbitratorReconciliationTask upgradeReconTask = new ArbitratorReconciliationTask(node);
-        return executor.submit(upgradeReconTask);
-    }
 
     private static Messages createMessages(final NodeRef nodeRef) {
-        final List<Message> messages = new ArrayList<>();
-        messages.add(new MessageBuilder()
-                .setNode(nodeRef)
-                .setBundleInnerMessage(DELETE_ALL_FLOW).build());
-        messages.add(new MessageBuilder()
-                .setNode(nodeRef)
-                .setBundleInnerMessage(DELETE_ALL_GROUP).build());
+        final var messages = List.of(
+            new MessageBuilder().setNode(nodeRef).setBundleInnerMessage(DELETE_ALL_FLOW).build(),
+            new MessageBuilder().setNode(nodeRef).setBundleInnerMessage(DELETE_ALL_GROUP).build());
         LOG.debug("The size of the flows and group messages created in createMessage() {}", messages.size());
         return new MessagesBuilder().setMessage(messages).build();
     }
 
-    private class ArbitratorReconciliationTask implements Callable<Boolean> {
-        final DeviceInfo deviceInfo;
+    private ListenableFuture<Boolean> reconcileConfiguration(final DeviceInfo node) {
+        LOG.info("Triggering arbitrator reconciliation for device {}", node.getDatapathId());
+        return Futures.submit(new ArbitratorReconciliationTask(node), executor);
+    }
+
+    private final class ArbitratorReconciliationTask implements Callable<Boolean> {
+        private final DeviceInfo deviceInfo;
 
         ArbitratorReconciliationTask(final DeviceInfo deviceInfo) {
-            this.deviceInfo = deviceInfo;
+            this.deviceInfo = requireNonNull(deviceInfo);
         }
 
         @Override
@@ -272,50 +252,38 @@ public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileS
             InstanceIdentifier<FlowCapableNode> nodeIdentity = deviceInfo.getNodeInstanceIdentifier()
                     .augmentation(FlowCapableNode.class);
             String node = nodeIdentity.firstKeyOf(Node.class).getId().getValue();
-            BundleId bundleIdValue = new BundleId(Uint32.valueOf(BUNDLE_ID.getAndIncrement()));
+            final var bundleIdValue = new BundleId(Uint32.valueOf(BUNDLE_ID.getAndIncrement()));
             LOG.debug("Triggering arbitrator reconciliation for device :{}", node);
             final NodeRef nodeRef = new NodeRef(nodeIdentity.firstIdentifierOf(Node.class));
 
-            final ControlBundleInput closeBundleInput = new ControlBundleInputBuilder()
+            final var openBundleMessagesFuture = Futures.transformAsync(
+                controlBundle.invoke(new ControlBundleInputBuilder()
                     .setNode(nodeRef)
                     .setBundleId(bundleIdValue)
                     .setFlags(BUNDLE_FLAGS)
                     .setType(BundleControlType.ONFBCTCLOSEREQUEST)
-                    .build();
-
-            final ControlBundleInput openBundleInput = new ControlBundleInputBuilder()
+                    .build()),
+                rpcResult -> controlBundle.invoke(new ControlBundleInputBuilder()
                     .setNode(nodeRef)
                     .setBundleId(bundleIdValue)
                     .setFlags(BUNDLE_FLAGS)
                     .setType(BundleControlType.ONFBCTOPENREQUEST)
-                    .build();
+                    .build()), MoreExecutors.directExecutor());
 
-            final AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
-                    .setNode(nodeRef)
-                    .setBundleId(bundleIdValue)
-                    .setFlags(BUNDLE_FLAGS)
-                    .setMessages(createMessages(nodeRef))
-                    .build();
-
-            ListenableFuture<RpcResult<ControlBundleOutput>> closeBundle = salBundleService
-                    .controlBundle(closeBundleInput);
-
-            ListenableFuture<RpcResult<ControlBundleOutput>> openBundleMessagesFuture = Futures
-                    .transformAsync(closeBundle, rpcResult -> salBundleService
-                            .controlBundle(openBundleInput), MoreExecutors.directExecutor());
-
-            ListenableFuture<RpcResult<AddBundleMessagesOutput>> addBundleMessagesFuture = Futures
-                    .transformAsync(openBundleMessagesFuture, rpcResult -> {
-                        if (rpcResult.isSuccessful()) {
-                            return salBundleService
-                                    .addBundleMessages(addBundleMessagesInput);
-                        }
-                        return FluentFutures.immediateNullFluentFuture();
-                    }, MoreExecutors.directExecutor());
-            Uint64 nodeId = getDpnIdFromNodeName(node);
+            final var addBundleMessagesFuture = Futures.transformAsync(openBundleMessagesFuture,
+                rpcResult -> rpcResult.isSuccessful()
+                    ? addBundleMessages.invoke(new AddBundleMessagesInputBuilder()
+                        .setNode(nodeRef)
+                        .setBundleId(bundleIdValue)
+                        .setFlags(BUNDLE_FLAGS)
+                        .setMessages(createMessages(nodeRef))
+                        .build())
+                    : FluentFutures.immediateNullFluentFuture(), MoreExecutors.directExecutor());
+            final var nodeId = getDpnIdFromNodeName(node);
             try {
                 if (addBundleMessagesFuture.get().isSuccessful()) {
-                    bundleIdMap.put(nodeId, new BundleDetails(bundleIdValue,FluentFutures.immediateNullFluentFuture()));
+                    bundleIdMap.put(nodeId, new BundleDetails(bundleIdValue,
+                        FluentFutures.immediateNullFluentFuture()));
                     LOG.debug("Arbitrator reconciliation initial task has been completed for node {} ", nodeId);
                     return true;
                 } else {
@@ -348,12 +316,12 @@ public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileS
         }
     }
 
-    private static <D> Function<RpcResult<D>,
-            RpcResult<CommitActiveBundleOutput>> createRpcResultCondenser(final String action) {
+    private static <D> Function<RpcResult<D>, RpcResult<CommitActiveBundleOutput>> createRpcResultCondenser(
+            final String action) {
         return input -> {
             final RpcResultBuilder<CommitActiveBundleOutput> resultSink;
             if (input != null) {
-                List<RpcError> errors = new ArrayList<>();
+                final var errors = new ArrayList<RpcError>();
                 if (!input.isSuccessful()) {
                     errors.addAll(input.getErrors());
                     resultSink = RpcResultBuilder.<CommitActiveBundleOutput>failed().withRpcErrors(errors);
@@ -369,40 +337,29 @@ public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileS
     }
 
     private void registerRpc(final DeviceInfo node) {
-        KeyedInstanceIdentifier<Node, NodeKey> path = InstanceIdentifier.create(Nodes.class)
-                .child(Node.class, new NodeKey(node.getNodeId()));
+        final var path = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(node.getNodeId()));
         LOG.debug("The path is registered : {}", path);
-        ObjectRegistration<? extends RpcService> rpcRegistration =
-                rpcProviderService.registerRpcImplementation(ArbitratorReconcileService.class, this, Set.of(path));
-        rpcRegistrations.put(node.getNodeId().getValue(), rpcRegistration);
+        rpcRegistrations.put(node.getNodeId().getValue(), rpcProviderService.registerRpcImplementations(
+            ImmutableClassToInstanceMap.<Rpc<?, ?>>builder()
+                .put(GetActiveBundle.class, this::getActiveBundle)
+                .put(CommitActiveBundle.class, this::commitActiveBundle)
+                .build(), Set.of(path)));
     }
 
     private void deregisterRpc(final DeviceInfo node) {
-        KeyedInstanceIdentifier<Node, NodeKey> path = InstanceIdentifier.create(Nodes.class)
-                .child(Node.class, new NodeKey(node.getNodeId()));
+        final var path = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(node.getNodeId()));
         LOG.debug("The path is unregistered : {}", path);
-        ObjectRegistration<? extends RpcService> rpcRegistration = rpcRegistrations.get(node.getNodeId().getValue());
-        if (rpcRegistration != null) {
-            rpcRegistration.close();
-            rpcRegistrations.remove(node.getNodeId().getValue());
+        final var reg = rpcRegistrations.remove(node.getNodeId().getValue());
+        if (reg != null) {
+            reg.close();
         }
     }
 
-    private static class BundleDetails {
-        private final BundleId bundleId;
-        private final ListenableFuture<RpcResult<ControlBundleOutput>> result;
-
-        BundleDetails(final BundleId bundleId, final ListenableFuture<RpcResult<ControlBundleOutput>> result) {
-            this.bundleId = bundleId;
-            this.result = result;
-        }
-
-        public BundleId getBundleId() {
-            return bundleId;
-        }
-
-        public ListenableFuture<RpcResult<ControlBundleOutput>> getResult() {
-            return result;
+    @NonNullByDefault
+    record BundleDetails(BundleId bundleId, ListenableFuture<RpcResult<ControlBundleOutput>> result) {
+        BundleDetails {
+            requireNonNull(bundleId);
+            requireNonNull(result);
         }
     }