Enabling Remote RPC Router module in ODL distribution.
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / sal / dom / broker / impl / SchemaAwareRpcBroker.java
index 22319abb17df7d1dec301105a59c50fad2cb1164..598361c3ae3cbf7e41eed5a69ea9cf0d74727702 100644 (file)
@@ -43,6 +43,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Optional;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 
 public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, RoutedRpcDefaultImplementation {
@@ -53,7 +55,7 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, Ro
             "2013-07-09", "context-reference");
     private final ListenerRegistry<RpcRegistrationListener> rpcRegistrationListeners = new ListenerRegistry<>();
     private final ListenerRegistry<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeChangeListeners = new ListenerRegistry<>();
-    
+
 
     private final String identifier;
     private final ConcurrentMap<QName, RpcImplementation> implementations = new ConcurrentHashMap<>();
@@ -280,24 +282,26 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, Ro
         }
     }
 
-    private static class RoutedRpcSelector implements RpcImplementation, AutoCloseable, Identifiable<QName> {
+    private static class RoutedRpcSelector implements RpcImplementation, AutoCloseable, Identifiable<RpcRoutingContext> {
 
         private final RoutedRpcStrategy strategy;
         private final Set<QName> supportedRpcs;
+        private final RpcRoutingContext identifier;
         private RpcImplementation defaultDelegate;
         private final ConcurrentMap<InstanceIdentifier, RoutedRpcRegImpl> implementations = new ConcurrentHashMap<>();
-        private SchemaAwareRpcBroker router;
+        private final SchemaAwareRpcBroker router;
 
         public RoutedRpcSelector(RoutedRpcStrategy strategy, SchemaAwareRpcBroker router) {
             super();
             this.strategy = strategy;
             supportedRpcs = ImmutableSet.of(strategy.getIdentifier());
+            identifier = RpcRoutingContext.create(strategy.context, strategy.getIdentifier());
             this.router = router;
         }
 
         @Override
-        public QName getIdentifier() {
-            return strategy.getIdentifier();
+        public RpcRoutingContext getIdentifier() {
+            return identifier;
         }
 
         @Override
@@ -382,7 +386,7 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, Ro
             RoutedRpcRegistration {
 
         private final QName type;
-        private RoutedRpcSelector router;
+        private final RoutedRpcSelector router;
 
         public RoutedRpcRegImpl(QName rpcType, RpcImplementation implementation, RoutedRpcSelector routedRpcSelector) {
             super(implementation);
@@ -424,13 +428,13 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, Ro
                 routeListener.getInstance().onRouteChange(change);
             } catch (Exception e) {
                 LOG.error("Unhandled exception during invoking onRouteChange for {}",routeListener.getInstance(),e);
-                
+
             }
         }
-        
+
     }
 
-    
+
 
     private void notifyPathWithdrawal(QName context,QName identifier, InstanceIdentifier path) {
         RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier);
@@ -443,10 +447,31 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, Ro
             }
         }
     }
-    
+
     @Override
     public <L extends RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> ListenerRegistration<L> registerRouteChangeListener(
             L listener) {
-        return routeChangeListeners.registerWithType(listener);
+        ListenerRegistration<L> reg = routeChangeListeners.registerWithType(listener);
+        RouteChange<RpcRoutingContext, InstanceIdentifier> initial = createInitialRouteChange();
+        try {
+        listener.onRouteChange(initial);
+        } catch (Exception e) {
+            LOG.error("Unhandled exception during sending initial route change event {} to {}",initial,listener);
+        }
+        return reg;
+    }
+
+    private RouteChange<RpcRoutingContext, InstanceIdentifier> createInitialRouteChange() {
+        FluentIterable<RoutedRpcSelector> rpcSelectors = FluentIterable.from(implementations.values()).filter(RoutedRpcSelector.class);
+
+
+        ImmutableMap.Builder<RpcRoutingContext, Set<InstanceIdentifier>> announcements = ImmutableMap.builder();
+        ImmutableMap.Builder<RpcRoutingContext, Set<InstanceIdentifier>> removals = ImmutableMap.builder();
+        for (RoutedRpcSelector routedRpcSelector : rpcSelectors) {
+            final RpcRoutingContext context = routedRpcSelector.getIdentifier();
+            final Set<InstanceIdentifier> paths = ImmutableSet.copyOf(routedRpcSelector.implementations.keySet());
+            announcements.put(context, paths);
+        }
+        return RoutingUtils.change(announcements.build(), removals.build());
     }
 }