7c6f52f110fd1771650c9670c43a8136d8999a2b
[controller.git] / opendaylight / md-sal / sal-common-impl / src / main / java / org / opendaylight / controller / md / sal / common / impl / service / AbstractDataBroker.xtend
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.common.impl.service\r
9 \r
10 import com.google.common.collect.FluentIterable\r
11 import com.google.common.collect.HashMultimap\r
12 import com.google.common.collect.ImmutableList\r
13 import com.google.common.collect.Multimap\r
14 import java.util.ArrayList\r
15 import java.util.Arrays\r
16 import java.util.Collection\r
17 import java.util.Collections\r
18 import java.util.HashSet\r
19 import java.util.List\r
20 import java.util.Set\r
21 import java.util.concurrent.Callable\r
22 import java.util.concurrent.ExecutorService\r
23 import java.util.concurrent.Future\r
24 import java.util.concurrent.atomic.AtomicLong\r
25 import org.opendaylight.controller.md.sal.common.api.RegistrationListener\r
26 import org.opendaylight.controller.md.sal.common.api.TransactionStatus\r
27 import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener\r
28 import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher\r
29 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler\r
30 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction\r
31 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration\r
32 import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory\r
33 import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService\r
34 import org.opendaylight.controller.md.sal.common.api.data.DataReader\r
35 import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification\r
36 import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter\r
37 import org.opendaylight.controller.sal.common.util.Rpcs\r
38 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration\r
39 import org.opendaylight.yangtools.concepts.CompositeObjectRegistration\r
40 import org.opendaylight.yangtools.concepts.ListenerRegistration\r
41 import org.opendaylight.yangtools.concepts.Path\r
42 import org.opendaylight.yangtools.concepts.util.ListenerRegistry\r
43 import org.opendaylight.yangtools.yang.common.RpcResult\r
44 import org.slf4j.LoggerFactory\r
45 \r
46 import static com.google.common.base.Preconditions.*\rimport org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent
47 import com.google.common.collect.Multimaps
48 import java.util.concurrent.locks.Lock
49 import java.util.concurrent.locks.ReentrantLock
50
51 abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //\r
52 DataReader<P, D>, //\r
53 DataChangePublisher<P, D, DCL>, //\r
54 DataProvisionService<P, D> {\r
55 \r
56     private static val LOG = LoggerFactory.getLogger(AbstractDataBroker);\r
57 \r
58     @Property\r
59     var ExecutorService executor;\r
60 \r
61     @Property\r
62     var AbstractDataReadRouter<P, D> dataReadRouter;\r
63     \r
64     @Property\r
65     private val AtomicLong submittedTransactionsCount = new AtomicLong;\r
66     \r
67     @Property\r
68     private val AtomicLong failedTransactionsCount = new AtomicLong\r
69     \r
70     @Property\r
71     private val AtomicLong finishedTransactionsCount = new AtomicLong\r
72 \r
73     Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create());\r
74     Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = Multimaps.synchronizedSetMultimap(HashMultimap.create());\r
75     
76     private val Lock registrationLock = new ReentrantLock;
77     \r
78     val ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P,D>>> commitHandlerRegistrationListeners = new ListenerRegistry();\r
79     public new() {\r
80     }\r
81 \r
82     protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(\r
83         HashSet<P> paths) {
84         return withLock(registrationLock) [|\r
85             return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //\r
86                 .transformAndConcat[value] //\r
87                 .transform[instance].toList()
88         ]\r
89     }\r
90 \r
91     override final readConfigurationData(P path) {\r
92         return dataReadRouter.readConfigurationData(path);\r
93     }\r
94 \r
95     override final readOperationalData(P path) {\r
96         return dataReadRouter.readOperationalData(path);\r
97     }
98     
99     private static def <T> withLock(Lock lock,Callable<T> method) {
100         lock.lock
101         try {
102             return method.call
103         } finally {
104             lock.unlock
105         }
106     } \r
107 \r
108     override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {
109         return withLock(registrationLock) [|\r
110             val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);\r
111             commitHandlers.put(path, registration)\r
112             LOG.trace("Registering Commit Handler {} for path: {}",commitHandler,path);\r
113             for(listener : commitHandlerRegistrationListeners) {\r
114                 try {\r
115                     listener.instance.onRegister(registration);\r
116                 } catch (Exception e) {\r
117                     LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);\r
118                 }\r
119             }
120             return registration;
121         ]\r
122     }\r
123 \r
124     override final def registerDataChangeListener(P path, DCL listener) {\r
125         return withLock(registrationLock) [|
126             val reg = new DataChangeListenerRegistration(path, listener, this);\r
127             listeners.put(path, reg);\r
128             val initialConfig = dataReadRouter.readConfigurationData(path);\r
129             val initialOperational = dataReadRouter.readOperationalData(path);\r
130             val event = createInitialListenerEvent(path,initialConfig,initialOperational);\r
131             listener.onDataChanged(event);\r
132             return reg;
133         ]\r
134     }\r
135 \r
136     final def registerDataReader(P path, DataReader<P, D> reader) {\r
137         return withLock(registrationLock) [|\r
138             val confReg = dataReadRouter.registerConfigurationReader(path, reader);\r
139             val dataReg = dataReadRouter.registerOperationalReader(path, reader);\r
140     \r
141             return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
142         ]\r
143     }\r
144     \r
145     override registerCommitHandlerListener(RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {\r
146         val ret = commitHandlerRegistrationListeners.register(commitHandlerListener);\r
147         return ret;\r
148     }\r
149     \r
150     protected  def DataChangeEvent<P,D> createInitialListenerEvent(P path,D initialConfig,D initialOperational) {\r
151         return new InitialDataChangeEventImpl<P, D>(initialConfig,initialOperational);\r
152         \r
153     }\r
154 \r
155     protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {
156         return withLock(registrationLock) [|\r
157             listeners.remove(registration.path, registration);
158         ]\r
159     }\r
160 \r
161     protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl<P, D> registration) {\r
162         return withLock(registrationLock) [|
163             commitHandlers.remove(registration.path, registration);\r
164              LOG.trace("Removing Commit Handler {} for path: {}",registration.instance,registration.path);\r
165             for(listener : commitHandlerRegistrationListeners) {\r
166                 try {\r
167                     listener.instance.onUnregister(registration);\r
168                 } catch (Exception e) {\r
169                     LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);\r
170                 }\r
171             }
172             return null;
173         ]\r
174     }\r
175 \r
176     protected final def getActiveCommitHandlers() {\r
177         return commitHandlers.entries;\r
178     }\r
179 \r
180     protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(\r
181         HashSet<P> paths) {
182         return withLock(registrationLock) [|\r
183             return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [\r
184                 val operationalState = readOperationalData(key)\r
185                 val configurationState = readConfigurationData(key)\r
186                 return new ListenerStateCapture(key, value, operationalState, configurationState)\r
187             ].toList()
188         ]\r
189     }\r
190 \r
191     protected def boolean isAffectedBy(P key, Set<P> paths) {\r
192         if (paths.contains(key)) {\r
193             return true;\r
194         }\r
195         for (path : paths) {\r
196             if (key.contains(path)) {\r
197                 return true;\r
198             }\r
199         }\r
200 \r
201         return false;\r
202     }\r
203 \r
204     package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P, D> transaction) {\r
205         checkNotNull(transaction);\r
206         transaction.changeStatus(TransactionStatus.SUBMITED);\r
207         val task = new TwoPhaseCommit(transaction, this);\r
208         submittedTransactionsCount.andIncrement;\r
209         return executor.submit(task);\r
210     }\r
211 \r
212 }\r
213 \r
214 @Data\r
215 package class ListenerStateCapture<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> {\r
216 \r
217     @Property\r
218     P path;\r
219 \r
220     @Property\r
221     Collection<DataChangeListenerRegistration<P, D, DCL>> listeners;\r
222 \r
223     @Property\r
224     D initialOperationalState;\r
225 \r
226     @Property\r
227     D initialConfigurationState;\r
228 }\r
229 \r
230 package class DataChangeListenerRegistration<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {\r
231 \r
232     AbstractDataBroker<P, D, DCL> dataBroker;\r
233 \r
234     @Property\r
235     val P path;\r
236 \r
237     new(P path, DCL instance, AbstractDataBroker<P, D, DCL> broker) {\r
238         super(instance)\r
239         dataBroker = broker;\r
240         _path = path;\r
241     }\r
242 \r
243     override protected removeRegistration() {\r
244         dataBroker.removeListener(this);\r
245         dataBroker = null;\r
246     }\r
247 \r
248 }\r
249 \r
250 package class DataCommitHandlerRegistrationImpl<P extends Path<P>, D> //\r
251 extends AbstractObjectRegistration<DataCommitHandler<P, D>> //\r
252 implements DataCommitHandlerRegistration<P, D> {\r
253 \r
254     AbstractDataBroker<P, D, ?> dataBroker;\r
255 \r
256     @Property\r
257     val P path;\r
258 \r
259     new(P path, DataCommitHandler<P, D> instance, AbstractDataBroker<P, D, ?> broker) {\r
260         super(instance)\r
261         dataBroker = broker;\r
262         _path = path;\r
263     }\r
264 \r
265     override protected removeRegistration() {\r
266         dataBroker.removeCommitHandler(this);\r
267         dataBroker = null;\r
268     }\r
269 }\r
270 \r
271 package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {\r
272 \r
273     private static val log = LoggerFactory.getLogger(TwoPhaseCommit);\r
274 \r
275     val AbstractDataTransaction<P, D> transaction;\r
276     val AbstractDataBroker<P, D, DCL> dataBroker;\r
277     \r
278     new(AbstractDataTransaction<P, D> transaction, AbstractDataBroker<P, D, DCL> broker) {\r
279         this.transaction = transaction;\r
280         this.dataBroker = broker;\r
281     }\r
282 \r
283     override call() throws Exception {\r
284 \r
285         // get affected paths\r
286         val affectedPaths = new HashSet<P>();\r
287 \r
288         affectedPaths.addAll(transaction.createdConfigurationData.keySet);\r
289         affectedPaths.addAll(transaction.updatedConfigurationData.keySet);\r
290         affectedPaths.addAll(transaction.removedConfigurationData);\r
291 \r
292         affectedPaths.addAll(transaction.createdOperationalData.keySet);\r
293         affectedPaths.addAll(transaction.updatedOperationalData.keySet);\r
294         affectedPaths.addAll(transaction.removedOperationalData);\r
295
296         val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);\r
297 \r
298         val transactionId = transaction.identifier;\r
299 \r
300         log.trace("Transaction: {} Started.",transactionId);\r
301         log.trace("Transaction: {} Affected Subtrees:",transactionId,affectedPaths);
302         // requesting commits\r
303         val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);\r
304         val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();\r
305         try {\r
306             for (handler : commitHandlers) {\r
307                 handlerTransactions.add(handler.requestCommit(transaction));\r
308             }\r
309         } catch (Exception e) {\r
310             log.error("Transaction: {} Request Commit failed", transactionId,e);\r
311             dataBroker.failedTransactionsCount.andIncrement\r
312             transaction.changeStatus(TransactionStatus.FAILED)
313             return rollback(handlerTransactions, e);\r
314         }\r
315         val List<RpcResult<Void>> results = new ArrayList();\r
316         try {\r
317             for (subtransaction : handlerTransactions) {\r
318                 results.add(subtransaction.finish());\r
319             }\r
320             listeners.publishDataChangeEvent();\r
321         } catch (Exception e) {\r
322             log.error("Transaction: {} Finish Commit failed",transactionId, e);\r
323             dataBroker.failedTransactionsCount.andIncrement
324             transaction.changeStatus(TransactionStatus.FAILED)\r
325             return rollback(handlerTransactions, e);\r
326         }\r
327         log.trace("Transaction: {} Finished successfully.",transactionId);\r
328         dataBroker.finishedTransactionsCount.andIncrement;
329         transaction.changeStatus(TransactionStatus.COMMITED)\r
330         return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());\r
331 \r
332     }\r
333 \r
334     def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {\r
335         dataBroker.executor.submit [|\r
336             for (listenerSet : listeners) {
337                 val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);
338                 val updatedOperational = dataBroker.readOperationalData(listenerSet.path);
339
340                 val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,
341                     listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);
342                 for (listener : listenerSet.listeners) {
343                     try {
344                         listener.instance.onDataChanged(changeEvent);
345
346                     } catch (Exception e) {
347                         e.printStackTrace();
348                     }
349                 }
350             }        \r
351         ]\r
352     }\r
353 \r
354     def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {\r
355         for (transaction : transactions) {\r
356             transaction.rollback()\r
357         }\r
358 \r
359         // FIXME return encountered error.\r
360         return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());\r
361     }\r
362 }\r
363 \r
364 public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {\r
365 \r
366     private static val LOG = LoggerFactory.getLogger(AbstractDataTransaction);
367
368     @Property\r
369     private val Object identifier;\r
370 \r
371     var TransactionStatus status;\r
372 \r
373     var AbstractDataBroker<P, D, ?> broker;\r
374 \r
375     protected new(Object identifier,AbstractDataBroker<P, D, ?> dataBroker) {\r
376         super(dataBroker);\r
377         _identifier = identifier;\r
378         broker = dataBroker;\r
379         status = TransactionStatus.NEW;\r
380         LOG.debug("Transaction {} Allocated.", identifier);
381 \r
382     //listeners = new ListenerRegistry<>();\r
383     }\r
384 \r
385     override commit() {\r
386         return broker.commit(this);\r
387     }\r
388 \r
389     override readConfigurationData(P path) {\r
390         val local = this.updatedConfigurationData.get(path);\r
391         if(local != null) {\r
392             return local;\r
393         }\r
394         \r
395         return broker.readConfigurationData(path);\r
396     }\r
397 \r
398     override readOperationalData(P path) {\r
399         val local = this.updatedOperationalData.get(path);\r
400         if(local != null) {\r
401             return local;\r
402         }\r
403         return broker.readOperationalData(path);\r
404     }\r
405 \r
406     override hashCode() {\r
407         return identifier.hashCode;\r
408     }\r
409 \r
410     override equals(Object obj) {\r
411         if (this === obj)\r
412             return true;\r
413         if (obj == null)\r
414             return false;\r
415         if (getClass() != obj.getClass())\r
416             return false;\r
417         val other = (obj as AbstractDataTransaction<P,D>);\r
418         if (broker == null) {\r
419             if (other.broker != null)\r
420                 return false;\r
421         } else if (!broker.equals(other.broker))\r
422             return false;\r
423         if (identifier == null) {\r
424             if (other.identifier != null)\r
425                 return false;\r
426         } else if (!identifier.equals(other.identifier))\r
427             return false;\r
428         return true;\r
429     }\r
430 \r
431     override TransactionStatus getStatus() {\r
432         return status;\r
433     }\r
434 \r
435     protected abstract def void onStatusChange(TransactionStatus status);\r
436 \r
437     public def changeStatus(TransactionStatus status) {\r
438         LOG.debug("Transaction {} transitioned from {} to {}", identifier, this.status, status);
439         this.status = status;\r
440         onStatusChange(status);\r
441     }\r
442 \r
443 }\r