import akka.actor.ActorRef;
import akka.actor.Cancellable;
import akka.actor.Props;
-import akka.japi.Creator;
import akka.japi.Option;
import akka.japi.Pair;
import com.google.common.base.Preconditions;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
-import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBean;
-import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
import scala.concurrent.duration.FiniteDuration;
/**
* Registry to look up cluster nodes that have registered for a given rpc.
- * <p/>
+ *
+ * <p>
* It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
* cluster wide information.
*/
}
public static Props props(RemoteRpcProviderConfig config) {
- return Props.create(new RpcRegistryCreator(config));
+ return Props.create(RpcRegistry.class, config);
}
@Override
}
/**
- * Register's rpc broker
+ * Registers a rpc broker.
*
* @param message contains {@link akka.actor.ActorRef} for rpc broker
*/
getLocalBucket().getData().setRouter(message.getRouter());
}
- /**
- * @param msg
- */
private void receiveAddRoutes(AddOrUpdateRoutes msg) {
log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
RoutingTable table = getLocalBucket().getData().copy();
- for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
+ for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
table.addRoute(routeId);
}
}
/**
+ * Processes a RemoveRoutes message.
+ *
* @param msg contains list of route ids to remove
*/
private void receiveRemoveRoutes(RemoveRoutes msg) {
/**
* Finds routers for the given rpc.
*
- * @param findRouters
+ * @param findRouters the FindRouters request
*/
private void receiveGetRouter(final FindRouters findRouters) {
log.debug("receiveGetRouter for {}", findRouters.getRouteIdentifier());
final ActorRef sender = getSender();
- if(!findRouters(findRouters, sender)) {
+ if (!findRouters(findRouters, sender)) {
log.debug("No routers found for {} - scheduling {} ms timer", findRouters.getRouteIdentifier(),
findRouterTimeout.toMillis());
final Runnable routesUpdatedRunnable = new Runnable() {
@Override
public void run() {
- if(findRouters(findRouters, sender)) {
+ if (findRouters(findRouters, sender)) {
routesUpdatedCallbacks.remove(this);
timer.get().cancel();
}
routesUpdatedCallbacks.add(routesUpdatedRunnable);
- Runnable timerRunnable = new Runnable() {
- @Override
- public void run() {
- log.warn("Timed out finding routers for {}", findRouters.getRouteIdentifier());
+ Runnable timerRunnable = () -> {
+ log.warn("Timed out finding routers for {}", findRouters.getRouteIdentifier());
- routesUpdatedCallbacks.remove(routesUpdatedRunnable);
- sender.tell(new Messages.FindRoutersReply(
- Collections.<Pair<ActorRef, Long>>emptyList()), self());
- }
+ routesUpdatedCallbacks.remove(routesUpdatedRunnable);
+ sender.tell(new Messages.FindRoutersReply(
+ Collections.<Pair<ActorRef, Long>>emptyList()), self());
};
timer.set(getContext().system().scheduler().scheduleOnce(findRouterTimeout, self(), timerRunnable,
RouteIdentifier<?, ?, ?> routeId = findRouters.getRouteIdentifier();
findRoutes(getLocalBucket().getData(), routeId, routers);
- for(Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
+ for (Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
findRoutes(bucket.getData(), routeId, routers);
}
log.debug("Found {} routers for {}", routers.size(), findRouters.getRouteIdentifier());
boolean foundRouters = !routers.isEmpty();
- if(foundRouters) {
+ if (foundRouters) {
sender.tell(new Messages.FindRoutersReply(routers), getSelf());
}
}
Option<Pair<ActorRef, Long>> routerWithUpdateTime = table.getRouterFor(routeId);
- if(!routerWithUpdateTime.isEmpty()) {
+ if (!routerWithUpdateTime.isEmpty()) {
routers.add(routerWithUpdateTime.get());
}
}
@Override
protected void onBucketsUpdated() {
- for(Runnable callBack: routesUpdatedCallbacks) {
+ if (routesUpdatedCallbacks.isEmpty()) {
+ return;
+ }
+
+ for (Runnable callBack: routesUpdatedCallbacks.toArray(new Runnable[routesUpdatedCallbacks.size()])) {
callBack.run();
}
}
/**
- * All messages used by the RpcRegistry
+ * All messages used by the RpcRegistry.
*/
public static class Messages {
final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
public ContainsRoute(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
- Preconditions.checkArgument(routeIdentifiers != null &&
- !routeIdentifiers.isEmpty(),
- "Route Identifiers must be supplied");
+ Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(),
+ "Route Identifiers must be supplied");
this.routeIdentifiers = routeIdentifiers;
}
@Override
public String toString() {
- return "ContainsRoute{" +
- "routeIdentifiers=" + routeIdentifiers +
- '}';
+ return "ContainsRoute{" + "routeIdentifiers=" + routeIdentifiers + '}';
}
}
@Override
public String toString() {
- return "SetLocalRouter{" +
- "router=" + router +
- '}';
+ return "SetLocalRouter{" + "router=" + router + '}';
}
}
@Override
public String toString() {
- return "FindRouters{" +
- "routeIdentifier=" + routeIdentifier +
- '}';
+ return "FindRouters{" + "routeIdentifier=" + routeIdentifier + '}';
}
}
@Override
public String toString() {
- return "FindRoutersReply{" +
- "routerWithUpdateTime=" + routerWithUpdateTime +
- '}';
+ return "FindRoutersReply{" + "routerWithUpdateTime=" + routerWithUpdateTime + '}';
}
}
}
-
- private static class RpcRegistryCreator implements Creator<RpcRegistry> {
- private static final long serialVersionUID = 1L;
- private final RemoteRpcProviderConfig config;
-
- private RpcRegistryCreator(RemoteRpcProviderConfig config) {
- this.config = config;
- }
-
- @Override
- public RpcRegistry create() throws Exception {
- RpcRegistry registry = new RpcRegistry(config);
- RemoteRpcRegistryMXBean mxBean = new RemoteRpcRegistryMXBeanImpl(registry);
- return registry;
- }
- }
}