2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.md.sal.common.impl.service;
10 import java.util.Arrays;
11 import java.util.Collection;
12 import java.util.Collections;
13 import java.util.HashSet;
15 import java.util.Map.Entry;
17 import java.util.concurrent.ExecutorService;
18 import java.util.concurrent.Future;
19 import java.util.concurrent.atomic.AtomicLong;
20 import java.util.concurrent.locks.Lock;
21 import java.util.concurrent.locks.ReentrantLock;
23 import org.opendaylight.controller.md.sal.common.api.RegistrationListener;
24 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
25 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
26 import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener;
27 import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher;
28 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
29 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration;
30 import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory;
31 import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService;
32 import org.opendaylight.controller.md.sal.common.api.data.DataReader;
33 import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter;
34 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
35 import org.opendaylight.yangtools.concepts.CompositeObjectRegistration;
36 import org.opendaylight.yangtools.concepts.ListenerRegistration;
37 import org.opendaylight.yangtools.concepts.Path;
38 import org.opendaylight.yangtools.concepts.Registration;
39 import org.opendaylight.yangtools.util.ListenerRegistry;
40 import org.opendaylight.yangtools.yang.common.RpcResult;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
44 import com.google.common.base.Function;
45 import com.google.common.base.Preconditions;
46 import com.google.common.base.Predicate;
47 import com.google.common.base.Supplier;
48 import com.google.common.collect.FluentIterable;
49 import com.google.common.collect.HashMultimap;
50 import com.google.common.collect.ImmutableList;
51 import com.google.common.collect.Multimap;
52 import com.google.common.collect.Multimaps;
53 import com.google.common.util.concurrent.MoreExecutors;
56 public abstract class AbstractDataBroker<P extends Path<P>, D extends Object, DCL extends DataChangeListener<P, D>>
57 implements DataModificationTransactionFactory<P, D>, DataReader<P, D>, DataChangePublisher<P, D, DCL>,
58 DataProvisionService<P, D> {
59 private final static Logger LOG = LoggerFactory.getLogger(AbstractDataBroker.class);
61 private ExecutorService executor;
63 public ExecutorService getExecutor() {
67 public void setExecutor(final ExecutorService executor) {
68 this.executor = executor;
71 private ExecutorService notificationExecutor = MoreExecutors.sameThreadExecutor();
73 public ExecutorService getNotificationExecutor() {
74 return this.notificationExecutor;
77 public void setNotificationExecutor(final ExecutorService notificationExecutor) {
78 this.notificationExecutor = notificationExecutor;
81 private AbstractDataReadRouter<P, D> dataReadRouter;
83 private final AtomicLong submittedTransactionsCount = new AtomicLong();
85 private final AtomicLong failedTransactionsCount = new AtomicLong();
87 private final AtomicLong finishedTransactionsCount = new AtomicLong();
89 public AbstractDataReadRouter<P, D> getDataReadRouter() {
90 return this.dataReadRouter;
93 public void setDataReadRouter(final AbstractDataReadRouter<P, D> dataReadRouter) {
94 this.dataReadRouter = dataReadRouter;
97 public AtomicLong getSubmittedTransactionsCount() {
98 return this.submittedTransactionsCount;
101 public AtomicLong getFailedTransactionsCount() {
102 return this.failedTransactionsCount;
105 public AtomicLong getFinishedTransactionsCount() {
106 return this.finishedTransactionsCount;
109 private final Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = Multimaps
110 .synchronizedSetMultimap(HashMultimap.<P, DataChangeListenerRegistration<P, D, DCL>> create());
112 private final Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = Multimaps
113 .synchronizedSetMultimap(HashMultimap.<P, DataCommitHandlerRegistrationImpl<P, D>> create());
115 private final Lock registrationLock = new ReentrantLock();
117 private final ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P, D>>> commitHandlerRegistrationListeners = new ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P, D>>>();
119 public AbstractDataBroker() {
122 protected ImmutableList<DataCommitHandler<P, D>> affectedCommitHandlers(final Set<P> paths) {
123 final Supplier<ImmutableList<DataCommitHandler<P, D>>> _function = new Supplier<ImmutableList<DataCommitHandler<P, D>>>() {
125 public ImmutableList<DataCommitHandler<P, D>> get() {
126 Map<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> _asMap = commitHandlers.asMap();
127 Set<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _entrySet = _asMap.entrySet();
128 FluentIterable<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _from = FluentIterable
129 .<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> from(_entrySet);
130 final Predicate<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _function = new Predicate<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>>() {
132 public boolean apply(final Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> it) {
133 P _key = it.getKey();
134 boolean _isAffectedBy = isAffectedBy(_key, paths);
135 return _isAffectedBy;
138 FluentIterable<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _filter = _from
140 final Function<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>, Collection<DataCommitHandlerRegistrationImpl<P, D>>> _function_1 = new Function<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>, Collection<DataCommitHandlerRegistrationImpl<P, D>>>() {
142 public Collection<DataCommitHandlerRegistrationImpl<P, D>> apply(
143 final Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> it) {
144 Collection<DataCommitHandlerRegistrationImpl<P, D>> _value = it.getValue();
148 FluentIterable<DataCommitHandlerRegistrationImpl<P, D>> _transformAndConcat = _filter
149 .<DataCommitHandlerRegistrationImpl<P, D>> transformAndConcat(_function_1);
150 final Function<DataCommitHandlerRegistrationImpl<P, D>, DataCommitHandler<P, D>> _function_2 = new Function<DataCommitHandlerRegistrationImpl<P, D>, DataCommitHandler<P, D>>() {
152 public DataCommitHandler<P, D> apply(final DataCommitHandlerRegistrationImpl<P, D> it) {
153 DataCommitHandler<P, D> _instance = it.getInstance();
157 FluentIterable<DataCommitHandler<P, D>> _transform = _transformAndConcat
158 .<DataCommitHandler<P, D>> transform(_function_2);
159 return _transform.toList();
162 return AbstractDataBroker.<ImmutableList<DataCommitHandler<P, D>>> withLock(this.registrationLock, _function);
165 protected ImmutableList<DataCommitHandler<P, D>> probablyAffectedCommitHandlers(final HashSet<P> paths) {
166 final Supplier<ImmutableList<DataCommitHandler<P, D>>> _function = new Supplier<ImmutableList<DataCommitHandler<P, D>>>() {
168 public ImmutableList<DataCommitHandler<P, D>> get() {
169 Map<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> _asMap = commitHandlers.asMap();
170 Set<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _entrySet = _asMap.entrySet();
171 FluentIterable<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _from = FluentIterable
172 .<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> from(_entrySet);
173 final Predicate<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _function = new Predicate<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>>() {
175 public boolean apply(final Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> it) {
176 P _key = it.getKey();
177 boolean _isProbablyAffectedBy = isProbablyAffectedBy(_key, paths);
178 return _isProbablyAffectedBy;
181 FluentIterable<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _filter = _from
183 final Function<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>, Collection<DataCommitHandlerRegistrationImpl<P, D>>> _function_1 = new Function<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>, Collection<DataCommitHandlerRegistrationImpl<P, D>>>() {
185 public Collection<DataCommitHandlerRegistrationImpl<P, D>> apply(
186 final Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> it) {
187 Collection<DataCommitHandlerRegistrationImpl<P, D>> _value = it.getValue();
191 FluentIterable<DataCommitHandlerRegistrationImpl<P, D>> _transformAndConcat = _filter
192 .<DataCommitHandlerRegistrationImpl<P, D>> transformAndConcat(_function_1);
193 final Function<DataCommitHandlerRegistrationImpl<P, D>, DataCommitHandler<P, D>> _function_2 = new Function<DataCommitHandlerRegistrationImpl<P, D>, DataCommitHandler<P, D>>() {
195 public DataCommitHandler<P, D> apply(final DataCommitHandlerRegistrationImpl<P, D> it) {
196 DataCommitHandler<P, D> _instance = it.getInstance();
200 FluentIterable<DataCommitHandler<P, D>> _transform = _transformAndConcat
201 .<DataCommitHandler<P, D>> transform(_function_2);
202 return _transform.toList();
205 return AbstractDataBroker.<ImmutableList<DataCommitHandler<P, D>>> withLock(this.registrationLock, _function);
208 protected Map<P, D> deepGetBySubpath(final Map<P, D> dataSet, final P path) {
209 return Collections.<P, D> emptyMap();
213 public final D readConfigurationData(final P path) {
214 AbstractDataReadRouter<P, D> _dataReadRouter = this.getDataReadRouter();
215 return _dataReadRouter.readConfigurationData(path);
219 public final D readOperationalData(final P path) {
220 AbstractDataReadRouter<P, D> _dataReadRouter = this.getDataReadRouter();
221 return _dataReadRouter.readOperationalData(path);
224 private static <T extends Object> T withLock(final Lock lock, final Supplier<T> method) {
234 public final Registration registerCommitHandler(final P path,
235 final DataCommitHandler<P, D> commitHandler) {
236 synchronized (commitHandler) {
237 final DataCommitHandlerRegistrationImpl<P, D> registration = new DataCommitHandlerRegistrationImpl<P, D>(
238 path, commitHandler, this);
239 commitHandlers.put(path, registration);
240 LOG.trace("Registering Commit Handler {} for path: {}", commitHandler, path);
241 for (final ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<P, D>>> listener : commitHandlerRegistrationListeners) {
243 listener.getInstance().onRegister(registration);
244 } catch (Exception e) {
245 LOG.error("Unexpected exception in listener {} during invoking onRegister", listener.getInstance(),
254 public final ListenerRegistration<DCL> registerDataChangeListener(final P path, final DCL listener) {
255 synchronized (listeners) {
256 final DataChangeListenerRegistration<P, D, DCL> reg = new DataChangeListenerRegistration<P, D, DCL>(path,
257 listener, AbstractDataBroker.this);
258 listeners.put(path, reg);
259 final D initialConfig = getDataReadRouter().readConfigurationData(path);
260 final D initialOperational = getDataReadRouter().readOperationalData(path);
261 final DataChangeEvent<P, D> event = createInitialListenerEvent(path, initialConfig, initialOperational);
262 listener.onDataChanged(event);
267 public final CompositeObjectRegistration<DataReader<P, D>> registerDataReader(final P path,
268 final DataReader<P, D> reader) {
270 final Registration confReg = getDataReadRouter().registerConfigurationReader(path, reader);
271 final Registration dataReg = getDataReadRouter().registerOperationalReader(path, reader);
272 return new CompositeObjectRegistration<DataReader<P, D>>(reader, Arrays.asList(confReg, dataReg));
276 public ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<P, D>>> registerCommitHandlerListener(
277 final RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {
278 final ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<P, D>>> ret = this.commitHandlerRegistrationListeners
279 .register(commitHandlerListener);
283 protected DataChangeEvent<P, D> createInitialListenerEvent(final P path, final D initialConfig,
284 final D initialOperational) {
285 InitialDataChangeEventImpl<P, D> _initialDataChangeEventImpl = new InitialDataChangeEventImpl<P, D>(
286 initialConfig, initialOperational);
287 return _initialDataChangeEventImpl;
290 protected final void removeListener(final DataChangeListenerRegistration<P, D, DCL> registration) {
291 synchronized (listeners) {
292 listeners.remove(registration.getPath(), registration);
296 protected final void removeCommitHandler(final DataCommitHandlerRegistrationImpl<P, D> registration) {
297 synchronized (commitHandlers) {
299 commitHandlers.remove(registration.getPath(), registration);
300 LOG.trace("Removing Commit Handler {} for path: {}", registration.getInstance(), registration.getPath());
301 for (final ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<P, D>>> listener : commitHandlerRegistrationListeners) {
303 listener.getInstance().onUnregister(registration);
304 } catch (Exception e) {
305 LOG.error("Unexpected exception in listener {} during invoking onUnregister",
306 listener.getInstance(), e);
313 protected final Collection<Entry<P, DataCommitHandlerRegistrationImpl<P, D>>> getActiveCommitHandlers() {
314 return commitHandlers.entries();
317 protected ImmutableList<ListenerStateCapture<P, D, DCL>> affectedListeners(final Set<P> paths) {
319 synchronized (listeners) {
320 return FluentIterable //
321 .from(listeners.asMap().entrySet()) //
322 .filter(new Predicate<Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>>>() {
324 public boolean apply(final Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>> it) {
325 return isAffectedBy(it.getKey(), paths);
329 new Function<Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>>, ListenerStateCapture<P, D, DCL>>() {
331 public ListenerStateCapture<P, D, DCL> apply(
332 final Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>> it) {
333 return new ListenerStateCapture<P, D, DCL>(it.getKey(), it.getValue(),
334 createContainsPredicate(it.getKey()));
341 protected ImmutableList<ListenerStateCapture<P, D, DCL>> probablyAffectedListeners(final Set<P> paths) {
342 synchronized (listeners) {
343 return FluentIterable //
344 .from(listeners.asMap().entrySet()) //
345 .filter(new Predicate<Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>>>() {
347 public boolean apply(final Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>> it) {
348 return isProbablyAffectedBy(it.getKey(), paths);
352 new Function<Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>>, ListenerStateCapture<P, D, DCL>>() {
354 public ListenerStateCapture<P, D, DCL> apply(
355 final Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>> it) {
356 return new ListenerStateCapture<P, D, DCL>(it.getKey(), it.getValue(),
357 createIsContainedPredicate(it.getKey()));
364 protected Predicate<P> createContainsPredicate(final P key) {
365 return new Predicate<P>() {
367 public boolean apply(final P other) {
368 return key.contains(other);
373 protected Predicate<P> createIsContainedPredicate(final P key) {
374 return new Predicate<P>() {
376 public boolean apply(final P other) {
377 return other.contains(key);
382 protected boolean isAffectedBy(final P key, final Set<P> paths) {
383 final Predicate<P> contains = this.createContainsPredicate(key);
384 if (paths.contains(key)) {
387 for (final P path : paths) {
388 if (contains.apply(path)) {
395 protected boolean isProbablyAffectedBy(final P key, final Set<P> paths) {
396 final Predicate<P> isContained = this.createIsContainedPredicate(key);
397 for (final P path : paths) {
398 if (isContained.apply(path)) {
405 final Future<RpcResult<TransactionStatus>> commit(final AbstractDataTransaction<P, D> transaction) {
406 Preconditions.checkNotNull(transaction);
407 final TwoPhaseCommit<P, D, DCL> task = new TwoPhaseCommit<P, D, DCL>(transaction, this);
409 this.getSubmittedTransactionsCount().getAndIncrement();
410 return this.getExecutor().submit(task);
413 private static class DataCommitHandlerRegistrationImpl<P extends Path<P>, D extends Object> //
414 extends AbstractObjectRegistration<DataCommitHandler<P, D>> //
415 implements DataCommitHandlerRegistration<P, D> {
417 private AbstractDataBroker<P, D, ? extends Object> dataBroker;
418 private final P path;
425 public DataCommitHandlerRegistrationImpl(final P path, final DataCommitHandler<P, D> instance,
426 final AbstractDataBroker<P, D, ? extends Object> broker) {
428 this.dataBroker = broker;
433 protected void removeRegistration() {
434 this.dataBroker.removeCommitHandler(this);
435 this.dataBroker = null;