import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
private final ListeningExecutorService executor = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(THREAD_POOL_SIZE));
private final Map<Uint64, BundleDetails> bundleIdMap = new ConcurrentHashMap<>();
- private ObjectRegistration<? extends RpcService> rpcRegistration;
+ private final ConcurrentMap<String,
+ ObjectRegistration<? extends RpcService>> rpcRegistrations = new ConcurrentHashMap<>();
@Inject
public ArbitratorReconciliationManagerImpl(
KeyedInstanceIdentifier<Node, NodeKey> path = InstanceIdentifier.create(Nodes.class)
.child(Node.class, new NodeKey(node.getNodeId()));
LOG.debug("The path is registered : {}", path);
- rpcRegistration = rpcProviderService.registerRpcImplementation(ArbitratorReconcileService.class,
- this, ImmutableSet.of(path));
+ ObjectRegistration<? extends RpcService> rpcRegistration =
+ rpcProviderService.registerRpcImplementation(ArbitratorReconcileService.class,
+ this, ImmutableSet.of(path));
+ rpcRegistrations.put(node.getNodeId().getValue(), rpcRegistration);
}
private void deregisterRpc(DeviceInfo node) {
KeyedInstanceIdentifier<Node, NodeKey> path = InstanceIdentifier.create(Nodes.class)
.child(Node.class, new NodeKey(node.getNodeId()));
LOG.debug("The path is unregistered : {}", path);
+ ObjectRegistration<? extends RpcService> rpcRegistration = rpcRegistrations.get(node.getNodeId().getValue());
if (rpcRegistration != null) {
rpcRegistration.close();
+ rpcRegistrations.remove(node.getNodeId().getValue());
}
}