X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2FRpcRegistry.java;h=2c89f1426072a6e945de5ed107fd3b07a53ef7df;hp=805e47a0dd05e7b52082f705f523b8870cd5bb02;hb=73b088ee110766618a8728eed653b15cef896cf1;hpb=5b66dd8f5e3467a07e77b20fe696b29993ce5565 diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java index 805e47a0dd..2c89f14260 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java @@ -7,10 +7,13 @@ */ package org.opendaylight.controller.remote.rpc.registry; +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + import akka.actor.ActorRef; import akka.actor.Address; import akka.actor.Props; -import com.google.common.base.Preconditions; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -21,13 +24,14 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Optional; import java.util.Set; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier; -import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; +import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes; -import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints; import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; +import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess; import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor; +import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl; +import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier; /** * Registry to look up cluster nodes that have registered for a given RPC. @@ -38,10 +42,12 @@ import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor; */ public class RpcRegistry extends BucketStoreActor { private final ActorRef rpcRegistrar; + private RemoteRpcRegistryMXBeanImpl mxBean; - public RpcRegistry(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) { + public RpcRegistry(final RemoteOpsProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) { super(config, config.getRpcRegistryPersistenceId(), new RoutingTable(rpcInvoker, ImmutableSet.of())); - this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar); + this.rpcRegistrar = requireNonNull(rpcRegistrar); + } /** @@ -52,11 +58,27 @@ public class RpcRegistry extends BucketStoreActor { * @param rpcInvoker Actor handling RPC invocation requests from remote nodes * @return A new {@link Props} instance */ - public static Props props(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, - final ActorRef rpcRegistrar) { + public static Props props(final RemoteOpsProviderConfig config, final ActorRef rpcInvoker, + final ActorRef rpcRegistrar) { return Props.create(RpcRegistry.class, config, rpcInvoker, rpcRegistrar); } + @Override + public void preStart() { + super.preStart(); + mxBean = new RemoteRpcRegistryMXBeanImpl(new BucketStoreAccess(self(), getContext().dispatcher(), + getConfig().getAskDuration()), getConfig().getAskDuration()); + } + + @Override + public void postStop() throws Exception { + if (mxBean != null) { + mxBean.unregister(); + mxBean = null; + } + super.postStop(); + } + @Override protected void handleCommand(final Object message) throws Exception { if (message instanceof AddOrUpdateRoutes) { @@ -85,7 +107,8 @@ public class RpcRegistry extends BucketStoreActor { @Override protected void onBucketRemoved(final Address address, final Bucket bucket) { - rpcRegistrar.tell(new UpdateRemoteEndpoints(ImmutableMap.of(address, Optional.empty())), ActorRef.noSender()); + rpcRegistrar.tell(new Messages.UpdateRemoteEndpoints(ImmutableMap.of(address, Optional.empty())), + ActorRef.noSender()); } @Override @@ -95,13 +118,13 @@ public class RpcRegistry extends BucketStoreActor { for (Entry> e : buckets.entrySet()) { final RoutingTable table = e.getValue().getData(); - final Collection rpcs = table.getRoutes(); + final Collection rpcs = table.getItems(); endpoints.put(e.getKey(), rpcs.isEmpty() ? Optional.empty() - : Optional.of(new RemoteRpcEndpoint(table.getRpcInvoker(), rpcs))); + : Optional.of(new RemoteRpcEndpoint(table.getInvoker(), rpcs))); } if (!endpoints.isEmpty()) { - rpcRegistrar.tell(new UpdateRemoteEndpoints(endpoints), ActorRef.noSender()); + rpcRegistrar.tell(new Messages.UpdateRemoteEndpoints(endpoints), ActorRef.noSender()); } } @@ -109,8 +132,9 @@ public class RpcRegistry extends BucketStoreActor { private final Set rpcs; private final ActorRef router; - RemoteRpcEndpoint(final ActorRef router, final Collection rpcs) { - this.router = Preconditions.checkNotNull(router); + @VisibleForTesting + public RemoteRpcEndpoint(final ActorRef router, final Collection rpcs) { + this.router = requireNonNull(router); this.rpcs = ImmutableSet.copyOf(rpcs); } @@ -128,45 +152,48 @@ public class RpcRegistry extends BucketStoreActor { */ public static class Messages { abstract static class AbstractRouteMessage { - final List routeIdentifiers; + final List rpcRouteIdentifiers; - AbstractRouteMessage(final Collection routeIdentifiers) { - Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(), + AbstractRouteMessage(final Collection rpcRouteIdentifiers) { + checkArgument(rpcRouteIdentifiers != null && !rpcRouteIdentifiers.isEmpty(), "Route Identifiers must be supplied"); - this.routeIdentifiers = ImmutableList.copyOf(routeIdentifiers); + this.rpcRouteIdentifiers = ImmutableList.copyOf(rpcRouteIdentifiers); } List getRouteIdentifiers() { - return this.routeIdentifiers; + return this.rpcRouteIdentifiers; } @Override public String toString() { - return "ContainsRoute{" + "routeIdentifiers=" + routeIdentifiers + '}'; + return "ContainsRoute{" + "routeIdentifiers=" + rpcRouteIdentifiers + '}'; } } - public static final class AddOrUpdateRoutes extends AbstractRouteMessage { - public AddOrUpdateRoutes(final Collection routeIdentifiers) { - super(routeIdentifiers); + public static final class AddOrUpdateRoutes extends Messages.AbstractRouteMessage { + public AddOrUpdateRoutes(final Collection rpcRouteIdentifiers) { + super(rpcRouteIdentifiers); } + } public static final class RemoveRoutes extends AbstractRouteMessage { - public RemoveRoutes(final Collection routeIdentifiers) { - super(routeIdentifiers); + public RemoveRoutes(final Collection rpcRouteIdentifiers) { + super(rpcRouteIdentifiers); } } public static final class UpdateRemoteEndpoints { - private final Map> endpoints; + private final Map> rpcEndpoints; + - UpdateRemoteEndpoints(final Map> endpoints) { - this.endpoints = ImmutableMap.copyOf(endpoints); + @VisibleForTesting + public UpdateRemoteEndpoints(final Map> rpcEndpoints) { + this.rpcEndpoints = ImmutableMap.copyOf(rpcEndpoints); } - public Map> getEndpoints() { - return endpoints; + public Map> getRpcEndpoints() { + return rpcEndpoints; } } }