X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2FRpcRegistry.java;h=095d70926b90d3838a777cc14a6976b31ccd9c97;hb=dd281c0e33267296ad3babbffd03e1122cdb127e;hp=5109d316446b13158e3739824e653e0259929135;hpb=351a78c9840c5b98a478b91ffd50befad998eb0e;p=controller.git diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java index 5109d31644..095d70926b 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java @@ -10,7 +10,6 @@ package org.opendaylight.controller.remote.rpc.registry; import akka.actor.ActorRef; import akka.actor.Address; import akka.actor.Props; -import akka.actor.UntypedActor; import akka.dispatch.Mapper; import akka.event.Logging; import akka.event.LoggingAdapter; @@ -18,9 +17,10 @@ import akka.japi.Option; import akka.japi.Pair; import akka.pattern.Patterns; import com.google.common.base.Preconditions; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering; +import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore; -import org.opendaylight.controller.remote.rpc.utils.ActorUtil; import org.opendaylight.controller.sal.connector.api.RpcRouter; import scala.concurrent.Future; @@ -30,9 +30,9 @@ import java.util.List; import java.util.Map; import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes; +import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters; import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes; import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter; -import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters; import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets; import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply; import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket; @@ -45,7 +45,7 @@ import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.Bu * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this * cluster wide information. */ -public class RpcRegistry extends UntypedActor { +public class RpcRegistry extends AbstractUntypedActorWithMetering { final LoggingAdapter log = Logging.getLogger(getContext().system(), this); @@ -59,9 +59,11 @@ public class RpcRegistry extends UntypedActor { */ private ActorRef localRouter; + private RemoteRpcProviderConfig config; + public RpcRegistry() { bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store"); - + this.config = new RemoteRpcProviderConfig(getContext().system().settings().config()); log.info("Bucket store path = {}", bucketStore.path().toString()); } @@ -69,11 +71,9 @@ public class RpcRegistry extends UntypedActor { this.bucketStore = bucketStore; } - @Override - public void onReceive(Object message) throws Exception { - - log.debug("Received message: message [{}]", message); + @Override + protected void handleReceive(Object message) throws Exception { //TODO: if sender is remote, reject message if (message instanceof SetLocalRouter) @@ -108,7 +108,7 @@ public class RpcRegistry extends UntypedActor { Preconditions.checkState(localRouter != null, "Router must be set first"); - Future futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), ActorUtil.ASK_DURATION.toMillis()); + Future futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), config.getAskDuration()); futureReply.map(getMapperToAddRoutes(msg.getRouteIdentifiers()), getContext().dispatcher()); } @@ -117,7 +117,7 @@ public class RpcRegistry extends UntypedActor { */ private void receiveRemoveRoutes(RemoveRoutes msg) { - Future futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), ActorUtil.ASK_DURATION.toMillis()); + Future futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), config.getAskDuration()); futureReply.map(getMapperToRemoveRoutes(msg.getRouteIdentifiers()), getContext().dispatcher()); } @@ -130,7 +130,7 @@ public class RpcRegistry extends UntypedActor { private void receiveGetRouter(FindRouters msg) { final ActorRef sender = getSender(); - Future futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), ActorUtil.ASK_DURATION.toMillis()); + Future futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), config.getAskDuration()); futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher()); } @@ -151,7 +151,8 @@ public class RpcRegistry extends UntypedActor { * @param routeId * @return */ - private Messages.FindRoutersReply createReplyWithRouters(Map buckets, RpcRouter.RouteIdentifier routeId) { + private Messages.FindRoutersReply createReplyWithRouters( + Map buckets, RpcRouter.RouteIdentifier routeId) { List> routers = new ArrayList<>(); Option> routerWithUpdateTime = null; @@ -184,7 +185,8 @@ public class RpcRegistry extends UntypedActor { * @param sender client who asked to find the routers. * @return */ - private Mapper getMapperToGetRouter(final RpcRouter.RouteIdentifier routeId, final ActorRef sender) { + private Mapper getMapperToGetRouter( + final RpcRouter.RouteIdentifier routeId, final ActorRef sender) { return new Mapper() { @Override public Void apply(Object replyMessage) {