-public class RpcRegistry extends UntypedActor {
-
- final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
-
- /**
- * Store to keep the registry. Bucket store sync's it across nodes in the cluster
- */
- private ActorRef bucketStore;
-
- /**
- * Rpc broker that would use the registry to route requests.
- */
- private ActorRef localRouter;
-
- public RpcRegistry() {
- bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store");
- }
-
- public RpcRegistry(ActorRef bucketStore) {
- this.bucketStore = bucketStore;
- }
-
- @Override
- public void onReceive(Object message) throws Exception {
-
- log.debug("Received message: message [{}]", message);
-
- //TODO: if sender is remote, reject message
-
- if (message instanceof SetLocalRouter)
- receiveSetLocalRouter((SetLocalRouter) message);
-
- if (message instanceof AddOrUpdateRoute)
- receiveAddRoute((AddOrUpdateRoute) message);
-
- else if (message instanceof RemoveRoute)
- receiveRemoveRoute((RemoveRoute) message);
-
- else if (message instanceof Messages.FindRouters)
- receiveGetRouter((Messages.FindRouters) message);
-
- else
- unhandled(message);
+public class RpcRegistry extends BucketStoreActor<RoutingTable> {
+ private final ActorRef rpcRegistrar;
+ private final RemoteRpcRegistryMXBeanImpl mxBean;
+
+ public RpcRegistry(final RemoteOpsProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) {
+ super(config, config.getRpcRegistryPersistenceId(), new RoutingTable(rpcInvoker, ImmutableSet.of()));
+ this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar);
+ this.mxBean = new RemoteRpcRegistryMXBeanImpl(new BucketStoreAccess(self(), getContext().dispatcher(),
+ config.getAskDuration()), config.getAskDuration());