Merge "Gerrit contains following fixes: 1) Fix for bug 284 - Added custom ip matching...
[controller.git] / opendaylight / md-sal / sal-common-impl / src / main / java / org / opendaylight / controller / md / sal / common / impl / service / AbstractDataBroker.xtend
1 package org.opendaylight.controller.md.sal.common.impl.service\r
2 \r
3 import com.google.common.collect.FluentIterable\r
4 import com.google.common.collect.HashMultimap\r
5 import com.google.common.collect.ImmutableList\r
6 import com.google.common.collect.Multimap\r
7 import java.util.ArrayList\r
8 import java.util.Arrays\r
9 import java.util.Collection\r
10 import java.util.Collections\r
11 import java.util.HashSet\r
12 import java.util.List\r
13 import java.util.Set\r
14 import java.util.concurrent.Callable\r
15 import java.util.concurrent.ExecutorService\r
16 import java.util.concurrent.Future\r
17 import java.util.concurrent.atomic.AtomicLong\r
18 import org.opendaylight.controller.md.sal.common.api.RegistrationListener\r
19 import org.opendaylight.controller.md.sal.common.api.TransactionStatus\r
20 import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener\r
21 import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher\r
22 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler\r
23 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction\r
24 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration\r
25 import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory\r
26 import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService\r
27 import org.opendaylight.controller.md.sal.common.api.data.DataReader\r
28 import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification\r
29 import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter\r
30 import org.opendaylight.controller.sal.common.util.Rpcs\r
31 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration\r
32 import org.opendaylight.yangtools.concepts.CompositeObjectRegistration\r
33 import org.opendaylight.yangtools.concepts.ListenerRegistration\r
34 import org.opendaylight.yangtools.concepts.Path\r
35 import org.opendaylight.yangtools.concepts.util.ListenerRegistry\r
36 import org.opendaylight.yangtools.yang.common.RpcResult\r
37 import org.slf4j.LoggerFactory\r
38 \r
39 import static com.google.common.base.Preconditions.*\rimport org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent
40 import com.google.common.collect.Multimaps
41
42 abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //\r
43 DataReader<P, D>, //\r
44 DataChangePublisher<P, D, DCL>, //\r
45 DataProvisionService<P, D> {\r
46 \r
47     private static val LOG = LoggerFactory.getLogger(AbstractDataBroker);\r
48 \r
49     @Property\r
50     var ExecutorService executor;\r
51 \r
52     @Property\r
53     var AbstractDataReadRouter<P, D> dataReadRouter;\r
54     \r
55     @Property\r
56     private val AtomicLong submittedTransactionsCount = new AtomicLong;\r
57     \r
58     @Property\r
59     private val AtomicLong failedTransactionsCount = new AtomicLong\r
60     \r
61     @Property\r
62     private val AtomicLong finishedTransactionsCount = new AtomicLong\r
63 \r
64     Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create());\r
65     Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = Multimaps.synchronizedSetMultimap(HashMultimap.create());\r
66     \r
67     val ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P,D>>> commitHandlerRegistrationListeners = new ListenerRegistry();\r
68     public new() {\r
69     }\r
70 \r
71     protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(\r
72         HashSet<P> paths) {\r
73         return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //\r
74         .transformAndConcat[value] //\r
75         .transform[instance].toList()\r
76     }\r
77 \r
78     override final readConfigurationData(P path) {\r
79         return dataReadRouter.readConfigurationData(path);\r
80     }\r
81 \r
82     override final readOperationalData(P path) {\r
83         return dataReadRouter.readOperationalData(path);\r
84     }\r
85 \r
86     override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {\r
87         val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);\r
88         commitHandlers.put(path, registration)\r
89         LOG.trace("Registering Commit Handler {} for path: {}",commitHandler,path);\r
90         for(listener : commitHandlerRegistrationListeners) {\r
91             try {\r
92                 listener.instance.onRegister(registration);\r
93             } catch (Exception e) {\r
94                 LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);\r
95             }\r
96         }\r
97         return registration;\r
98     }\r
99 \r
100     override final def registerDataChangeListener(P path, DCL listener) {\r
101         val reg = new DataChangeListenerRegistration(path, listener, this);\r
102         listeners.put(path, reg);\r
103         val initialConfig = dataReadRouter.readConfigurationData(path);\r
104         val initialOperational = dataReadRouter.readOperationalData(path);\r
105         val event = createInitialListenerEvent(path,initialConfig,initialOperational);\r
106         listener.onDataChanged(event);\r
107         return reg;\r
108     }\r
109 \r
110     final def registerDataReader(P path, DataReader<P, D> reader) {\r
111 \r
112         val confReg = dataReadRouter.registerConfigurationReader(path, reader);\r
113         val dataReg = dataReadRouter.registerOperationalReader(path, reader);\r
114 \r
115         return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));\r
116     }\r
117     \r
118     override registerCommitHandlerListener(RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {\r
119         val ret = commitHandlerRegistrationListeners.register(commitHandlerListener);\r
120         \r
121         return ret;\r
122     }\r
123     \r
124     protected  def DataChangeEvent<P,D> createInitialListenerEvent(P path,D initialConfig,D initialOperational) {\r
125         return new InitialDataChangeEventImpl<P, D>(initialConfig,initialOperational);\r
126         \r
127     }\r
128 \r
129     protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {\r
130         listeners.remove(registration.path, registration);\r
131     }\r
132 \r
133     protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl<P, D> registration) {\r
134         commitHandlers.remove(registration.path, registration);\r
135         \r
136          LOG.trace("Removing Commit Handler {} for path: {}",registration.instance,registration.path);\r
137         for(listener : commitHandlerRegistrationListeners) {\r
138             try {\r
139                 listener.instance.onUnregister(registration);\r
140             } catch (Exception e) {\r
141                 LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);\r
142             }\r
143         }\r
144     }\r
145 \r
146     protected final def getActiveCommitHandlers() {\r
147         return commitHandlers.entries;\r
148     }\r
149 \r
150     protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(\r
151         HashSet<P> paths) {\r
152         return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [\r
153             val operationalState = readOperationalData(key)\r
154             val configurationState = readConfigurationData(key)\r
155             return new ListenerStateCapture(key, value, operationalState, configurationState)\r
156         ].toList()\r
157     }\r
158 \r
159     protected def boolean isAffectedBy(P key, Set<P> paths) {\r
160         if (paths.contains(key)) {\r
161             return true;\r
162         }\r
163         for (path : paths) {\r
164             if (key.contains(path)) {\r
165                 return true;\r
166             }\r
167         }\r
168 \r
169         return false;\r
170     }\r
171 \r
172     package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P, D> transaction) {\r
173         checkNotNull(transaction);\r
174         transaction.changeStatus(TransactionStatus.SUBMITED);\r
175         val task = new TwoPhaseCommit(transaction, this);\r
176         submittedTransactionsCount.andIncrement;\r
177         return executor.submit(task);\r
178     }\r
179 \r
180 }\r
181 \r
182 @Data\r
183 package class ListenerStateCapture<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> {\r
184 \r
185     @Property\r
186     P path;\r
187 \r
188     @Property\r
189     Collection<DataChangeListenerRegistration<P, D, DCL>> listeners;\r
190 \r
191     @Property\r
192     D initialOperationalState;\r
193 \r
194     @Property\r
195     D initialConfigurationState;\r
196 }\r
197 \r
198 package class DataChangeListenerRegistration<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {\r
199 \r
200     AbstractDataBroker<P, D, DCL> dataBroker;\r
201 \r
202     @Property\r
203     val P path;\r
204 \r
205     new(P path, DCL instance, AbstractDataBroker<P, D, DCL> broker) {\r
206         super(instance)\r
207         dataBroker = broker;\r
208         _path = path;\r
209     }\r
210 \r
211     override protected removeRegistration() {\r
212         dataBroker.removeListener(this);\r
213         dataBroker = null;\r
214     }\r
215 \r
216 }\r
217 \r
218 package class DataCommitHandlerRegistrationImpl<P extends Path<P>, D> //\r
219 extends AbstractObjectRegistration<DataCommitHandler<P, D>> //\r
220 implements DataCommitHandlerRegistration<P, D> {\r
221 \r
222     AbstractDataBroker<P, D, ?> dataBroker;\r
223 \r
224     @Property\r
225     val P path;\r
226 \r
227     new(P path, DataCommitHandler<P, D> instance, AbstractDataBroker<P, D, ?> broker) {\r
228         super(instance)\r
229         dataBroker = broker;\r
230         _path = path;\r
231     }\r
232 \r
233     override protected removeRegistration() {\r
234         dataBroker.removeCommitHandler(this);\r
235         dataBroker = null;\r
236     }\r
237 }\r
238 \r
239 package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {\r
240 \r
241     private static val log = LoggerFactory.getLogger(TwoPhaseCommit);\r
242 \r
243     val AbstractDataTransaction<P, D> transaction;\r
244     val AbstractDataBroker<P, D, DCL> dataBroker;\r
245     \r
246     new(AbstractDataTransaction<P, D> transaction, AbstractDataBroker<P, D, DCL> broker) {\r
247         this.transaction = transaction;\r
248         this.dataBroker = broker;\r
249     }\r
250 \r
251     override call() throws Exception {\r
252 \r
253         // get affected paths\r
254         val affectedPaths = new HashSet<P>();\r
255 \r
256         affectedPaths.addAll(transaction.createdConfigurationData.keySet);\r
257         affectedPaths.addAll(transaction.updatedConfigurationData.keySet);\r
258         affectedPaths.addAll(transaction.removedConfigurationData);\r
259 \r
260         affectedPaths.addAll(transaction.createdOperationalData.keySet);\r
261         affectedPaths.addAll(transaction.updatedOperationalData.keySet);\r
262         affectedPaths.addAll(transaction.removedOperationalData);\r
263 \r
264         val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);\r
265 \r
266         val transactionId = transaction.identifier;\r
267 \r
268         log.trace("Transaction: {} Started.",transactionId);\r
269         // requesting commits\r
270         val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);\r
271         val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();\r
272         try {\r
273             for (handler : commitHandlers) {\r
274                 handlerTransactions.add(handler.requestCommit(transaction));\r
275             }\r
276         } catch (Exception e) {\r
277             log.error("Transaction: {} Request Commit failed", transactionId,e);\r
278             dataBroker.failedTransactionsCount.andIncrement\r
279             return rollback(handlerTransactions, e);\r
280         }\r
281         val List<RpcResult<Void>> results = new ArrayList();\r
282         try {\r
283             for (subtransaction : handlerTransactions) {\r
284                 results.add(subtransaction.finish());\r
285             }\r
286             listeners.publishDataChangeEvent();\r
287         } catch (Exception e) {\r
288             log.error("Transaction: {} Finish Commit failed",transactionId, e);\r
289             dataBroker.failedTransactionsCount.andIncrement\r
290             return rollback(handlerTransactions, e);\r
291         }\r
292         log.trace("Transaction: {} Finished successfully.",transactionId);\r
293         dataBroker.finishedTransactionsCount.andIncrement;\r
294         return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());\r
295 \r
296     }\r
297 \r
298     def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {\r
299         for (listenerSet : listeners) {\r
300             val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);\r
301             val updatedOperational = dataBroker.readOperationalData(listenerSet.path);\r
302 \r
303             val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,\r
304                 listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);\r
305             for (listener : listenerSet.listeners) {\r
306                 try {\r
307                     listener.instance.onDataChanged(changeEvent);\r
308 \r
309                 } catch (Exception e) {\r
310                     e.printStackTrace();\r
311                 }\r
312             }\r
313         }\r
314     }\r
315 \r
316     def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {\r
317         for (transaction : transactions) {\r
318             transaction.rollback()\r
319         }\r
320 \r
321         // FIXME return encountered error.\r
322         return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());\r
323     }\r
324 }\r
325 \r
326 public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {\r
327 \r
328     private static val LOG = LoggerFactory.getLogger(AbstractDataTransaction);
329
330     @Property\r
331     private val Object identifier;\r
332 \r
333     var TransactionStatus status;\r
334 \r
335     var AbstractDataBroker<P, D, ?> broker;\r
336 \r
337     protected new(Object identifier,AbstractDataBroker<P, D, ?> dataBroker) {\r
338         super(dataBroker);\r
339         _identifier = identifier;\r
340         broker = dataBroker;\r
341         status = TransactionStatus.NEW;\r
342         LOG.debug("Transaction {} Allocated.", identifier);
343 \r
344     //listeners = new ListenerRegistry<>();\r
345     }\r
346 \r
347     override commit() {\r
348         return broker.commit(this);\r
349     }\r
350 \r
351     override readConfigurationData(P path) {\r
352         val local = this.updatedConfigurationData.get(path);\r
353         if(local != null) {\r
354             return local;\r
355         }\r
356         \r
357         return broker.readConfigurationData(path);\r
358     }\r
359 \r
360     override readOperationalData(P path) {\r
361         val local = this.updatedOperationalData.get(path);\r
362         if(local != null) {\r
363             return local;\r
364         }\r
365         return broker.readOperationalData(path);\r
366     }\r
367 \r
368     override hashCode() {\r
369         return identifier.hashCode;\r
370     }\r
371 \r
372     override equals(Object obj) {\r
373         if (this === obj)\r
374             return true;\r
375         if (obj == null)\r
376             return false;\r
377         if (getClass() != obj.getClass())\r
378             return false;\r
379         val other = (obj as AbstractDataTransaction<P,D>);\r
380         if (broker == null) {\r
381             if (other.broker != null)\r
382                 return false;\r
383         } else if (!broker.equals(other.broker))\r
384             return false;\r
385         if (identifier == null) {\r
386             if (other.identifier != null)\r
387                 return false;\r
388         } else if (!identifier.equals(other.identifier))\r
389             return false;\r
390         return true;\r
391     }\r
392 \r
393     override TransactionStatus getStatus() {\r
394         return status;\r
395     }\r
396 \r
397     protected abstract def void onStatusChange(TransactionStatus status);\r
398 \r
399     public def changeStatus(TransactionStatus status) {\r
400         LOG.debug("Transaction {} transitioned from {} to {}", identifier, this.status, status);
401         this.status = status;\r
402         onStatusChange(status);\r
403     }\r
404 \r
405 }\r