Bump MRI upstreams
[openflowplugin.git] / applications / arbitratorreconciliation / impl / src / main / java / org / opendaylight / openflowplugin / applications / arbitratorreconciliation / impl / ArbitratorReconciliationManagerImpl.java
index cbf75d6f8a188e05ad975f387d1bad3b676c5adb..8eec6568fb3ecc2c58d8897a42a2926238b13967 100644 (file)
@@ -5,31 +5,33 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.openflowplugin.applications.arbitratorreconciliation.impl;
 
+import static java.util.Objects.requireNonNull;
+
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.JdkFutureAdapters;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
-
-import java.math.BigInteger;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicLong;
-
-import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
-import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.opendaylight.mdsal.binding.api.RpcConsumerRegistry;
+import org.opendaylight.mdsal.binding.api.RpcProviderService;
 import org.opendaylight.openflowplugin.api.OFConstants;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
 import org.opendaylight.openflowplugin.applications.reconciliation.NotificationRegistration;
@@ -37,13 +39,9 @@ import org.opendaylight.openflowplugin.applications.reconciliation.Reconciliatio
 import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationNotificationListener;
 import org.opendaylight.serviceutils.upgrade.UpgradeState;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
@@ -59,7 +57,9 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.on
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.MessagesBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.Message;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.MessageBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCase;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCaseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveGroupCase;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveGroupCaseBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.flow._case.RemoveFlowCaseDataBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.group._case.RemoveGroupCaseDataBuilder;
@@ -74,14 +74,21 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.rf.state.rev170713.ResultState;
+import org.opendaylight.yangtools.concepts.ObjectRegistration;
+import org.opendaylight.yangtools.util.concurrent.FluentFutures;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.RpcService;
+import org.opendaylight.yangtools.yang.common.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.common.Uint32;
+import org.opendaylight.yangtools.yang.common.Uint64;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+@Singleton
 public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileService,
         ReconciliationNotificationListener, AutoCloseable {
 
@@ -94,32 +101,49 @@ public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileS
     private static final String SERVICE_NAME = "ArbitratorReconciliationManager";
     private static final String SEPARATOR = ":";
 
+    private static final BundleRemoveFlowCase DELETE_ALL_FLOW = new BundleRemoveFlowCaseBuilder()
+            .setRemoveFlowCaseData(new RemoveFlowCaseDataBuilder()
+                .setTableId(OFConstants.OFPTT_ALL)
+                .build())
+            .build();
+    private static final BundleRemoveGroupCase DELETE_ALL_GROUP = new BundleRemoveGroupCaseBuilder()
+            .setRemoveGroupCaseData(new RemoveGroupCaseDataBuilder(new GroupBuilder()
+                    .setGroupType(GroupTypes.GroupAll)
+                    .setGroupId(new GroupId(OFConstants.OFPG_ALL))
+                    .build()).build())
+            .build();
+
     private final SalBundleService salBundleService;
     private final ReconciliationManager reconciliationManager;
-    private final RoutedRpcRegistration routedRpcReg;
+    private final RpcProviderService rpcProviderService;
     private final UpgradeState upgradeState;
     private NotificationRegistration registration;
-    private final ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
-    private final Map<BigInteger, BundleDetails> bundleIdMap = new ConcurrentHashMap<>();
-
-    public ArbitratorReconciliationManagerImpl(final RpcProviderRegistry rpcRegistry,
-            final ReconciliationManager reconciliationManager, final UpgradeState upgradeState) {
+    private final ListeningExecutorService executor = MoreExecutors.listeningDecorator(
+            Executors.newFixedThreadPool(THREAD_POOL_SIZE));
+    private final Map<Uint64, BundleDetails> bundleIdMap = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String,
+            ObjectRegistration<? extends RpcService>> rpcRegistrations = new ConcurrentHashMap<>();
+
+    @Inject
+    public ArbitratorReconciliationManagerImpl(final ReconciliationManager reconciliationManager,
+            final RpcProviderService rpcProviderService, final RpcConsumerRegistry rpcRegistry,
+            final UpgradeState upgradeState) {
         Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry cannot be null !");
-        this.reconciliationManager = Preconditions.checkNotNull(reconciliationManager,
-                "ReconciliationManager cannot be null!");
-        this.salBundleService = Preconditions.checkNotNull(rpcRegistry.getRpcService(SalBundleService.class),
-                "RPC SalBundlService not found.");
-        this.routedRpcReg = rpcRegistry.addRoutedRpcImplementation(ArbitratorReconcileService.class,
-                this);
-        this.upgradeState = Preconditions.checkNotNull(upgradeState, "UpgradeState cannot be null!");
+        this.reconciliationManager = requireNonNull(reconciliationManager, "ReconciliationManager cannot be null!");
+        salBundleService = requireNonNull(rpcRegistry.getRpcService(SalBundleService.class),
+                "RPC SalBundleService not found.");
+        this.rpcProviderService = rpcProviderService;
+        this.upgradeState = requireNonNull(upgradeState, "UpgradeState cannot be null!");
     }
 
+    @PostConstruct
     public void start() {
         registration = reconciliationManager.registerService(this);
         LOG.info("ArbitratorReconciliationManager has started successfully.");
     }
 
     @Override
+    @PreDestroy
     public void close() throws Exception {
         executor.shutdown();
         if (registration != null) {
@@ -130,72 +154,77 @@ public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileS
 
     @Override
     public ListenableFuture<RpcResult<CommitActiveBundleOutput>> commitActiveBundle(
-            CommitActiveBundleInput input) {
-        BigInteger nodeId = input.getNodeId();
+            final CommitActiveBundleInput input) {
+        Uint64 nodeId = input.getNodeId();
         if (bundleIdMap.containsKey(nodeId)) {
             BundleId bundleId = bundleIdMap.get(nodeId).getBundleId();
             if (bundleId != null) {
                 final ControlBundleInput commitBundleInput = new ControlBundleInputBuilder()
-                        .setNode(input.getNode()).setBundleId(bundleId)
+                        .setNode(input.getNode())
+                        .setBundleId(bundleId)
                         .setFlags(BUNDLE_FLAGS)
-                        .setType(BundleControlType.ONFBCTCOMMITREQUEST).build();
+                        .setType(BundleControlType.ONFBCTCOMMITREQUEST)
+                        .build();
                 ListenableFuture<RpcResult<ControlBundleOutput>> rpcResult = salBundleService
                         .controlBundle(commitBundleInput);
                 bundleIdMap.put(nodeId, new BundleDetails(bundleId, rpcResult));
-                Futures.addCallback(rpcResult, new CommitActiveBundleCallback(nodeId),
+
+                Futures.addCallback(rpcResult,
+                        new CommitActiveBundleCallback(nodeId),
                         MoreExecutors.directExecutor());
                 return Futures.transform(
                         rpcResult,
-                        this.<ControlBundleOutput>createRpcResultCondenser("committed active bundle"),
+                        createRpcResultCondenser("committed active bundle"),
                         MoreExecutors.directExecutor());
             }
         }
-        return RpcResultBuilder.success((new CommitActiveBundleOutputBuilder()
-                .setResult(null).build()))
-                .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION,
-                null, "No active bundle found for the node" + nodeId.toString()))).buildFuture();
+        return RpcResultBuilder.success(new CommitActiveBundleOutputBuilder()
+                .setResult(null).build())
+                .withRpcErrors(List.of(RpcResultBuilder.newError(ErrorType.APPLICATION,
+                        null, "No active bundle found for the node" + nodeId))).buildFuture();
     }
 
     @Override
-    public ListenableFuture<RpcResult<GetActiveBundleOutput>> getActiveBundle(GetActiveBundleInput input) {
-        BigInteger nodeId = input.getNodeId();
+    public ListenableFuture<RpcResult<GetActiveBundleOutput>> getActiveBundle(final GetActiveBundleInput input) {
+        Uint64 nodeId = input.getNodeId();
         BundleDetails bundleDetails = bundleIdMap.get(nodeId);
         if (bundleDetails != null) {
             try {
                 //This blocking call is used to prevent the applications from pushing flows and groups via the default
                 // pipeline when the commit bundle is ongoing.
                 bundleDetails.getResult().get();
-                return RpcResultBuilder.success((new GetActiveBundleOutputBuilder()
-                        .setResult(bundleDetails.getBundleId()).build())).buildFuture();
-            } catch (InterruptedException | ExecutionException | NullPointerException e) {
+                return RpcResultBuilder.success(new GetActiveBundleOutputBuilder()
+                        .setResult(bundleDetails.getBundleId())
+                        .build())
+                        .buildFuture();
+            } catch (InterruptedException | ExecutionException e) {
                 return RpcResultBuilder.<GetActiveBundleOutput>failed()
-                        .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION,
+                        .withRpcErrors(List.of(RpcResultBuilder.newError(ErrorType.APPLICATION,
                                 null, e.getMessage()))).buildFuture();
             }
         }
-        return RpcResultBuilder.success((new GetActiveBundleOutputBuilder()
-                .setResult(null).build())).buildFuture();
+        return RpcResultBuilder.success(new GetActiveBundleOutputBuilder()
+                .setResult(null).build()).buildFuture();
     }
 
     @Override
-    public ListenableFuture<Boolean> startReconciliation(DeviceInfo node) {
+    public ListenableFuture<Boolean> startReconciliation(final DeviceInfo node) {
         registerRpc(node);
         if (upgradeState.isUpgradeInProgress()) {
             LOG.trace("Starting arbitrator reconciliation for node {}", node.getDatapathId());
             return reconcileConfiguration(node);
         }
         LOG.trace("arbitrator reconciliation is disabled");
-        return Futures.immediateFuture(true);
+        return FluentFutures.immediateTrueFluentFuture();
     }
 
     @Override
-    public ListenableFuture<Boolean> endReconciliation(DeviceInfo node) {
-        LOG.trace("Stopping arbitrator reconciliation for node {}", node.getDatapathId());
-        InstanceIdentifier<FlowCapableNode> connectedNode = node.getNodeInstanceIdentifier()
-                .augmentation(FlowCapableNode.class);
-        bundleIdMap.remove(connectedNode);
+    public ListenableFuture<Boolean> endReconciliation(final DeviceInfo node) {
+        Uint64 datapathId = node.getDatapathId();
+        LOG.trace("Stopping arbitrator reconciliation for node {}", datapathId);
+        bundleIdMap.remove(datapathId);
         deregisterRpc(node);
-        return Futures.immediateFuture(true);
+        return FluentFutures.immediateTrueFluentFuture();
     }
 
     @Override
@@ -213,40 +242,24 @@ public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileS
         return ResultState.DONOTHING;
     }
 
-    private ListenableFuture<Boolean> reconcileConfiguration(DeviceInfo node) {
+    private ListenableFuture<Boolean> reconcileConfiguration(final DeviceInfo node) {
         LOG.info("Triggering arbitrator reconciliation for device {}", node.getDatapathId());
         ArbitratorReconciliationTask upgradeReconTask = new ArbitratorReconciliationTask(node);
-        return JdkFutureAdapters.listenInPoolThread(executor.submit(upgradeReconTask));
+        return executor.submit(upgradeReconTask);
     }
 
-    private Messages createMessages(final NodeRef nodeRef) {
+    private static Messages createMessages(final NodeRef nodeRef) {
         final List<Message> messages = new ArrayList<>();
-        messages.add(new MessageBuilder().setNode(nodeRef).setBundleInnerMessage(
-                new BundleRemoveFlowCaseBuilder()
-                .setRemoveFlowCaseData(new RemoveFlowCaseDataBuilder(getDeleteAllFlow()).build())
-                .build()).build());
-
-        messages.add(new MessageBuilder().setNode(nodeRef).setBundleInnerMessage(
-                new BundleRemoveGroupCaseBuilder()
-                .setRemoveGroupCaseData(new RemoveGroupCaseDataBuilder(getDeleteAllGroup()).build())
-                .build()).build());
+        messages.add(new MessageBuilder()
+                .setNode(nodeRef)
+                .setBundleInnerMessage(DELETE_ALL_FLOW).build());
+        messages.add(new MessageBuilder()
+                .setNode(nodeRef)
+                .setBundleInnerMessage(DELETE_ALL_GROUP).build());
         LOG.debug("The size of the flows and group messages created in createMessage() {}", messages.size());
         return new MessagesBuilder().setMessage(messages).build();
     }
 
-    private Flow getDeleteAllFlow() {
-        final FlowBuilder flowBuilder = new FlowBuilder();
-        flowBuilder.setTableId(OFConstants.OFPTT_ALL);
-        return flowBuilder.build();
-    }
-
-    private Group getDeleteAllGroup() {
-        final GroupBuilder groupBuilder = new GroupBuilder();
-        groupBuilder.setGroupType(GroupTypes.GroupAll);
-        groupBuilder.setGroupId(new GroupId(OFConstants.OFPG_ALL));
-        return groupBuilder.build();
-    }
-
     private class ArbitratorReconciliationTask implements Callable<Boolean> {
         final DeviceInfo deviceInfo;
 
@@ -258,22 +271,31 @@ public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileS
         public Boolean call() {
             InstanceIdentifier<FlowCapableNode> nodeIdentity = deviceInfo.getNodeInstanceIdentifier()
                     .augmentation(FlowCapableNode.class);
-            String node = nodeIdentity.firstKeyOf(Node.class, NodeKey.class).getId().getValue();
-            BundleId bundleIdValue = new BundleId(BUNDLE_ID.getAndIncrement());
+            String node = nodeIdentity.firstKeyOf(Node.class).getId().getValue();
+            BundleId bundleIdValue = new BundleId(Uint32.valueOf(BUNDLE_ID.getAndIncrement()));
             LOG.debug("Triggering arbitrator reconciliation for device :{}", node);
             final NodeRef nodeRef = new NodeRef(nodeIdentity.firstIdentifierOf(Node.class));
 
-            final ControlBundleInput closeBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
-                    .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
-                    .setType(BundleControlType.ONFBCTCLOSEREQUEST).build();
+            final ControlBundleInput closeBundleInput = new ControlBundleInputBuilder()
+                    .setNode(nodeRef)
+                    .setBundleId(bundleIdValue)
+                    .setFlags(BUNDLE_FLAGS)
+                    .setType(BundleControlType.ONFBCTCLOSEREQUEST)
+                    .build();
 
-            final ControlBundleInput openBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
-                    .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
-                    .setType(BundleControlType.ONFBCTOPENREQUEST).build();
+            final ControlBundleInput openBundleInput = new ControlBundleInputBuilder()
+                    .setNode(nodeRef)
+                    .setBundleId(bundleIdValue)
+                    .setFlags(BUNDLE_FLAGS)
+                    .setType(BundleControlType.ONFBCTOPENREQUEST)
+                    .build();
 
             final AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
-                    .setNode(nodeRef).setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
-                    .setMessages(createMessages(nodeRef)).build();
+                    .setNode(nodeRef)
+                    .setBundleId(bundleIdValue)
+                    .setFlags(BUNDLE_FLAGS)
+                    .setMessages(createMessages(nodeRef))
+                    .build();
 
             ListenableFuture<RpcResult<ControlBundleOutput>> closeBundle = salBundleService
                     .controlBundle(closeBundleInput);
@@ -288,15 +310,13 @@ public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileS
                             return salBundleService
                                     .addBundleMessages(addBundleMessagesInput);
                         }
-                        return Futures.immediateFuture(null);
+                        return FluentFutures.immediateNullFluentFuture();
                     }, MoreExecutors.directExecutor());
-            BigInteger nodeId = getDpnIdFromNodeName(node);
+            Uint64 nodeId = getDpnIdFromNodeName(node);
             try {
                 if (addBundleMessagesFuture.get().isSuccessful()) {
-                    bundleIdMap.put(nodeId, new BundleDetails(bundleIdValue,
-                            Futures.immediateFuture(null)));
-                    LOG.debug("Arbitrator reconciliation initial task has been completed for node {} and open up"
-                            + " for application programming.", nodeId);
+                    bundleIdMap.put(nodeId, new BundleDetails(bundleIdValue,FluentFutures.immediateNullFluentFuture()));
+                    LOG.debug("Arbitrator reconciliation initial task has been completed for node {} ", nodeId);
                     return true;
                 } else {
                     LOG.error("Error while performing arbitrator reconciliation for device:{}", nodeId);
@@ -310,26 +330,25 @@ public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileS
     }
 
     public final class CommitActiveBundleCallback implements FutureCallback<RpcResult<?>> {
-        private final BigInteger nodeId;
+        private final Uint64 nodeId;
 
-        private CommitActiveBundleCallback(final BigInteger nodeId) {
+        private CommitActiveBundleCallback(final Uint64 nodeId) {
             this.nodeId = nodeId;
         }
 
         @Override
-        public void onSuccess(RpcResult<?> rpcResult) {
+        public void onSuccess(final RpcResult<?> rpcResult) {
             LOG.debug("Completed arbitrator reconciliation for device:{}", nodeId);
             bundleIdMap.remove(nodeId);
         }
 
         @Override
-        public void onFailure(Throwable throwable) {
-            LOG.error("Error while performing arbitrator reconciliation for device {}",
-                    nodeId, throwable);
+        public void onFailure(final Throwable throwable) {
+            LOG.error("Error while performing arbitrator reconciliation for device {}", nodeId, throwable);
         }
     }
 
-    private <D> Function<RpcResult<D>,
+    private static <D> Function<RpcResult<D>,
             RpcResult<CommitActiveBundleOutput>> createRpcResultCondenser(final String action) {
         return input -> {
             final RpcResultBuilder<CommitActiveBundleOutput> resultSink;
@@ -343,31 +362,37 @@ public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileS
                 }
             } else {
                 resultSink = RpcResultBuilder.<CommitActiveBundleOutput>failed()
-                        .withError(RpcError.ErrorType.APPLICATION, "action of " + action + " failed");
+                        .withError(ErrorType.APPLICATION, "action of " + action + " failed");
             }
             return resultSink.build();
         };
     }
 
-    private void registerRpc(DeviceInfo node) {
+    private void registerRpc(final DeviceInfo node) {
         KeyedInstanceIdentifier<Node, NodeKey> path = InstanceIdentifier.create(Nodes.class)
                 .child(Node.class, new NodeKey(node.getNodeId()));
         LOG.debug("The path is registered : {}", path);
-        routedRpcReg.registerPath(NodeContext.class, path);
+        ObjectRegistration<? extends RpcService> rpcRegistration =
+                rpcProviderService.registerRpcImplementation(ArbitratorReconcileService.class, this, Set.of(path));
+        rpcRegistrations.put(node.getNodeId().getValue(), rpcRegistration);
     }
 
-    private void deregisterRpc(DeviceInfo node) {
-        KeyedInstanceIdentifier<Node, NodeKey> path = InstanceIdentifier.create(Nodes.class).child(Node.class,
-                new NodeKey(node.getNodeId()));
+    private void deregisterRpc(final DeviceInfo node) {
+        KeyedInstanceIdentifier<Node, NodeKey> path = InstanceIdentifier.create(Nodes.class)
+                .child(Node.class, new NodeKey(node.getNodeId()));
         LOG.debug("The path is unregistered : {}", path);
-        routedRpcReg.unregisterPath(NodeContext.class, path);
+        ObjectRegistration<? extends RpcService> rpcRegistration = rpcRegistrations.get(node.getNodeId().getValue());
+        if (rpcRegistration != null) {
+            rpcRegistration.close();
+            rpcRegistrations.remove(node.getNodeId().getValue());
+        }
     }
 
     private static class BundleDetails {
         private final BundleId bundleId;
         private final ListenableFuture<RpcResult<ControlBundleOutput>> result;
 
-        BundleDetails(BundleId bundleId, ListenableFuture<RpcResult<ControlBundleOutput>> result) {
+        BundleDetails(final BundleId bundleId, final ListenableFuture<RpcResult<ControlBundleOutput>> result) {
             this.bundleId = bundleId;
             this.result = result;
         }
@@ -381,9 +406,8 @@ public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileS
         }
     }
 
-    private BigInteger getDpnIdFromNodeName(String nodeName) {
+    private static Uint64 getDpnIdFromNodeName(final String nodeName) {
         String dpnId = nodeName.substring(nodeName.lastIndexOf(SEPARATOR) + 1);
-        return new BigInteger(dpnId);
+        return Uint64.valueOf(dpnId);
     }
-
 }