Bug 560: Fixed Mount Point RPC registration service
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / sal / dom / broker / impl / SchemaAwareRpcBroker.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.dom.broker.impl;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12
13 import java.util.Set;
14 import java.util.concurrent.ConcurrentHashMap;
15 import java.util.concurrent.ConcurrentMap;
16
17 import com.google.common.base.Preconditions;
18 import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
19 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
20 import org.opendaylight.controller.md.sal.common.impl.routing.RoutingUtils;
21 import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
22 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
23 import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation;
24 import org.opendaylight.controller.sal.core.api.RpcImplementation;
25 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
26 import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
27 import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter;
28 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
29 import org.opendaylight.yangtools.concepts.Identifiable;
30 import org.opendaylight.yangtools.concepts.ListenerRegistration;
31 import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
32 import org.opendaylight.yangtools.yang.common.QName;
33 import org.opendaylight.yangtools.yang.common.RpcResult;
34 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
35 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
36 import org.opendaylight.yangtools.yang.data.api.SimpleNode;
37 import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode;
38 import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
39 import org.opendaylight.yangtools.yang.model.api.Module;
40 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
41 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
42 import org.opendaylight.yangtools.yang.model.api.UnknownSchemaNode;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 import com.google.common.base.Optional;
47 import com.google.common.collect.FluentIterable;
48 import com.google.common.collect.ImmutableMap;
49 import com.google.common.collect.ImmutableSet;
50 import com.google.common.util.concurrent.ListenableFuture;
51
52 public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, RoutedRpcDefaultImplementation {
53
54     private static final Logger LOG = LoggerFactory.getLogger(SchemaAwareRpcBroker.class);
55
56     private static final QName CONTEXT_REFERENCE = QName.create("urn:opendaylight:yang:extension:yang-ext",
57             "2013-07-09", "context-reference");
58     private final ListenerRegistry<RpcRegistrationListener> rpcRegistrationListeners = new ListenerRegistry<>();
59     private final ListenerRegistry<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeChangeListeners = new ListenerRegistry<>();
60
61
62     private final String identifier;
63     private final ConcurrentMap<QName, RpcImplementation> implementations = new ConcurrentHashMap<>();
64     private RpcImplementation defaultImplementation;
65     private SchemaContextProvider schemaProvider;
66     private RoutedRpcDefaultImplementation defaultDelegate;
67
68     public SchemaAwareRpcBroker(String identifier, SchemaContextProvider schemaProvider) {
69         super();
70         this.identifier = identifier;
71         this.schemaProvider = schemaProvider;
72     }
73
74     public RpcImplementation getDefaultImplementation() {
75         return defaultImplementation;
76     }
77
78     public void setDefaultImplementation(RpcImplementation defaultImplementation) {
79         this.defaultImplementation = defaultImplementation;
80     }
81
82     public SchemaContextProvider getSchemaProvider() {
83         return schemaProvider;
84     }
85
86     public void setSchemaProvider(SchemaContextProvider schemaProvider) {
87         this.schemaProvider = schemaProvider;
88     }
89
90     public RoutedRpcDefaultImplementation getRoutedRpcDefaultDelegate() {
91         return defaultDelegate;
92     }
93
94     @Override
95     public void setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultDelegate) {
96         this.defaultDelegate = defaultDelegate;
97     }
98
99     @Override
100     public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
101         checkArgument(rpcType != null, "RPC Type should not be null");
102         checkArgument(implementation != null, "RPC Implementatoin should not be null");
103         return getOrCreateRoutedRpcRouter(rpcType).addRoutedRpcImplementation(rpcType, implementation);
104     }
105
106     private RoutedRpcSelector getOrCreateRoutedRpcRouter(QName rpcType) {
107         RoutedRpcSelector potential = getRoutedRpcRouter(rpcType);
108         if (potential != null) {
109             return potential;
110         }
111         synchronized (implementations) {
112             potential = getRoutedRpcRouter(rpcType);
113             if (potential != null) {
114                 return potential;
115             }
116             RpcDefinition definition = findRpcDefinition(rpcType);
117             RoutingStrategy strategy = getRoutingStrategy(definition);
118             checkState(strategy instanceof RoutedRpcStrategy, "Rpc %s is not routed.", rpcType);
119             potential = new RoutedRpcSelector((RoutedRpcStrategy) strategy, this);
120             implementations.put(rpcType, potential);
121             return potential;
122         }
123     }
124
125     private RoutedRpcSelector getRoutedRpcRouter(QName rpcType) {
126         RpcImplementation potential = implementations.get(rpcType);
127         if (potential != null) {
128             checkState(potential instanceof RoutedRpcSelector, "Rpc %s is not routed.", rpcType);
129             return (RoutedRpcSelector) potential;
130         }
131         return null;
132
133     }
134
135     @Override
136     public RpcRegistration addRpcImplementation(QName rpcType, RpcImplementation implementation)
137             throws IllegalArgumentException {
138         checkArgument(rpcType != null, "RPC Type should not be null");
139         checkArgument(implementation != null, "RPC Implementatoin should not be null");
140         checkState(!hasRpcImplementation(rpcType), "Implementation already registered");
141         RpcDefinition definition = findRpcDefinition(rpcType);
142         checkArgument(!isRoutedRpc(definition), "RPC Type must not be routed.");
143         GlobalRpcRegistration reg = new GlobalRpcRegistration(rpcType, implementation, this);
144         final RpcImplementation previous = implementations.putIfAbsent(rpcType, implementation);
145         Preconditions.checkState(previous == null, "Rpc %s is already registered.",rpcType);
146         notifyRpcAdded(rpcType);
147         return reg;
148     }
149
150     private void notifyRpcAdded(QName rpcType) {
151         for (ListenerRegistration<RpcRegistrationListener> listener : rpcRegistrationListeners) {
152             try {
153                 listener.getInstance().onRpcImplementationAdded(rpcType);
154             } catch (Exception ex) {
155                 LOG.error("Unhandled exception during invoking listener {}", listener.getInstance(), ex);
156             }
157
158         }
159     }
160
161     private boolean isRoutedRpc(RpcDefinition definition) {
162         return getRoutingStrategy(definition) instanceof RoutedRpcStrategy;
163     }
164
165     @Override
166     public ListenerRegistration<RpcRegistrationListener> addRpcRegistrationListener(RpcRegistrationListener listener) {
167         ListenerRegistration<RpcRegistrationListener> reg = rpcRegistrationListeners.register(listener);
168         for (QName impl : implementations.keySet()) {
169             listener.onRpcImplementationAdded(impl);
170         }
171         return reg;
172     }
173
174     @Override
175     public String getIdentifier() {
176         return identifier;
177     }
178
179     @Override
180     public Set<QName> getSupportedRpcs() {
181         return ImmutableSet.copyOf(implementations.keySet());
182     }
183
184     @Override
185     public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
186         return findRpcImplemention(rpc).invokeRpc(rpc, input);
187     }
188
189     private RpcImplementation findRpcImplemention(QName rpc) {
190         checkArgument(rpc != null, "Rpc name should not be null");
191         RpcImplementation potentialImpl = implementations.get(rpc);
192         if (potentialImpl != null) {
193             return potentialImpl;
194         }
195
196         potentialImpl = defaultImplementation;
197         if( potentialImpl == null ) {
198             throw new UnsupportedOperationException( "No implementation for this operation is available." );
199         }
200
201         return potentialImpl;
202     }
203
204     private boolean hasRpcImplementation(QName rpc) {
205         return implementations.containsKey(rpc);
206     }
207
208     private RpcDefinition findRpcDefinition(QName rpcType) {
209         checkArgument(rpcType != null, "Rpc name must be supplied.");
210         checkState(schemaProvider != null, "Schema Provider is not available.");
211         SchemaContext ctx = schemaProvider.getSchemaContext();
212         checkState(ctx != null, "YANG Schema Context is not available.");
213         Module module = ctx.findModuleByNamespaceAndRevision(rpcType.getNamespace(), rpcType.getRevision());
214         checkState(module != null, "YANG Module is not available.");
215         return findRpcDefinition(rpcType, module.getRpcs());
216     }
217
218     static private RpcDefinition findRpcDefinition(QName rpcType, Set<RpcDefinition> rpcs) {
219         checkState(rpcs != null, "Rpc schema is not available.");
220         for (RpcDefinition rpc : rpcs) {
221             if (rpcType.equals(rpc.getQName())) {
222                 return rpc;
223             }
224         }
225         throw new IllegalArgumentException("Supplied Rpc Type is not defined.");
226     }
227
228     private RoutingStrategy getRoutingStrategy(RpcDefinition rpc) {
229         ContainerSchemaNode input = rpc.getInput();
230         if (input != null) {
231             for (DataSchemaNode schemaNode : input.getChildNodes()) {
232                 Optional<QName> context = getRoutingContext(schemaNode);
233                 if (context.isPresent()) {
234                     return createRoutedStrategy(rpc, context.get(), schemaNode.getQName());
235                 }
236             }
237         }
238         return createGlobalStrategy(rpc);
239     }
240
241     private static RoutingStrategy createRoutedStrategy(RpcDefinition rpc, QName context, QName leafNode) {
242         return new RoutedRpcStrategy(rpc.getQName(), context, leafNode);
243     }
244
245     private Optional<QName> getRoutingContext(DataSchemaNode schemaNode) {
246         for (UnknownSchemaNode extension : schemaNode.getUnknownSchemaNodes()) {
247             if (CONTEXT_REFERENCE.equals(extension.getNodeType())) {
248                 return Optional.fromNullable(extension.getQName());
249             }
250         }
251         return Optional.absent();
252     }
253
254     private static RoutingStrategy createGlobalStrategy(RpcDefinition rpc) {
255         GlobalRpcStrategy ret = new GlobalRpcStrategy(rpc.getQName());
256         return ret;
257     }
258
259     @Override
260     public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
261       checkState(defaultDelegate != null);
262       return defaultDelegate.invokeRpc(rpc, identifier, input);
263     }
264
265     private static abstract class RoutingStrategy implements Identifiable<QName> {
266
267         private final QName identifier;
268
269         public RoutingStrategy(QName identifier) {
270             super();
271             this.identifier = identifier;
272         }
273
274         @Override
275         public QName getIdentifier() {
276             return identifier;
277         }
278     }
279
280     private static class GlobalRpcStrategy extends RoutingStrategy {
281
282         public GlobalRpcStrategy(QName identifier) {
283             super(identifier);
284         }
285     }
286
287     private static class RoutedRpcStrategy extends RoutingStrategy {
288
289         private final QName context;
290         private final QName leaf;
291
292         public RoutedRpcStrategy(QName identifier, QName ctx, QName leaf) {
293             super(identifier);
294             this.context = ctx;
295             this.leaf = leaf;
296         }
297
298         public QName getContext() {
299             return context;
300         }
301
302         public QName getLeaf() {
303             return leaf;
304         }
305     }
306
307     private static class RoutedRpcSelector implements RpcImplementation, AutoCloseable, Identifiable<RpcRoutingContext> {
308
309         private final RoutedRpcStrategy strategy;
310         private final Set<QName> supportedRpcs;
311         private final RpcRoutingContext identifier;
312         private RpcImplementation defaultDelegate;
313         private final ConcurrentMap<InstanceIdentifier, RoutedRpcRegImpl> implementations = new ConcurrentHashMap<>();
314         private final SchemaAwareRpcBroker router;
315
316         public RoutedRpcSelector(RoutedRpcStrategy strategy, SchemaAwareRpcBroker router) {
317             super();
318             this.strategy = strategy;
319             supportedRpcs = ImmutableSet.of(strategy.getIdentifier());
320             identifier = RpcRoutingContext.create(strategy.context, strategy.getIdentifier());
321             this.router = router;
322         }
323
324         @Override
325         public RpcRoutingContext getIdentifier() {
326             return identifier;
327         }
328
329         @Override
330         public void close() throws Exception {
331
332         }
333
334         @Override
335         public Set<QName> getSupportedRpcs() {
336             return supportedRpcs;
337         }
338
339         public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
340             return new RoutedRpcRegImpl(rpcType, implementation, this);
341         }
342
343         @Override
344         public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
345             CompositeNode inputContainer = input.getFirstCompositeByName(QName.create(rpc,"input"));
346             checkArgument(inputContainer != null, "Rpc payload must contain input element");
347             SimpleNode<?> routeContainer = inputContainer.getFirstSimpleByName(strategy.getLeaf());
348             checkArgument(routeContainer != null, "Leaf %s must be set with value", strategy.getLeaf());
349             Object route = routeContainer.getValue();
350             checkArgument(route instanceof InstanceIdentifier,
351                           "The routed node %s is not an instance identifier", route);
352             RpcImplementation potential = null;
353             if (route != null) {
354                 RoutedRpcRegImpl potentialReg = implementations.get(route);
355                 if (potentialReg != null) {
356                     potential = potentialReg.getInstance();
357                 }
358             }
359             if (potential == null) {
360                 return router.invokeRpc(rpc, (InstanceIdentifier) route, input);
361             }
362             checkState(potential != null, "No implementation is available for rpc:%s path:%s", rpc, route);
363             return potential.invokeRpc(rpc, input);
364         }
365
366         public void addPath(QName context, InstanceIdentifier path, RoutedRpcRegImpl routedRpcRegImpl) {
367             //checkArgument(strategy.getContext().equals(context),"Supplied context is not supported.");
368             RoutedRpcRegImpl previous = implementations.put(path, routedRpcRegImpl);
369             if (previous == null) {
370                 router.notifyPathAnnouncement(context,strategy.getIdentifier(), path);
371             }
372
373         }
374
375         public void removePath(QName context, InstanceIdentifier path, RoutedRpcRegImpl routedRpcRegImpl) {
376             boolean removed = implementations.remove(path, routedRpcRegImpl);
377             if (removed) {
378                 router.notifyPathWithdrawal(context, strategy.getIdentifier(), path);
379             }
380         }
381     }
382
383     private static class GlobalRpcRegistration extends AbstractObjectRegistration<RpcImplementation> implements
384             RpcRegistration {
385         private final QName type;
386         private SchemaAwareRpcBroker router;
387
388         public GlobalRpcRegistration(QName type, RpcImplementation instance, SchemaAwareRpcBroker router) {
389             super(instance);
390             this.type = type;
391             this.router = router;
392         }
393
394         @Override
395         public QName getType() {
396             return type;
397         }
398
399         @Override
400         protected void removeRegistration() {
401             if (router != null) {
402                 router.remove(this);
403                 router = null;
404             }
405         }
406     }
407
408     private static class RoutedRpcRegImpl extends AbstractObjectRegistration<RpcImplementation> implements
409             RoutedRpcRegistration {
410
411         private final QName type;
412         private final RoutedRpcSelector router;
413
414         public RoutedRpcRegImpl(QName rpcType, RpcImplementation implementation, RoutedRpcSelector routedRpcSelector) {
415             super(implementation);
416             this.type = rpcType;
417             router = routedRpcSelector;
418         }
419
420         @Override
421         public void registerPath(QName context, InstanceIdentifier path) {
422             router.addPath(context, path, this);
423         }
424
425         @Override
426         public void unregisterPath(QName context, InstanceIdentifier path) {
427             router.removePath(context, path, this);
428         }
429
430         @Override
431         protected void removeRegistration() {
432
433         }
434
435         @Override
436         public QName getType() {
437             return type;
438         }
439
440     }
441
442     private void remove(GlobalRpcRegistration registration) {
443         implementations.remove(registration.getType(), registration);
444     }
445
446     private void notifyPathAnnouncement(QName context, QName identifier, InstanceIdentifier path) {
447         RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier);
448         RouteChange<RpcRoutingContext, InstanceIdentifier> change = RoutingUtils.announcementChange(contextWrapped , path);
449         for(ListenerRegistration<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeListener : routeChangeListeners) {
450             try {
451                 routeListener.getInstance().onRouteChange(change);
452             } catch (Exception e) {
453                 LOG.error("Unhandled exception during invoking onRouteChange for {}",routeListener.getInstance(),e);
454
455             }
456         }
457
458     }
459
460
461
462     private void notifyPathWithdrawal(QName context,QName identifier, InstanceIdentifier path) {
463         RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier);
464         RouteChange<RpcRoutingContext, InstanceIdentifier> change = RoutingUtils.removalChange(contextWrapped , path);
465         for(ListenerRegistration<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeListener : routeChangeListeners) {
466             try {
467                 routeListener.getInstance().onRouteChange(change);
468             } catch (Exception e) {
469                 LOG.error("Unhandled exception during invoking onRouteChange for {}",routeListener.getInstance(),e);
470             }
471         }
472     }
473
474     @Override
475     public <L extends RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> ListenerRegistration<L> registerRouteChangeListener(
476             L listener) {
477         ListenerRegistration<L> reg = routeChangeListeners.registerWithType(listener);
478         RouteChange<RpcRoutingContext, InstanceIdentifier> initial = createInitialRouteChange();
479         try {
480         listener.onRouteChange(initial);
481         } catch (Exception e) {
482             LOG.error("Unhandled exception during sending initial route change event {} to {}",initial,listener, e);
483         }
484         return reg;
485     }
486
487     private RouteChange<RpcRoutingContext, InstanceIdentifier> createInitialRouteChange() {
488         FluentIterable<RoutedRpcSelector> rpcSelectors = FluentIterable.from(implementations.values()).filter(RoutedRpcSelector.class);
489
490
491         ImmutableMap.Builder<RpcRoutingContext, Set<InstanceIdentifier>> announcements = ImmutableMap.builder();
492         ImmutableMap.Builder<RpcRoutingContext, Set<InstanceIdentifier>> removals = ImmutableMap.builder();
493         for (RoutedRpcSelector routedRpcSelector : rpcSelectors) {
494             final RpcRoutingContext context = routedRpcSelector.getIdentifier();
495             final Set<InstanceIdentifier> paths = ImmutableSet.copyOf(routedRpcSelector.implementations.keySet());
496             announcements.put(context, paths);
497         }
498         return RoutingUtils.change(announcements.build(), removals.build());
499     }
500 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.