package org.opendaylight.controller.remote.rpc.registry.mbeans;
import akka.actor.Address;
+import akka.util.Timeout;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
import org.opendaylight.controller.remote.rpc.registry.RoutingTable;
-import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements RemoteRpcRegistryMXBean {
private static final String NAME_CONSTANT = " | name:";
- private final RpcRegistry rpcRegistry;
+ private final BucketStoreAccess rpcRegistryAccess;
+ private final Timeout timeout;
- public RemoteRpcRegistryMXBeanImpl(final RpcRegistry rpcRegistry) {
+ public RemoteRpcRegistryMXBeanImpl(final BucketStoreAccess rpcRegistryAccess, Timeout timeout) {
super("RemoteRpcRegistry", "RemoteRpcBroker", null);
- this.rpcRegistry = rpcRegistry;
+ this.rpcRegistryAccess = rpcRegistryAccess;
+ this.timeout = timeout;
registerMBean();
}
+ @SuppressWarnings({"unchecked", "checkstyle:IllegalCatch", "rawtypes"})
+ private RoutingTable getLocalData() {
+ try {
+ return (RoutingTable) Await.result((Future) rpcRegistryAccess.getLocalData(), timeout.duration());
+ } catch (Exception e) {
+ throw new RuntimeException("getLocalData failed", e);
+ }
+ }
+
+ @SuppressWarnings({"unchecked", "checkstyle:IllegalCatch", "rawtypes"})
+ private Map<Address, Bucket<RoutingTable>> getRemoteBuckets() {
+ try {
+ return (Map<Address, Bucket<RoutingTable>>) Await.result((Future)rpcRegistryAccess.getRemoteBuckets(),
+ timeout.duration());
+ } catch (Exception e) {
+ throw new RuntimeException("getRemoteBuckets failed", e);
+ }
+ }
+
@Override
public Set<String> getGlobalRpc() {
- RoutingTable table = rpcRegistry.getLocalData();
+ RoutingTable table = getLocalData();
Set<String> globalRpc = new HashSet<>(table.getRoutes().size());
for (DOMRpcIdentifier route : table.getRoutes()) {
if (route.getContextReference().isEmpty()) {
@Override
public Set<String> getLocalRegisteredRoutedRpc() {
- RoutingTable table = rpcRegistry.getLocalData();
+ RoutingTable table = getLocalData();
Set<String> routedRpc = new HashSet<>(table.getRoutes().size());
for (DOMRpcIdentifier route : table.getRoutes()) {
if (!route.getContextReference().isEmpty()) {
@Override
public Map<String, String> findRpcByName(final String name) {
- RoutingTable localTable = rpcRegistry.getLocalData();
+ RoutingTable localTable = getLocalData();
// Get all RPCs from local bucket
Map<String, String> rpcMap = new HashMap<>(getRpcMemberMapByName(localTable, name, LOCAL_CONSTANT));
// Get all RPCs from remote bucket
- Map<Address, Bucket<RoutingTable>> buckets = rpcRegistry.getRemoteBuckets();
+ Map<Address, Bucket<RoutingTable>> buckets = getRemoteBuckets();
for (Entry<Address, Bucket<RoutingTable>> entry : buckets.entrySet()) {
RoutingTable table = entry.getValue().getData();
rpcMap.putAll(getRpcMemberMapByName(table, name, entry.getKey().toString()));
@Override
public Map<String, String> findRpcByRoute(final String routeId) {
- RoutingTable localTable = rpcRegistry.getLocalData();
+ RoutingTable localTable = getLocalData();
Map<String, String> rpcMap = new HashMap<>(getRpcMemberMapByRoute(localTable, routeId, LOCAL_CONSTANT));
- Map<Address, Bucket<RoutingTable>> buckets = rpcRegistry.getRemoteBuckets();
+ Map<Address, Bucket<RoutingTable>> buckets = getRemoteBuckets();
for (Entry<Address, Bucket<RoutingTable>> entry : buckets.entrySet()) {
RoutingTable table = entry.getValue().getData();
rpcMap.putAll(getRpcMemberMapByRoute(table, routeId, entry.getKey().toString()));
}
@Override
+ @SuppressWarnings({"unchecked", "checkstyle:IllegalCatch", "rawtypes"})
public String getBucketVersions() {
- return rpcRegistry.getVersions().toString();
+ try {
+ return Await.result((Future)rpcRegistryAccess.getBucketVersions(), timeout.duration()).toString();
+ } catch (Exception e) {
+ throw new RuntimeException("getVersions failed", e);
+ }
}
}