14e75e22c6c6572f62f802901bd0e9e356d93097
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / md / sal / dom / broker / impl / DOMRpcRouter.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.dom.broker.impl;
9
10 import com.google.common.base.Function;
11 import com.google.common.base.Predicate;
12 import com.google.common.collect.Collections2;
13 import com.google.common.collect.ImmutableList;
14 import com.google.common.collect.ImmutableList.Builder;
15 import com.google.common.collect.ImmutableSet;
16 import com.google.common.util.concurrent.CheckedFuture;
17 import com.google.common.util.concurrent.ThreadFactoryBuilder;
18 import java.util.Collection;
19 import java.util.Collections;
20 import java.util.Map;
21 import java.util.Map.Entry;
22 import java.util.Set;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.Executors;
25 import java.util.concurrent.ThreadFactory;
26 import javax.annotation.concurrent.GuardedBy;
27 import org.opendaylight.controller.md.sal.dom.api.DOMRpcAvailabilityListener;
28 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
29 import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
30 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementation;
31 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
32 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
33 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
34 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
35 import org.opendaylight.controller.md.sal.dom.spi.AbstractDOMRpcImplementationRegistration;
36 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
37 import org.opendaylight.yangtools.concepts.ListenerRegistration;
38 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
39 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
40 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
41 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
42 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
43
44 public final class DOMRpcRouter implements AutoCloseable, DOMRpcService, DOMRpcProviderService, SchemaContextListener {
45     private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("DOMRpcRouter-listener-%s").setDaemon(true).build();
46     private final ExecutorService listenerNotifier = Executors.newSingleThreadExecutor(THREAD_FACTORY);
47     @GuardedBy("this")
48     private Collection<ListenerRegistration<? extends DOMRpcAvailabilityListener>> listeners = Collections.emptyList();
49     private volatile DOMRpcRoutingTable routingTable = DOMRpcRoutingTable.EMPTY;
50
51     @Override
52     public <T extends DOMRpcImplementation> DOMRpcImplementationRegistration<T> registerRpcImplementation(final T implementation, final DOMRpcIdentifier... rpcs) {
53         return registerRpcImplementation(implementation, ImmutableSet.copyOf(rpcs));
54     }
55
56     private static Collection<DOMRpcIdentifier> notPresentRpcs(final DOMRpcRoutingTable table, final Collection<DOMRpcIdentifier> candidates) {
57         return ImmutableSet.copyOf(Collections2.filter(candidates, new Predicate<DOMRpcIdentifier>() {
58             @Override
59             public boolean apply(final DOMRpcIdentifier input) {
60                 return !table.contains(input);
61             }
62         }));
63     }
64
65     private synchronized void removeRpcImplementation(final DOMRpcImplementation implementation, final Set<DOMRpcIdentifier> rpcs) {
66         final DOMRpcRoutingTable oldTable = routingTable;
67         final DOMRpcRoutingTable newTable = oldTable.remove(implementation, rpcs);
68
69         final Collection<DOMRpcIdentifier> removedRpcs = notPresentRpcs(newTable, rpcs);
70         routingTable = newTable;
71         if(!removedRpcs.isEmpty()) {
72             final Collection<ListenerRegistration<? extends DOMRpcAvailabilityListener>> capturedListeners = listeners;
73             listenerNotifier.execute(new Runnable() {
74                 @Override
75                 public void run() {
76                     for (final ListenerRegistration<? extends DOMRpcAvailabilityListener> l : capturedListeners) {
77                         // Need to ensure removed listeners do not get notified
78                         synchronized (DOMRpcRouter.this) {
79                             if (listeners.contains(l)) {
80                                 l.getInstance().onRpcUnavailable(removedRpcs);
81                             }
82                         }
83                     }
84                 }
85             });
86         }
87     }
88
89     @Override
90     public synchronized <T extends DOMRpcImplementation> DOMRpcImplementationRegistration<T> registerRpcImplementation(final T implementation, final Set<DOMRpcIdentifier> rpcs) {
91         final DOMRpcRoutingTable oldTable = routingTable;
92         final DOMRpcRoutingTable newTable = oldTable.add(implementation, rpcs);
93
94         final Collection<DOMRpcIdentifier> addedRpcs = notPresentRpcs(oldTable, rpcs);
95         routingTable = newTable;
96
97         if(!addedRpcs.isEmpty()) {
98             final Collection<ListenerRegistration<? extends DOMRpcAvailabilityListener>> capturedListeners = listeners;
99             listenerNotifier.execute(new Runnable() {
100                 @Override
101                 public void run() {
102                     for (final ListenerRegistration<? extends DOMRpcAvailabilityListener> l : capturedListeners) {
103                         // Need to ensure removed listeners do not get notified
104                         synchronized (DOMRpcRouter.this) {
105                             if (listeners.contains(l)) {
106                                 l.getInstance().onRpcAvailable(addedRpcs);
107                             }
108                         }
109                     }
110                 }
111             });
112         }
113
114         return new AbstractDOMRpcImplementationRegistration<T>(implementation) {
115             @Override
116             protected void removeRegistration() {
117                 removeRpcImplementation(getInstance(), rpcs);
118             }
119         };
120     }
121
122     @Override
123     public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(final SchemaPath type, final NormalizedNode<?, ?> input) {
124         return routingTable.invokeRpc(type, input);
125     }
126
127     private synchronized void removeListener(final ListenerRegistration<? extends DOMRpcAvailabilityListener> reg) {
128         listeners = ImmutableList.copyOf(Collections2.filter(listeners, new Predicate<Object>() {
129             @Override
130             public boolean apply(final Object input) {
131                 return !reg.equals(input);
132             }
133         }));
134     }
135
136     @Override
137     public synchronized <T extends DOMRpcAvailabilityListener> ListenerRegistration<T> registerRpcListener(final T listener) {
138         final ListenerRegistration<T> ret = new AbstractListenerRegistration<T>(listener) {
139             @Override
140             protected void removeRegistration() {
141                 removeListener(this);
142             }
143         };
144
145         final Builder<ListenerRegistration<? extends DOMRpcAvailabilityListener>> b = ImmutableList.builder();
146         b.addAll(listeners);
147         b.add(ret);
148         listeners = b.build();
149         final Map<SchemaPath, Set<YangInstanceIdentifier>> capturedRpcs = routingTable.getRpcs();
150
151         listenerNotifier.execute(new Runnable() {
152             @Override
153             public void run() {
154                 for (final Entry<SchemaPath, Set<YangInstanceIdentifier>> e : capturedRpcs.entrySet()) {
155                     listener.onRpcAvailable(Collections2.transform(e.getValue(), new Function<YangInstanceIdentifier, DOMRpcIdentifier>() {
156                         @Override
157                         public DOMRpcIdentifier apply(final YangInstanceIdentifier input) {
158                             return DOMRpcIdentifier.create(e.getKey(), input);
159                         }
160                     }));
161                 }
162             }
163         });
164
165         return ret;
166     }
167
168     @Override
169     public synchronized void onGlobalContextUpdated(final SchemaContext context) {
170         final DOMRpcRoutingTable oldTable = routingTable;
171         final DOMRpcRoutingTable newTable = oldTable.setSchemaContext(context);
172         routingTable = newTable;
173     }
174
175     @Override
176     public void close() {
177         listenerNotifier.shutdown();
178     }
179
180 }