BUG-8: mark deprecated classes as such
[controller.git] / opendaylight / md-sal / sal-common-impl / src / main / java / org / opendaylight / controller / md / sal / common / impl / service / AbstractDataBroker.java
1 /**
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.common.impl.service;
9
10 import java.util.Arrays;
11 import java.util.Collection;
12 import java.util.Collections;
13 import java.util.HashSet;
14 import java.util.Map;
15 import java.util.Map.Entry;
16 import java.util.Set;
17 import java.util.concurrent.ExecutorService;
18 import java.util.concurrent.Future;
19 import java.util.concurrent.atomic.AtomicLong;
20 import java.util.concurrent.locks.Lock;
21 import java.util.concurrent.locks.ReentrantLock;
22
23 import org.opendaylight.controller.md.sal.common.api.RegistrationListener;
24 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
25 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
26 import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener;
27 import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher;
28 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
29 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration;
30 import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory;
31 import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService;
32 import org.opendaylight.controller.md.sal.common.api.data.DataReader;
33 import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter;
34 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
35 import org.opendaylight.yangtools.concepts.CompositeObjectRegistration;
36 import org.opendaylight.yangtools.concepts.ListenerRegistration;
37 import org.opendaylight.yangtools.concepts.Path;
38 import org.opendaylight.yangtools.concepts.Registration;
39 import org.opendaylight.yangtools.util.ListenerRegistry;
40 import org.opendaylight.yangtools.yang.common.RpcResult;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 import com.google.common.base.Function;
45 import com.google.common.base.Preconditions;
46 import com.google.common.base.Predicate;
47 import com.google.common.base.Supplier;
48 import com.google.common.collect.FluentIterable;
49 import com.google.common.collect.HashMultimap;
50 import com.google.common.collect.ImmutableList;
51 import com.google.common.collect.Multimap;
52 import com.google.common.collect.Multimaps;
53 import com.google.common.util.concurrent.MoreExecutors;
54
55 @Deprecated
56 public abstract class AbstractDataBroker<P extends Path<P>, D extends Object, DCL extends DataChangeListener<P, D>>
57         implements DataModificationTransactionFactory<P, D>, DataReader<P, D>, DataChangePublisher<P, D, DCL>,
58         DataProvisionService<P, D> {
59     private final static Logger LOG = LoggerFactory.getLogger(AbstractDataBroker.class);
60
61     private ExecutorService executor;
62
63     public ExecutorService getExecutor() {
64         return this.executor;
65     }
66
67     public void setExecutor(final ExecutorService executor) {
68         this.executor = executor;
69     }
70
71     private ExecutorService notificationExecutor = MoreExecutors.sameThreadExecutor();
72
73     public ExecutorService getNotificationExecutor() {
74         return this.notificationExecutor;
75     }
76
77     public void setNotificationExecutor(final ExecutorService notificationExecutor) {
78         this.notificationExecutor = notificationExecutor;
79     }
80
81     private AbstractDataReadRouter<P, D> dataReadRouter;
82
83     private final AtomicLong submittedTransactionsCount = new AtomicLong();
84
85     private final AtomicLong failedTransactionsCount = new AtomicLong();
86
87     private final AtomicLong finishedTransactionsCount = new AtomicLong();
88
89     public AbstractDataReadRouter<P, D> getDataReadRouter() {
90         return this.dataReadRouter;
91     }
92
93     public void setDataReadRouter(final AbstractDataReadRouter<P, D> dataReadRouter) {
94         this.dataReadRouter = dataReadRouter;
95     }
96
97     public AtomicLong getSubmittedTransactionsCount() {
98         return this.submittedTransactionsCount;
99     }
100
101     public AtomicLong getFailedTransactionsCount() {
102         return this.failedTransactionsCount;
103     }
104
105     public AtomicLong getFinishedTransactionsCount() {
106         return this.finishedTransactionsCount;
107     }
108
109     private final Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = Multimaps
110             .synchronizedSetMultimap(HashMultimap.<P, DataChangeListenerRegistration<P, D, DCL>> create());
111
112     private final Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = Multimaps
113             .synchronizedSetMultimap(HashMultimap.<P, DataCommitHandlerRegistrationImpl<P, D>> create());
114
115     private final Lock registrationLock = new ReentrantLock();
116
117     private final ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P, D>>> commitHandlerRegistrationListeners = new ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P, D>>>();
118
119     public AbstractDataBroker() {
120     }
121
122     protected ImmutableList<DataCommitHandler<P, D>> affectedCommitHandlers(final Set<P> paths) {
123         final Supplier<ImmutableList<DataCommitHandler<P, D>>> _function = new Supplier<ImmutableList<DataCommitHandler<P, D>>>() {
124             @Override
125             public ImmutableList<DataCommitHandler<P, D>> get() {
126                 Map<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> _asMap = commitHandlers.asMap();
127                 Set<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _entrySet = _asMap.entrySet();
128                 FluentIterable<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _from = FluentIterable
129                         .<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> from(_entrySet);
130                 final Predicate<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _function = new Predicate<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>>() {
131                     @Override
132                     public boolean apply(final Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> it) {
133                         P _key = it.getKey();
134                         boolean _isAffectedBy = isAffectedBy(_key, paths);
135                         return _isAffectedBy;
136                     }
137                 };
138                 FluentIterable<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _filter = _from
139                         .filter(_function);
140                 final Function<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>, Collection<DataCommitHandlerRegistrationImpl<P, D>>> _function_1 = new Function<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>, Collection<DataCommitHandlerRegistrationImpl<P, D>>>() {
141                     @Override
142                     public Collection<DataCommitHandlerRegistrationImpl<P, D>> apply(
143                             final Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> it) {
144                         Collection<DataCommitHandlerRegistrationImpl<P, D>> _value = it.getValue();
145                         return _value;
146                     }
147                 };
148                 FluentIterable<DataCommitHandlerRegistrationImpl<P, D>> _transformAndConcat = _filter
149                         .<DataCommitHandlerRegistrationImpl<P, D>> transformAndConcat(_function_1);
150                 final Function<DataCommitHandlerRegistrationImpl<P, D>, DataCommitHandler<P, D>> _function_2 = new Function<DataCommitHandlerRegistrationImpl<P, D>, DataCommitHandler<P, D>>() {
151                     @Override
152                     public DataCommitHandler<P, D> apply(final DataCommitHandlerRegistrationImpl<P, D> it) {
153                         DataCommitHandler<P, D> _instance = it.getInstance();
154                         return _instance;
155                     }
156                 };
157                 FluentIterable<DataCommitHandler<P, D>> _transform = _transformAndConcat
158                         .<DataCommitHandler<P, D>> transform(_function_2);
159                 return _transform.toList();
160             }
161         };
162         return AbstractDataBroker.<ImmutableList<DataCommitHandler<P, D>>> withLock(this.registrationLock, _function);
163     }
164
165     protected ImmutableList<DataCommitHandler<P, D>> probablyAffectedCommitHandlers(final HashSet<P> paths) {
166         final Supplier<ImmutableList<DataCommitHandler<P, D>>> _function = new Supplier<ImmutableList<DataCommitHandler<P, D>>>() {
167             @Override
168             public ImmutableList<DataCommitHandler<P, D>> get() {
169                 Map<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> _asMap = commitHandlers.asMap();
170                 Set<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _entrySet = _asMap.entrySet();
171                 FluentIterable<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _from = FluentIterable
172                         .<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> from(_entrySet);
173                 final Predicate<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _function = new Predicate<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>>() {
174                     @Override
175                     public boolean apply(final Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> it) {
176                         P _key = it.getKey();
177                         boolean _isProbablyAffectedBy = isProbablyAffectedBy(_key, paths);
178                         return _isProbablyAffectedBy;
179                     }
180                 };
181                 FluentIterable<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _filter = _from
182                         .filter(_function);
183                 final Function<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>, Collection<DataCommitHandlerRegistrationImpl<P, D>>> _function_1 = new Function<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>, Collection<DataCommitHandlerRegistrationImpl<P, D>>>() {
184                     @Override
185                     public Collection<DataCommitHandlerRegistrationImpl<P, D>> apply(
186                             final Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> it) {
187                         Collection<DataCommitHandlerRegistrationImpl<P, D>> _value = it.getValue();
188                         return _value;
189                     }
190                 };
191                 FluentIterable<DataCommitHandlerRegistrationImpl<P, D>> _transformAndConcat = _filter
192                         .<DataCommitHandlerRegistrationImpl<P, D>> transformAndConcat(_function_1);
193                 final Function<DataCommitHandlerRegistrationImpl<P, D>, DataCommitHandler<P, D>> _function_2 = new Function<DataCommitHandlerRegistrationImpl<P, D>, DataCommitHandler<P, D>>() {
194                     @Override
195                     public DataCommitHandler<P, D> apply(final DataCommitHandlerRegistrationImpl<P, D> it) {
196                         DataCommitHandler<P, D> _instance = it.getInstance();
197                         return _instance;
198                     }
199                 };
200                 FluentIterable<DataCommitHandler<P, D>> _transform = _transformAndConcat
201                         .<DataCommitHandler<P, D>> transform(_function_2);
202                 return _transform.toList();
203             }
204         };
205         return AbstractDataBroker.<ImmutableList<DataCommitHandler<P, D>>> withLock(this.registrationLock, _function);
206     }
207
208     protected Map<P, D> deepGetBySubpath(final Map<P, D> dataSet, final P path) {
209         return Collections.<P, D> emptyMap();
210     }
211
212     @Override
213     public final D readConfigurationData(final P path) {
214         AbstractDataReadRouter<P, D> _dataReadRouter = this.getDataReadRouter();
215         return _dataReadRouter.readConfigurationData(path);
216     }
217
218     @Override
219     public final D readOperationalData(final P path) {
220         AbstractDataReadRouter<P, D> _dataReadRouter = this.getDataReadRouter();
221         return _dataReadRouter.readOperationalData(path);
222     }
223
224     private static <T extends Object> T withLock(final Lock lock, final Supplier<T> method) {
225         lock.lock();
226         try {
227             return method.get();
228         } finally {
229             lock.unlock();
230         }
231     }
232
233     @Override
234     public final Registration registerCommitHandler(final P path,
235             final DataCommitHandler<P, D> commitHandler) {
236         synchronized (commitHandler) {
237             final DataCommitHandlerRegistrationImpl<P, D> registration = new DataCommitHandlerRegistrationImpl<P, D>(
238                     path, commitHandler, this);
239             commitHandlers.put(path, registration);
240             LOG.trace("Registering Commit Handler {} for path: {}", commitHandler, path);
241             for (final ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<P, D>>> listener : commitHandlerRegistrationListeners) {
242                 try {
243                     listener.getInstance().onRegister(registration);
244                 } catch (Exception e) {
245                     LOG.error("Unexpected exception in listener {} during invoking onRegister", listener.getInstance(),
246                             e);
247                 }
248             }
249             return registration;
250         }
251     }
252
253     @Override
254     public final ListenerRegistration<DCL> registerDataChangeListener(final P path, final DCL listener) {
255         synchronized (listeners) {
256             final DataChangeListenerRegistration<P, D, DCL> reg = new DataChangeListenerRegistration<P, D, DCL>(path,
257                     listener, AbstractDataBroker.this);
258             listeners.put(path, reg);
259             final D initialConfig = getDataReadRouter().readConfigurationData(path);
260             final D initialOperational = getDataReadRouter().readOperationalData(path);
261             final DataChangeEvent<P, D> event = createInitialListenerEvent(path, initialConfig, initialOperational);
262             listener.onDataChanged(event);
263             return reg;
264         }
265     }
266
267     public final CompositeObjectRegistration<DataReader<P, D>> registerDataReader(final P path,
268             final DataReader<P, D> reader) {
269
270         final Registration confReg = getDataReadRouter().registerConfigurationReader(path, reader);
271         final Registration dataReg = getDataReadRouter().registerOperationalReader(path, reader);
272         return new CompositeObjectRegistration<DataReader<P, D>>(reader, Arrays.asList(confReg, dataReg));
273     }
274
275     @Override
276     public ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<P, D>>> registerCommitHandlerListener(
277             final RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {
278         final ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<P, D>>> ret = this.commitHandlerRegistrationListeners
279                 .register(commitHandlerListener);
280         return ret;
281     }
282
283     protected DataChangeEvent<P, D> createInitialListenerEvent(final P path, final D initialConfig,
284             final D initialOperational) {
285         InitialDataChangeEventImpl<P, D> _initialDataChangeEventImpl = new InitialDataChangeEventImpl<P, D>(
286                 initialConfig, initialOperational);
287         return _initialDataChangeEventImpl;
288     }
289
290     protected final void removeListener(final DataChangeListenerRegistration<P, D, DCL> registration) {
291         synchronized (listeners) {
292             listeners.remove(registration.getPath(), registration);
293         }
294     }
295
296     protected final void removeCommitHandler(final DataCommitHandlerRegistrationImpl<P, D> registration) {
297         synchronized (commitHandlers) {
298
299             commitHandlers.remove(registration.getPath(), registration);
300             LOG.trace("Removing Commit Handler {} for path: {}", registration.getInstance(), registration.getPath());
301             for (final ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<P, D>>> listener : commitHandlerRegistrationListeners) {
302                 try {
303                     listener.getInstance().onUnregister(registration);
304                 } catch (Exception e) {
305                     LOG.error("Unexpected exception in listener {} during invoking onUnregister",
306                             listener.getInstance(), e);
307                 }
308             }
309         }
310
311     }
312
313     protected final Collection<Entry<P, DataCommitHandlerRegistrationImpl<P, D>>> getActiveCommitHandlers() {
314         return commitHandlers.entries();
315     }
316
317     protected ImmutableList<ListenerStateCapture<P, D, DCL>> affectedListeners(final Set<P> paths) {
318
319         synchronized (listeners) {
320             return FluentIterable //
321                     .from(listeners.asMap().entrySet()) //
322                     .filter(new Predicate<Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>>>() {
323                         @Override
324                         public boolean apply(final Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>> it) {
325                             return isAffectedBy(it.getKey(), paths);
326                         }
327                     }) //
328                     .transform(
329                             new Function<Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>>, ListenerStateCapture<P, D, DCL>>() {
330                                 @Override
331                                 public ListenerStateCapture<P, D, DCL> apply(
332                                         final Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>> it) {
333                                     return new ListenerStateCapture<P, D, DCL>(it.getKey(), it.getValue(),
334                                             createContainsPredicate(it.getKey()));
335                                 }
336                             }) //
337                     .toList();
338         }
339     }
340
341     protected ImmutableList<ListenerStateCapture<P, D, DCL>> probablyAffectedListeners(final Set<P> paths) {
342         synchronized (listeners) {
343             return FluentIterable //
344                     .from(listeners.asMap().entrySet()) //
345                     .filter(new Predicate<Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>>>() {
346                         @Override
347                         public boolean apply(final Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>> it) {
348                             return isProbablyAffectedBy(it.getKey(), paths);
349                         }
350                     }) //
351                     .transform(
352                             new Function<Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>>, ListenerStateCapture<P, D, DCL>>() {
353                                 @Override
354                                 public ListenerStateCapture<P, D, DCL> apply(
355                                         final Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>> it) {
356                                     return new ListenerStateCapture<P, D, DCL>(it.getKey(), it.getValue(),
357                                             createIsContainedPredicate(it.getKey()));
358                                 }
359                             }) //
360                     .toList();
361         }
362     }
363
364     protected Predicate<P> createContainsPredicate(final P key) {
365         return new Predicate<P>() {
366             @Override
367             public boolean apply(final P other) {
368                 return key.contains(other);
369             }
370         };
371     }
372
373     protected Predicate<P> createIsContainedPredicate(final P key) {
374         return new Predicate<P>() {
375             @Override
376             public boolean apply(final P other) {
377                 return other.contains(key);
378             }
379         };
380     }
381
382     protected boolean isAffectedBy(final P key, final Set<P> paths) {
383         final Predicate<P> contains = this.createContainsPredicate(key);
384         if (paths.contains(key)) {
385             return true;
386         }
387         for (final P path : paths) {
388             if (contains.apply(path)) {
389                 return true;
390             }
391         }
392         return false;
393     }
394
395     protected boolean isProbablyAffectedBy(final P key, final Set<P> paths) {
396         final Predicate<P> isContained = this.createIsContainedPredicate(key);
397         for (final P path : paths) {
398             if (isContained.apply(path)) {
399                 return true;
400             }
401         }
402         return false;
403     }
404
405     final Future<RpcResult<TransactionStatus>> commit(final AbstractDataTransaction<P, D> transaction) {
406         Preconditions.checkNotNull(transaction);
407         final TwoPhaseCommit<P, D, DCL> task = new TwoPhaseCommit<P, D, DCL>(transaction, this);
408
409         this.getSubmittedTransactionsCount().getAndIncrement();
410         return this.getExecutor().submit(task);
411     }
412
413     private static class DataCommitHandlerRegistrationImpl<P extends Path<P>, D extends Object> //
414             extends AbstractObjectRegistration<DataCommitHandler<P, D>> //
415             implements DataCommitHandlerRegistration<P, D> {
416
417         private AbstractDataBroker<P, D, ? extends Object> dataBroker;
418         private final P path;
419
420         @Override
421         public P getPath() {
422             return this.path;
423         }
424
425         public DataCommitHandlerRegistrationImpl(final P path, final DataCommitHandler<P, D> instance,
426                 final AbstractDataBroker<P, D, ? extends Object> broker) {
427             super(instance);
428             this.dataBroker = broker;
429             this.path = path;
430         }
431
432         @Override
433         protected void removeRegistration() {
434             this.dataBroker.removeCommitHandler(this);
435             this.dataBroker = null;
436         }
437     }
438 }