/*
* Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
package org.opendaylight.controller.remote.rpc.registry;
import akka.actor.ActorRef;
import akka.actor.Address;
import akka.actor.Props;
import akka.dispatch.Mapper;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.Option;
import akka.japi.Pair;
import akka.pattern.Patterns;
import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
import scala.concurrent.Future;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
/**
* Registry to look up cluster nodes that have registered for a given rpc.
*
* It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
* cluster wide information.
*/
public class RpcRegistry extends AbstractUntypedActorWithMetering {
final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
/**
* Store to keep the registry. Bucket store sync's it across nodes in the cluster
*/
private ActorRef bucketStore;
/**
* Rpc broker that would use the registry to route requests.
*/
private ActorRef localRouter;
private RemoteRpcProviderConfig config;
public RpcRegistry() {
bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store");
this.config = new RemoteRpcProviderConfig(getContext().system().settings().config());
log.info("Bucket store path = {}", bucketStore.path().toString());
}
public RpcRegistry(ActorRef bucketStore) {
this.bucketStore = bucketStore;
}
@Override
protected void handleReceive(Object message) throws Exception {
//TODO: if sender is remote, reject message
if (message instanceof SetLocalRouter)
receiveSetLocalRouter((SetLocalRouter) message);
if (message instanceof AddOrUpdateRoutes)
receiveAddRoutes((AddOrUpdateRoutes) message);
else if (message instanceof RemoveRoutes)
receiveRemoveRoutes((RemoveRoutes) message);
else if (message instanceof Messages.FindRouters)
receiveGetRouter((FindRouters) message);
else
unhandled(message);
}
/**
* Register's rpc broker
*
* @param message contains {@link akka.actor.ActorRef} for rpc broker
*/
private void receiveSetLocalRouter(SetLocalRouter message) {
localRouter = message.getRouter();
}
/**
* @param msg
*/
private void receiveAddRoutes(AddOrUpdateRoutes msg) {
Preconditions.checkState(localRouter != null, "Router must be set first");
Future