Moved IT code to separate project, Fix for Bug 184, created test-jar
[controller.git] / opendaylight / md-sal / sal-common-impl / src / main / java / org / opendaylight / controller / md / sal / common / impl / service / AbstractDataBroker.xtend
1 package org.opendaylight.controller.md.sal.common.impl.service\r
2 \r
3 import com.google.common.collect.FluentIterable\r
4 import com.google.common.collect.HashMultimap\r
5 import com.google.common.collect.ImmutableList\r
6 import com.google.common.collect.Multimap\r
7 import java.util.ArrayList\r
8 import java.util.Arrays\r
9 import java.util.Collection\r
10 import java.util.Collections\r
11 import java.util.HashSet\r
12 import java.util.List\r
13 import java.util.Set\r
14 import java.util.concurrent.Callable\r
15 import java.util.concurrent.ExecutorService\r
16 import java.util.concurrent.Future\r
17 import java.util.concurrent.atomic.AtomicLong\r
18 import org.opendaylight.controller.md.sal.common.api.RegistrationListener\r
19 import org.opendaylight.controller.md.sal.common.api.TransactionStatus\r
20 import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener\r
21 import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher\r
22 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler\r
23 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction\r
24 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration\r
25 import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory\r
26 import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService\r
27 import org.opendaylight.controller.md.sal.common.api.data.DataReader\r
28 import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification\r
29 import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter\r
30 import org.opendaylight.controller.sal.common.util.Rpcs\r
31 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration\r
32 import org.opendaylight.yangtools.concepts.CompositeObjectRegistration\r
33 import org.opendaylight.yangtools.concepts.ListenerRegistration\r
34 import org.opendaylight.yangtools.concepts.Path\r
35 import org.opendaylight.yangtools.concepts.util.ListenerRegistry\r
36 import org.opendaylight.yangtools.yang.common.RpcResult\r
37 import org.slf4j.LoggerFactory\r
38 \r
39 import static com.google.common.base.Preconditions.*\rimport org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent
40
41 abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //\r
42 DataReader<P, D>, //\r
43 DataChangePublisher<P, D, DCL>, //\r
44 DataProvisionService<P, D> {\r
45 \r
46     private static val LOG = LoggerFactory.getLogger(AbstractDataBroker);\r
47 \r
48     @Property\r
49     var ExecutorService executor;\r
50 \r
51     @Property\r
52     var AbstractDataReadRouter<P, D> dataReadRouter;\r
53     \r
54     @Property\r
55     private val AtomicLong submittedTransactionsCount = new AtomicLong;\r
56     \r
57     @Property\r
58     private val AtomicLong failedTransactionsCount = new AtomicLong\r
59     \r
60     @Property\r
61     private val AtomicLong finishedTransactionsCount = new AtomicLong\r
62 \r
63     Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = HashMultimap.create();\r
64     Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = HashMultimap.create();\r
65     \r
66     val ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P,D>>> commitHandlerRegistrationListeners = new ListenerRegistry();\r
67     public new() {\r
68     }\r
69 \r
70     protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(\r
71         HashSet<P> paths) {\r
72         return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //\r
73         .transformAndConcat[value] //\r
74         .transform[instance].toList()\r
75     }\r
76 \r
77     override final readConfigurationData(P path) {\r
78         return dataReadRouter.readConfigurationData(path);\r
79     }\r
80 \r
81     override final readOperationalData(P path) {\r
82         return dataReadRouter.readOperationalData(path);\r
83     }\r
84 \r
85     override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {\r
86         val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);\r
87         commitHandlers.put(path, registration)\r
88         LOG.info("Registering Commit Handler {} for path: {}",commitHandler,path);\r
89         for(listener : commitHandlerRegistrationListeners) {\r
90             try {\r
91                 listener.instance.onRegister(registration);\r
92             } catch (Exception e) {\r
93                 LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);\r
94             }\r
95         }\r
96         return registration;\r
97     }\r
98 \r
99     override final def registerDataChangeListener(P path, DCL listener) {\r
100         val reg = new DataChangeListenerRegistration(path, listener, this);\r
101         listeners.put(path, reg);\r
102         val initialConfig = dataReadRouter.readConfigurationData(path);\r
103         val initialOperational = dataReadRouter.readOperationalData(path);\r
104         val event = createInitialListenerEvent(path,initialConfig,initialOperational);\r
105         listener.onDataChanged(event);\r
106         return reg;\r
107     }\r
108 \r
109     final def registerDataReader(P path, DataReader<P, D> reader) {\r
110 \r
111         val confReg = dataReadRouter.registerConfigurationReader(path, reader);\r
112         val dataReg = dataReadRouter.registerOperationalReader(path, reader);\r
113 \r
114         return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));\r
115     }\r
116     \r
117     override registerCommitHandlerListener(RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {\r
118         val ret = commitHandlerRegistrationListeners.register(commitHandlerListener);\r
119         \r
120         return ret;\r
121     }\r
122     \r
123     protected  def DataChangeEvent<P,D> createInitialListenerEvent(P path,D initialConfig,D initialOperational) {\r
124         return new InitialDataChangeEventImpl<P, D>(initialConfig,initialOperational);\r
125         \r
126     }\r
127 \r
128     protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {\r
129         listeners.remove(registration.path, registration);\r
130     }\r
131 \r
132     protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl<P, D> registration) {\r
133         commitHandlers.remove(registration.path, registration);\r
134         \r
135          LOG.info("Removing Commit Handler {} for path: {}",registration.instance,registration.path);\r
136         for(listener : commitHandlerRegistrationListeners) {\r
137             try {\r
138                 listener.instance.onUnregister(registration);\r
139             } catch (Exception e) {\r
140                 LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);\r
141             }\r
142         }\r
143     }\r
144 \r
145     protected final def getActiveCommitHandlers() {\r
146         return commitHandlers.entries;\r
147     }\r
148 \r
149     protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(\r
150         HashSet<P> paths) {\r
151         return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [\r
152             val operationalState = readOperationalData(key)\r
153             val configurationState = readConfigurationData(key)\r
154             return new ListenerStateCapture(key, value, operationalState, configurationState)\r
155         ].toList()\r
156     }\r
157 \r
158     protected def boolean isAffectedBy(P key, Set<P> paths) {\r
159         if (paths.contains(key)) {\r
160             return true;\r
161         }\r
162         for (path : paths) {\r
163             if (key.contains(path)) {\r
164                 return true;\r
165             }\r
166         }\r
167 \r
168         return false;\r
169     }\r
170 \r
171     package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P, D> transaction) {\r
172         checkNotNull(transaction);\r
173         transaction.changeStatus(TransactionStatus.SUBMITED);\r
174         val task = new TwoPhaseCommit(transaction, this);\r
175         submittedTransactionsCount.andIncrement;\r
176         return executor.submit(task);\r
177     }\r
178 \r
179 }\r
180 \r
181 @Data\r
182 package class ListenerStateCapture<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> {\r
183 \r
184     @Property\r
185     P path;\r
186 \r
187     @Property\r
188     Collection<DataChangeListenerRegistration<P, D, DCL>> listeners;\r
189 \r
190     @Property\r
191     D initialOperationalState;\r
192 \r
193     @Property\r
194     D initialConfigurationState;\r
195 }\r
196 \r
197 package class DataChangeListenerRegistration<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {\r
198 \r
199     AbstractDataBroker<P, D, DCL> dataBroker;\r
200 \r
201     @Property\r
202     val P path;\r
203 \r
204     new(P path, DCL instance, AbstractDataBroker<P, D, DCL> broker) {\r
205         super(instance)\r
206         dataBroker = broker;\r
207         _path = path;\r
208     }\r
209 \r
210     override protected removeRegistration() {\r
211         dataBroker.removeListener(this);\r
212         dataBroker = null;\r
213     }\r
214 \r
215 }\r
216 \r
217 package class DataCommitHandlerRegistrationImpl<P extends Path<P>, D> //\r
218 extends AbstractObjectRegistration<DataCommitHandler<P, D>> //\r
219 implements DataCommitHandlerRegistration<P, D> {\r
220 \r
221     AbstractDataBroker<P, D, ?> dataBroker;\r
222 \r
223     @Property\r
224     val P path;\r
225 \r
226     new(P path, DataCommitHandler<P, D> instance, AbstractDataBroker<P, D, ?> broker) {\r
227         super(instance)\r
228         dataBroker = broker;\r
229         _path = path;\r
230     }\r
231 \r
232     override protected removeRegistration() {\r
233         dataBroker.removeCommitHandler(this);\r
234         dataBroker = null;\r
235     }\r
236 }\r
237 \r
238 package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {\r
239 \r
240     private static val log = LoggerFactory.getLogger(TwoPhaseCommit);\r
241 \r
242     val AbstractDataTransaction<P, D> transaction;\r
243     val AbstractDataBroker<P, D, DCL> dataBroker;\r
244     \r
245     new(AbstractDataTransaction<P, D> transaction, AbstractDataBroker<P, D, DCL> broker) {\r
246         this.transaction = transaction;\r
247         this.dataBroker = broker;\r
248     }\r
249 \r
250     override call() throws Exception {\r
251 \r
252         // get affected paths\r
253         val affectedPaths = new HashSet<P>();\r
254 \r
255         affectedPaths.addAll(transaction.createdConfigurationData.keySet);\r
256         affectedPaths.addAll(transaction.updatedConfigurationData.keySet);\r
257         affectedPaths.addAll(transaction.removedConfigurationData);\r
258 \r
259         affectedPaths.addAll(transaction.createdOperationalData.keySet);\r
260         affectedPaths.addAll(transaction.updatedOperationalData.keySet);\r
261         affectedPaths.addAll(transaction.removedOperationalData);\r
262 \r
263         val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);\r
264 \r
265         val transactionId = transaction.identifier;\r
266 \r
267         log.info("Transaction: {} Started.",transactionId);\r
268         // requesting commits\r
269         val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);\r
270         val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();\r
271         try {\r
272             for (handler : commitHandlers) {\r
273                 handlerTransactions.add(handler.requestCommit(transaction));\r
274             }\r
275         } catch (Exception e) {\r
276             log.error("Transaction: {} Request Commit failed", transactionId,e);\r
277             dataBroker.failedTransactionsCount.andIncrement\r
278             return rollback(handlerTransactions, e);\r
279         }\r
280         val List<RpcResult<Void>> results = new ArrayList();\r
281         try {\r
282             for (subtransaction : handlerTransactions) {\r
283                 results.add(subtransaction.finish());\r
284             }\r
285             listeners.publishDataChangeEvent();\r
286         } catch (Exception e) {\r
287             log.error("Transaction: {} Finish Commit failed",transactionId, e);\r
288             dataBroker.failedTransactionsCount.andIncrement\r
289             return rollback(handlerTransactions, e);\r
290         }\r
291         log.info("Transaction: {} Finished successfully.",transactionId);\r
292         dataBroker.finishedTransactionsCount.andIncrement;\r
293         return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());\r
294 \r
295     }\r
296 \r
297     def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {\r
298         for (listenerSet : listeners) {\r
299             val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);\r
300             val updatedOperational = dataBroker.readOperationalData(listenerSet.path);\r
301 \r
302             val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,\r
303                 listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);\r
304             for (listener : listenerSet.listeners) {\r
305                 try {\r
306                     listener.instance.onDataChanged(changeEvent);\r
307 \r
308                 } catch (Exception e) {\r
309                     e.printStackTrace();\r
310                 }\r
311             }\r
312         }\r
313     }\r
314 \r
315     def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {\r
316         for (transaction : transactions) {\r
317             transaction.rollback()\r
318         }\r
319 \r
320         // FIXME return encountered error.\r
321         return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());\r
322     }\r
323 }\r
324 \r
325 public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {\r
326 \r
327     @Property\r
328     private val Object identifier;\r
329 \r
330     var TransactionStatus status;\r
331 \r
332     var AbstractDataBroker<P, D, ?> broker;\r
333 \r
334     protected new(Object identifier,AbstractDataBroker<P, D, ?> dataBroker) {\r
335         super(dataBroker);\r
336         _identifier = identifier;\r
337         broker = dataBroker;\r
338         status = TransactionStatus.NEW;\r
339 \r
340     //listeners = new ListenerRegistry<>();\r
341     }\r
342 \r
343     override commit() {\r
344         return broker.commit(this);\r
345     }\r
346 \r
347     override readConfigurationData(P path) {\r
348         val local = this.updatedConfigurationData.get(path);\r
349         if(local != null) {\r
350             return local;\r
351         }\r
352         \r
353         return broker.readConfigurationData(path);\r
354     }\r
355 \r
356     override readOperationalData(P path) {\r
357         val local = this.updatedOperationalData.get(path);\r
358         if(local != null) {\r
359             return local;\r
360         }\r
361         return broker.readOperationalData(path);\r
362     }\r
363 \r
364     override hashCode() {\r
365         return identifier.hashCode;\r
366     }\r
367 \r
368     override equals(Object obj) {\r
369         if (this === obj)\r
370             return true;\r
371         if (obj == null)\r
372             return false;\r
373         if (getClass() != obj.getClass())\r
374             return false;\r
375         val other = (obj as AbstractDataTransaction<P,D>);\r
376         if (broker == null) {\r
377             if (other.broker != null)\r
378                 return false;\r
379         } else if (!broker.equals(other.broker))\r
380             return false;\r
381         if (identifier == null) {\r
382             if (other.identifier != null)\r
383                 return false;\r
384         } else if (!identifier.equals(other.identifier))\r
385             return false;\r
386         return true;\r
387     }\r
388 \r
389     override TransactionStatus getStatus() {\r
390         return status;\r
391     }\r
392 \r
393     protected abstract def void onStatusChange(TransactionStatus status);\r
394 \r
395     public def changeStatus(TransactionStatus status) {\r
396         this.status = status;\r
397         onStatusChange(status);\r
398     }\r
399 \r
400 }\r