import org.slf4j.LoggerFactory;
import com.google.common.base.Optional;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, RoutedRpcDefaultImplementation {
"2013-07-09", "context-reference");
private final ListenerRegistry<RpcRegistrationListener> rpcRegistrationListeners = new ListenerRegistry<>();
private final ListenerRegistry<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeChangeListeners = new ListenerRegistry<>();
-
+
private final String identifier;
private final ConcurrentMap<QName, RpcImplementation> implementations = new ConcurrentHashMap<>();
}
}
- private static class RoutedRpcSelector implements RpcImplementation, AutoCloseable, Identifiable<QName> {
+ private static class RoutedRpcSelector implements RpcImplementation, AutoCloseable, Identifiable<RpcRoutingContext> {
private final RoutedRpcStrategy strategy;
private final Set<QName> supportedRpcs;
+ private final RpcRoutingContext identifier;
private RpcImplementation defaultDelegate;
private final ConcurrentMap<InstanceIdentifier, RoutedRpcRegImpl> implementations = new ConcurrentHashMap<>();
- private SchemaAwareRpcBroker router;
+ private final SchemaAwareRpcBroker router;
public RoutedRpcSelector(RoutedRpcStrategy strategy, SchemaAwareRpcBroker router) {
super();
this.strategy = strategy;
supportedRpcs = ImmutableSet.of(strategy.getIdentifier());
+ identifier = RpcRoutingContext.create(strategy.context, strategy.getIdentifier());
this.router = router;
}
@Override
- public QName getIdentifier() {
- return strategy.getIdentifier();
+ public RpcRoutingContext getIdentifier() {
+ return identifier;
}
@Override
RoutedRpcRegistration {
private final QName type;
- private RoutedRpcSelector router;
+ private final RoutedRpcSelector router;
public RoutedRpcRegImpl(QName rpcType, RpcImplementation implementation, RoutedRpcSelector routedRpcSelector) {
super(implementation);
routeListener.getInstance().onRouteChange(change);
} catch (Exception e) {
LOG.error("Unhandled exception during invoking onRouteChange for {}",routeListener.getInstance(),e);
-
+
}
}
-
+
}
-
+
private void notifyPathWithdrawal(QName context,QName identifier, InstanceIdentifier path) {
RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier);
}
}
}
-
+
@Override
public <L extends RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> ListenerRegistration<L> registerRouteChangeListener(
L listener) {
- return routeChangeListeners.registerWithType(listener);
+ ListenerRegistration<L> reg = routeChangeListeners.registerWithType(listener);
+ RouteChange<RpcRoutingContext, InstanceIdentifier> initial = createInitialRouteChange();
+ try {
+ listener.onRouteChange(initial);
+ } catch (Exception e) {
+ LOG.error("Unhandled exception during sending initial route change event {} to {}",initial,listener);
+ }
+ return reg;
+ }
+
+ private RouteChange<RpcRoutingContext, InstanceIdentifier> createInitialRouteChange() {
+ FluentIterable<RoutedRpcSelector> rpcSelectors = FluentIterable.from(implementations.values()).filter(RoutedRpcSelector.class);
+
+
+ ImmutableMap.Builder<RpcRoutingContext, Set<InstanceIdentifier>> announcements = ImmutableMap.builder();
+ ImmutableMap.Builder<RpcRoutingContext, Set<InstanceIdentifier>> removals = ImmutableMap.builder();
+ for (RoutedRpcSelector routedRpcSelector : rpcSelectors) {
+ final RpcRoutingContext context = routedRpcSelector.getIdentifier();
+ final Set<InstanceIdentifier> paths = ImmutableSet.copyOf(routedRpcSelector.implementations.keySet());
+ announcements.put(context, paths);
+ }
+ return RoutingUtils.change(announcements.build(), removals.build());
}
}