2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.md.sal.dom.broker.impl;
10 import com.google.common.base.Function;
11 import com.google.common.base.Predicate;
12 import com.google.common.collect.Collections2;
13 import com.google.common.collect.ImmutableList;
14 import com.google.common.collect.ImmutableList.Builder;
15 import com.google.common.collect.ImmutableSet;
16 import com.google.common.util.concurrent.CheckedFuture;
17 import com.google.common.util.concurrent.ThreadFactoryBuilder;
18 import java.util.Collection;
19 import java.util.Collections;
21 import java.util.Map.Entry;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.Executors;
25 import java.util.concurrent.ThreadFactory;
26 import javax.annotation.concurrent.GuardedBy;
27 import org.opendaylight.controller.md.sal.dom.api.DOMRpcAvailabilityListener;
28 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
29 import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
30 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementation;
31 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
32 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
33 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
34 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
35 import org.opendaylight.controller.md.sal.dom.spi.AbstractDOMRpcImplementationRegistration;
36 import org.opendaylight.controller.sal.core.api.model.SchemaService;
37 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
38 import org.opendaylight.yangtools.concepts.ListenerRegistration;
39 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
40 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
41 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
42 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
43 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
45 public final class DOMRpcRouter implements AutoCloseable, DOMRpcService, DOMRpcProviderService, SchemaContextListener {
46 private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("DOMRpcRouter-listener-%s").setDaemon(true).build();
47 private final ExecutorService listenerNotifier = Executors.newSingleThreadExecutor(THREAD_FACTORY);
49 private Collection<ListenerRegistration<? extends DOMRpcAvailabilityListener>> listeners = Collections.emptyList();
50 private volatile DOMRpcRoutingTable routingTable = DOMRpcRoutingTable.EMPTY;
52 public static DOMRpcRouter newInstance(SchemaService schemaService) {
53 final DOMRpcRouter rpcRouter = new DOMRpcRouter();
54 schemaService.registerSchemaContextListener(rpcRouter);
59 public <T extends DOMRpcImplementation> DOMRpcImplementationRegistration<T> registerRpcImplementation(final T implementation, final DOMRpcIdentifier... rpcs) {
60 return registerRpcImplementation(implementation, ImmutableSet.copyOf(rpcs));
63 private static Collection<DOMRpcIdentifier> notPresentRpcs(final DOMRpcRoutingTable table, final Collection<DOMRpcIdentifier> candidates) {
64 return ImmutableSet.copyOf(Collections2.filter(candidates, new Predicate<DOMRpcIdentifier>() {
66 public boolean apply(final DOMRpcIdentifier input) {
67 return !table.contains(input);
72 private synchronized void removeRpcImplementation(final DOMRpcImplementation implementation, final Set<DOMRpcIdentifier> rpcs) {
73 final DOMRpcRoutingTable oldTable = routingTable;
74 final DOMRpcRoutingTable newTable = oldTable.remove(implementation, rpcs);
76 final Collection<DOMRpcIdentifier> removedRpcs = notPresentRpcs(newTable, rpcs);
77 routingTable = newTable;
78 if(!removedRpcs.isEmpty()) {
79 final Collection<ListenerRegistration<? extends DOMRpcAvailabilityListener>> capturedListeners = listeners;
80 listenerNotifier.execute(new Runnable() {
83 for (final ListenerRegistration<? extends DOMRpcAvailabilityListener> l : capturedListeners) {
84 // Need to ensure removed listeners do not get notified
85 synchronized (DOMRpcRouter.this) {
86 if (listeners.contains(l)) {
87 l.getInstance().onRpcUnavailable(removedRpcs);
97 public synchronized <T extends DOMRpcImplementation> DOMRpcImplementationRegistration<T> registerRpcImplementation(final T implementation, final Set<DOMRpcIdentifier> rpcs) {
98 final DOMRpcRoutingTable oldTable = routingTable;
99 final DOMRpcRoutingTable newTable = oldTable.add(implementation, rpcs);
101 final Collection<DOMRpcIdentifier> addedRpcs = notPresentRpcs(oldTable, rpcs);
102 routingTable = newTable;
104 if(!addedRpcs.isEmpty()) {
105 final Collection<ListenerRegistration<? extends DOMRpcAvailabilityListener>> capturedListeners = listeners;
106 listenerNotifier.execute(new Runnable() {
109 for (final ListenerRegistration<? extends DOMRpcAvailabilityListener> l : capturedListeners) {
110 // Need to ensure removed listeners do not get notified
111 synchronized (DOMRpcRouter.this) {
112 if (listeners.contains(l)) {
113 l.getInstance().onRpcAvailable(addedRpcs);
121 return new AbstractDOMRpcImplementationRegistration<T>(implementation) {
123 protected void removeRegistration() {
124 removeRpcImplementation(getInstance(), rpcs);
130 public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(final SchemaPath type, final NormalizedNode<?, ?> input) {
131 return routingTable.invokeRpc(type, input);
134 private synchronized void removeListener(final ListenerRegistration<? extends DOMRpcAvailabilityListener> reg) {
135 listeners = ImmutableList.copyOf(Collections2.filter(listeners, new Predicate<Object>() {
137 public boolean apply(final Object input) {
138 return !reg.equals(input);
144 public synchronized <T extends DOMRpcAvailabilityListener> ListenerRegistration<T> registerRpcListener(final T listener) {
145 final ListenerRegistration<T> ret = new AbstractListenerRegistration<T>(listener) {
147 protected void removeRegistration() {
148 removeListener(this);
152 final Builder<ListenerRegistration<? extends DOMRpcAvailabilityListener>> b = ImmutableList.builder();
155 listeners = b.build();
156 final Map<SchemaPath, Set<YangInstanceIdentifier>> capturedRpcs = routingTable.getRpcs();
158 listenerNotifier.execute(new Runnable() {
161 for (final Entry<SchemaPath, Set<YangInstanceIdentifier>> e : capturedRpcs.entrySet()) {
162 listener.onRpcAvailable(Collections2.transform(e.getValue(), new Function<YangInstanceIdentifier, DOMRpcIdentifier>() {
164 public DOMRpcIdentifier apply(final YangInstanceIdentifier input) {
165 return DOMRpcIdentifier.create(e.getKey(), input);
176 public synchronized void onGlobalContextUpdated(final SchemaContext context) {
177 final DOMRpcRoutingTable oldTable = routingTable;
178 final DOMRpcRoutingTable newTable = oldTable.setSchemaContext(context);
179 routingTable = newTable;
183 public void close() {
184 listenerNotifier.shutdown();