Bulk-add copyright headers to .xtend files
[controller.git] / opendaylight / md-sal / sal-common-impl / src / main / java / org / opendaylight / controller / md / sal / common / impl / service / AbstractDataBroker.xtend
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.common.impl.service\r
9 \r
10 import com.google.common.collect.FluentIterable\r
11 import com.google.common.collect.HashMultimap\r
12 import com.google.common.collect.ImmutableList\r
13 import com.google.common.collect.Multimap\r
14 import java.util.ArrayList\r
15 import java.util.Arrays\r
16 import java.util.Collection\r
17 import java.util.Collections\r
18 import java.util.HashSet\r
19 import java.util.List\r
20 import java.util.Set\r
21 import java.util.concurrent.Callable\r
22 import java.util.concurrent.ExecutorService\r
23 import java.util.concurrent.Future\r
24 import java.util.concurrent.atomic.AtomicLong\r
25 import org.opendaylight.controller.md.sal.common.api.RegistrationListener\r
26 import org.opendaylight.controller.md.sal.common.api.TransactionStatus\r
27 import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener\r
28 import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher\r
29 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler\r
30 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction\r
31 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration\r
32 import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory\r
33 import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService\r
34 import org.opendaylight.controller.md.sal.common.api.data.DataReader\r
35 import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification\r
36 import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter\r
37 import org.opendaylight.controller.sal.common.util.Rpcs\r
38 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration\r
39 import org.opendaylight.yangtools.concepts.CompositeObjectRegistration\r
40 import org.opendaylight.yangtools.concepts.ListenerRegistration\r
41 import org.opendaylight.yangtools.concepts.Path\r
42 import org.opendaylight.yangtools.concepts.util.ListenerRegistry\r
43 import org.opendaylight.yangtools.yang.common.RpcResult\r
44 import org.slf4j.LoggerFactory\r
45 \r
46 import static com.google.common.base.Preconditions.*\rimport org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent
47
48 abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //\r
49 DataReader<P, D>, //\r
50 DataChangePublisher<P, D, DCL>, //\r
51 DataProvisionService<P, D> {\r
52 \r
53     private static val LOG = LoggerFactory.getLogger(AbstractDataBroker);\r
54 \r
55     @Property\r
56     var ExecutorService executor;\r
57 \r
58     @Property\r
59     var AbstractDataReadRouter<P, D> dataReadRouter;\r
60     \r
61     @Property\r
62     private val AtomicLong submittedTransactionsCount = new AtomicLong;\r
63     \r
64     @Property\r
65     private val AtomicLong failedTransactionsCount = new AtomicLong\r
66     \r
67     @Property\r
68     private val AtomicLong finishedTransactionsCount = new AtomicLong\r
69 \r
70     Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = HashMultimap.create();\r
71     Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = HashMultimap.create();\r
72     \r
73     val ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P,D>>> commitHandlerRegistrationListeners = new ListenerRegistry();\r
74     public new() {\r
75     }\r
76 \r
77     protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(\r
78         HashSet<P> paths) {\r
79         return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //\r
80         .transformAndConcat[value] //\r
81         .transform[instance].toList()\r
82     }\r
83 \r
84     override final readConfigurationData(P path) {\r
85         return dataReadRouter.readConfigurationData(path);\r
86     }\r
87 \r
88     override final readOperationalData(P path) {\r
89         return dataReadRouter.readOperationalData(path);\r
90     }\r
91 \r
92     override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {\r
93         val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);\r
94         commitHandlers.put(path, registration)\r
95         LOG.trace("Registering Commit Handler {} for path: {}",commitHandler,path);\r
96         for(listener : commitHandlerRegistrationListeners) {\r
97             try {\r
98                 listener.instance.onRegister(registration);\r
99             } catch (Exception e) {\r
100                 LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);\r
101             }\r
102         }\r
103         return registration;\r
104     }\r
105 \r
106     override final def registerDataChangeListener(P path, DCL listener) {\r
107         val reg = new DataChangeListenerRegistration(path, listener, this);\r
108         listeners.put(path, reg);\r
109         val initialConfig = dataReadRouter.readConfigurationData(path);\r
110         val initialOperational = dataReadRouter.readOperationalData(path);\r
111         val event = createInitialListenerEvent(path,initialConfig,initialOperational);\r
112         listener.onDataChanged(event);\r
113         return reg;\r
114     }\r
115 \r
116     final def registerDataReader(P path, DataReader<P, D> reader) {\r
117 \r
118         val confReg = dataReadRouter.registerConfigurationReader(path, reader);\r
119         val dataReg = dataReadRouter.registerOperationalReader(path, reader);\r
120 \r
121         return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));\r
122     }\r
123     \r
124     override registerCommitHandlerListener(RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {\r
125         val ret = commitHandlerRegistrationListeners.register(commitHandlerListener);\r
126         \r
127         return ret;\r
128     }\r
129     \r
130     protected  def DataChangeEvent<P,D> createInitialListenerEvent(P path,D initialConfig,D initialOperational) {\r
131         return new InitialDataChangeEventImpl<P, D>(initialConfig,initialOperational);\r
132         \r
133     }\r
134 \r
135     protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {\r
136         listeners.remove(registration.path, registration);\r
137     }\r
138 \r
139     protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl<P, D> registration) {\r
140         commitHandlers.remove(registration.path, registration);\r
141         \r
142          LOG.trace("Removing Commit Handler {} for path: {}",registration.instance,registration.path);\r
143         for(listener : commitHandlerRegistrationListeners) {\r
144             try {\r
145                 listener.instance.onUnregister(registration);\r
146             } catch (Exception e) {\r
147                 LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);\r
148             }\r
149         }\r
150     }\r
151 \r
152     protected final def getActiveCommitHandlers() {\r
153         return commitHandlers.entries;\r
154     }\r
155 \r
156     protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(\r
157         HashSet<P> paths) {\r
158         return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [\r
159             val operationalState = readOperationalData(key)\r
160             val configurationState = readConfigurationData(key)\r
161             return new ListenerStateCapture(key, value, operationalState, configurationState)\r
162         ].toList()\r
163     }\r
164 \r
165     protected def boolean isAffectedBy(P key, Set<P> paths) {\r
166         if (paths.contains(key)) {\r
167             return true;\r
168         }\r
169         for (path : paths) {\r
170             if (key.contains(path)) {\r
171                 return true;\r
172             }\r
173         }\r
174 \r
175         return false;\r
176     }\r
177 \r
178     package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P, D> transaction) {\r
179         checkNotNull(transaction);\r
180         transaction.changeStatus(TransactionStatus.SUBMITED);\r
181         val task = new TwoPhaseCommit(transaction, this);\r
182         submittedTransactionsCount.andIncrement;\r
183         return executor.submit(task);\r
184     }\r
185 \r
186 }\r
187 \r
188 @Data\r
189 package class ListenerStateCapture<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> {\r
190 \r
191     @Property\r
192     P path;\r
193 \r
194     @Property\r
195     Collection<DataChangeListenerRegistration<P, D, DCL>> listeners;\r
196 \r
197     @Property\r
198     D initialOperationalState;\r
199 \r
200     @Property\r
201     D initialConfigurationState;\r
202 }\r
203 \r
204 package class DataChangeListenerRegistration<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {\r
205 \r
206     AbstractDataBroker<P, D, DCL> dataBroker;\r
207 \r
208     @Property\r
209     val P path;\r
210 \r
211     new(P path, DCL instance, AbstractDataBroker<P, D, DCL> broker) {\r
212         super(instance)\r
213         dataBroker = broker;\r
214         _path = path;\r
215     }\r
216 \r
217     override protected removeRegistration() {\r
218         dataBroker.removeListener(this);\r
219         dataBroker = null;\r
220     }\r
221 \r
222 }\r
223 \r
224 package class DataCommitHandlerRegistrationImpl<P extends Path<P>, D> //\r
225 extends AbstractObjectRegistration<DataCommitHandler<P, D>> //\r
226 implements DataCommitHandlerRegistration<P, D> {\r
227 \r
228     AbstractDataBroker<P, D, ?> dataBroker;\r
229 \r
230     @Property\r
231     val P path;\r
232 \r
233     new(P path, DataCommitHandler<P, D> instance, AbstractDataBroker<P, D, ?> broker) {\r
234         super(instance)\r
235         dataBroker = broker;\r
236         _path = path;\r
237     }\r
238 \r
239     override protected removeRegistration() {\r
240         dataBroker.removeCommitHandler(this);\r
241         dataBroker = null;\r
242     }\r
243 }\r
244 \r
245 package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {\r
246 \r
247     private static val log = LoggerFactory.getLogger(TwoPhaseCommit);\r
248 \r
249     val AbstractDataTransaction<P, D> transaction;\r
250     val AbstractDataBroker<P, D, DCL> dataBroker;\r
251     \r
252     new(AbstractDataTransaction<P, D> transaction, AbstractDataBroker<P, D, DCL> broker) {\r
253         this.transaction = transaction;\r
254         this.dataBroker = broker;\r
255     }\r
256 \r
257     override call() throws Exception {\r
258 \r
259         // get affected paths\r
260         val affectedPaths = new HashSet<P>();\r
261 \r
262         affectedPaths.addAll(transaction.createdConfigurationData.keySet);\r
263         affectedPaths.addAll(transaction.updatedConfigurationData.keySet);\r
264         affectedPaths.addAll(transaction.removedConfigurationData);\r
265 \r
266         affectedPaths.addAll(transaction.createdOperationalData.keySet);\r
267         affectedPaths.addAll(transaction.updatedOperationalData.keySet);\r
268         affectedPaths.addAll(transaction.removedOperationalData);\r
269 \r
270         val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);\r
271 \r
272         val transactionId = transaction.identifier;\r
273 \r
274         log.trace("Transaction: {} Started.",transactionId);\r
275         // requesting commits\r
276         val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);\r
277         val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();\r
278         try {\r
279             for (handler : commitHandlers) {\r
280                 handlerTransactions.add(handler.requestCommit(transaction));\r
281             }\r
282         } catch (Exception e) {\r
283             log.error("Transaction: {} Request Commit failed", transactionId,e);\r
284             dataBroker.failedTransactionsCount.andIncrement\r
285             return rollback(handlerTransactions, e);\r
286         }\r
287         val List<RpcResult<Void>> results = new ArrayList();\r
288         try {\r
289             for (subtransaction : handlerTransactions) {\r
290                 results.add(subtransaction.finish());\r
291             }\r
292             listeners.publishDataChangeEvent();\r
293         } catch (Exception e) {\r
294             log.error("Transaction: {} Finish Commit failed",transactionId, e);\r
295             dataBroker.failedTransactionsCount.andIncrement\r
296             return rollback(handlerTransactions, e);\r
297         }\r
298         log.trace("Transaction: {} Finished successfully.",transactionId);\r
299         dataBroker.finishedTransactionsCount.andIncrement;\r
300         return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());\r
301 \r
302     }\r
303 \r
304     def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {\r
305         for (listenerSet : listeners) {\r
306             val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);\r
307             val updatedOperational = dataBroker.readOperationalData(listenerSet.path);\r
308 \r
309             val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,\r
310                 listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);\r
311             for (listener : listenerSet.listeners) {\r
312                 try {\r
313                     listener.instance.onDataChanged(changeEvent);\r
314 \r
315                 } catch (Exception e) {\r
316                     e.printStackTrace();\r
317                 }\r
318             }\r
319         }\r
320     }\r
321 \r
322     def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {\r
323         for (transaction : transactions) {\r
324             transaction.rollback()\r
325         }\r
326 \r
327         // FIXME return encountered error.\r
328         return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());\r
329     }\r
330 }\r
331 \r
332 public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {\r
333 \r
334     private static val LOG = LoggerFactory.getLogger(AbstractDataTransaction);
335
336     @Property\r
337     private val Object identifier;\r
338 \r
339     var TransactionStatus status;\r
340 \r
341     var AbstractDataBroker<P, D, ?> broker;\r
342 \r
343     protected new(Object identifier,AbstractDataBroker<P, D, ?> dataBroker) {\r
344         super(dataBroker);\r
345         _identifier = identifier;\r
346         broker = dataBroker;\r
347         status = TransactionStatus.NEW;\r
348         LOG.debug("Transaction {} Allocated.", identifier);
349 \r
350     //listeners = new ListenerRegistry<>();\r
351     }\r
352 \r
353     override commit() {\r
354         return broker.commit(this);\r
355     }\r
356 \r
357     override readConfigurationData(P path) {\r
358         val local = this.updatedConfigurationData.get(path);\r
359         if(local != null) {\r
360             return local;\r
361         }\r
362         \r
363         return broker.readConfigurationData(path);\r
364     }\r
365 \r
366     override readOperationalData(P path) {\r
367         val local = this.updatedOperationalData.get(path);\r
368         if(local != null) {\r
369             return local;\r
370         }\r
371         return broker.readOperationalData(path);\r
372     }\r
373 \r
374     override hashCode() {\r
375         return identifier.hashCode;\r
376     }\r
377 \r
378     override equals(Object obj) {\r
379         if (this === obj)\r
380             return true;\r
381         if (obj == null)\r
382             return false;\r
383         if (getClass() != obj.getClass())\r
384             return false;\r
385         val other = (obj as AbstractDataTransaction<P,D>);\r
386         if (broker == null) {\r
387             if (other.broker != null)\r
388                 return false;\r
389         } else if (!broker.equals(other.broker))\r
390             return false;\r
391         if (identifier == null) {\r
392             if (other.identifier != null)\r
393                 return false;\r
394         } else if (!identifier.equals(other.identifier))\r
395             return false;\r
396         return true;\r
397     }\r
398 \r
399     override TransactionStatus getStatus() {\r
400         return status;\r
401     }\r
402 \r
403     protected abstract def void onStatusChange(TransactionStatus status);\r
404 \r
405     public def changeStatus(TransactionStatus status) {\r
406         LOG.debug("Transaction {} transitioned from {} to {}", identifier, this.status, status);
407         this.status = status;\r
408         onStatusChange(status);\r
409     }\r
410 \r
411 }\r