Bump versions to 13.0.4-SNAPSHOT
[mdsal.git] / dom / mdsal-dom-broker / src / main / java / org / opendaylight / mdsal / dom / broker / DOMRpcRouter.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.dom.broker;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Verify.verifyNotNull;
12 import static java.util.Objects.requireNonNull;
13
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.collect.Collections2;
16 import com.google.common.collect.ImmutableList;
17 import com.google.common.collect.ImmutableSet;
18 import com.google.common.collect.ImmutableTable;
19 import com.google.common.collect.MapDifference;
20 import com.google.common.collect.MapDifference.ValueDifference;
21 import com.google.common.collect.Maps;
22 import com.google.common.collect.Sets;
23 import com.google.common.util.concurrent.Futures;
24 import com.google.common.util.concurrent.ListenableFuture;
25 import com.google.common.util.concurrent.ThreadFactoryBuilder;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.HashSet;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Map.Entry;
32 import java.util.Set;
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.ThreadFactory;
36 import javax.annotation.PreDestroy;
37 import javax.inject.Inject;
38 import javax.inject.Singleton;
39 import org.checkerframework.checker.lock.qual.GuardedBy;
40 import org.eclipse.jdt.annotation.NonNull;
41 import org.eclipse.jdt.annotation.NonNullByDefault;
42 import org.opendaylight.mdsal.dom.api.DOMActionAvailabilityExtension;
43 import org.opendaylight.mdsal.dom.api.DOMActionAvailabilityExtension.AvailabilityListener;
44 import org.opendaylight.mdsal.dom.api.DOMActionImplementation;
45 import org.opendaylight.mdsal.dom.api.DOMActionInstance;
46 import org.opendaylight.mdsal.dom.api.DOMActionNotAvailableException;
47 import org.opendaylight.mdsal.dom.api.DOMActionProviderService;
48 import org.opendaylight.mdsal.dom.api.DOMActionService;
49 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
50 import org.opendaylight.mdsal.dom.api.DOMRpcAvailabilityListener;
51 import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
52 import org.opendaylight.mdsal.dom.api.DOMRpcImplementation;
53 import org.opendaylight.mdsal.dom.api.DOMRpcImplementationNotAvailableException;
54 import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
55 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
56 import org.opendaylight.mdsal.dom.api.DOMRpcService;
57 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
58 import org.opendaylight.yangtools.concepts.AbstractRegistration;
59 import org.opendaylight.yangtools.concepts.Registration;
60 import org.opendaylight.yangtools.yang.common.QName;
61 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
62 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
63 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
64 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
65 import org.osgi.service.component.annotations.Activate;
66 import org.osgi.service.component.annotations.Component;
67 import org.osgi.service.component.annotations.Deactivate;
68 import org.osgi.service.component.annotations.Reference;
69 import org.slf4j.Logger;
70 import org.slf4j.LoggerFactory;
71
72 @Singleton
73 @Component(service = DOMRpcRouter.class)
74 public final class DOMRpcRouter extends AbstractRegistration {
75     private static final Logger LOG = LoggerFactory.getLogger(DOMRpcRouter.class);
76     private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat(
77             "DOMRpcRouter-listener-%s").setDaemon(true).build();
78
79     private final ExecutorService listenerNotifier = Executors.newSingleThreadExecutor(THREAD_FACTORY);
80     private final @NonNull DOMActionProviderService actionProviderService = new ActionProviderServiceFacade();
81     private final @NonNull DOMActionService actionService = new ActionServiceFacade();
82     private final @NonNull DOMRpcProviderService rpcProviderService = new RpcProviderServiceFacade();
83     private final @NonNull DOMRpcService rpcService = new RpcServiceFacade();
84
85     @GuardedBy("this")
86     private ImmutableList<RpcAvailReg> listeners = ImmutableList.of();
87
88     @GuardedBy("this")
89     private ImmutableList<ActionAvailReg> actionListeners = ImmutableList.of();
90
91     private volatile DOMRpcRoutingTable routingTable = DOMRpcRoutingTable.EMPTY;
92
93     private volatile DOMActionRoutingTable actionRoutingTable = DOMActionRoutingTable.EMPTY;
94
95     private Registration listenerRegistration;
96
97     @Deprecated
98     @VisibleForTesting
99     // FIXME: 9.0.0: make this constructor package-private
100     public DOMRpcRouter() {
101
102     }
103
104     @Inject
105     @Activate
106     public DOMRpcRouter(@Reference final DOMSchemaService schemaService) {
107         listenerRegistration = schemaService.registerSchemaContextListener(this::onModelContextUpdated);
108         LOG.info("DOM RPC/Action router started");
109     }
110
111     @Deprecated(forRemoval = true)
112     public static DOMRpcRouter newInstance(final DOMSchemaService schemaService) {
113         return new DOMRpcRouter(schemaService);
114     }
115
116     @PreDestroy
117     @Deactivate
118     public void shutdown() {
119         close();
120     }
121
122     public @NonNull DOMActionService actionService() {
123         return actionService;
124     }
125
126     public @NonNull DOMActionProviderService actionProviderService() {
127         return actionProviderService;
128     }
129
130     public @NonNull DOMRpcService rpcService() {
131         return rpcService;
132     }
133
134     public @NonNull DOMRpcProviderService rpcProviderService() {
135         return rpcProviderService;
136     }
137
138     private synchronized void removeRpcImplementation(final DOMRpcImplementation implementation,
139             final Set<DOMRpcIdentifier> rpcs) {
140         final DOMRpcRoutingTable oldTable = routingTable;
141         final DOMRpcRoutingTable newTable = (DOMRpcRoutingTable) oldTable.remove(implementation, rpcs);
142         routingTable = newTable;
143
144         listenerNotifier.execute(() -> notifyRemoved(newTable, implementation));
145     }
146
147     private synchronized void removeRpcImplementations(
148             final ImmutableTable<QName, YangInstanceIdentifier, DOMRpcImplementation> implTable) {
149         final DOMRpcRoutingTable oldTable = routingTable;
150         final DOMRpcRoutingTable newTable = (DOMRpcRoutingTable) oldTable.removeAll(implTable);
151         routingTable = newTable;
152
153         listenerNotifier.execute(() -> notifyRemoved(newTable, implTable.values()));
154     }
155
156     private synchronized void removeActionImplementation(final DOMActionImplementation implementation,
157             final Set<DOMActionInstance> actions) {
158         final DOMActionRoutingTable oldTable = actionRoutingTable;
159         final DOMActionRoutingTable newTable = (DOMActionRoutingTable) oldTable.remove(implementation, actions);
160         actionRoutingTable = newTable;
161
162         listenerNotifier.execute(() -> notifyActionChanged(newTable, implementation));
163     }
164
165     private synchronized void removeListener(final RpcAvailReg reg) {
166         listeners = ImmutableList.copyOf(Collections2.filter(listeners, input -> !reg.equals(input)));
167     }
168
169     private synchronized void removeActionListener(final ActionAvailReg reg) {
170         actionListeners = ImmutableList.copyOf(Collections2.filter(actionListeners, input -> !reg.equals(input)));
171     }
172
173     private synchronized void notifyAdded(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) {
174         for (var l : listeners) {
175             l.addRpc(newTable, impl);
176         }
177     }
178
179     private synchronized void notifyAdded(final DOMRpcRoutingTable newTable,
180             final Collection<? extends DOMRpcImplementation> impls) {
181         for (var l : listeners) {
182             for (var impl : impls) {
183                 l.addRpc(newTable, impl);
184             }
185         }
186     }
187
188     private synchronized void notifyRemoved(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) {
189         for (var l : listeners) {
190             l.removeRpc(newTable, impl);
191         }
192     }
193
194     private synchronized void notifyRemoved(final DOMRpcRoutingTable newTable,
195             final Collection<? extends DOMRpcImplementation> impls) {
196         for (var l : listeners) {
197             for (DOMRpcImplementation impl : impls) {
198                 l.removeRpc(newTable, impl);
199             }
200         }
201     }
202
203     private synchronized void notifyActionChanged(final DOMActionRoutingTable newTable,
204             final DOMActionImplementation impl) {
205         for (var l : actionListeners) {
206             l.actionChanged(newTable, impl);
207         }
208     }
209
210
211     synchronized void onModelContextUpdated(final @NonNull EffectiveModelContext newModelContext) {
212         final DOMRpcRoutingTable oldTable = routingTable;
213         final DOMRpcRoutingTable newTable = (DOMRpcRoutingTable) oldTable.setSchemaContext(newModelContext);
214         routingTable = newTable;
215
216         final DOMActionRoutingTable oldActionTable = actionRoutingTable;
217         final DOMActionRoutingTable newActionTable =
218                 (DOMActionRoutingTable) oldActionTable.setSchemaContext(newModelContext);
219         actionRoutingTable = newActionTable;
220     }
221
222     @Override
223     protected void removeRegistration() {
224         if (listenerRegistration != null) {
225             listenerRegistration.close();
226             listenerRegistration = null;
227         }
228         listenerNotifier.shutdown();
229         LOG.info("DOM RPC/Action router stopped");
230     }
231
232     @VisibleForTesting
233     synchronized List<?> listeners() {
234         return listeners;
235     }
236
237     @VisibleForTesting
238     synchronized List<?> actionListeners() {
239         return actionListeners;
240     }
241
242     @VisibleForTesting
243     DOMRpcRoutingTable routingTable() {
244         return routingTable;
245     }
246
247     private static final class RpcAvailReg extends AbstractRegistration {
248         private final DOMRpcAvailabilityListener listener;
249
250         private Map<QName, Set<YangInstanceIdentifier>> prevRpcs;
251         private DOMRpcRouter router;
252
253         RpcAvailReg(final DOMRpcRouter router, final DOMRpcAvailabilityListener listener,
254                 final Map<QName, Set<YangInstanceIdentifier>> rpcs) {
255             this.listener = requireNonNull(listener);
256             this.router = requireNonNull(router);
257             prevRpcs = requireNonNull(rpcs);
258         }
259
260         @Override
261         protected void removeRegistration() {
262             router.removeListener(this);
263             router = null;
264         }
265
266         void initialTable() {
267             final List<DOMRpcIdentifier> added = new ArrayList<>();
268             for (Entry<QName, Set<YangInstanceIdentifier>> e : prevRpcs.entrySet()) {
269                 added.addAll(Collections2.transform(e.getValue(), i -> DOMRpcIdentifier.create(e.getKey(), i)));
270             }
271             if (!added.isEmpty()) {
272                 listener.onRpcAvailable(added);
273             }
274         }
275
276         void addRpc(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) {
277             if (!listener.acceptsImplementation(impl)) {
278                 return;
279             }
280
281             final Map<QName, Set<YangInstanceIdentifier>> rpcs = verifyNotNull(newTable.getOperations(listener));
282             final MapDifference<QName, Set<YangInstanceIdentifier>> diff = Maps.difference(prevRpcs, rpcs);
283
284             final List<DOMRpcIdentifier> added = new ArrayList<>();
285             for (Entry<QName, Set<YangInstanceIdentifier>> e : diff.entriesOnlyOnRight().entrySet()) {
286                 added.addAll(Collections2.transform(e.getValue(), i -> DOMRpcIdentifier.create(e.getKey(), i)));
287             }
288             for (Entry<QName, ValueDifference<Set<YangInstanceIdentifier>>> e : diff.entriesDiffering().entrySet()) {
289                 for (YangInstanceIdentifier i : Sets.difference(e.getValue().rightValue(), e.getValue().leftValue())) {
290                     added.add(DOMRpcIdentifier.create(e.getKey(), i));
291                 }
292             }
293
294             prevRpcs = rpcs;
295             if (!added.isEmpty()) {
296                 listener.onRpcAvailable(added);
297             }
298         }
299
300         void removeRpc(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) {
301             if (!listener.acceptsImplementation(impl)) {
302                 return;
303             }
304
305             final Map<QName, Set<YangInstanceIdentifier>> rpcs = verifyNotNull(newTable.getOperations(listener));
306             final MapDifference<QName, Set<YangInstanceIdentifier>> diff = Maps.difference(prevRpcs, rpcs);
307
308             final List<DOMRpcIdentifier> removed = new ArrayList<>();
309             for (Entry<QName, Set<YangInstanceIdentifier>> e : diff.entriesOnlyOnLeft().entrySet()) {
310                 removed.addAll(Collections2.transform(e.getValue(), i -> DOMRpcIdentifier.create(e.getKey(), i)));
311             }
312             for (Entry<QName, ValueDifference<Set<YangInstanceIdentifier>>> e : diff.entriesDiffering().entrySet()) {
313                 for (YangInstanceIdentifier i : Sets.difference(e.getValue().leftValue(), e.getValue().rightValue())) {
314                     removed.add(DOMRpcIdentifier.create(e.getKey(), i));
315                 }
316             }
317
318             prevRpcs = rpcs;
319             if (!removed.isEmpty()) {
320                 listener.onRpcUnavailable(removed);
321             }
322         }
323     }
324
325     private static final class ActionAvailReg extends AbstractRegistration {
326         private final AvailabilityListener listener;
327
328         private Map<Absolute, Set<DOMDataTreeIdentifier>> prevActions;
329         private DOMRpcRouter router;
330
331         ActionAvailReg(final DOMRpcRouter router, final AvailabilityListener listener,
332                 final Map<Absolute, Set<DOMDataTreeIdentifier>> actions) {
333             this.listener = requireNonNull(listener);
334             this.router = requireNonNull(router);
335             prevActions = requireNonNull(actions);
336         }
337
338         @Override
339         protected void removeRegistration() {
340             router.removeActionListener(this);
341             router = null;
342         }
343
344         void initialTable() {
345             final var added = new ArrayList<DOMActionInstance>();
346             for (var e : prevActions.entrySet()) {
347                 added.addAll(Collections2.transform(e.getValue(), i -> DOMActionInstance.of(e.getKey(), i)));
348             }
349             if (!added.isEmpty()) {
350                 listener.onActionsChanged(ImmutableSet.of(), ImmutableSet.copyOf(added));
351             }
352         }
353
354         void actionChanged(final DOMActionRoutingTable newTable, final DOMActionImplementation impl) {
355             if (!listener.acceptsImplementation(impl)) {
356                 return;
357             }
358
359             final Map<Absolute, Set<DOMDataTreeIdentifier>> actions = verifyNotNull(newTable.getOperations(listener));
360             final MapDifference<Absolute, Set<DOMDataTreeIdentifier>> diff = Maps.difference(prevActions, actions);
361
362             final Set<DOMActionInstance> removed = new HashSet<>();
363             final Set<DOMActionInstance> added = new HashSet<>();
364
365             for (Entry<Absolute, Set<DOMDataTreeIdentifier>> e : diff.entriesOnlyOnLeft().entrySet()) {
366                 removed.addAll(Collections2.transform(e.getValue(), i -> DOMActionInstance.of(e.getKey(), i)));
367             }
368
369             for (Entry<Absolute, Set<DOMDataTreeIdentifier>> e : diff.entriesOnlyOnRight().entrySet()) {
370                 added.addAll(Collections2.transform(e.getValue(), i -> DOMActionInstance.of(e.getKey(), i)));
371             }
372
373             for (Entry<Absolute, ValueDifference<Set<DOMDataTreeIdentifier>>> e : diff.entriesDiffering().entrySet()) {
374                 for (DOMDataTreeIdentifier i : Sets.difference(e.getValue().leftValue(), e.getValue().rightValue())) {
375                     removed.add(DOMActionInstance.of(e.getKey(), i));
376                 }
377
378                 for (DOMDataTreeIdentifier i : Sets.difference(e.getValue().rightValue(), e.getValue().leftValue())) {
379                     added.add(DOMActionInstance.of(e.getKey(), i));
380                 }
381             }
382
383             prevActions = actions;
384             if (!removed.isEmpty() || !added.isEmpty()) {
385                 listener.onActionsChanged(removed, added);
386             }
387         }
388     }
389
390     @NonNullByDefault
391     private final class ActionServiceFacade implements DOMActionService, DOMActionAvailabilityExtension {
392         @Override
393         public List<Extension> supportedExtensions() {
394             return List.of(this);
395         }
396
397         @Override
398         public ListenableFuture<? extends DOMRpcResult> invokeAction(final Absolute type,
399                 final DOMDataTreeIdentifier path, final ContainerNode input) {
400             final YangInstanceIdentifier pathRoot = path.path();
401             checkArgument(!pathRoot.isEmpty(), "Action path must not be empty");
402
403             final DOMActionRoutingTableEntry entry = (DOMActionRoutingTableEntry) actionRoutingTable.getEntry(type);
404             return entry != null ? OperationInvocation.invoke(entry, type, path, requireNonNull(input))
405                 : Futures.immediateFailedFuture(
406                     new DOMActionNotAvailableException("No implementation of Action %s available", type));
407         }
408
409         @Override
410         public Registration registerAvailabilityListener(final AvailabilityListener listener) {
411             synchronized (DOMRpcRouter.this) {
412                 final var ret = new ActionAvailReg(DOMRpcRouter.this, listener,
413                     actionRoutingTable.getOperations(listener));
414                 actionListeners = ImmutableList.<ActionAvailReg>builder()
415                     .addAll(actionListeners)
416                     .add(ret)
417                     .build();
418
419                 listenerNotifier.execute(ret::initialTable);
420                 return ret;
421             }
422         }
423     }
424
425     @NonNullByDefault
426     private final class ActionProviderServiceFacade implements DOMActionProviderService {
427         @Override
428         public Registration registerActionImplementation(final DOMActionImplementation implementation,
429                 final Set<DOMActionInstance> instances) {
430             checkArgument(!instances.isEmpty(), "Instances must not be empty");
431
432             synchronized (DOMRpcRouter.this) {
433                 final DOMActionRoutingTable oldTable = actionRoutingTable;
434                 final DOMActionRoutingTable newTable = (DOMActionRoutingTable) oldTable.add(implementation, instances);
435                 actionRoutingTable = newTable;
436
437                 listenerNotifier.execute(() -> notifyActionChanged(newTable, implementation));
438             }
439
440             return new AbstractRegistration() {
441                 @Override
442                 protected void removeRegistration() {
443                     removeActionImplementation(implementation, instances);
444                 }
445             };
446         }
447     }
448
449     private final class RpcServiceFacade implements DOMRpcService {
450         @Override
451         public ListenableFuture<? extends DOMRpcResult> invokeRpc(final QName type, final ContainerNode input) {
452             final var entry = (AbstractDOMRpcRoutingTableEntry) routingTable.getEntry(type);
453             if (entry == null) {
454                 return Futures.immediateFailedFuture(
455                     new DOMRpcImplementationNotAvailableException("No implementation of RPC %s available", type));
456             }
457
458             return OperationInvocation.invoke(entry, requireNonNull(input));
459         }
460
461         @Override
462         public Registration registerRpcListener(final DOMRpcAvailabilityListener listener) {
463             synchronized (DOMRpcRouter.this) {
464                 final var ret = new RpcAvailReg(DOMRpcRouter.this, listener, routingTable.getOperations(listener));
465                 listeners = ImmutableList.<RpcAvailReg>builder().addAll(listeners).add(ret).build();
466
467                 listenerNotifier.execute(ret::initialTable);
468                 return ret;
469             }
470         }
471     }
472
473     private final class RpcProviderServiceFacade implements DOMRpcProviderService {
474         @Override
475         public Registration registerRpcImplementation(final DOMRpcImplementation implementation,
476                 final Set<DOMRpcIdentifier> rpcs) {
477
478             synchronized (DOMRpcRouter.this) {
479                 final DOMRpcRoutingTable oldTable = routingTable;
480                 final DOMRpcRoutingTable newTable = (DOMRpcRoutingTable) oldTable.add(implementation, rpcs);
481                 routingTable = newTable;
482
483                 listenerNotifier.execute(() -> notifyAdded(newTable, implementation));
484             }
485
486             return new AbstractRegistration() {
487                 @Override
488                 protected void removeRegistration() {
489                     removeRpcImplementation(implementation, rpcs);
490                 }
491             };
492         }
493
494         @Override
495         public Registration registerRpcImplementations(final Map<DOMRpcIdentifier, DOMRpcImplementation> map) {
496             checkArgument(!map.isEmpty());
497
498             final var builder = ImmutableTable.<QName, YangInstanceIdentifier, DOMRpcImplementation>builder();
499             for (var entry : map.entrySet()) {
500                 final var id = entry.getKey();
501                 builder.put(id.getType(), id.getContextReference(), entry.getValue());
502             }
503             final var implTable = builder.build();
504
505             synchronized (DOMRpcRouter.this) {
506                 final DOMRpcRoutingTable oldTable = routingTable;
507                 final DOMRpcRoutingTable newTable = (DOMRpcRoutingTable) oldTable.addAll(implTable);
508                 routingTable = newTable;
509
510                 listenerNotifier.execute(() -> notifyAdded(newTable, implTable.values()));
511             }
512
513             return new AbstractRegistration() {
514                 @Override
515                 protected void removeRegistration() {
516                     removeRpcImplementations(implTable);
517                 }
518             };
519         }
520     }
521 }