48c8525ba056e7e64e272277d1e6a8df2df38a20
[mdsal.git] / dom / mdsal-dom-broker / src / main / java / org / opendaylight / mdsal / dom / broker / DOMRpcRouter.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.dom.broker;
9
10 import static com.google.common.base.Verify.verifyNotNull;
11 import static java.util.Objects.requireNonNull;
12
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.collect.Collections2;
15 import com.google.common.collect.ImmutableList;
16 import com.google.common.collect.ImmutableList.Builder;
17 import com.google.common.collect.ImmutableSet;
18 import com.google.common.collect.MapDifference;
19 import com.google.common.collect.MapDifference.ValueDifference;
20 import com.google.common.collect.Maps;
21 import com.google.common.collect.Sets;
22 import com.google.common.util.concurrent.CheckedFuture;
23 import com.google.common.util.concurrent.Futures;
24 import com.google.common.util.concurrent.ThreadFactoryBuilder;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.Map;
29 import java.util.Map.Entry;
30 import java.util.Set;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Executors;
33 import java.util.concurrent.ThreadFactory;
34 import javax.annotation.concurrent.GuardedBy;
35 import org.opendaylight.mdsal.dom.api.DOMRpcAvailabilityListener;
36 import org.opendaylight.mdsal.dom.api.DOMRpcException;
37 import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
38 import org.opendaylight.mdsal.dom.api.DOMRpcImplementation;
39 import org.opendaylight.mdsal.dom.api.DOMRpcImplementationNotAvailableException;
40 import org.opendaylight.mdsal.dom.api.DOMRpcImplementationRegistration;
41 import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
42 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
43 import org.opendaylight.mdsal.dom.api.DOMRpcService;
44 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
45 import org.opendaylight.mdsal.dom.spi.AbstractDOMRpcImplementationRegistration;
46 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
47 import org.opendaylight.yangtools.concepts.AbstractRegistration;
48 import org.opendaylight.yangtools.concepts.ListenerRegistration;
49 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
50 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
51 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
52 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
53 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
54
55 public final class DOMRpcRouter extends AbstractRegistration implements SchemaContextListener {
56     private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat(
57             "DOMRpcRouter-listener-%s").setDaemon(true).build();
58
59     private final ExecutorService listenerNotifier = Executors.newSingleThreadExecutor(THREAD_FACTORY);
60     private final DOMRpcProviderService rpcProviderService = new RpcProviderServiceFacade();
61     private final DOMRpcService rpcService = new RpcServiceFacade();
62
63     @GuardedBy("this")
64     private Collection<Registration<?>> listeners = Collections.emptyList();
65
66     private volatile DOMRpcRoutingTable routingTable = DOMRpcRoutingTable.EMPTY;
67
68     private ListenerRegistration<?> listenerRegistration;
69
70     public static DOMRpcRouter newInstance(final DOMSchemaService schemaService) {
71         final DOMRpcRouter rpcRouter = new DOMRpcRouter();
72         rpcRouter.listenerRegistration = schemaService.registerSchemaContextListener(rpcRouter);
73         return rpcRouter;
74     }
75
76     public DOMRpcService getRpcService() {
77         return rpcService;
78     }
79
80     public DOMRpcProviderService getRpcProviderService() {
81         return rpcProviderService;
82     }
83
84     private synchronized void removeRpcImplementation(final DOMRpcImplementation implementation,
85             final Set<DOMRpcIdentifier> rpcs) {
86         final DOMRpcRoutingTable oldTable = routingTable;
87         final DOMRpcRoutingTable newTable = oldTable.remove(implementation, rpcs);
88         routingTable = newTable;
89
90         listenerNotifier.execute(() -> notifyRemoved(newTable, implementation));
91     }
92
93     private synchronized void removeListener(final ListenerRegistration<? extends DOMRpcAvailabilityListener> reg) {
94         listeners = ImmutableList.copyOf(Collections2.filter(listeners, input -> !reg.equals(input)));
95     }
96
97     private synchronized void notifyAdded(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) {
98         for (Registration<?> l : listeners) {
99             l.addRpc(newTable, impl);
100         }
101     }
102
103     private synchronized void notifyRemoved(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) {
104         for (Registration<?> l : listeners) {
105             l.removeRpc(newTable, impl);
106         }
107     }
108
109     @Override
110     public synchronized void onGlobalContextUpdated(final SchemaContext context) {
111         final DOMRpcRoutingTable oldTable = routingTable;
112         final DOMRpcRoutingTable newTable = oldTable.setSchemaContext(context);
113         routingTable = newTable;
114     }
115
116     @Override
117     protected void removeRegistration() {
118         if (listenerRegistration != null) {
119             listenerRegistration.close();
120             listenerRegistration = null;
121         }
122         listenerNotifier.shutdown();
123     }
124
125     @VisibleForTesting
126     Collection<?> listeners() {
127         return listeners;
128     }
129
130     @VisibleForTesting
131     DOMRpcRoutingTable routingTable() {
132         return routingTable;
133     }
134
135     private static final class Registration<T extends DOMRpcAvailabilityListener>
136         extends AbstractListenerRegistration<T> {
137
138         private Map<SchemaPath, Set<YangInstanceIdentifier>> prevRpcs;
139         private DOMRpcRouter router;
140
141         Registration(final DOMRpcRouter router, final T listener,
142                 final Map<SchemaPath, Set<YangInstanceIdentifier>> rpcs) {
143             super(listener);
144             this.router = requireNonNull(router);
145             this.prevRpcs = requireNonNull(rpcs);
146         }
147
148         @Override
149         protected void removeRegistration() {
150             router.removeListener(this);
151             router = null;
152         }
153
154         void initialTable() {
155             final Collection<DOMRpcIdentifier> added = new ArrayList<>();
156             for (Entry<SchemaPath, Set<YangInstanceIdentifier>> e : prevRpcs.entrySet()) {
157                 added.addAll(Collections2.transform(e.getValue(), i -> DOMRpcIdentifier.create(e.getKey(), i)));
158             }
159             if (!added.isEmpty()) {
160                 getInstance().onRpcAvailable(added);
161             }
162         }
163
164         void addRpc(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) {
165             final T l = getInstance();
166             if (!l.acceptsImplementation(impl)) {
167                 return;
168             }
169
170             final Map<SchemaPath, Set<YangInstanceIdentifier>> rpcs = verifyNotNull(newTable.getRpcs(l));
171             final MapDifference<SchemaPath, Set<YangInstanceIdentifier>> diff = Maps.difference(prevRpcs, rpcs);
172
173             final Collection<DOMRpcIdentifier> added = new ArrayList<>();
174             for (Entry<SchemaPath, Set<YangInstanceIdentifier>> e : diff.entriesOnlyOnRight().entrySet()) {
175                 added.addAll(Collections2.transform(e.getValue(), i -> DOMRpcIdentifier.create(e.getKey(), i)));
176             }
177             for (Entry<SchemaPath, ValueDifference<Set<YangInstanceIdentifier>>> e :
178                     diff.entriesDiffering().entrySet()) {
179                 for (YangInstanceIdentifier i : Sets.difference(e.getValue().rightValue(), e.getValue().leftValue())) {
180                     added.add(DOMRpcIdentifier.create(e.getKey(), i));
181                 }
182             }
183
184             prevRpcs = rpcs;
185             if (!added.isEmpty()) {
186                 l.onRpcAvailable(added);
187             }
188         }
189
190         void removeRpc(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) {
191             final T l = getInstance();
192             if (!l.acceptsImplementation(impl)) {
193                 return;
194             }
195
196             final Map<SchemaPath, Set<YangInstanceIdentifier>> rpcs = verifyNotNull(newTable.getRpcs(l));
197             final MapDifference<SchemaPath, Set<YangInstanceIdentifier>> diff = Maps.difference(prevRpcs, rpcs);
198
199             final Collection<DOMRpcIdentifier> removed = new ArrayList<>();
200             for (Entry<SchemaPath, Set<YangInstanceIdentifier>> e : diff.entriesOnlyOnLeft().entrySet()) {
201                 removed.addAll(Collections2.transform(e.getValue(), i -> DOMRpcIdentifier.create(e.getKey(), i)));
202             }
203             for (Entry<SchemaPath, ValueDifference<Set<YangInstanceIdentifier>>> e :
204                     diff.entriesDiffering().entrySet()) {
205                 for (YangInstanceIdentifier i : Sets.difference(e.getValue().leftValue(), e.getValue().rightValue())) {
206                     removed.add(DOMRpcIdentifier.create(e.getKey(), i));
207                 }
208             }
209
210             prevRpcs = rpcs;
211             if (!removed.isEmpty()) {
212                 l.onRpcUnavailable(removed);
213             }
214         }
215     }
216
217     private final class RpcServiceFacade implements DOMRpcService {
218         @Override
219         public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(final SchemaPath type,
220                 final NormalizedNode<?, ?> input) {
221             final AbstractDOMRpcRoutingTableEntry entry = routingTable.getEntry(type);
222             if (entry == null) {
223                 return Futures.immediateFailedCheckedFuture(
224                     new DOMRpcImplementationNotAvailableException("No implementation of RPC %s available", type));
225             }
226
227             return entry.invokeRpc(input);
228         }
229
230         @Override
231         public <T extends DOMRpcAvailabilityListener> ListenerRegistration<T> registerRpcListener(final T listener) {
232             synchronized (DOMRpcRouter.this) {
233                 final Registration<T> ret = new Registration<>(DOMRpcRouter.this, listener,
234                         routingTable.getRpcs(listener));
235                 final Builder<Registration<?>> b = ImmutableList.builder();
236                 b.addAll(listeners);
237                 b.add(ret);
238                 listeners = b.build();
239
240                 listenerNotifier.execute(() -> ret.initialTable());
241                 return ret;
242             }
243         }
244     }
245
246     private final class RpcProviderServiceFacade implements DOMRpcProviderService {
247         @Override
248         public <T extends DOMRpcImplementation> DOMRpcImplementationRegistration<T> registerRpcImplementation(
249                 final T implementation, final DOMRpcIdentifier... rpcs) {
250             return registerRpcImplementation(implementation, ImmutableSet.copyOf(rpcs));
251         }
252
253         @Override
254         public <T extends DOMRpcImplementation> DOMRpcImplementationRegistration<T> registerRpcImplementation(
255                 final T implementation, final Set<DOMRpcIdentifier> rpcs) {
256
257             synchronized (DOMRpcRouter.this) {
258                 final DOMRpcRoutingTable oldTable = routingTable;
259                 final DOMRpcRoutingTable newTable = oldTable.add(implementation, rpcs);
260                 routingTable = newTable;
261
262                 listenerNotifier.execute(() -> notifyAdded(newTable, implementation));
263             }
264
265             return new AbstractDOMRpcImplementationRegistration<T>(implementation) {
266                 @Override
267                 protected void removeRegistration() {
268                     removeRpcImplementation(getInstance(), rpcs);
269                 }
270             };
271         }
272     }
273 }