b34421910ac1c9d06de43be3f273187048c1f1af
[mdsal.git] / dom / mdsal-dom-broker / src / main / java / org / opendaylight / mdsal / dom / broker / DOMRpcRouter.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.dom.broker;
9
10 import org.opendaylight.mdsal.dom.spi.AbstractDOMRpcImplementationRegistration;
11
12 import org.opendaylight.mdsal.dom.api.DOMRpcAvailabilityListener;
13 import org.opendaylight.mdsal.dom.api.DOMRpcException;
14 import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
15 import org.opendaylight.mdsal.dom.api.DOMRpcImplementation;
16 import org.opendaylight.mdsal.dom.api.DOMRpcImplementationRegistration;
17 import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
18 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
19 import org.opendaylight.mdsal.dom.api.DOMRpcService;
20 import com.google.common.base.Function;
21 import com.google.common.base.Predicate;
22 import com.google.common.collect.Collections2;
23 import com.google.common.collect.ImmutableList;
24 import com.google.common.collect.ImmutableList.Builder;
25 import com.google.common.collect.ImmutableSet;
26 import com.google.common.util.concurrent.CheckedFuture;
27 import com.google.common.util.concurrent.ThreadFactoryBuilder;
28 import java.util.Collection;
29 import java.util.Collections;
30 import java.util.Map;
31 import java.util.Map.Entry;
32 import java.util.Set;
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.ThreadFactory;
36 import javax.annotation.concurrent.GuardedBy;
37 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
38 import org.opendaylight.yangtools.concepts.ListenerRegistration;
39 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
40 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
41 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
42 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
43 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
44
45 public final class DOMRpcRouter implements AutoCloseable, DOMRpcService, DOMRpcProviderService, SchemaContextListener {
46     private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("DOMRpcRouter-listener-%s").setDaemon(true).build();
47     private final ExecutorService listenerNotifier = Executors.newSingleThreadExecutor(THREAD_FACTORY);
48     @GuardedBy("this")
49     private Collection<ListenerRegistration<? extends DOMRpcAvailabilityListener>> listeners = Collections.emptyList();
50     private volatile DOMRpcRoutingTable routingTable = DOMRpcRoutingTable.EMPTY;
51
52     @Override
53     public <T extends DOMRpcImplementation> DOMRpcImplementationRegistration<T> registerRpcImplementation(final T implementation, final DOMRpcIdentifier... rpcs) {
54         return registerRpcImplementation(implementation, ImmutableSet.copyOf(rpcs));
55     }
56
57     private static Collection<DOMRpcIdentifier> notPresentRpcs(final DOMRpcRoutingTable table, final Collection<DOMRpcIdentifier> candidates) {
58         return ImmutableSet.copyOf(Collections2.filter(candidates, new Predicate<DOMRpcIdentifier>() {
59             @Override
60             public boolean apply(final DOMRpcIdentifier input) {
61                 return !table.contains(input);
62             }
63         }));
64     }
65
66     private synchronized void removeRpcImplementation(final DOMRpcImplementation implementation, final Set<DOMRpcIdentifier> rpcs) {
67         final DOMRpcRoutingTable oldTable = routingTable;
68         final DOMRpcRoutingTable newTable = oldTable.remove(implementation, rpcs);
69
70         final Collection<DOMRpcIdentifier> removedRpcs = notPresentRpcs(newTable, rpcs);
71         routingTable = newTable;
72         if(!removedRpcs.isEmpty()) {
73             final Collection<ListenerRegistration<? extends DOMRpcAvailabilityListener>> capturedListeners = listeners;
74             listenerNotifier.execute(new Runnable() {
75                 @Override
76                 public void run() {
77                     for (final ListenerRegistration<? extends DOMRpcAvailabilityListener> l : capturedListeners) {
78                         // Need to ensure removed listeners do not get notified
79                         synchronized (DOMRpcRouter.this) {
80                             if (listeners.contains(l)) {
81                                 l.getInstance().onRpcUnavailable(removedRpcs);
82                             }
83                         }
84                     }
85                 }
86             });
87         }
88     }
89
90     @Override
91     public synchronized <T extends DOMRpcImplementation> DOMRpcImplementationRegistration<T> registerRpcImplementation(final T implementation, final Set<DOMRpcIdentifier> rpcs) {
92         final DOMRpcRoutingTable oldTable = routingTable;
93         final DOMRpcRoutingTable newTable = oldTable.add(implementation, rpcs);
94
95         final Collection<DOMRpcIdentifier> addedRpcs = notPresentRpcs(oldTable, rpcs);
96         routingTable = newTable;
97
98         if(!addedRpcs.isEmpty()) {
99             final Collection<ListenerRegistration<? extends DOMRpcAvailabilityListener>> capturedListeners = listeners;
100             listenerNotifier.execute(new Runnable() {
101                 @Override
102                 public void run() {
103                     for (final ListenerRegistration<? extends DOMRpcAvailabilityListener> l : capturedListeners) {
104                         // Need to ensure removed listeners do not get notified
105                         synchronized (DOMRpcRouter.this) {
106                             if (listeners.contains(l)) {
107                                 l.getInstance().onRpcAvailable(addedRpcs);
108                             }
109                         }
110                     }
111                 }
112             });
113         }
114
115         return new AbstractDOMRpcImplementationRegistration<T>(implementation) {
116             @Override
117             protected void removeRegistration() {
118                 removeRpcImplementation(getInstance(), rpcs);
119             }
120         };
121     }
122
123     @Override
124     public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(final SchemaPath type, final NormalizedNode<?, ?> input) {
125         return routingTable.invokeRpc(type, input);
126     }
127
128     private synchronized void removeListener(final ListenerRegistration<? extends DOMRpcAvailabilityListener> reg) {
129         listeners = ImmutableList.copyOf(Collections2.filter(listeners, new Predicate<Object>() {
130             @Override
131             public boolean apply(final Object input) {
132                 return !reg.equals(input);
133             }
134         }));
135     }
136
137     @Override
138     public synchronized <T extends DOMRpcAvailabilityListener> ListenerRegistration<T> registerRpcListener(final T listener) {
139         final ListenerRegistration<T> ret = new AbstractListenerRegistration<T>(listener) {
140             @Override
141             protected void removeRegistration() {
142                 removeListener(this);
143             }
144         };
145
146         final Builder<ListenerRegistration<? extends DOMRpcAvailabilityListener>> b = ImmutableList.builder();
147         b.addAll(listeners);
148         b.add(ret);
149         listeners = b.build();
150         final Map<SchemaPath, Set<YangInstanceIdentifier>> capturedRpcs = routingTable.getRpcs();
151
152         listenerNotifier.execute(new Runnable() {
153             @Override
154             public void run() {
155                 for (final Entry<SchemaPath, Set<YangInstanceIdentifier>> e : capturedRpcs.entrySet()) {
156                     listener.onRpcAvailable(Collections2.transform(e.getValue(), new Function<YangInstanceIdentifier, DOMRpcIdentifier>() {
157                         @Override
158                         public DOMRpcIdentifier apply(final YangInstanceIdentifier input) {
159                             return DOMRpcIdentifier.create(e.getKey(), input);
160                         }
161                     }));
162                 }
163             }
164         });
165
166         return ret;
167     }
168
169     @Override
170     public synchronized void onGlobalContextUpdated(final SchemaContext context) {
171         final DOMRpcRoutingTable oldTable = routingTable;
172         final DOMRpcRoutingTable newTable = oldTable.setSchemaContext(context);
173         routingTable = newTable;
174     }
175
176     @Override
177     public void close() {
178         listenerNotifier.shutdown();
179     }
180
181 }