Remove AbstractDOMRpcRoutingTableEntry.invokeRpc
[mdsal.git] / dom / mdsal-dom-broker / src / main / java / org / opendaylight / mdsal / dom / broker / DOMRpcRouter.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.dom.broker;
9
10 import static com.google.common.base.Verify.verifyNotNull;
11 import static java.util.Objects.requireNonNull;
12
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.collect.Collections2;
15 import com.google.common.collect.ImmutableList;
16 import com.google.common.collect.ImmutableList.Builder;
17 import com.google.common.collect.ImmutableSet;
18 import com.google.common.collect.MapDifference;
19 import com.google.common.collect.MapDifference.ValueDifference;
20 import com.google.common.collect.Maps;
21 import com.google.common.collect.Sets;
22 import com.google.common.util.concurrent.FluentFuture;
23 import com.google.common.util.concurrent.ThreadFactoryBuilder;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.Collections;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Map.Entry;
30 import java.util.Optional;
31 import java.util.Set;
32 import java.util.concurrent.ExecutorService;
33 import java.util.concurrent.Executors;
34 import java.util.concurrent.ThreadFactory;
35 import javax.annotation.concurrent.GuardedBy;
36 import org.opendaylight.mdsal.dom.api.DOMRpcAvailabilityListener;
37 import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
38 import org.opendaylight.mdsal.dom.api.DOMRpcImplementation;
39 import org.opendaylight.mdsal.dom.api.DOMRpcImplementationNotAvailableException;
40 import org.opendaylight.mdsal.dom.api.DOMRpcImplementationRegistration;
41 import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
42 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
43 import org.opendaylight.mdsal.dom.api.DOMRpcService;
44 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
45 import org.opendaylight.mdsal.dom.spi.AbstractDOMRpcImplementationRegistration;
46 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
47 import org.opendaylight.yangtools.concepts.AbstractRegistration;
48 import org.opendaylight.yangtools.concepts.ListenerRegistration;
49 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
50 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
51 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
52 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNodes;
53 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
54 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
55 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58
59 public final class DOMRpcRouter extends AbstractRegistration implements SchemaContextListener {
60     private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat(
61             "DOMRpcRouter-listener-%s").setDaemon(true).build();
62
63     private final ExecutorService listenerNotifier = Executors.newSingleThreadExecutor(THREAD_FACTORY);
64     private final DOMRpcProviderService rpcProviderService = new RpcProviderServiceFacade();
65     private final DOMRpcService rpcService = new RpcServiceFacade();
66
67     @GuardedBy("this")
68     private Collection<Registration<?>> listeners = Collections.emptyList();
69
70     private volatile DOMRpcRoutingTable routingTable = DOMRpcRoutingTable.EMPTY;
71
72     private ListenerRegistration<?> listenerRegistration;
73
74     public static DOMRpcRouter newInstance(final DOMSchemaService schemaService) {
75         final DOMRpcRouter rpcRouter = new DOMRpcRouter();
76         rpcRouter.listenerRegistration = schemaService.registerSchemaContextListener(rpcRouter);
77         return rpcRouter;
78     }
79
80     public DOMRpcService getRpcService() {
81         return rpcService;
82     }
83
84     public DOMRpcProviderService getRpcProviderService() {
85         return rpcProviderService;
86     }
87
88     private synchronized void removeRpcImplementation(final DOMRpcImplementation implementation,
89             final Set<DOMRpcIdentifier> rpcs) {
90         final DOMRpcRoutingTable oldTable = routingTable;
91         final DOMRpcRoutingTable newTable = oldTable.remove(implementation, rpcs);
92         routingTable = newTable;
93
94         listenerNotifier.execute(() -> notifyRemoved(newTable, implementation));
95     }
96
97     private synchronized void removeListener(final ListenerRegistration<? extends DOMRpcAvailabilityListener> reg) {
98         listeners = ImmutableList.copyOf(Collections2.filter(listeners, input -> !reg.equals(input)));
99     }
100
101     private synchronized void notifyAdded(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) {
102         for (Registration<?> l : listeners) {
103             l.addRpc(newTable, impl);
104         }
105     }
106
107     private synchronized void notifyRemoved(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) {
108         for (Registration<?> l : listeners) {
109             l.removeRpc(newTable, impl);
110         }
111     }
112
113     @Override
114     public synchronized void onGlobalContextUpdated(final SchemaContext context) {
115         final DOMRpcRoutingTable oldTable = routingTable;
116         final DOMRpcRoutingTable newTable = oldTable.setSchemaContext(context);
117         routingTable = newTable;
118     }
119
120     @Override
121     protected void removeRegistration() {
122         if (listenerRegistration != null) {
123             listenerRegistration.close();
124             listenerRegistration = null;
125         }
126         listenerNotifier.shutdown();
127     }
128
129     @VisibleForTesting
130     Collection<?> listeners() {
131         return listeners;
132     }
133
134     @VisibleForTesting
135     DOMRpcRoutingTable routingTable() {
136         return routingTable;
137     }
138
139     private static final class Registration<T extends DOMRpcAvailabilityListener>
140         extends AbstractListenerRegistration<T> {
141
142         private Map<SchemaPath, Set<YangInstanceIdentifier>> prevRpcs;
143         private DOMRpcRouter router;
144
145         Registration(final DOMRpcRouter router, final T listener,
146                 final Map<SchemaPath, Set<YangInstanceIdentifier>> rpcs) {
147             super(listener);
148             this.router = requireNonNull(router);
149             this.prevRpcs = requireNonNull(rpcs);
150         }
151
152         @Override
153         protected void removeRegistration() {
154             router.removeListener(this);
155             router = null;
156         }
157
158         void initialTable() {
159             final Collection<DOMRpcIdentifier> added = new ArrayList<>();
160             for (Entry<SchemaPath, Set<YangInstanceIdentifier>> e : prevRpcs.entrySet()) {
161                 added.addAll(Collections2.transform(e.getValue(), i -> DOMRpcIdentifier.create(e.getKey(), i)));
162             }
163             if (!added.isEmpty()) {
164                 getInstance().onRpcAvailable(added);
165             }
166         }
167
168         void addRpc(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) {
169             final T l = getInstance();
170             if (!l.acceptsImplementation(impl)) {
171                 return;
172             }
173
174             final Map<SchemaPath, Set<YangInstanceIdentifier>> rpcs = verifyNotNull(newTable.getRpcs(l));
175             final MapDifference<SchemaPath, Set<YangInstanceIdentifier>> diff = Maps.difference(prevRpcs, rpcs);
176
177             final Collection<DOMRpcIdentifier> added = new ArrayList<>();
178             for (Entry<SchemaPath, Set<YangInstanceIdentifier>> e : diff.entriesOnlyOnRight().entrySet()) {
179                 added.addAll(Collections2.transform(e.getValue(), i -> DOMRpcIdentifier.create(e.getKey(), i)));
180             }
181             for (Entry<SchemaPath, ValueDifference<Set<YangInstanceIdentifier>>> e :
182                     diff.entriesDiffering().entrySet()) {
183                 for (YangInstanceIdentifier i : Sets.difference(e.getValue().rightValue(), e.getValue().leftValue())) {
184                     added.add(DOMRpcIdentifier.create(e.getKey(), i));
185                 }
186             }
187
188             prevRpcs = rpcs;
189             if (!added.isEmpty()) {
190                 l.onRpcAvailable(added);
191             }
192         }
193
194         void removeRpc(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) {
195             final T l = getInstance();
196             if (!l.acceptsImplementation(impl)) {
197                 return;
198             }
199
200             final Map<SchemaPath, Set<YangInstanceIdentifier>> rpcs = verifyNotNull(newTable.getRpcs(l));
201             final MapDifference<SchemaPath, Set<YangInstanceIdentifier>> diff = Maps.difference(prevRpcs, rpcs);
202
203             final Collection<DOMRpcIdentifier> removed = new ArrayList<>();
204             for (Entry<SchemaPath, Set<YangInstanceIdentifier>> e : diff.entriesOnlyOnLeft().entrySet()) {
205                 removed.addAll(Collections2.transform(e.getValue(), i -> DOMRpcIdentifier.create(e.getKey(), i)));
206             }
207             for (Entry<SchemaPath, ValueDifference<Set<YangInstanceIdentifier>>> e :
208                     diff.entriesDiffering().entrySet()) {
209                 for (YangInstanceIdentifier i : Sets.difference(e.getValue().leftValue(), e.getValue().rightValue())) {
210                     removed.add(DOMRpcIdentifier.create(e.getKey(), i));
211                 }
212             }
213
214             prevRpcs = rpcs;
215             if (!removed.isEmpty()) {
216                 l.onRpcUnavailable(removed);
217             }
218         }
219     }
220
221     private final class RpcServiceFacade implements DOMRpcService {
222         @Override
223         public FluentFuture<DOMRpcResult> invokeRpc(final SchemaPath type, final NormalizedNode<?, ?> input) {
224             final AbstractDOMRpcRoutingTableEntry entry = routingTable.getEntry(type);
225             if (entry == null) {
226                 return FluentFutures.immediateFailedFluentFuture(
227                     new DOMRpcImplementationNotAvailableException("No implementation of RPC %s available", type));
228             }
229
230             return RpcInvocation.invoke(entry, input);
231         }
232
233         @Override
234         public <T extends DOMRpcAvailabilityListener> ListenerRegistration<T> registerRpcListener(final T listener) {
235             synchronized (DOMRpcRouter.this) {
236                 final Registration<T> ret = new Registration<>(DOMRpcRouter.this, listener,
237                         routingTable.getRpcs(listener));
238                 final Builder<Registration<?>> b = ImmutableList.builder();
239                 b.addAll(listeners);
240                 b.add(ret);
241                 listeners = b.build();
242
243                 listenerNotifier.execute(ret::initialTable);
244                 return ret;
245             }
246         }
247     }
248
249     private final class RpcProviderServiceFacade implements DOMRpcProviderService {
250         @Override
251         public <T extends DOMRpcImplementation> DOMRpcImplementationRegistration<T> registerRpcImplementation(
252                 final T implementation, final DOMRpcIdentifier... rpcs) {
253             return registerRpcImplementation(implementation, ImmutableSet.copyOf(rpcs));
254         }
255
256         @Override
257         public <T extends DOMRpcImplementation> DOMRpcImplementationRegistration<T> registerRpcImplementation(
258                 final T implementation, final Set<DOMRpcIdentifier> rpcs) {
259
260             synchronized (DOMRpcRouter.this) {
261                 final DOMRpcRoutingTable oldTable = routingTable;
262                 final DOMRpcRoutingTable newTable = oldTable.add(implementation, rpcs);
263                 routingTable = newTable;
264
265                 listenerNotifier.execute(() -> notifyAdded(newTable, implementation));
266             }
267
268             return new AbstractDOMRpcImplementationRegistration<T>(implementation) {
269                 @Override
270                 protected void removeRegistration() {
271                     removeRpcImplementation(getInstance(), rpcs);
272                 }
273             };
274         }
275     }
276
277     static final class RpcInvocation {
278         private static final Logger LOG = LoggerFactory.getLogger(RpcInvocation.class);
279
280         static FluentFuture<DOMRpcResult> invoke(final AbstractDOMRpcRoutingTableEntry entry,
281             final NormalizedNode<?, ?> input) {
282             if (entry instanceof UnknownDOMRpcRoutingTableEntry) {
283                 return FluentFutures.immediateFailedFluentFuture(
284                     new DOMRpcImplementationNotAvailableException("SchemaPath %s is not resolved to an RPC",
285                         entry.getSchemaPath()));
286             } else if (entry instanceof RoutedDOMRpcRoutingTableEntry) {
287                 return invokeRoutedRpc((RoutedDOMRpcRoutingTableEntry) entry, input);
288             } else if (entry instanceof GlobalDOMRpcRoutingTableEntry) {
289                 return invokeGlobalRpc((GlobalDOMRpcRoutingTableEntry) entry, input);
290             }
291
292             return FluentFutures.immediateFailedFluentFuture(
293                 new DOMRpcImplementationNotAvailableException("Unsupported RPC entry."));
294         }
295
296         private static FluentFuture<DOMRpcResult> invokeRoutedRpc(final RoutedDOMRpcRoutingTableEntry entry,
297             final NormalizedNode<?, ?> input) {
298             final Optional<NormalizedNode<?, ?>> maybeKey = NormalizedNodes.findNode(input,
299                 entry.getRpcId().getContextReference());
300
301             // Routing key is present, attempt to deliver as a routed RPC
302             if (maybeKey.isPresent()) {
303                 final NormalizedNode<?, ?> key = maybeKey.get();
304                 final Object value = key.getValue();
305                 if (value instanceof YangInstanceIdentifier) {
306                     final YangInstanceIdentifier iid = (YangInstanceIdentifier) value;
307
308                     // Find a DOMRpcImplementation for a specific iid
309                     final List<DOMRpcImplementation> specificImpls = entry.getImplementations(iid);
310                     if (specificImpls != null) {
311                         return specificImpls.get(0)
312                             .invokeRpc(DOMRpcIdentifier.create(entry.getSchemaPath(), iid), input);
313                     }
314
315                     LOG.debug("No implementation for context {} found will now look for wildcard id", iid);
316
317                     // Find a DOMRpcImplementation for a wild card. Usually remote-rpc-connector would register an
318                     // implementation this way
319                     final List<DOMRpcImplementation> mayBeRemoteImpls =
320                         entry.getImplementations(YangInstanceIdentifier.EMPTY);
321
322                     if (mayBeRemoteImpls != null) {
323                         return mayBeRemoteImpls.get(0)
324                             .invokeRpc(DOMRpcIdentifier.create(entry.getSchemaPath(), iid), input);
325                     }
326
327                 } else {
328                     LOG.warn("Ignoring wrong context value {}", value);
329                 }
330             }
331
332             final List<DOMRpcImplementation> impls = entry.getImplementations(null);
333             if (impls != null) {
334                 return impls.get(0).invokeRpc(entry.getRpcId(), input);
335             }
336
337             return FluentFutures.immediateFailedFluentFuture(
338                 new DOMRpcImplementationNotAvailableException("No implementation of RPC %s available",
339                     entry.getSchemaPath()));
340         }
341
342         private static FluentFuture<DOMRpcResult> invokeGlobalRpc(final GlobalDOMRpcRoutingTableEntry entry,
343                 final NormalizedNode<?, ?> input) {
344             return entry.getImplementations(YangInstanceIdentifier.EMPTY).get(0).invokeRpc(entry.getRpcId(), input);
345         }
346     }
347 }