Bug 849: Fixed NPE in Translated Data Change Events.
[controller.git] / opendaylight / md-sal / sal-common-impl / src / main / java / org / opendaylight / controller / md / sal / common / impl / service / AbstractDataBroker.java
1 /**
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.common.impl.service;
9
10 import java.util.Arrays;
11 import java.util.Collection;
12 import java.util.Collections;
13 import java.util.HashSet;
14 import java.util.Map;
15 import java.util.Map.Entry;
16 import java.util.Set;
17 import java.util.concurrent.Callable;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.Future;
20 import java.util.concurrent.atomic.AtomicLong;
21 import java.util.concurrent.locks.Lock;
22 import java.util.concurrent.locks.ReentrantLock;
23
24 import org.eclipse.xtext.xbase.lib.Exceptions;
25 import org.opendaylight.controller.md.sal.common.api.RegistrationListener;
26 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
27 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
28 import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener;
29 import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher;
30 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
31 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration;
32 import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory;
33 import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService;
34 import org.opendaylight.controller.md.sal.common.api.data.DataReader;
35 import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter;
36 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
37 import org.opendaylight.yangtools.concepts.CompositeObjectRegistration;
38 import org.opendaylight.yangtools.concepts.ListenerRegistration;
39 import org.opendaylight.yangtools.concepts.Path;
40 import org.opendaylight.yangtools.concepts.Registration;
41 import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
42 import org.opendaylight.yangtools.yang.common.RpcResult;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 import com.google.common.base.Function;
47 import com.google.common.base.Preconditions;
48 import com.google.common.base.Predicate;
49 import com.google.common.collect.FluentIterable;
50 import com.google.common.collect.HashMultimap;
51 import com.google.common.collect.ImmutableList;
52 import com.google.common.collect.Multimap;
53 import com.google.common.collect.Multimaps;
54 import com.google.common.util.concurrent.MoreExecutors;
55
56 public abstract class AbstractDataBroker<P extends Path<P>, D extends Object, DCL extends DataChangeListener<P, D>>
57         implements DataModificationTransactionFactory<P, D>, DataReader<P, D>, DataChangePublisher<P, D, DCL>,
58         DataProvisionService<P, D> {
59     private final static Logger LOG = LoggerFactory.getLogger(AbstractDataBroker.class);
60
61     private ExecutorService executor;
62
63     public ExecutorService getExecutor() {
64         return this.executor;
65     }
66
67     public void setExecutor(final ExecutorService executor) {
68         this.executor = executor;
69     }
70
71     private ExecutorService notificationExecutor = MoreExecutors.sameThreadExecutor();
72
73     public ExecutorService getNotificationExecutor() {
74         return this.notificationExecutor;
75     }
76
77     public void setNotificationExecutor(final ExecutorService notificationExecutor) {
78         this.notificationExecutor = notificationExecutor;
79     }
80
81     private AbstractDataReadRouter<P, D> dataReadRouter;
82
83     private final AtomicLong submittedTransactionsCount = new AtomicLong();
84
85     private final AtomicLong failedTransactionsCount = new AtomicLong();
86
87     private final AtomicLong finishedTransactionsCount = new AtomicLong();
88
89     public AbstractDataReadRouter<P, D> getDataReadRouter() {
90         return this.dataReadRouter;
91     }
92
93     public void setDataReadRouter(final AbstractDataReadRouter<P, D> dataReadRouter) {
94         this.dataReadRouter = dataReadRouter;
95     }
96
97     public AtomicLong getSubmittedTransactionsCount() {
98         return this.submittedTransactionsCount;
99     }
100
101     public AtomicLong getFailedTransactionsCount() {
102         return this.failedTransactionsCount;
103     }
104
105     public AtomicLong getFinishedTransactionsCount() {
106         return this.finishedTransactionsCount;
107     }
108
109     private final Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = Multimaps
110             .synchronizedSetMultimap(HashMultimap.<P, DataChangeListenerRegistration<P, D, DCL>> create());
111
112     private final Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = Multimaps
113             .synchronizedSetMultimap(HashMultimap.<P, DataCommitHandlerRegistrationImpl<P, D>> create());
114
115     private final Lock registrationLock = new ReentrantLock();
116
117     private final ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P, D>>> commitHandlerRegistrationListeners = new ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P, D>>>();
118
119     public AbstractDataBroker() {
120     }
121
122     protected ImmutableList<DataCommitHandler<P, D>> affectedCommitHandlers(final Set<P> paths) {
123         final Callable<ImmutableList<DataCommitHandler<P, D>>> _function = new Callable<ImmutableList<DataCommitHandler<P, D>>>() {
124             @Override
125             public ImmutableList<DataCommitHandler<P, D>> call() throws Exception {
126                 Map<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> _asMap = commitHandlers.asMap();
127                 Set<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _entrySet = _asMap.entrySet();
128                 FluentIterable<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _from = FluentIterable
129                         .<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> from(_entrySet);
130                 final Predicate<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _function = new Predicate<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>>() {
131                     @Override
132                     public boolean apply(final Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> it) {
133                         P _key = it.getKey();
134                         boolean _isAffectedBy = isAffectedBy(_key, paths);
135                         return _isAffectedBy;
136                     }
137                 };
138                 FluentIterable<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _filter = _from
139                         .filter(_function);
140                 final Function<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>, Collection<DataCommitHandlerRegistrationImpl<P, D>>> _function_1 = new Function<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>, Collection<DataCommitHandlerRegistrationImpl<P, D>>>() {
141                     @Override
142                     public Collection<DataCommitHandlerRegistrationImpl<P, D>> apply(
143                             final Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> it) {
144                         Collection<DataCommitHandlerRegistrationImpl<P, D>> _value = it.getValue();
145                         return _value;
146                     }
147                 };
148                 FluentIterable<DataCommitHandlerRegistrationImpl<P, D>> _transformAndConcat = _filter
149                         .<DataCommitHandlerRegistrationImpl<P, D>> transformAndConcat(_function_1);
150                 final Function<DataCommitHandlerRegistrationImpl<P, D>, DataCommitHandler<P, D>> _function_2 = new Function<DataCommitHandlerRegistrationImpl<P, D>, DataCommitHandler<P, D>>() {
151                     @Override
152                     public DataCommitHandler<P, D> apply(final DataCommitHandlerRegistrationImpl<P, D> it) {
153                         DataCommitHandler<P, D> _instance = it.getInstance();
154                         return _instance;
155                     }
156                 };
157                 FluentIterable<DataCommitHandler<P, D>> _transform = _transformAndConcat
158                         .<DataCommitHandler<P, D>> transform(_function_2);
159                 return _transform.toList();
160             }
161         };
162         return AbstractDataBroker.<ImmutableList<DataCommitHandler<P, D>>> withLock(this.registrationLock, _function);
163     }
164
165     protected ImmutableList<DataCommitHandler<P, D>> probablyAffectedCommitHandlers(final HashSet<P> paths) {
166         final Callable<ImmutableList<DataCommitHandler<P, D>>> _function = new Callable<ImmutableList<DataCommitHandler<P, D>>>() {
167             @Override
168             public ImmutableList<DataCommitHandler<P, D>> call() throws Exception {
169                 Map<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> _asMap = commitHandlers.asMap();
170                 Set<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _entrySet = _asMap.entrySet();
171                 FluentIterable<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _from = FluentIterable
172                         .<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> from(_entrySet);
173                 final Predicate<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _function = new Predicate<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>>() {
174                     @Override
175                     public boolean apply(final Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> it) {
176                         P _key = it.getKey();
177                         boolean _isProbablyAffectedBy = isProbablyAffectedBy(_key, paths);
178                         return _isProbablyAffectedBy;
179                     }
180                 };
181                 FluentIterable<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _filter = _from
182                         .filter(_function);
183                 final Function<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>, Collection<DataCommitHandlerRegistrationImpl<P, D>>> _function_1 = new Function<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>, Collection<DataCommitHandlerRegistrationImpl<P, D>>>() {
184                     @Override
185                     public Collection<DataCommitHandlerRegistrationImpl<P, D>> apply(
186                             final Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> it) {
187                         Collection<DataCommitHandlerRegistrationImpl<P, D>> _value = it.getValue();
188                         return _value;
189                     }
190                 };
191                 FluentIterable<DataCommitHandlerRegistrationImpl<P, D>> _transformAndConcat = _filter
192                         .<DataCommitHandlerRegistrationImpl<P, D>> transformAndConcat(_function_1);
193                 final Function<DataCommitHandlerRegistrationImpl<P, D>, DataCommitHandler<P, D>> _function_2 = new Function<DataCommitHandlerRegistrationImpl<P, D>, DataCommitHandler<P, D>>() {
194                     @Override
195                     public DataCommitHandler<P, D> apply(final DataCommitHandlerRegistrationImpl<P, D> it) {
196                         DataCommitHandler<P, D> _instance = it.getInstance();
197                         return _instance;
198                     }
199                 };
200                 FluentIterable<DataCommitHandler<P, D>> _transform = _transformAndConcat
201                         .<DataCommitHandler<P, D>> transform(_function_2);
202                 return _transform.toList();
203             }
204         };
205         return AbstractDataBroker.<ImmutableList<DataCommitHandler<P, D>>> withLock(this.registrationLock, _function);
206     }
207
208     protected Map<P, D> deepGetBySubpath(final Map<P, D> dataSet, final P path) {
209         return Collections.<P, D> emptyMap();
210     }
211
212     @Override
213     public final D readConfigurationData(final P path) {
214         AbstractDataReadRouter<P, D> _dataReadRouter = this.getDataReadRouter();
215         return _dataReadRouter.readConfigurationData(path);
216     }
217
218     @Override
219     public final D readOperationalData(final P path) {
220         AbstractDataReadRouter<P, D> _dataReadRouter = this.getDataReadRouter();
221         return _dataReadRouter.readOperationalData(path);
222     }
223
224     private static <T extends Object> T withLock(final Lock lock, final Callable<T> method) {
225         lock.lock();
226         try {
227             return method.call();
228         } catch (Exception e) {
229             throw Exceptions.sneakyThrow(e);
230         } finally {
231             lock.unlock();
232         }
233     }
234
235     @Override
236     public final Registration<DataCommitHandler<P, D>> registerCommitHandler(final P path,
237             final DataCommitHandler<P, D> commitHandler) {
238         synchronized (commitHandler) {
239             final DataCommitHandlerRegistrationImpl<P, D> registration = new DataCommitHandlerRegistrationImpl<P, D>(
240                     path, commitHandler, this);
241             commitHandlers.put(path, registration);
242             LOG.trace("Registering Commit Handler {} for path: {}", commitHandler, path);
243             for (final ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<P, D>>> listener : commitHandlerRegistrationListeners) {
244                 try {
245                     listener.getInstance().onRegister(registration);
246                 } catch (Exception e) {
247                     LOG.error("Unexpected exception in listener {} during invoking onRegister", listener.getInstance(),
248                             e);
249                 }
250             }
251             return registration;
252         }
253     }
254
255     @Override
256     public final ListenerRegistration<DCL> registerDataChangeListener(final P path, final DCL listener) {
257         synchronized (listeners) {
258             final DataChangeListenerRegistration<P, D, DCL> reg = new DataChangeListenerRegistration<P, D, DCL>(path,
259                     listener, AbstractDataBroker.this);
260             listeners.put(path, reg);
261             final D initialConfig = getDataReadRouter().readConfigurationData(path);
262             final D initialOperational = getDataReadRouter().readOperationalData(path);
263             final DataChangeEvent<P, D> event = createInitialListenerEvent(path, initialConfig, initialOperational);
264             listener.onDataChanged(event);
265             return reg;
266         }
267     }
268
269     public final CompositeObjectRegistration<DataReader<P, D>> registerDataReader(final P path,
270             final DataReader<P, D> reader) {
271
272         final Registration<DataReader<P, D>> confReg = getDataReadRouter().registerConfigurationReader(path, reader);
273         final Registration<DataReader<P, D>> dataReg = getDataReadRouter().registerOperationalReader(path, reader);
274         return new CompositeObjectRegistration<DataReader<P, D>>(reader, Arrays.asList(confReg, dataReg));
275     }
276
277     @Override
278     public ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<P, D>>> registerCommitHandlerListener(
279             final RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {
280         final ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<P, D>>> ret = this.commitHandlerRegistrationListeners
281                 .register(commitHandlerListener);
282         return ret;
283     }
284
285     protected DataChangeEvent<P, D> createInitialListenerEvent(final P path, final D initialConfig,
286             final D initialOperational) {
287         InitialDataChangeEventImpl<P, D> _initialDataChangeEventImpl = new InitialDataChangeEventImpl<P, D>(
288                 initialConfig, initialOperational);
289         return _initialDataChangeEventImpl;
290     }
291
292     protected final void removeListener(final DataChangeListenerRegistration<P, D, DCL> registration) {
293         synchronized (listeners) {
294             listeners.remove(registration.getPath(), registration);
295         }
296     }
297
298     protected final void removeCommitHandler(final DataCommitHandlerRegistrationImpl<P, D> registration) {
299         synchronized (commitHandlers) {
300
301             commitHandlers.remove(registration.getPath(), registration);
302             LOG.trace("Removing Commit Handler {} for path: {}", registration.getInstance(), registration.getPath());
303             for (final ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<P, D>>> listener : commitHandlerRegistrationListeners) {
304                 try {
305                     listener.getInstance().onUnregister(registration);
306                 } catch (Exception e) {
307                     LOG.error("Unexpected exception in listener {} during invoking onUnregister",
308                             listener.getInstance(), e);
309                 }
310             }
311         }
312
313     }
314
315     protected final Collection<Entry<P, DataCommitHandlerRegistrationImpl<P, D>>> getActiveCommitHandlers() {
316         return commitHandlers.entries();
317     }
318
319     protected ImmutableList<ListenerStateCapture<P, D, DCL>> affectedListeners(final Set<P> paths) {
320
321         synchronized (listeners) {
322             return FluentIterable //
323                     .from(listeners.asMap().entrySet()) //
324                     .filter(new Predicate<Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>>>() {
325                         @Override
326                         public boolean apply(final Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>> it) {
327                             return isAffectedBy(it.getKey(), paths);
328                         }
329                     }) //
330                     .transform(
331                             new Function<Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>>, ListenerStateCapture<P, D, DCL>>() {
332                                 @Override
333                                 public ListenerStateCapture<P, D, DCL> apply(
334                                         final Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>> it) {
335                                     return new ListenerStateCapture<P, D, DCL>(it.getKey(), it.getValue(),
336                                             createContainsPredicate(it.getKey()));
337                                 }
338                             }) //
339                     .toList();
340         }
341     }
342
343     protected ImmutableList<ListenerStateCapture<P, D, DCL>> probablyAffectedListeners(final Set<P> paths) {
344         synchronized (listeners) {
345             return FluentIterable //
346                     .from(listeners.asMap().entrySet()) //
347                     .filter(new Predicate<Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>>>() {
348                         @Override
349                         public boolean apply(final Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>> it) {
350                             return isProbablyAffectedBy(it.getKey(), paths);
351                         }
352                     }) //
353                     .transform(
354                             new Function<Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>>, ListenerStateCapture<P, D, DCL>>() {
355                                 @Override
356                                 public ListenerStateCapture<P, D, DCL> apply(
357                                         final Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>> it) {
358                                     return new ListenerStateCapture<P, D, DCL>(it.getKey(), it.getValue(),
359                                             createIsContainedPredicate(it.getKey()));
360                                 }
361                             }) //
362                     .toList();
363         }
364     }
365
366     protected Predicate<P> createContainsPredicate(final P key) {
367         return new Predicate<P>() {
368             @Override
369             public boolean apply(final P other) {
370                 return key.contains(other);
371             }
372         };
373     }
374
375     protected Predicate<P> createIsContainedPredicate(final P key) {
376         return new Predicate<P>() {
377             @Override
378             public boolean apply(final P other) {
379                 return other.contains(key);
380             }
381         };
382     }
383
384     protected boolean isAffectedBy(final P key, final Set<P> paths) {
385         final Predicate<P> contains = this.createContainsPredicate(key);
386         if (paths.contains(key)) {
387             return true;
388         }
389         for (final P path : paths) {
390             if (contains.apply(path)) {
391                 return true;
392             }
393         }
394         return false;
395     }
396
397     protected boolean isProbablyAffectedBy(final P key, final Set<P> paths) {
398         final Predicate<P> isContained = this.createIsContainedPredicate(key);
399         for (final P path : paths) {
400             if (isContained.apply(path)) {
401                 return true;
402             }
403         }
404         return false;
405     }
406
407     final Future<RpcResult<TransactionStatus>> commit(final AbstractDataTransaction<P, D> transaction) {
408         Preconditions.checkNotNull(transaction);
409         final TwoPhaseCommit<P, D, DCL> task = new TwoPhaseCommit<P, D, DCL>(transaction, this);
410
411         this.getSubmittedTransactionsCount().getAndIncrement();
412         return this.getExecutor().submit(task);
413     }
414
415     private static class DataCommitHandlerRegistrationImpl<P extends Path<P>, D extends Object> //
416             extends AbstractObjectRegistration<DataCommitHandler<P, D>> //
417             implements DataCommitHandlerRegistration<P, D> {
418
419         private AbstractDataBroker<P, D, ? extends Object> dataBroker;
420         private final P path;
421
422         @Override
423         public P getPath() {
424             return this.path;
425         }
426
427         public DataCommitHandlerRegistrationImpl(final P path, final DataCommitHandler<P, D> instance,
428                 final AbstractDataBroker<P, D, ? extends Object> broker) {
429             super(instance);
430             this.dataBroker = broker;
431             this.path = path;
432         }
433
434         @Override
435         protected void removeRegistration() {
436             this.dataBroker.removeCommitHandler(this);
437             this.dataBroker = null;
438         }
439     }
440 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.