checkStyleViolationSeverity=error implemented for mdsal-dom-broker
[mdsal.git] / dom / mdsal-dom-broker / src / main / java / org / opendaylight / mdsal / dom / broker / DOMRpcRouter.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.dom.broker;
9
10 import com.google.common.base.Function;
11 import com.google.common.base.Predicate;
12 import com.google.common.collect.Collections2;
13 import com.google.common.collect.ImmutableList;
14 import com.google.common.collect.ImmutableList.Builder;
15 import com.google.common.collect.ImmutableSet;
16 import com.google.common.util.concurrent.CheckedFuture;
17 import com.google.common.util.concurrent.ThreadFactoryBuilder;
18 import java.util.Collection;
19 import java.util.Collections;
20 import java.util.Map;
21 import java.util.Map.Entry;
22 import java.util.Set;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.Executors;
25 import java.util.concurrent.ThreadFactory;
26 import javax.annotation.concurrent.GuardedBy;
27 import org.opendaylight.mdsal.dom.api.DOMRpcAvailabilityListener;
28 import org.opendaylight.mdsal.dom.api.DOMRpcException;
29 import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
30 import org.opendaylight.mdsal.dom.api.DOMRpcImplementation;
31 import org.opendaylight.mdsal.dom.api.DOMRpcImplementationRegistration;
32 import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
33 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
34 import org.opendaylight.mdsal.dom.api.DOMRpcService;
35 import org.opendaylight.mdsal.dom.spi.AbstractDOMRpcImplementationRegistration;
36 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
37 import org.opendaylight.yangtools.concepts.ListenerRegistration;
38 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
39 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
40 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
41 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
42 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
43
44 public final class DOMRpcRouter implements AutoCloseable, DOMRpcService, DOMRpcProviderService, SchemaContextListener {
45     private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat(
46             "DOMRpcRouter-listener-%s").setDaemon(true).build();
47     private final ExecutorService listenerNotifier = Executors.newSingleThreadExecutor(THREAD_FACTORY);
48     @GuardedBy("this")
49     private Collection<ListenerRegistration<? extends DOMRpcAvailabilityListener>> listeners = Collections.emptyList();
50     private volatile DOMRpcRoutingTable routingTable = DOMRpcRoutingTable.EMPTY;
51
52     private static Collection<DOMRpcIdentifier> notPresentRpcs(final DOMRpcRoutingTable table,
53             final Collection<DOMRpcIdentifier> candidates) {
54         return ImmutableSet.copyOf(Collections2.filter(candidates, new Predicate<DOMRpcIdentifier>() {
55             @Override
56             public boolean apply(final DOMRpcIdentifier input) {
57                 return !table.contains(input);
58             }
59         }));
60     }
61
62     private synchronized void removeRpcImplementation(final DOMRpcImplementation implementation,
63             final Set<DOMRpcIdentifier> rpcs) {
64         final DOMRpcRoutingTable oldTable = routingTable;
65         final DOMRpcRoutingTable newTable = oldTable.remove(implementation, rpcs);
66
67         final Collection<DOMRpcIdentifier> removedRpcs = notPresentRpcs(newTable, rpcs);
68         routingTable = newTable;
69         if (!removedRpcs.isEmpty()) {
70             final Collection<ListenerRegistration<? extends DOMRpcAvailabilityListener>> capturedListeners = listeners;
71             listenerNotifier.execute(new Runnable() {
72                 @Override
73                 public void run() {
74                     for (final ListenerRegistration<? extends DOMRpcAvailabilityListener> l : capturedListeners) {
75                         // Need to ensure removed listeners do not get notified
76                         synchronized (DOMRpcRouter.this) {
77                             if (listeners.contains(l)) {
78                                 l.getInstance().onRpcUnavailable(removedRpcs);
79                             }
80                         }
81                     }
82                 }
83             });
84         }
85     }
86
87     @Override
88     public <T extends DOMRpcImplementation> DOMRpcImplementationRegistration<T> registerRpcImplementation(
89             final T implementation, final DOMRpcIdentifier... rpcs) {
90         return registerRpcImplementation(implementation, ImmutableSet.copyOf(rpcs));
91     }
92
93     @Override
94     public synchronized <T extends DOMRpcImplementation> DOMRpcImplementationRegistration<T>
95             registerRpcImplementation(final T implementation, final Set<DOMRpcIdentifier> rpcs) {
96         final DOMRpcRoutingTable oldTable = routingTable;
97         final DOMRpcRoutingTable newTable = oldTable.add(implementation, rpcs);
98
99         final Collection<DOMRpcIdentifier> addedRpcs = notPresentRpcs(oldTable, rpcs);
100         routingTable = newTable;
101
102         if (!addedRpcs.isEmpty()) {
103             final Collection<ListenerRegistration<? extends DOMRpcAvailabilityListener>> capturedListeners = listeners;
104             listenerNotifier.execute(new Runnable() {
105                 @Override
106                 public void run() {
107                     for (final ListenerRegistration<? extends DOMRpcAvailabilityListener> l : capturedListeners) {
108                         // Need to ensure removed listeners do not get notified
109                         synchronized (DOMRpcRouter.this) {
110                             if (listeners.contains(l)) {
111                                 l.getInstance().onRpcAvailable(addedRpcs);
112                             }
113                         }
114                     }
115                 }
116             });
117         }
118
119         return new AbstractDOMRpcImplementationRegistration<T>(implementation) {
120             @Override
121             protected void removeRegistration() {
122                 removeRpcImplementation(getInstance(), rpcs);
123             }
124         };
125     }
126
127     @Override
128     public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(final SchemaPath type,
129             final NormalizedNode<?, ?> input) {
130         return routingTable.invokeRpc(type, input);
131     }
132
133     private synchronized void removeListener(final ListenerRegistration<? extends DOMRpcAvailabilityListener> reg) {
134         listeners = ImmutableList.copyOf(Collections2.filter(listeners, new Predicate<Object>() {
135             @Override
136             public boolean apply(final Object input) {
137                 return !reg.equals(input);
138             }
139         }));
140     }
141
142     @Override
143     public synchronized <T extends DOMRpcAvailabilityListener> ListenerRegistration<T> registerRpcListener(
144             final T listener) {
145         final ListenerRegistration<T> ret = new AbstractListenerRegistration<T>(listener) {
146             @Override
147             protected void removeRegistration() {
148                 removeListener(this);
149             }
150         };
151
152         final Builder<ListenerRegistration<? extends DOMRpcAvailabilityListener>> b = ImmutableList.builder();
153         b.addAll(listeners);
154         b.add(ret);
155         listeners = b.build();
156         final Map<SchemaPath, Set<YangInstanceIdentifier>> capturedRpcs = routingTable.getRpcs();
157
158         listenerNotifier.execute(new Runnable() {
159             @Override
160             public void run() {
161                 for (final Entry<SchemaPath, Set<YangInstanceIdentifier>> e : capturedRpcs.entrySet()) {
162                     listener.onRpcAvailable(Collections2.transform(e.getValue(),
163                             new Function<YangInstanceIdentifier, DOMRpcIdentifier>() {
164                             @Override
165                             public DOMRpcIdentifier apply(final YangInstanceIdentifier input) {
166                                 return DOMRpcIdentifier.create(e.getKey(), input);
167                             }
168                         }));
169                 }
170             }
171         });
172
173         return ret;
174     }
175
176     @Override
177     public synchronized void onGlobalContextUpdated(final SchemaContext context) {
178         final DOMRpcRoutingTable oldTable = routingTable;
179         final DOMRpcRoutingTable newTable = oldTable.setSchemaContext(context);
180         routingTable = newTable;
181     }
182
183     @Override
184     public void close() {
185         listenerNotifier.shutdown();
186     }
187
188 }