import akka.actor.ActorRef;
+import com.google.common.base.Preconditions;
import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
-import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc;
-import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-public class RoutedRpcListener implements RouteChangeListener<RpcRoutingContext, InstanceIdentifier>{
+public class RoutedRpcListener implements RouteChangeListener<RpcRoutingContext, YangInstanceIdentifier>{
private static final Logger LOG = LoggerFactory.getLogger(RoutedRpcListener.class);
private final ActorRef rpcRegistry;
- private final String actorPath;
- public RoutedRpcListener(ActorRef rpcRegistry, String actorPath) {
+ public RoutedRpcListener(ActorRef rpcRegistry) {
+ Preconditions.checkNotNull(rpcRegistry, "rpc registry actor should not be null");
+
this.rpcRegistry = rpcRegistry;
- this.actorPath = actorPath;
}
@Override
- public void onRouteChange(RouteChange<RpcRoutingContext, InstanceIdentifier> routeChange) {
- Map<RpcRoutingContext, Set<InstanceIdentifier>> announcements = routeChange.getAnnouncements();
- announce(getRouteIdentifiers(announcements));
+ public void onRouteChange(RouteChange<RpcRoutingContext, YangInstanceIdentifier> routeChange) {
+ Map<RpcRoutingContext, Set<YangInstanceIdentifier>> announcements = routeChange.getAnnouncements();
+ if(announcements != null && announcements.size() > 0){
+ announce(getRouteIdentifiers(announcements));
+ }
- Map<RpcRoutingContext, Set<InstanceIdentifier>> removals = routeChange.getRemovals();
- remove(getRouteIdentifiers(removals));
+ Map<RpcRoutingContext, Set<YangInstanceIdentifier>> removals = routeChange.getRemovals();
+ if(removals != null && removals.size() > 0 ) {
+ remove(getRouteIdentifiers(removals));
+ }
}
/**
* @param announcements
*/
private void announce(Set<RpcRouter.RouteIdentifier<?, ?, ?>> announcements) {
- LOG.debug("Announcing [{}]", announcements);
- AddRoutedRpc addRpcMsg = new AddRoutedRpc(announcements, actorPath);
- try {
- ActorUtil.executeLocalOperation(rpcRegistry, addRpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION);
- } catch (Exception e) {
- // Just logging it because Akka API throws this exception
- LOG.error(e.toString());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Announcing [{}]", announcements);
}
+ RpcRegistry.Messages.AddOrUpdateRoutes addRpcMsg = new RpcRegistry.Messages.AddOrUpdateRoutes(new ArrayList<>(announcements));
+ rpcRegistry.tell(addRpcMsg, ActorRef.noSender());
}
/**
* @param removals
*/
private void remove(Set<RpcRouter.RouteIdentifier<?, ?, ?>> removals){
- LOG.debug("Removing [{}]", removals);
- RemoveRoutedRpc removeRpcMsg = new RemoveRoutedRpc(removals, actorPath);
- try {
- ActorUtil.executeLocalOperation(rpcRegistry, removeRpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION);
- } catch (Exception e) {
- // Just logging it because Akka API throws this exception
- LOG.error(e.toString());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Removing [{}]", removals);
}
+ RpcRegistry.Messages.RemoveRoutes removeRpcMsg = new RpcRegistry.Messages.RemoveRoutes(new ArrayList<>(removals));
+ rpcRegistry.tell(removeRpcMsg, ActorRef.noSender());
}
/**
* @param changes
* @return
*/
- private Set<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers(Map<RpcRoutingContext, Set<InstanceIdentifier>> changes) {
+ private Set<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers(Map<RpcRoutingContext, Set<YangInstanceIdentifier>> changes) {
RouteIdentifierImpl routeId = null;
Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdSet = new HashSet<>();
for (RpcRoutingContext context : changes.keySet()){
- for (InstanceIdentifier instanceId : changes.get(context)){
+ for (YangInstanceIdentifier instanceId : changes.get(context)){
routeId = new RouteIdentifierImpl(null, context.getRpc(), instanceId);
routeIdSet.add(routeId);
}