2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.md.sal.common.impl.service;
10 import java.util.Arrays;
11 import java.util.Collection;
12 import java.util.Collections;
13 import java.util.HashSet;
15 import java.util.Map.Entry;
17 import java.util.concurrent.Callable;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.Future;
20 import java.util.concurrent.atomic.AtomicLong;
21 import java.util.concurrent.locks.Lock;
22 import java.util.concurrent.locks.ReentrantLock;
24 import org.eclipse.xtext.xbase.lib.Exceptions;
25 import org.opendaylight.controller.md.sal.common.api.RegistrationListener;
26 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
27 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
28 import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener;
29 import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher;
30 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
31 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration;
32 import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory;
33 import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService;
34 import org.opendaylight.controller.md.sal.common.api.data.DataReader;
35 import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter;
36 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
37 import org.opendaylight.yangtools.concepts.CompositeObjectRegistration;
38 import org.opendaylight.yangtools.concepts.ListenerRegistration;
39 import org.opendaylight.yangtools.concepts.Path;
40 import org.opendaylight.yangtools.concepts.Registration;
41 import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
42 import org.opendaylight.yangtools.yang.common.RpcResult;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
46 import com.google.common.base.Function;
47 import com.google.common.base.Preconditions;
48 import com.google.common.base.Predicate;
49 import com.google.common.collect.FluentIterable;
50 import com.google.common.collect.HashMultimap;
51 import com.google.common.collect.ImmutableList;
52 import com.google.common.collect.Multimap;
53 import com.google.common.collect.Multimaps;
54 import com.google.common.util.concurrent.MoreExecutors;
56 public abstract class AbstractDataBroker<P extends Path<P>, D extends Object, DCL extends DataChangeListener<P, D>>
57 implements DataModificationTransactionFactory<P, D>, DataReader<P, D>, DataChangePublisher<P, D, DCL>,
58 DataProvisionService<P, D> {
59 private final static Logger LOG = LoggerFactory.getLogger(AbstractDataBroker.class);
61 private ExecutorService executor;
63 public ExecutorService getExecutor() {
67 public void setExecutor(final ExecutorService executor) {
68 this.executor = executor;
71 private ExecutorService notificationExecutor = MoreExecutors.sameThreadExecutor();
73 public ExecutorService getNotificationExecutor() {
74 return this.notificationExecutor;
77 public void setNotificationExecutor(final ExecutorService notificationExecutor) {
78 this.notificationExecutor = notificationExecutor;
81 private AbstractDataReadRouter<P, D> dataReadRouter;
83 private final AtomicLong submittedTransactionsCount = new AtomicLong();
85 private final AtomicLong failedTransactionsCount = new AtomicLong();
87 private final AtomicLong finishedTransactionsCount = new AtomicLong();
89 public AbstractDataReadRouter<P, D> getDataReadRouter() {
90 return this.dataReadRouter;
93 public void setDataReadRouter(final AbstractDataReadRouter<P, D> dataReadRouter) {
94 this.dataReadRouter = dataReadRouter;
97 public AtomicLong getSubmittedTransactionsCount() {
98 return this.submittedTransactionsCount;
101 public AtomicLong getFailedTransactionsCount() {
102 return this.failedTransactionsCount;
105 public AtomicLong getFinishedTransactionsCount() {
106 return this.finishedTransactionsCount;
109 private final Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = Multimaps
110 .synchronizedSetMultimap(HashMultimap.<P, DataChangeListenerRegistration<P, D, DCL>> create());
112 private final Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = Multimaps
113 .synchronizedSetMultimap(HashMultimap.<P, DataCommitHandlerRegistrationImpl<P, D>> create());
115 private final Lock registrationLock = new ReentrantLock();
117 private final ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P, D>>> commitHandlerRegistrationListeners = new ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P, D>>>();
119 public AbstractDataBroker() {
122 protected ImmutableList<DataCommitHandler<P, D>> affectedCommitHandlers(final Set<P> paths) {
123 final Callable<ImmutableList<DataCommitHandler<P, D>>> _function = new Callable<ImmutableList<DataCommitHandler<P, D>>>() {
125 public ImmutableList<DataCommitHandler<P, D>> call() throws Exception {
126 Map<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> _asMap = commitHandlers.asMap();
127 Set<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _entrySet = _asMap.entrySet();
128 FluentIterable<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _from = FluentIterable
129 .<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> from(_entrySet);
130 final Predicate<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _function = new Predicate<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>>() {
132 public boolean apply(final Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> it) {
133 P _key = it.getKey();
134 boolean _isAffectedBy = isAffectedBy(_key, paths);
135 return _isAffectedBy;
138 FluentIterable<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _filter = _from
140 final Function<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>, Collection<DataCommitHandlerRegistrationImpl<P, D>>> _function_1 = new Function<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>, Collection<DataCommitHandlerRegistrationImpl<P, D>>>() {
142 public Collection<DataCommitHandlerRegistrationImpl<P, D>> apply(
143 final Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> it) {
144 Collection<DataCommitHandlerRegistrationImpl<P, D>> _value = it.getValue();
148 FluentIterable<DataCommitHandlerRegistrationImpl<P, D>> _transformAndConcat = _filter
149 .<DataCommitHandlerRegistrationImpl<P, D>> transformAndConcat(_function_1);
150 final Function<DataCommitHandlerRegistrationImpl<P, D>, DataCommitHandler<P, D>> _function_2 = new Function<DataCommitHandlerRegistrationImpl<P, D>, DataCommitHandler<P, D>>() {
152 public DataCommitHandler<P, D> apply(final DataCommitHandlerRegistrationImpl<P, D> it) {
153 DataCommitHandler<P, D> _instance = it.getInstance();
157 FluentIterable<DataCommitHandler<P, D>> _transform = _transformAndConcat
158 .<DataCommitHandler<P, D>> transform(_function_2);
159 return _transform.toList();
162 return AbstractDataBroker.<ImmutableList<DataCommitHandler<P, D>>> withLock(this.registrationLock, _function);
165 protected ImmutableList<DataCommitHandler<P, D>> probablyAffectedCommitHandlers(final HashSet<P> paths) {
166 final Callable<ImmutableList<DataCommitHandler<P, D>>> _function = new Callable<ImmutableList<DataCommitHandler<P, D>>>() {
168 public ImmutableList<DataCommitHandler<P, D>> call() throws Exception {
169 Map<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> _asMap = commitHandlers.asMap();
170 Set<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _entrySet = _asMap.entrySet();
171 FluentIterable<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _from = FluentIterable
172 .<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> from(_entrySet);
173 final Predicate<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _function = new Predicate<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>>() {
175 public boolean apply(final Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> it) {
176 P _key = it.getKey();
177 boolean _isProbablyAffectedBy = isProbablyAffectedBy(_key, paths);
178 return _isProbablyAffectedBy;
181 FluentIterable<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _filter = _from
183 final Function<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>, Collection<DataCommitHandlerRegistrationImpl<P, D>>> _function_1 = new Function<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>, Collection<DataCommitHandlerRegistrationImpl<P, D>>>() {
185 public Collection<DataCommitHandlerRegistrationImpl<P, D>> apply(
186 final Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> it) {
187 Collection<DataCommitHandlerRegistrationImpl<P, D>> _value = it.getValue();
191 FluentIterable<DataCommitHandlerRegistrationImpl<P, D>> _transformAndConcat = _filter
192 .<DataCommitHandlerRegistrationImpl<P, D>> transformAndConcat(_function_1);
193 final Function<DataCommitHandlerRegistrationImpl<P, D>, DataCommitHandler<P, D>> _function_2 = new Function<DataCommitHandlerRegistrationImpl<P, D>, DataCommitHandler<P, D>>() {
195 public DataCommitHandler<P, D> apply(final DataCommitHandlerRegistrationImpl<P, D> it) {
196 DataCommitHandler<P, D> _instance = it.getInstance();
200 FluentIterable<DataCommitHandler<P, D>> _transform = _transformAndConcat
201 .<DataCommitHandler<P, D>> transform(_function_2);
202 return _transform.toList();
205 return AbstractDataBroker.<ImmutableList<DataCommitHandler<P, D>>> withLock(this.registrationLock, _function);
208 protected Map<P, D> deepGetBySubpath(final Map<P, D> dataSet, final P path) {
209 return Collections.<P, D> emptyMap();
213 public final D readConfigurationData(final P path) {
214 AbstractDataReadRouter<P, D> _dataReadRouter = this.getDataReadRouter();
215 return _dataReadRouter.readConfigurationData(path);
219 public final D readOperationalData(final P path) {
220 AbstractDataReadRouter<P, D> _dataReadRouter = this.getDataReadRouter();
221 return _dataReadRouter.readOperationalData(path);
224 private static <T extends Object> T withLock(final Lock lock, final Callable<T> method) {
227 return method.call();
228 } catch (Exception e) {
229 throw Exceptions.sneakyThrow(e);
236 public final Registration<DataCommitHandler<P, D>> registerCommitHandler(final P path,
237 final DataCommitHandler<P, D> commitHandler) {
238 synchronized (commitHandler) {
239 final DataCommitHandlerRegistrationImpl<P, D> registration = new DataCommitHandlerRegistrationImpl<P, D>(
240 path, commitHandler, this);
241 commitHandlers.put(path, registration);
242 LOG.trace("Registering Commit Handler {} for path: {}", commitHandler, path);
243 for (final ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<P, D>>> listener : commitHandlerRegistrationListeners) {
245 listener.getInstance().onRegister(registration);
246 } catch (Exception e) {
247 LOG.error("Unexpected exception in listener {} during invoking onRegister", listener.getInstance(),
256 public final ListenerRegistration<DCL> registerDataChangeListener(final P path, final DCL listener) {
257 synchronized (listeners) {
258 final DataChangeListenerRegistration<P, D, DCL> reg = new DataChangeListenerRegistration<P, D, DCL>(path,
259 listener, AbstractDataBroker.this);
260 listeners.put(path, reg);
261 final D initialConfig = getDataReadRouter().readConfigurationData(path);
262 final D initialOperational = getDataReadRouter().readOperationalData(path);
263 final DataChangeEvent<P, D> event = createInitialListenerEvent(path, initialConfig, initialOperational);
264 listener.onDataChanged(event);
269 public final CompositeObjectRegistration<DataReader<P, D>> registerDataReader(final P path,
270 final DataReader<P, D> reader) {
272 final Registration<DataReader<P, D>> confReg = getDataReadRouter().registerConfigurationReader(path, reader);
273 final Registration<DataReader<P, D>> dataReg = getDataReadRouter().registerOperationalReader(path, reader);
274 return new CompositeObjectRegistration<DataReader<P, D>>(reader, Arrays.asList(confReg, dataReg));
278 public ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<P, D>>> registerCommitHandlerListener(
279 final RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {
280 final ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<P, D>>> ret = this.commitHandlerRegistrationListeners
281 .register(commitHandlerListener);
285 protected DataChangeEvent<P, D> createInitialListenerEvent(final P path, final D initialConfig,
286 final D initialOperational) {
287 InitialDataChangeEventImpl<P, D> _initialDataChangeEventImpl = new InitialDataChangeEventImpl<P, D>(
288 initialConfig, initialOperational);
289 return _initialDataChangeEventImpl;
292 protected final void removeListener(final DataChangeListenerRegistration<P, D, DCL> registration) {
293 synchronized (listeners) {
294 listeners.remove(registration.getPath(), registration);
298 protected final void removeCommitHandler(final DataCommitHandlerRegistrationImpl<P, D> registration) {
299 synchronized (commitHandlers) {
301 commitHandlers.remove(registration.getPath(), registration);
302 LOG.trace("Removing Commit Handler {} for path: {}", registration.getInstance(), registration.getPath());
303 for (final ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<P, D>>> listener : commitHandlerRegistrationListeners) {
305 listener.getInstance().onUnregister(registration);
306 } catch (Exception e) {
307 LOG.error("Unexpected exception in listener {} during invoking onUnregister",
308 listener.getInstance(), e);
315 protected final Collection<Entry<P, DataCommitHandlerRegistrationImpl<P, D>>> getActiveCommitHandlers() {
316 return commitHandlers.entries();
319 protected ImmutableList<ListenerStateCapture<P, D, DCL>> affectedListeners(final Set<P> paths) {
321 synchronized (listeners) {
322 return FluentIterable //
323 .from(listeners.asMap().entrySet()) //
324 .filter(new Predicate<Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>>>() {
326 public boolean apply(final Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>> it) {
327 return isAffectedBy(it.getKey(), paths);
331 new Function<Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>>, ListenerStateCapture<P, D, DCL>>() {
333 public ListenerStateCapture<P, D, DCL> apply(
334 final Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>> it) {
335 return new ListenerStateCapture<P, D, DCL>(it.getKey(), it.getValue(),
336 createContainsPredicate(it.getKey()));
343 protected ImmutableList<ListenerStateCapture<P, D, DCL>> probablyAffectedListeners(final Set<P> paths) {
344 synchronized (listeners) {
345 return FluentIterable //
346 .from(listeners.asMap().entrySet()) //
347 .filter(new Predicate<Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>>>() {
349 public boolean apply(final Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>> it) {
350 return isProbablyAffectedBy(it.getKey(), paths);
354 new Function<Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>>, ListenerStateCapture<P, D, DCL>>() {
356 public ListenerStateCapture<P, D, DCL> apply(
357 final Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>> it) {
358 return new ListenerStateCapture<P, D, DCL>(it.getKey(), it.getValue(),
359 createIsContainedPredicate(it.getKey()));
366 protected Predicate<P> createContainsPredicate(final P key) {
367 return new Predicate<P>() {
369 public boolean apply(final P other) {
370 return key.contains(other);
375 protected Predicate<P> createIsContainedPredicate(final P key) {
376 return new Predicate<P>() {
378 public boolean apply(final P other) {
379 return other.contains(key);
384 protected boolean isAffectedBy(final P key, final Set<P> paths) {
385 final Predicate<P> contains = this.createContainsPredicate(key);
386 if (paths.contains(key)) {
389 for (final P path : paths) {
390 if (contains.apply(path)) {
397 protected boolean isProbablyAffectedBy(final P key, final Set<P> paths) {
398 final Predicate<P> isContained = this.createIsContainedPredicate(key);
399 for (final P path : paths) {
400 if (isContained.apply(path)) {
407 final Future<RpcResult<TransactionStatus>> commit(final AbstractDataTransaction<P, D> transaction) {
408 Preconditions.checkNotNull(transaction);
409 final TwoPhaseCommit<P, D, DCL> task = new TwoPhaseCommit<P, D, DCL>(transaction, this);
411 this.getSubmittedTransactionsCount().getAndIncrement();
412 return this.getExecutor().submit(task);
415 private static class DataCommitHandlerRegistrationImpl<P extends Path<P>, D extends Object> //
416 extends AbstractObjectRegistration<DataCommitHandler<P, D>> //
417 implements DataCommitHandlerRegistration<P, D> {
419 private AbstractDataBroker<P, D, ? extends Object> dataBroker;
420 private final P path;
427 public DataCommitHandlerRegistrationImpl(final P path, final DataCommitHandler<P, D> instance,
428 final AbstractDataBroker<P, D, ? extends Object> broker) {
430 this.dataBroker = broker;
435 protected void removeRegistration() {
436 this.dataBroker.removeCommitHandler(this);
437 this.dataBroker = null;