2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.mdsal.dom.broker;
10 import org.opendaylight.mdsal.dom.spi.AbstractDOMRpcImplementationRegistration;
12 import org.opendaylight.mdsal.dom.api.DOMRpcAvailabilityListener;
13 import org.opendaylight.mdsal.dom.api.DOMRpcException;
14 import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
15 import org.opendaylight.mdsal.dom.api.DOMRpcImplementation;
16 import org.opendaylight.mdsal.dom.api.DOMRpcImplementationRegistration;
17 import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
18 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
19 import org.opendaylight.mdsal.dom.api.DOMRpcService;
20 import com.google.common.base.Function;
21 import com.google.common.base.Predicate;
22 import com.google.common.collect.Collections2;
23 import com.google.common.collect.ImmutableList;
24 import com.google.common.collect.ImmutableList.Builder;
25 import com.google.common.collect.ImmutableSet;
26 import com.google.common.util.concurrent.CheckedFuture;
27 import com.google.common.util.concurrent.ThreadFactoryBuilder;
28 import java.util.Collection;
29 import java.util.Collections;
31 import java.util.Map.Entry;
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.ThreadFactory;
36 import javax.annotation.concurrent.GuardedBy;
37 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
38 import org.opendaylight.yangtools.concepts.ListenerRegistration;
39 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
40 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
41 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
42 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
43 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
45 public final class DOMRpcRouter implements AutoCloseable, DOMRpcService, DOMRpcProviderService, SchemaContextListener {
46 private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("DOMRpcRouter-listener-%s").setDaemon(true).build();
47 private final ExecutorService listenerNotifier = Executors.newSingleThreadExecutor(THREAD_FACTORY);
49 private Collection<ListenerRegistration<? extends DOMRpcAvailabilityListener>> listeners = Collections.emptyList();
50 private volatile DOMRpcRoutingTable routingTable = DOMRpcRoutingTable.EMPTY;
53 public <T extends DOMRpcImplementation> DOMRpcImplementationRegistration<T> registerRpcImplementation(final T implementation, final DOMRpcIdentifier... rpcs) {
54 return registerRpcImplementation(implementation, ImmutableSet.copyOf(rpcs));
57 private static Collection<DOMRpcIdentifier> notPresentRpcs(final DOMRpcRoutingTable table, final Collection<DOMRpcIdentifier> candidates) {
58 return ImmutableSet.copyOf(Collections2.filter(candidates, new Predicate<DOMRpcIdentifier>() {
60 public boolean apply(final DOMRpcIdentifier input) {
61 return !table.contains(input);
66 private synchronized void removeRpcImplementation(final DOMRpcImplementation implementation, final Set<DOMRpcIdentifier> rpcs) {
67 final DOMRpcRoutingTable oldTable = routingTable;
68 final DOMRpcRoutingTable newTable = oldTable.remove(implementation, rpcs);
70 final Collection<DOMRpcIdentifier> removedRpcs = notPresentRpcs(newTable, rpcs);
71 routingTable = newTable;
72 if(!removedRpcs.isEmpty()) {
73 final Collection<ListenerRegistration<? extends DOMRpcAvailabilityListener>> capturedListeners = listeners;
74 listenerNotifier.execute(new Runnable() {
77 for (final ListenerRegistration<? extends DOMRpcAvailabilityListener> l : capturedListeners) {
78 // Need to ensure removed listeners do not get notified
79 synchronized (DOMRpcRouter.this) {
80 if (listeners.contains(l)) {
81 l.getInstance().onRpcUnavailable(removedRpcs);
91 public synchronized <T extends DOMRpcImplementation> DOMRpcImplementationRegistration<T> registerRpcImplementation(final T implementation, final Set<DOMRpcIdentifier> rpcs) {
92 final DOMRpcRoutingTable oldTable = routingTable;
93 final DOMRpcRoutingTable newTable = oldTable.add(implementation, rpcs);
95 final Collection<DOMRpcIdentifier> addedRpcs = notPresentRpcs(oldTable, rpcs);
96 routingTable = newTable;
98 if(!addedRpcs.isEmpty()) {
99 final Collection<ListenerRegistration<? extends DOMRpcAvailabilityListener>> capturedListeners = listeners;
100 listenerNotifier.execute(new Runnable() {
103 for (final ListenerRegistration<? extends DOMRpcAvailabilityListener> l : capturedListeners) {
104 // Need to ensure removed listeners do not get notified
105 synchronized (DOMRpcRouter.this) {
106 if (listeners.contains(l)) {
107 l.getInstance().onRpcAvailable(addedRpcs);
115 return new AbstractDOMRpcImplementationRegistration<T>(implementation) {
117 protected void removeRegistration() {
118 removeRpcImplementation(getInstance(), rpcs);
124 public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(final SchemaPath type, final NormalizedNode<?, ?> input) {
125 return routingTable.invokeRpc(type, input);
128 private synchronized void removeListener(final ListenerRegistration<? extends DOMRpcAvailabilityListener> reg) {
129 listeners = ImmutableList.copyOf(Collections2.filter(listeners, new Predicate<Object>() {
131 public boolean apply(final Object input) {
132 return !reg.equals(input);
138 public synchronized <T extends DOMRpcAvailabilityListener> ListenerRegistration<T> registerRpcListener(final T listener) {
139 final ListenerRegistration<T> ret = new AbstractListenerRegistration<T>(listener) {
141 protected void removeRegistration() {
142 removeListener(this);
146 final Builder<ListenerRegistration<? extends DOMRpcAvailabilityListener>> b = ImmutableList.builder();
149 listeners = b.build();
150 final Map<SchemaPath, Set<YangInstanceIdentifier>> capturedRpcs = routingTable.getRpcs();
152 listenerNotifier.execute(new Runnable() {
155 for (final Entry<SchemaPath, Set<YangInstanceIdentifier>> e : capturedRpcs.entrySet()) {
156 listener.onRpcAvailable(Collections2.transform(e.getValue(), new Function<YangInstanceIdentifier, DOMRpcIdentifier>() {
158 public DOMRpcIdentifier apply(final YangInstanceIdentifier input) {
159 return DOMRpcIdentifier.create(e.getKey(), input);
170 public synchronized void onGlobalContextUpdated(final SchemaContext context) {
171 final DOMRpcRoutingTable oldTable = routingTable;
172 final DOMRpcRoutingTable newTable = oldTable.setSchemaContext(context);
173 routingTable = newTable;
177 public void close() {
178 listenerNotifier.shutdown();