2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.md.sal.common.impl.service;
10 import java.util.Arrays;
11 import java.util.Collection;
12 import java.util.Collections;
13 import java.util.HashSet;
15 import java.util.Map.Entry;
17 import java.util.concurrent.ExecutorService;
18 import java.util.concurrent.Future;
19 import java.util.concurrent.atomic.AtomicLong;
20 import java.util.concurrent.locks.Lock;
21 import java.util.concurrent.locks.ReentrantLock;
23 import org.opendaylight.controller.md.sal.common.api.RegistrationListener;
24 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
25 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
26 import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener;
27 import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher;
28 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
29 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration;
30 import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory;
31 import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService;
32 import org.opendaylight.controller.md.sal.common.api.data.DataReader;
33 import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter;
34 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
35 import org.opendaylight.yangtools.concepts.CompositeObjectRegistration;
36 import org.opendaylight.yangtools.concepts.ListenerRegistration;
37 import org.opendaylight.yangtools.concepts.Path;
38 import org.opendaylight.yangtools.concepts.Registration;
39 import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
40 import org.opendaylight.yangtools.yang.common.RpcResult;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
44 import com.google.common.base.Function;
45 import com.google.common.base.Preconditions;
46 import com.google.common.base.Predicate;
47 import com.google.common.base.Supplier;
48 import com.google.common.collect.FluentIterable;
49 import com.google.common.collect.HashMultimap;
50 import com.google.common.collect.ImmutableList;
51 import com.google.common.collect.Multimap;
52 import com.google.common.collect.Multimaps;
53 import com.google.common.util.concurrent.MoreExecutors;
55 public abstract class AbstractDataBroker<P extends Path<P>, D extends Object, DCL extends DataChangeListener<P, D>>
56 implements DataModificationTransactionFactory<P, D>, DataReader<P, D>, DataChangePublisher<P, D, DCL>,
57 DataProvisionService<P, D> {
58 private final static Logger LOG = LoggerFactory.getLogger(AbstractDataBroker.class);
60 private ExecutorService executor;
62 public ExecutorService getExecutor() {
66 public void setExecutor(final ExecutorService executor) {
67 this.executor = executor;
70 private ExecutorService notificationExecutor = MoreExecutors.sameThreadExecutor();
72 public ExecutorService getNotificationExecutor() {
73 return this.notificationExecutor;
76 public void setNotificationExecutor(final ExecutorService notificationExecutor) {
77 this.notificationExecutor = notificationExecutor;
80 private AbstractDataReadRouter<P, D> dataReadRouter;
82 private final AtomicLong submittedTransactionsCount = new AtomicLong();
84 private final AtomicLong failedTransactionsCount = new AtomicLong();
86 private final AtomicLong finishedTransactionsCount = new AtomicLong();
88 public AbstractDataReadRouter<P, D> getDataReadRouter() {
89 return this.dataReadRouter;
92 public void setDataReadRouter(final AbstractDataReadRouter<P, D> dataReadRouter) {
93 this.dataReadRouter = dataReadRouter;
96 public AtomicLong getSubmittedTransactionsCount() {
97 return this.submittedTransactionsCount;
100 public AtomicLong getFailedTransactionsCount() {
101 return this.failedTransactionsCount;
104 public AtomicLong getFinishedTransactionsCount() {
105 return this.finishedTransactionsCount;
108 private final Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = Multimaps
109 .synchronizedSetMultimap(HashMultimap.<P, DataChangeListenerRegistration<P, D, DCL>> create());
111 private final Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = Multimaps
112 .synchronizedSetMultimap(HashMultimap.<P, DataCommitHandlerRegistrationImpl<P, D>> create());
114 private final Lock registrationLock = new ReentrantLock();
116 private final ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P, D>>> commitHandlerRegistrationListeners = new ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P, D>>>();
118 public AbstractDataBroker() {
121 protected ImmutableList<DataCommitHandler<P, D>> affectedCommitHandlers(final Set<P> paths) {
122 final Supplier<ImmutableList<DataCommitHandler<P, D>>> _function = new Supplier<ImmutableList<DataCommitHandler<P, D>>>() {
124 public ImmutableList<DataCommitHandler<P, D>> get() {
125 Map<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> _asMap = commitHandlers.asMap();
126 Set<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _entrySet = _asMap.entrySet();
127 FluentIterable<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _from = FluentIterable
128 .<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> from(_entrySet);
129 final Predicate<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _function = new Predicate<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>>() {
131 public boolean apply(final Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> it) {
132 P _key = it.getKey();
133 boolean _isAffectedBy = isAffectedBy(_key, paths);
134 return _isAffectedBy;
137 FluentIterable<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _filter = _from
139 final Function<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>, Collection<DataCommitHandlerRegistrationImpl<P, D>>> _function_1 = new Function<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>, Collection<DataCommitHandlerRegistrationImpl<P, D>>>() {
141 public Collection<DataCommitHandlerRegistrationImpl<P, D>> apply(
142 final Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> it) {
143 Collection<DataCommitHandlerRegistrationImpl<P, D>> _value = it.getValue();
147 FluentIterable<DataCommitHandlerRegistrationImpl<P, D>> _transformAndConcat = _filter
148 .<DataCommitHandlerRegistrationImpl<P, D>> transformAndConcat(_function_1);
149 final Function<DataCommitHandlerRegistrationImpl<P, D>, DataCommitHandler<P, D>> _function_2 = new Function<DataCommitHandlerRegistrationImpl<P, D>, DataCommitHandler<P, D>>() {
151 public DataCommitHandler<P, D> apply(final DataCommitHandlerRegistrationImpl<P, D> it) {
152 DataCommitHandler<P, D> _instance = it.getInstance();
156 FluentIterable<DataCommitHandler<P, D>> _transform = _transformAndConcat
157 .<DataCommitHandler<P, D>> transform(_function_2);
158 return _transform.toList();
161 return AbstractDataBroker.<ImmutableList<DataCommitHandler<P, D>>> withLock(this.registrationLock, _function);
164 protected ImmutableList<DataCommitHandler<P, D>> probablyAffectedCommitHandlers(final HashSet<P> paths) {
165 final Supplier<ImmutableList<DataCommitHandler<P, D>>> _function = new Supplier<ImmutableList<DataCommitHandler<P, D>>>() {
167 public ImmutableList<DataCommitHandler<P, D>> get() {
168 Map<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> _asMap = commitHandlers.asMap();
169 Set<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _entrySet = _asMap.entrySet();
170 FluentIterable<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _from = FluentIterable
171 .<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> from(_entrySet);
172 final Predicate<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _function = new Predicate<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>>() {
174 public boolean apply(final Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> it) {
175 P _key = it.getKey();
176 boolean _isProbablyAffectedBy = isProbablyAffectedBy(_key, paths);
177 return _isProbablyAffectedBy;
180 FluentIterable<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _filter = _from
182 final Function<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>, Collection<DataCommitHandlerRegistrationImpl<P, D>>> _function_1 = new Function<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>, Collection<DataCommitHandlerRegistrationImpl<P, D>>>() {
184 public Collection<DataCommitHandlerRegistrationImpl<P, D>> apply(
185 final Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> it) {
186 Collection<DataCommitHandlerRegistrationImpl<P, D>> _value = it.getValue();
190 FluentIterable<DataCommitHandlerRegistrationImpl<P, D>> _transformAndConcat = _filter
191 .<DataCommitHandlerRegistrationImpl<P, D>> transformAndConcat(_function_1);
192 final Function<DataCommitHandlerRegistrationImpl<P, D>, DataCommitHandler<P, D>> _function_2 = new Function<DataCommitHandlerRegistrationImpl<P, D>, DataCommitHandler<P, D>>() {
194 public DataCommitHandler<P, D> apply(final DataCommitHandlerRegistrationImpl<P, D> it) {
195 DataCommitHandler<P, D> _instance = it.getInstance();
199 FluentIterable<DataCommitHandler<P, D>> _transform = _transformAndConcat
200 .<DataCommitHandler<P, D>> transform(_function_2);
201 return _transform.toList();
204 return AbstractDataBroker.<ImmutableList<DataCommitHandler<P, D>>> withLock(this.registrationLock, _function);
207 protected Map<P, D> deepGetBySubpath(final Map<P, D> dataSet, final P path) {
208 return Collections.<P, D> emptyMap();
212 public final D readConfigurationData(final P path) {
213 AbstractDataReadRouter<P, D> _dataReadRouter = this.getDataReadRouter();
214 return _dataReadRouter.readConfigurationData(path);
218 public final D readOperationalData(final P path) {
219 AbstractDataReadRouter<P, D> _dataReadRouter = this.getDataReadRouter();
220 return _dataReadRouter.readOperationalData(path);
223 private static <T extends Object> T withLock(final Lock lock, final Supplier<T> method) {
233 public final Registration registerCommitHandler(final P path,
234 final DataCommitHandler<P, D> commitHandler) {
235 synchronized (commitHandler) {
236 final DataCommitHandlerRegistrationImpl<P, D> registration = new DataCommitHandlerRegistrationImpl<P, D>(
237 path, commitHandler, this);
238 commitHandlers.put(path, registration);
239 LOG.trace("Registering Commit Handler {} for path: {}", commitHandler, path);
240 for (final ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<P, D>>> listener : commitHandlerRegistrationListeners) {
242 listener.getInstance().onRegister(registration);
243 } catch (Exception e) {
244 LOG.error("Unexpected exception in listener {} during invoking onRegister", listener.getInstance(),
253 public final ListenerRegistration<DCL> registerDataChangeListener(final P path, final DCL listener) {
254 synchronized (listeners) {
255 final DataChangeListenerRegistration<P, D, DCL> reg = new DataChangeListenerRegistration<P, D, DCL>(path,
256 listener, AbstractDataBroker.this);
257 listeners.put(path, reg);
258 final D initialConfig = getDataReadRouter().readConfigurationData(path);
259 final D initialOperational = getDataReadRouter().readOperationalData(path);
260 final DataChangeEvent<P, D> event = createInitialListenerEvent(path, initialConfig, initialOperational);
261 listener.onDataChanged(event);
266 public final CompositeObjectRegistration<DataReader<P, D>> registerDataReader(final P path,
267 final DataReader<P, D> reader) {
269 final Registration confReg = getDataReadRouter().registerConfigurationReader(path, reader);
270 final Registration dataReg = getDataReadRouter().registerOperationalReader(path, reader);
271 return new CompositeObjectRegistration<DataReader<P, D>>(reader, Arrays.asList(confReg, dataReg));
275 public ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<P, D>>> registerCommitHandlerListener(
276 final RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {
277 final ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<P, D>>> ret = this.commitHandlerRegistrationListeners
278 .register(commitHandlerListener);
282 protected DataChangeEvent<P, D> createInitialListenerEvent(final P path, final D initialConfig,
283 final D initialOperational) {
284 InitialDataChangeEventImpl<P, D> _initialDataChangeEventImpl = new InitialDataChangeEventImpl<P, D>(
285 initialConfig, initialOperational);
286 return _initialDataChangeEventImpl;
289 protected final void removeListener(final DataChangeListenerRegistration<P, D, DCL> registration) {
290 synchronized (listeners) {
291 listeners.remove(registration.getPath(), registration);
295 protected final void removeCommitHandler(final DataCommitHandlerRegistrationImpl<P, D> registration) {
296 synchronized (commitHandlers) {
298 commitHandlers.remove(registration.getPath(), registration);
299 LOG.trace("Removing Commit Handler {} for path: {}", registration.getInstance(), registration.getPath());
300 for (final ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<P, D>>> listener : commitHandlerRegistrationListeners) {
302 listener.getInstance().onUnregister(registration);
303 } catch (Exception e) {
304 LOG.error("Unexpected exception in listener {} during invoking onUnregister",
305 listener.getInstance(), e);
312 protected final Collection<Entry<P, DataCommitHandlerRegistrationImpl<P, D>>> getActiveCommitHandlers() {
313 return commitHandlers.entries();
316 protected ImmutableList<ListenerStateCapture<P, D, DCL>> affectedListeners(final Set<P> paths) {
318 synchronized (listeners) {
319 return FluentIterable //
320 .from(listeners.asMap().entrySet()) //
321 .filter(new Predicate<Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>>>() {
323 public boolean apply(final Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>> it) {
324 return isAffectedBy(it.getKey(), paths);
328 new Function<Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>>, ListenerStateCapture<P, D, DCL>>() {
330 public ListenerStateCapture<P, D, DCL> apply(
331 final Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>> it) {
332 return new ListenerStateCapture<P, D, DCL>(it.getKey(), it.getValue(),
333 createContainsPredicate(it.getKey()));
340 protected ImmutableList<ListenerStateCapture<P, D, DCL>> probablyAffectedListeners(final Set<P> paths) {
341 synchronized (listeners) {
342 return FluentIterable //
343 .from(listeners.asMap().entrySet()) //
344 .filter(new Predicate<Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>>>() {
346 public boolean apply(final Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>> it) {
347 return isProbablyAffectedBy(it.getKey(), paths);
351 new Function<Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>>, ListenerStateCapture<P, D, DCL>>() {
353 public ListenerStateCapture<P, D, DCL> apply(
354 final Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>> it) {
355 return new ListenerStateCapture<P, D, DCL>(it.getKey(), it.getValue(),
356 createIsContainedPredicate(it.getKey()));
363 protected Predicate<P> createContainsPredicate(final P key) {
364 return new Predicate<P>() {
366 public boolean apply(final P other) {
367 return key.contains(other);
372 protected Predicate<P> createIsContainedPredicate(final P key) {
373 return new Predicate<P>() {
375 public boolean apply(final P other) {
376 return other.contains(key);
381 protected boolean isAffectedBy(final P key, final Set<P> paths) {
382 final Predicate<P> contains = this.createContainsPredicate(key);
383 if (paths.contains(key)) {
386 for (final P path : paths) {
387 if (contains.apply(path)) {
394 protected boolean isProbablyAffectedBy(final P key, final Set<P> paths) {
395 final Predicate<P> isContained = this.createIsContainedPredicate(key);
396 for (final P path : paths) {
397 if (isContained.apply(path)) {
404 final Future<RpcResult<TransactionStatus>> commit(final AbstractDataTransaction<P, D> transaction) {
405 Preconditions.checkNotNull(transaction);
406 final TwoPhaseCommit<P, D, DCL> task = new TwoPhaseCommit<P, D, DCL>(transaction, this);
408 this.getSubmittedTransactionsCount().getAndIncrement();
409 return this.getExecutor().submit(task);
412 private static class DataCommitHandlerRegistrationImpl<P extends Path<P>, D extends Object> //
413 extends AbstractObjectRegistration<DataCommitHandler<P, D>> //
414 implements DataCommitHandlerRegistration<P, D> {
416 private AbstractDataBroker<P, D, ? extends Object> dataBroker;
417 private final P path;
424 public DataCommitHandlerRegistrationImpl(final P path, final DataCommitHandler<P, D> instance,
425 final AbstractDataBroker<P, D, ? extends Object> broker) {
427 this.dataBroker = broker;
432 protected void removeRegistration() {
433 this.dataBroker.removeCommitHandler(this);
434 this.dataBroker = null;