2c3b0188f48096d34ea45485fe34ec1a1711efe1
[controller.git] / opendaylight / md-sal / sal-common-impl / src / main / java / org / opendaylight / controller / md / sal / common / impl / service / AbstractDataBroker.xtend
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.common.impl.service\r
9 \r
10 import com.google.common.collect.FluentIterable\r
11 import com.google.common.collect.HashMultimap\r
12 import com.google.common.collect.ImmutableList\r
13 import com.google.common.collect.Multimap\r
14 import java.util.ArrayList\r
15 import java.util.Arrays\r
16 import java.util.Collection\r
17 import java.util.Collections\r
18 import java.util.HashSet\r
19 import java.util.List\r
20 import java.util.Set\r
21 import java.util.concurrent.Callable\r
22 import java.util.concurrent.ExecutorService\r
23 import java.util.concurrent.Future\r
24 import java.util.concurrent.atomic.AtomicLong\r
25 import org.opendaylight.controller.md.sal.common.api.RegistrationListener\r
26 import org.opendaylight.controller.md.sal.common.api.TransactionStatus\r
27 import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener\r
28 import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher\r
29 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler\r
30 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction\r
31 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration\r
32 import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory\r
33 import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService\r
34 import org.opendaylight.controller.md.sal.common.api.data.DataReader\r
35 import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification\r
36 import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter\r
37 import org.opendaylight.controller.sal.common.util.Rpcs\r
38 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration\r
39 import org.opendaylight.yangtools.concepts.CompositeObjectRegistration\r
40 import org.opendaylight.yangtools.concepts.ListenerRegistration\r
41 import org.opendaylight.yangtools.concepts.Path\r
42 import org.opendaylight.yangtools.concepts.util.ListenerRegistry\r
43 import org.opendaylight.yangtools.yang.common.RpcResult\r
44 import org.slf4j.LoggerFactory\r
45 \r
46 import static com.google.common.base.Preconditions.*\rimport org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent
47 import com.google.common.collect.Multimaps
48 import java.util.concurrent.locks.Lock
49 import java.util.concurrent.locks.ReentrantLock
50
51 abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //\r
52 DataReader<P, D>, //\r
53 DataChangePublisher<P, D, DCL>, //\r
54 DataProvisionService<P, D> {\r
55 \r
56     private static val LOG = LoggerFactory.getLogger(AbstractDataBroker);\r
57 \r
58     @Property\r
59     var ExecutorService executor;\r
60 \r
61     @Property\r
62     var AbstractDataReadRouter<P, D> dataReadRouter;\r
63     \r
64     @Property\r
65     private val AtomicLong submittedTransactionsCount = new AtomicLong;\r
66     \r
67     @Property\r
68     private val AtomicLong failedTransactionsCount = new AtomicLong\r
69     \r
70     @Property\r
71     private val AtomicLong finishedTransactionsCount = new AtomicLong\r
72 \r
73     Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create());\r
74     Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = Multimaps.synchronizedSetMultimap(HashMultimap.create());\r
75     
76     private val Lock registrationLock = new ReentrantLock;
77     \r
78     val ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P,D>>> commitHandlerRegistrationListeners = new ListenerRegistry();\r
79     public new() {\r
80     }\r
81 \r
82     protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(\r
83         HashSet<P> paths) {
84         return withLock(registrationLock) [|\r
85             return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //\r
86                 .transformAndConcat[value] //\r
87                 .transform[instance].toList()
88         ]\r
89     }\r
90 \r
91     override final readConfigurationData(P path) {\r
92         return dataReadRouter.readConfigurationData(path);\r
93     }\r
94 \r
95     override final readOperationalData(P path) {\r
96         return dataReadRouter.readOperationalData(path);\r
97     }
98     
99     private static def <T> withLock(Lock lock,Callable<T> method) {
100         lock.lock
101         try {
102             return method.call
103         } finally {
104             lock.unlock
105         }
106     } \r
107 \r
108     override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {
109         return withLock(registrationLock) [|\r
110             val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);\r
111             commitHandlers.put(path, registration)\r
112             LOG.trace("Registering Commit Handler {} for path: {}",commitHandler,path);\r
113             for(listener : commitHandlerRegistrationListeners) {\r
114                 try {\r
115                     listener.instance.onRegister(registration);\r
116                 } catch (Exception e) {\r
117                     LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);\r
118                 }\r
119             }
120             return registration;
121         ]\r
122     }\r
123 \r
124     override final def registerDataChangeListener(P path, DCL listener) {\r
125         return withLock(registrationLock) [|
126             val reg = new DataChangeListenerRegistration(path, listener, this);\r
127             listeners.put(path, reg);\r
128             val initialConfig = dataReadRouter.readConfigurationData(path);\r
129             val initialOperational = dataReadRouter.readOperationalData(path);\r
130             val event = createInitialListenerEvent(path,initialConfig,initialOperational);\r
131             listener.onDataChanged(event);\r
132             return reg;
133         ]\r
134     }\r
135 \r
136     final def registerDataReader(P path, DataReader<P, D> reader) {\r
137         return withLock(registrationLock) [|\r
138             val confReg = dataReadRouter.registerConfigurationReader(path, reader);\r
139             val dataReg = dataReadRouter.registerOperationalReader(path, reader);\r
140     \r
141             return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
142         ]\r
143     }\r
144     \r
145     override registerCommitHandlerListener(RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {\r
146         val ret = commitHandlerRegistrationListeners.register(commitHandlerListener);\r
147         return ret;\r
148     }\r
149     \r
150     protected  def DataChangeEvent<P,D> createInitialListenerEvent(P path,D initialConfig,D initialOperational) {\r
151         return new InitialDataChangeEventImpl<P, D>(initialConfig,initialOperational);\r
152         \r
153     }\r
154 \r
155     protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {
156         return withLock(registrationLock) [|\r
157             listeners.remove(registration.path, registration);
158         ]\r
159     }\r
160 \r
161     protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl<P, D> registration) {\r
162         return withLock(registrationLock) [|
163             commitHandlers.remove(registration.path, registration);\r
164              LOG.trace("Removing Commit Handler {} for path: {}",registration.instance,registration.path);\r
165             for(listener : commitHandlerRegistrationListeners) {\r
166                 try {\r
167                     listener.instance.onUnregister(registration);\r
168                 } catch (Exception e) {\r
169                     LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);\r
170                 }\r
171             }
172             return null;
173         ]\r
174     }\r
175 \r
176     protected final def getActiveCommitHandlers() {\r
177         return commitHandlers.entries;\r
178     }\r
179 \r
180     protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(\r
181         HashSet<P> paths) {
182         return withLock(registrationLock) [|\r
183             return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [\r
184                 val operationalState = readOperationalData(key)\r
185                 val configurationState = readConfigurationData(key)\r
186                 return new ListenerStateCapture(key, value, operationalState, configurationState)\r
187             ].toList()
188         ]\r
189     }\r
190 \r
191     protected def boolean isAffectedBy(P key, Set<P> paths) {\r
192         if (paths.contains(key)) {\r
193             return true;\r
194         }\r
195         for (path : paths) {\r
196             if (key.contains(path)) {\r
197                 return true;\r
198             }\r
199         }\r
200 \r
201         return false;\r
202     }\r
203 \r
204     package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P, D> transaction) {\r
205         checkNotNull(transaction);\r
206         transaction.changeStatus(TransactionStatus.SUBMITED);\r
207         val task = new TwoPhaseCommit(transaction, this);\r
208         submittedTransactionsCount.andIncrement;\r
209         return executor.submit(task);\r
210     }\r
211 \r
212 }\r
213 \r
214 @Data\r
215 package class ListenerStateCapture<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> {\r
216 \r
217     @Property\r
218     P path;\r
219 \r
220     @Property\r
221     Collection<DataChangeListenerRegistration<P, D, DCL>> listeners;\r
222 \r
223     @Property\r
224     D initialOperationalState;\r
225 \r
226     @Property\r
227     D initialConfigurationState;\r
228 }\r
229 \r
230 package class DataChangeListenerRegistration<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {\r
231 \r
232     AbstractDataBroker<P, D, DCL> dataBroker;\r
233 \r
234     @Property\r
235     val P path;\r
236 \r
237     new(P path, DCL instance, AbstractDataBroker<P, D, DCL> broker) {\r
238         super(instance)\r
239         dataBroker = broker;\r
240         _path = path;\r
241     }\r
242 \r
243     override protected removeRegistration() {\r
244         dataBroker.removeListener(this);\r
245         dataBroker = null;\r
246     }\r
247 \r
248 }\r
249 \r
250 package class DataCommitHandlerRegistrationImpl<P extends Path<P>, D> //\r
251 extends AbstractObjectRegistration<DataCommitHandler<P, D>> //\r
252 implements DataCommitHandlerRegistration<P, D> {\r
253 \r
254     AbstractDataBroker<P, D, ?> dataBroker;\r
255 \r
256     @Property\r
257     val P path;\r
258 \r
259     new(P path, DataCommitHandler<P, D> instance, AbstractDataBroker<P, D, ?> broker) {\r
260         super(instance)\r
261         dataBroker = broker;\r
262         _path = path;\r
263     }\r
264 \r
265     override protected removeRegistration() {\r
266         dataBroker.removeCommitHandler(this);\r
267         dataBroker = null;\r
268     }\r
269 }\r
270 \r
271 package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {\r
272 \r
273     private static val log = LoggerFactory.getLogger(TwoPhaseCommit);\r
274 \r
275     val AbstractDataTransaction<P, D> transaction;\r
276     val AbstractDataBroker<P, D, DCL> dataBroker;\r
277     \r
278     new(AbstractDataTransaction<P, D> transaction, AbstractDataBroker<P, D, DCL> broker) {\r
279         this.transaction = transaction;\r
280         this.dataBroker = broker;\r
281     }\r
282 \r
283     override call() throws Exception {\r
284 \r
285         // get affected paths\r
286         val affectedPaths = new HashSet<P>();\r
287 \r
288         affectedPaths.addAll(transaction.createdConfigurationData.keySet);\r
289         affectedPaths.addAll(transaction.updatedConfigurationData.keySet);\r
290         affectedPaths.addAll(transaction.removedConfigurationData);\r
291 \r
292         affectedPaths.addAll(transaction.createdOperationalData.keySet);\r
293         affectedPaths.addAll(transaction.updatedOperationalData.keySet);\r
294         affectedPaths.addAll(transaction.removedOperationalData);\r
295
296         val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);\r
297 \r
298         val transactionId = transaction.identifier;\r
299 \r
300         log.trace("Transaction: {} Started.",transactionId);\r
301         log.trace("Transaction: {} Affected Subtrees:",transactionId,affectedPaths);
302         // requesting commits\r
303         val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);\r
304         val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();\r
305         try {\r
306             for (handler : commitHandlers) {\r
307                 handlerTransactions.add(handler.requestCommit(transaction));\r
308             }\r
309         } catch (Exception e) {\r
310             log.error("Transaction: {} Request Commit failed", transactionId,e);\r
311             dataBroker.failedTransactionsCount.andIncrement\r
312             transaction.changeStatus(TransactionStatus.FAILED)
313             return rollback(handlerTransactions, e);\r
314         }\r
315         val List<RpcResult<Void>> results = new ArrayList();\r
316         try {\r
317             for (subtransaction : handlerTransactions) {\r
318                 results.add(subtransaction.finish());\r
319             }\r
320             listeners.publishDataChangeEvent();\r
321         } catch (Exception e) {\r
322             log.error("Transaction: {} Finish Commit failed",transactionId, e);\r
323             dataBroker.failedTransactionsCount.andIncrement
324             transaction.changeStatus(TransactionStatus.FAILED)\r
325             return rollback(handlerTransactions, e);\r
326         }\r
327         log.trace("Transaction: {} Finished successfully.",transactionId);\r
328         dataBroker.finishedTransactionsCount.andIncrement;
329         transaction.changeStatus(TransactionStatus.COMMITED)\r
330         return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());\r
331 \r
332     }\r
333 \r
334     def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {\r
335         for (listenerSet : listeners) {\r
336             val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);\r
337             val updatedOperational = dataBroker.readOperationalData(listenerSet.path);\r
338 \r
339             val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,\r
340                 listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);\r
341             for (listener : listenerSet.listeners) {\r
342                 try {\r
343                     listener.instance.onDataChanged(changeEvent);\r
344 \r
345                 } catch (Exception e) {\r
346                     e.printStackTrace();\r
347                 }\r
348             }\r
349         }\r
350     }\r
351 \r
352     def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {\r
353         for (transaction : transactions) {\r
354             transaction.rollback()\r
355         }\r
356 \r
357         // FIXME return encountered error.\r
358         return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());\r
359     }\r
360 }\r
361 \r
362 public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {\r
363 \r
364     private static val LOG = LoggerFactory.getLogger(AbstractDataTransaction);
365
366     @Property\r
367     private val Object identifier;\r
368 \r
369     var TransactionStatus status;\r
370 \r
371     var AbstractDataBroker<P, D, ?> broker;\r
372 \r
373     protected new(Object identifier,AbstractDataBroker<P, D, ?> dataBroker) {\r
374         super(dataBroker);\r
375         _identifier = identifier;\r
376         broker = dataBroker;\r
377         status = TransactionStatus.NEW;\r
378         LOG.debug("Transaction {} Allocated.", identifier);
379 \r
380     //listeners = new ListenerRegistry<>();\r
381     }\r
382 \r
383     override commit() {\r
384         return broker.commit(this);\r
385     }\r
386 \r
387     override readConfigurationData(P path) {\r
388         val local = this.updatedConfigurationData.get(path);\r
389         if(local != null) {\r
390             return local;\r
391         }\r
392         \r
393         return broker.readConfigurationData(path);\r
394     }\r
395 \r
396     override readOperationalData(P path) {\r
397         val local = this.updatedOperationalData.get(path);\r
398         if(local != null) {\r
399             return local;\r
400         }\r
401         return broker.readOperationalData(path);\r
402     }\r
403 \r
404     override hashCode() {\r
405         return identifier.hashCode;\r
406     }\r
407 \r
408     override equals(Object obj) {\r
409         if (this === obj)\r
410             return true;\r
411         if (obj == null)\r
412             return false;\r
413         if (getClass() != obj.getClass())\r
414             return false;\r
415         val other = (obj as AbstractDataTransaction<P,D>);\r
416         if (broker == null) {\r
417             if (other.broker != null)\r
418                 return false;\r
419         } else if (!broker.equals(other.broker))\r
420             return false;\r
421         if (identifier == null) {\r
422             if (other.identifier != null)\r
423                 return false;\r
424         } else if (!identifier.equals(other.identifier))\r
425             return false;\r
426         return true;\r
427     }\r
428 \r
429     override TransactionStatus getStatus() {\r
430         return status;\r
431     }\r
432 \r
433     protected abstract def void onStatusChange(TransactionStatus status);\r
434 \r
435     public def changeStatus(TransactionStatus status) {\r
436         LOG.debug("Transaction {} transitioned from {} to {}", identifier, this.status, status);
437         this.status = status;\r
438         onStatusChange(status);\r
439     }\r
440 \r
441 }\r

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.