import akka.actor.ActorRef;
import akka.actor.Address;
import akka.actor.Props;
-import akka.actor.UntypedActor;
import akka.dispatch.Mapper;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.Pair;
import akka.pattern.Patterns;
import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
-import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
import scala.concurrent.Future;
import java.util.Map;
import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
+import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
* It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
* cluster wide information.
*/
-public class RpcRegistry extends UntypedActor {
+public class RpcRegistry extends AbstractUntypedActorWithMetering {
final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
*/
private ActorRef localRouter;
+ private RemoteRpcProviderConfig config;
+
public RpcRegistry() {
bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store");
-
+ this.config = new RemoteRpcProviderConfig(getContext().system().settings().config());
log.info("Bucket store path = {}", bucketStore.path().toString());
}
this.bucketStore = bucketStore;
}
- @Override
- public void onReceive(Object message) throws Exception {
-
- log.debug("Received message: message [{}]", message);
+ @Override
+ protected void handleReceive(Object message) throws Exception {
//TODO: if sender is remote, reject message
if (message instanceof SetLocalRouter)
Preconditions.checkState(localRouter != null, "Router must be set first");
- Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), ActorUtil.ASK_DURATION.toMillis());
+ Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), config.getAskDuration());
futureReply.map(getMapperToAddRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
}
*/
private void receiveRemoveRoutes(RemoveRoutes msg) {
- Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), ActorUtil.ASK_DURATION.toMillis());
+ Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), config.getAskDuration());
futureReply.map(getMapperToRemoveRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
}
private void receiveGetRouter(FindRouters msg) {
final ActorRef sender = getSender();
- Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), ActorUtil.ASK_DURATION.toMillis());
+ Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), config.getAskDuration());
futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher());
}
* @param routeId
* @return
*/
- private Messages.FindRoutersReply createReplyWithRouters(Map<Address, Bucket> buckets, RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
+ private Messages.FindRoutersReply createReplyWithRouters(
+ Map<Address, Bucket> buckets, RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
List<Pair<ActorRef, Long>> routers = new ArrayList<>();
Option<Pair<ActorRef, Long>> routerWithUpdateTime = null;
* @param sender client who asked to find the routers.
* @return
*/
- private Mapper<Object, Void> getMapperToGetRouter(final RpcRouter.RouteIdentifier<?, ?, ?> routeId, final ActorRef sender) {
+ private Mapper<Object, Void> getMapperToGetRouter(
+ final RpcRouter.RouteIdentifier<?, ?, ?> routeId, final ActorRef sender) {
return new Mapper<Object, Void>() {
@Override
public Void apply(Object replyMessage) {