Merge "Bug 849: Fixed NPE in Translated Data Change Events."
[controller.git] / opendaylight / md-sal / sal-common-impl / src / main / java / org / opendaylight / controller / md / sal / common / impl / service / AbstractDataBroker.java
1 /**
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.common.impl.service;
9
10 import java.util.Arrays;
11 import java.util.Collection;
12 import java.util.Collections;
13 import java.util.HashSet;
14 import java.util.Map;
15 import java.util.Map.Entry;
16 import java.util.Set;
17 import java.util.concurrent.ExecutorService;
18 import java.util.concurrent.Future;
19 import java.util.concurrent.atomic.AtomicLong;
20 import java.util.concurrent.locks.Lock;
21 import java.util.concurrent.locks.ReentrantLock;
22
23 import org.opendaylight.controller.md.sal.common.api.RegistrationListener;
24 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
25 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
26 import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener;
27 import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher;
28 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
29 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration;
30 import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory;
31 import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService;
32 import org.opendaylight.controller.md.sal.common.api.data.DataReader;
33 import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter;
34 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
35 import org.opendaylight.yangtools.concepts.CompositeObjectRegistration;
36 import org.opendaylight.yangtools.concepts.ListenerRegistration;
37 import org.opendaylight.yangtools.concepts.Path;
38 import org.opendaylight.yangtools.concepts.Registration;
39 import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
40 import org.opendaylight.yangtools.yang.common.RpcResult;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 import com.google.common.base.Function;
45 import com.google.common.base.Preconditions;
46 import com.google.common.base.Predicate;
47 import com.google.common.base.Supplier;
48 import com.google.common.collect.FluentIterable;
49 import com.google.common.collect.HashMultimap;
50 import com.google.common.collect.ImmutableList;
51 import com.google.common.collect.Multimap;
52 import com.google.common.collect.Multimaps;
53 import com.google.common.util.concurrent.MoreExecutors;
54
55 public abstract class AbstractDataBroker<P extends Path<P>, D extends Object, DCL extends DataChangeListener<P, D>>
56         implements DataModificationTransactionFactory<P, D>, DataReader<P, D>, DataChangePublisher<P, D, DCL>,
57         DataProvisionService<P, D> {
58     private final static Logger LOG = LoggerFactory.getLogger(AbstractDataBroker.class);
59
60     private ExecutorService executor;
61
62     public ExecutorService getExecutor() {
63         return this.executor;
64     }
65
66     public void setExecutor(final ExecutorService executor) {
67         this.executor = executor;
68     }
69
70     private ExecutorService notificationExecutor = MoreExecutors.sameThreadExecutor();
71
72     public ExecutorService getNotificationExecutor() {
73         return this.notificationExecutor;
74     }
75
76     public void setNotificationExecutor(final ExecutorService notificationExecutor) {
77         this.notificationExecutor = notificationExecutor;
78     }
79
80     private AbstractDataReadRouter<P, D> dataReadRouter;
81
82     private final AtomicLong submittedTransactionsCount = new AtomicLong();
83
84     private final AtomicLong failedTransactionsCount = new AtomicLong();
85
86     private final AtomicLong finishedTransactionsCount = new AtomicLong();
87
88     public AbstractDataReadRouter<P, D> getDataReadRouter() {
89         return this.dataReadRouter;
90     }
91
92     public void setDataReadRouter(final AbstractDataReadRouter<P, D> dataReadRouter) {
93         this.dataReadRouter = dataReadRouter;
94     }
95
96     public AtomicLong getSubmittedTransactionsCount() {
97         return this.submittedTransactionsCount;
98     }
99
100     public AtomicLong getFailedTransactionsCount() {
101         return this.failedTransactionsCount;
102     }
103
104     public AtomicLong getFinishedTransactionsCount() {
105         return this.finishedTransactionsCount;
106     }
107
108     private final Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = Multimaps
109             .synchronizedSetMultimap(HashMultimap.<P, DataChangeListenerRegistration<P, D, DCL>> create());
110
111     private final Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = Multimaps
112             .synchronizedSetMultimap(HashMultimap.<P, DataCommitHandlerRegistrationImpl<P, D>> create());
113
114     private final Lock registrationLock = new ReentrantLock();
115
116     private final ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P, D>>> commitHandlerRegistrationListeners = new ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P, D>>>();
117
118     public AbstractDataBroker() {
119     }
120
121     protected ImmutableList<DataCommitHandler<P, D>> affectedCommitHandlers(final Set<P> paths) {
122         final Supplier<ImmutableList<DataCommitHandler<P, D>>> _function = new Supplier<ImmutableList<DataCommitHandler<P, D>>>() {
123             @Override
124             public ImmutableList<DataCommitHandler<P, D>> get() {
125                 Map<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> _asMap = commitHandlers.asMap();
126                 Set<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _entrySet = _asMap.entrySet();
127                 FluentIterable<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _from = FluentIterable
128                         .<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> from(_entrySet);
129                 final Predicate<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _function = new Predicate<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>>() {
130                     @Override
131                     public boolean apply(final Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> it) {
132                         P _key = it.getKey();
133                         boolean _isAffectedBy = isAffectedBy(_key, paths);
134                         return _isAffectedBy;
135                     }
136                 };
137                 FluentIterable<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _filter = _from
138                         .filter(_function);
139                 final Function<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>, Collection<DataCommitHandlerRegistrationImpl<P, D>>> _function_1 = new Function<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>, Collection<DataCommitHandlerRegistrationImpl<P, D>>>() {
140                     @Override
141                     public Collection<DataCommitHandlerRegistrationImpl<P, D>> apply(
142                             final Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> it) {
143                         Collection<DataCommitHandlerRegistrationImpl<P, D>> _value = it.getValue();
144                         return _value;
145                     }
146                 };
147                 FluentIterable<DataCommitHandlerRegistrationImpl<P, D>> _transformAndConcat = _filter
148                         .<DataCommitHandlerRegistrationImpl<P, D>> transformAndConcat(_function_1);
149                 final Function<DataCommitHandlerRegistrationImpl<P, D>, DataCommitHandler<P, D>> _function_2 = new Function<DataCommitHandlerRegistrationImpl<P, D>, DataCommitHandler<P, D>>() {
150                     @Override
151                     public DataCommitHandler<P, D> apply(final DataCommitHandlerRegistrationImpl<P, D> it) {
152                         DataCommitHandler<P, D> _instance = it.getInstance();
153                         return _instance;
154                     }
155                 };
156                 FluentIterable<DataCommitHandler<P, D>> _transform = _transformAndConcat
157                         .<DataCommitHandler<P, D>> transform(_function_2);
158                 return _transform.toList();
159             }
160         };
161         return AbstractDataBroker.<ImmutableList<DataCommitHandler<P, D>>> withLock(this.registrationLock, _function);
162     }
163
164     protected ImmutableList<DataCommitHandler<P, D>> probablyAffectedCommitHandlers(final HashSet<P> paths) {
165         final Supplier<ImmutableList<DataCommitHandler<P, D>>> _function = new Supplier<ImmutableList<DataCommitHandler<P, D>>>() {
166             @Override
167             public ImmutableList<DataCommitHandler<P, D>> get() {
168                 Map<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> _asMap = commitHandlers.asMap();
169                 Set<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _entrySet = _asMap.entrySet();
170                 FluentIterable<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _from = FluentIterable
171                         .<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> from(_entrySet);
172                 final Predicate<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _function = new Predicate<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>>() {
173                     @Override
174                     public boolean apply(final Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> it) {
175                         P _key = it.getKey();
176                         boolean _isProbablyAffectedBy = isProbablyAffectedBy(_key, paths);
177                         return _isProbablyAffectedBy;
178                     }
179                 };
180                 FluentIterable<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _filter = _from
181                         .filter(_function);
182                 final Function<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>, Collection<DataCommitHandlerRegistrationImpl<P, D>>> _function_1 = new Function<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>, Collection<DataCommitHandlerRegistrationImpl<P, D>>>() {
183                     @Override
184                     public Collection<DataCommitHandlerRegistrationImpl<P, D>> apply(
185                             final Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> it) {
186                         Collection<DataCommitHandlerRegistrationImpl<P, D>> _value = it.getValue();
187                         return _value;
188                     }
189                 };
190                 FluentIterable<DataCommitHandlerRegistrationImpl<P, D>> _transformAndConcat = _filter
191                         .<DataCommitHandlerRegistrationImpl<P, D>> transformAndConcat(_function_1);
192                 final Function<DataCommitHandlerRegistrationImpl<P, D>, DataCommitHandler<P, D>> _function_2 = new Function<DataCommitHandlerRegistrationImpl<P, D>, DataCommitHandler<P, D>>() {
193                     @Override
194                     public DataCommitHandler<P, D> apply(final DataCommitHandlerRegistrationImpl<P, D> it) {
195                         DataCommitHandler<P, D> _instance = it.getInstance();
196                         return _instance;
197                     }
198                 };
199                 FluentIterable<DataCommitHandler<P, D>> _transform = _transformAndConcat
200                         .<DataCommitHandler<P, D>> transform(_function_2);
201                 return _transform.toList();
202             }
203         };
204         return AbstractDataBroker.<ImmutableList<DataCommitHandler<P, D>>> withLock(this.registrationLock, _function);
205     }
206
207     protected Map<P, D> deepGetBySubpath(final Map<P, D> dataSet, final P path) {
208         return Collections.<P, D> emptyMap();
209     }
210
211     @Override
212     public final D readConfigurationData(final P path) {
213         AbstractDataReadRouter<P, D> _dataReadRouter = this.getDataReadRouter();
214         return _dataReadRouter.readConfigurationData(path);
215     }
216
217     @Override
218     public final D readOperationalData(final P path) {
219         AbstractDataReadRouter<P, D> _dataReadRouter = this.getDataReadRouter();
220         return _dataReadRouter.readOperationalData(path);
221     }
222
223     private static <T extends Object> T withLock(final Lock lock, final Supplier<T> method) {
224         lock.lock();
225         try {
226             return method.get();
227         } finally {
228             lock.unlock();
229         }
230     }
231
232     @Override
233     public final Registration<DataCommitHandler<P, D>> registerCommitHandler(final P path,
234             final DataCommitHandler<P, D> commitHandler) {
235         synchronized (commitHandler) {
236             final DataCommitHandlerRegistrationImpl<P, D> registration = new DataCommitHandlerRegistrationImpl<P, D>(
237                     path, commitHandler, this);
238             commitHandlers.put(path, registration);
239             LOG.trace("Registering Commit Handler {} for path: {}", commitHandler, path);
240             for (final ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<P, D>>> listener : commitHandlerRegistrationListeners) {
241                 try {
242                     listener.getInstance().onRegister(registration);
243                 } catch (Exception e) {
244                     LOG.error("Unexpected exception in listener {} during invoking onRegister", listener.getInstance(),
245                             e);
246                 }
247             }
248             return registration;
249         }
250     }
251
252     @Override
253     public final ListenerRegistration<DCL> registerDataChangeListener(final P path, final DCL listener) {
254         synchronized (listeners) {
255             final DataChangeListenerRegistration<P, D, DCL> reg = new DataChangeListenerRegistration<P, D, DCL>(path,
256                     listener, AbstractDataBroker.this);
257             listeners.put(path, reg);
258             final D initialConfig = getDataReadRouter().readConfigurationData(path);
259             final D initialOperational = getDataReadRouter().readOperationalData(path);
260             final DataChangeEvent<P, D> event = createInitialListenerEvent(path, initialConfig, initialOperational);
261             listener.onDataChanged(event);
262             return reg;
263         }
264     }
265
266     public final CompositeObjectRegistration<DataReader<P, D>> registerDataReader(final P path,
267             final DataReader<P, D> reader) {
268
269         final Registration<DataReader<P, D>> confReg = getDataReadRouter().registerConfigurationReader(path, reader);
270         final Registration<DataReader<P, D>> dataReg = getDataReadRouter().registerOperationalReader(path, reader);
271         return new CompositeObjectRegistration<DataReader<P, D>>(reader, Arrays.asList(confReg, dataReg));
272     }
273
274     @Override
275     public ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<P, D>>> registerCommitHandlerListener(
276             final RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {
277         final ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<P, D>>> ret = this.commitHandlerRegistrationListeners
278                 .register(commitHandlerListener);
279         return ret;
280     }
281
282     protected DataChangeEvent<P, D> createInitialListenerEvent(final P path, final D initialConfig,
283             final D initialOperational) {
284         InitialDataChangeEventImpl<P, D> _initialDataChangeEventImpl = new InitialDataChangeEventImpl<P, D>(
285                 initialConfig, initialOperational);
286         return _initialDataChangeEventImpl;
287     }
288
289     protected final void removeListener(final DataChangeListenerRegistration<P, D, DCL> registration) {
290         synchronized (listeners) {
291             listeners.remove(registration.getPath(), registration);
292         }
293     }
294
295     protected final void removeCommitHandler(final DataCommitHandlerRegistrationImpl<P, D> registration) {
296         synchronized (commitHandlers) {
297
298             commitHandlers.remove(registration.getPath(), registration);
299             LOG.trace("Removing Commit Handler {} for path: {}", registration.getInstance(), registration.getPath());
300             for (final ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<P, D>>> listener : commitHandlerRegistrationListeners) {
301                 try {
302                     listener.getInstance().onUnregister(registration);
303                 } catch (Exception e) {
304                     LOG.error("Unexpected exception in listener {} during invoking onUnregister",
305                             listener.getInstance(), e);
306                 }
307             }
308         }
309
310     }
311
312     protected final Collection<Entry<P, DataCommitHandlerRegistrationImpl<P, D>>> getActiveCommitHandlers() {
313         return commitHandlers.entries();
314     }
315
316     protected ImmutableList<ListenerStateCapture<P, D, DCL>> affectedListeners(final Set<P> paths) {
317
318         synchronized (listeners) {
319             return FluentIterable //
320                     .from(listeners.asMap().entrySet()) //
321                     .filter(new Predicate<Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>>>() {
322                         @Override
323                         public boolean apply(final Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>> it) {
324                             return isAffectedBy(it.getKey(), paths);
325                         }
326                     }) //
327                     .transform(
328                             new Function<Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>>, ListenerStateCapture<P, D, DCL>>() {
329                                 @Override
330                                 public ListenerStateCapture<P, D, DCL> apply(
331                                         final Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>> it) {
332                                     return new ListenerStateCapture<P, D, DCL>(it.getKey(), it.getValue(),
333                                             createContainsPredicate(it.getKey()));
334                                 }
335                             }) //
336                     .toList();
337         }
338     }
339
340     protected ImmutableList<ListenerStateCapture<P, D, DCL>> probablyAffectedListeners(final Set<P> paths) {
341         synchronized (listeners) {
342             return FluentIterable //
343                     .from(listeners.asMap().entrySet()) //
344                     .filter(new Predicate<Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>>>() {
345                         @Override
346                         public boolean apply(final Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>> it) {
347                             return isProbablyAffectedBy(it.getKey(), paths);
348                         }
349                     }) //
350                     .transform(
351                             new Function<Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>>, ListenerStateCapture<P, D, DCL>>() {
352                                 @Override
353                                 public ListenerStateCapture<P, D, DCL> apply(
354                                         final Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>> it) {
355                                     return new ListenerStateCapture<P, D, DCL>(it.getKey(), it.getValue(),
356                                             createIsContainedPredicate(it.getKey()));
357                                 }
358                             }) //
359                     .toList();
360         }
361     }
362
363     protected Predicate<P> createContainsPredicate(final P key) {
364         return new Predicate<P>() {
365             @Override
366             public boolean apply(final P other) {
367                 return key.contains(other);
368             }
369         };
370     }
371
372     protected Predicate<P> createIsContainedPredicate(final P key) {
373         return new Predicate<P>() {
374             @Override
375             public boolean apply(final P other) {
376                 return other.contains(key);
377             }
378         };
379     }
380
381     protected boolean isAffectedBy(final P key, final Set<P> paths) {
382         final Predicate<P> contains = this.createContainsPredicate(key);
383         if (paths.contains(key)) {
384             return true;
385         }
386         for (final P path : paths) {
387             if (contains.apply(path)) {
388                 return true;
389             }
390         }
391         return false;
392     }
393
394     protected boolean isProbablyAffectedBy(final P key, final Set<P> paths) {
395         final Predicate<P> isContained = this.createIsContainedPredicate(key);
396         for (final P path : paths) {
397             if (isContained.apply(path)) {
398                 return true;
399             }
400         }
401         return false;
402     }
403
404     final Future<RpcResult<TransactionStatus>> commit(final AbstractDataTransaction<P, D> transaction) {
405         Preconditions.checkNotNull(transaction);
406         final TwoPhaseCommit<P, D, DCL> task = new TwoPhaseCommit<P, D, DCL>(transaction, this);
407
408         this.getSubmittedTransactionsCount().getAndIncrement();
409         return this.getExecutor().submit(task);
410     }
411
412     private static class DataCommitHandlerRegistrationImpl<P extends Path<P>, D extends Object> //
413             extends AbstractObjectRegistration<DataCommitHandler<P, D>> //
414             implements DataCommitHandlerRegistration<P, D> {
415
416         private AbstractDataBroker<P, D, ? extends Object> dataBroker;
417         private final P path;
418
419         @Override
420         public P getPath() {
421             return this.path;
422         }
423
424         public DataCommitHandlerRegistrationImpl(final P path, final DataCommitHandler<P, D> instance,
425                 final AbstractDataBroker<P, D, ? extends Object> broker) {
426             super(instance);
427             this.dataBroker = broker;
428             this.path = path;
429         }
430
431         @Override
432         protected void removeRegistration() {
433             this.dataBroker.removeCommitHandler(this);
434             this.dataBroker = null;
435         }
436     }
437 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.