Merge "Prevent ConfigPusher from killing its thread"
[controller.git] / opendaylight / md-sal / sal-common-impl / src / main / java / org / opendaylight / controller / md / sal / common / impl / service / AbstractDataBroker.xtend
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.common.impl.service\r
9 \r
10 import com.google.common.collect.FluentIterable\r
11 import com.google.common.collect.HashMultimap\r
12 import com.google.common.collect.ImmutableList\r
13 import com.google.common.collect.Multimap\r
14 import java.util.ArrayList\r
15 import java.util.Arrays\r
16 import java.util.Collection\r
17 import java.util.Collections\r
18 import java.util.HashSet\r
19 import java.util.List\r
20 import java.util.Set\r
21 import java.util.concurrent.Callable\r
22 import java.util.concurrent.ExecutorService\r
23 import java.util.concurrent.Future\r
24 import java.util.concurrent.atomic.AtomicLong\r
25 import org.opendaylight.controller.md.sal.common.api.RegistrationListener\r
26 import org.opendaylight.controller.md.sal.common.api.TransactionStatus\r
27 import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener\r
28 import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher\r
29 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler\r
30 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction\r
31 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration\r
32 import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory\r
33 import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService\r
34 import org.opendaylight.controller.md.sal.common.api.data.DataReader\r
35 import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification\r
36 import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter\r
37 import org.opendaylight.controller.sal.common.util.Rpcs\r
38 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration\r
39 import org.opendaylight.yangtools.concepts.CompositeObjectRegistration\r
40 import org.opendaylight.yangtools.concepts.ListenerRegistration\r
41 import org.opendaylight.yangtools.concepts.Path\r
42 import org.opendaylight.yangtools.concepts.util.ListenerRegistry\r
43 import org.opendaylight.yangtools.yang.common.RpcResult\r
44 import org.slf4j.LoggerFactory\r
45 \r
46 import static com.google.common.base.Preconditions.*\rimport org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent
47 import com.google.common.collect.Multimaps
48
49 abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //\r
50 DataReader<P, D>, //\r
51 DataChangePublisher<P, D, DCL>, //\r
52 DataProvisionService<P, D> {\r
53 \r
54     private static val LOG = LoggerFactory.getLogger(AbstractDataBroker);\r
55 \r
56     @Property\r
57     var ExecutorService executor;\r
58 \r
59     @Property\r
60     var AbstractDataReadRouter<P, D> dataReadRouter;\r
61     \r
62     @Property\r
63     private val AtomicLong submittedTransactionsCount = new AtomicLong;\r
64     \r
65     @Property\r
66     private val AtomicLong failedTransactionsCount = new AtomicLong\r
67     \r
68     @Property\r
69     private val AtomicLong finishedTransactionsCount = new AtomicLong\r
70 \r
71     Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create());\r
72     Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = Multimaps.synchronizedSetMultimap(HashMultimap.create());\r
73     \r
74     val ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P,D>>> commitHandlerRegistrationListeners = new ListenerRegistry();\r
75     public new() {\r
76     }\r
77 \r
78     protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(\r
79         HashSet<P> paths) {\r
80         return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //\r
81         .transformAndConcat[value] //\r
82         .transform[instance].toList()\r
83     }\r
84 \r
85     override final readConfigurationData(P path) {\r
86         return dataReadRouter.readConfigurationData(path);\r
87     }\r
88 \r
89     override final readOperationalData(P path) {\r
90         return dataReadRouter.readOperationalData(path);\r
91     }\r
92 \r
93     override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {\r
94         val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);\r
95         commitHandlers.put(path, registration)\r
96         LOG.trace("Registering Commit Handler {} for path: {}",commitHandler,path);\r
97         for(listener : commitHandlerRegistrationListeners) {\r
98             try {\r
99                 listener.instance.onRegister(registration);\r
100             } catch (Exception e) {\r
101                 LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);\r
102             }\r
103         }\r
104         return registration;\r
105     }\r
106 \r
107     override final def registerDataChangeListener(P path, DCL listener) {\r
108         val reg = new DataChangeListenerRegistration(path, listener, this);\r
109         listeners.put(path, reg);\r
110         val initialConfig = dataReadRouter.readConfigurationData(path);\r
111         val initialOperational = dataReadRouter.readOperationalData(path);\r
112         val event = createInitialListenerEvent(path,initialConfig,initialOperational);\r
113         listener.onDataChanged(event);\r
114         return reg;\r
115     }\r
116 \r
117     final def registerDataReader(P path, DataReader<P, D> reader) {\r
118 \r
119         val confReg = dataReadRouter.registerConfigurationReader(path, reader);\r
120         val dataReg = dataReadRouter.registerOperationalReader(path, reader);\r
121 \r
122         return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));\r
123     }\r
124     \r
125     override registerCommitHandlerListener(RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {\r
126         val ret = commitHandlerRegistrationListeners.register(commitHandlerListener);\r
127         \r
128         return ret;\r
129     }\r
130     \r
131     protected  def DataChangeEvent<P,D> createInitialListenerEvent(P path,D initialConfig,D initialOperational) {\r
132         return new InitialDataChangeEventImpl<P, D>(initialConfig,initialOperational);\r
133         \r
134     }\r
135 \r
136     protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {\r
137         listeners.remove(registration.path, registration);\r
138     }\r
139 \r
140     protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl<P, D> registration) {\r
141         commitHandlers.remove(registration.path, registration);\r
142         \r
143          LOG.trace("Removing Commit Handler {} for path: {}",registration.instance,registration.path);\r
144         for(listener : commitHandlerRegistrationListeners) {\r
145             try {\r
146                 listener.instance.onUnregister(registration);\r
147             } catch (Exception e) {\r
148                 LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);\r
149             }\r
150         }\r
151     }\r
152 \r
153     protected final def getActiveCommitHandlers() {\r
154         return commitHandlers.entries;\r
155     }\r
156 \r
157     protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(\r
158         HashSet<P> paths) {\r
159         return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [\r
160             val operationalState = readOperationalData(key)\r
161             val configurationState = readConfigurationData(key)\r
162             return new ListenerStateCapture(key, value, operationalState, configurationState)\r
163         ].toList()\r
164     }\r
165 \r
166     protected def boolean isAffectedBy(P key, Set<P> paths) {\r
167         if (paths.contains(key)) {\r
168             return true;\r
169         }\r
170         for (path : paths) {\r
171             if (key.contains(path)) {\r
172                 return true;\r
173             }\r
174         }\r
175 \r
176         return false;\r
177     }\r
178 \r
179     package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P, D> transaction) {\r
180         checkNotNull(transaction);\r
181         transaction.changeStatus(TransactionStatus.SUBMITED);\r
182         val task = new TwoPhaseCommit(transaction, this);\r
183         submittedTransactionsCount.andIncrement;\r
184         return executor.submit(task);\r
185     }\r
186 \r
187 }\r
188 \r
189 @Data\r
190 package class ListenerStateCapture<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> {\r
191 \r
192     @Property\r
193     P path;\r
194 \r
195     @Property\r
196     Collection<DataChangeListenerRegistration<P, D, DCL>> listeners;\r
197 \r
198     @Property\r
199     D initialOperationalState;\r
200 \r
201     @Property\r
202     D initialConfigurationState;\r
203 }\r
204 \r
205 package class DataChangeListenerRegistration<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {\r
206 \r
207     AbstractDataBroker<P, D, DCL> dataBroker;\r
208 \r
209     @Property\r
210     val P path;\r
211 \r
212     new(P path, DCL instance, AbstractDataBroker<P, D, DCL> broker) {\r
213         super(instance)\r
214         dataBroker = broker;\r
215         _path = path;\r
216     }\r
217 \r
218     override protected removeRegistration() {\r
219         dataBroker.removeListener(this);\r
220         dataBroker = null;\r
221     }\r
222 \r
223 }\r
224 \r
225 package class DataCommitHandlerRegistrationImpl<P extends Path<P>, D> //\r
226 extends AbstractObjectRegistration<DataCommitHandler<P, D>> //\r
227 implements DataCommitHandlerRegistration<P, D> {\r
228 \r
229     AbstractDataBroker<P, D, ?> dataBroker;\r
230 \r
231     @Property\r
232     val P path;\r
233 \r
234     new(P path, DataCommitHandler<P, D> instance, AbstractDataBroker<P, D, ?> broker) {\r
235         super(instance)\r
236         dataBroker = broker;\r
237         _path = path;\r
238     }\r
239 \r
240     override protected removeRegistration() {\r
241         dataBroker.removeCommitHandler(this);\r
242         dataBroker = null;\r
243     }\r
244 }\r
245 \r
246 package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {\r
247 \r
248     private static val log = LoggerFactory.getLogger(TwoPhaseCommit);\r
249 \r
250     val AbstractDataTransaction<P, D> transaction;\r
251     val AbstractDataBroker<P, D, DCL> dataBroker;\r
252     \r
253     new(AbstractDataTransaction<P, D> transaction, AbstractDataBroker<P, D, DCL> broker) {\r
254         this.transaction = transaction;\r
255         this.dataBroker = broker;\r
256     }\r
257 \r
258     override call() throws Exception {\r
259 \r
260         // get affected paths\r
261         val affectedPaths = new HashSet<P>();\r
262 \r
263         affectedPaths.addAll(transaction.createdConfigurationData.keySet);\r
264         affectedPaths.addAll(transaction.updatedConfigurationData.keySet);\r
265         affectedPaths.addAll(transaction.removedConfigurationData);\r
266 \r
267         affectedPaths.addAll(transaction.createdOperationalData.keySet);\r
268         affectedPaths.addAll(transaction.updatedOperationalData.keySet);\r
269         affectedPaths.addAll(transaction.removedOperationalData);\r
270 \r
271         val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);\r
272 \r
273         val transactionId = transaction.identifier;\r
274 \r
275         log.trace("Transaction: {} Started.",transactionId);\r
276         // requesting commits\r
277         val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);\r
278         val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();\r
279         try {\r
280             for (handler : commitHandlers) {\r
281                 handlerTransactions.add(handler.requestCommit(transaction));\r
282             }\r
283         } catch (Exception e) {\r
284             log.error("Transaction: {} Request Commit failed", transactionId,e);\r
285             dataBroker.failedTransactionsCount.andIncrement\r
286             return rollback(handlerTransactions, e);\r
287         }\r
288         val List<RpcResult<Void>> results = new ArrayList();\r
289         try {\r
290             for (subtransaction : handlerTransactions) {\r
291                 results.add(subtransaction.finish());\r
292             }\r
293             listeners.publishDataChangeEvent();\r
294         } catch (Exception e) {\r
295             log.error("Transaction: {} Finish Commit failed",transactionId, e);\r
296             dataBroker.failedTransactionsCount.andIncrement\r
297             return rollback(handlerTransactions, e);\r
298         }\r
299         log.trace("Transaction: {} Finished successfully.",transactionId);\r
300         dataBroker.finishedTransactionsCount.andIncrement;\r
301         return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());\r
302 \r
303     }\r
304 \r
305     def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {\r
306         for (listenerSet : listeners) {\r
307             val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);\r
308             val updatedOperational = dataBroker.readOperationalData(listenerSet.path);\r
309 \r
310             val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,\r
311                 listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);\r
312             for (listener : listenerSet.listeners) {\r
313                 try {\r
314                     listener.instance.onDataChanged(changeEvent);\r
315 \r
316                 } catch (Exception e) {\r
317                     e.printStackTrace();\r
318                 }\r
319             }\r
320         }\r
321     }\r
322 \r
323     def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {\r
324         for (transaction : transactions) {\r
325             transaction.rollback()\r
326         }\r
327 \r
328         // FIXME return encountered error.\r
329         return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());\r
330     }\r
331 }\r
332 \r
333 public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {\r
334 \r
335     private static val LOG = LoggerFactory.getLogger(AbstractDataTransaction);
336
337     @Property\r
338     private val Object identifier;\r
339 \r
340     var TransactionStatus status;\r
341 \r
342     var AbstractDataBroker<P, D, ?> broker;\r
343 \r
344     protected new(Object identifier,AbstractDataBroker<P, D, ?> dataBroker) {\r
345         super(dataBroker);\r
346         _identifier = identifier;\r
347         broker = dataBroker;\r
348         status = TransactionStatus.NEW;\r
349         LOG.debug("Transaction {} Allocated.", identifier);
350 \r
351     //listeners = new ListenerRegistry<>();\r
352     }\r
353 \r
354     override commit() {\r
355         return broker.commit(this);\r
356     }\r
357 \r
358     override readConfigurationData(P path) {\r
359         val local = this.updatedConfigurationData.get(path);\r
360         if(local != null) {\r
361             return local;\r
362         }\r
363         \r
364         return broker.readConfigurationData(path);\r
365     }\r
366 \r
367     override readOperationalData(P path) {\r
368         val local = this.updatedOperationalData.get(path);\r
369         if(local != null) {\r
370             return local;\r
371         }\r
372         return broker.readOperationalData(path);\r
373     }\r
374 \r
375     override hashCode() {\r
376         return identifier.hashCode;\r
377     }\r
378 \r
379     override equals(Object obj) {\r
380         if (this === obj)\r
381             return true;\r
382         if (obj == null)\r
383             return false;\r
384         if (getClass() != obj.getClass())\r
385             return false;\r
386         val other = (obj as AbstractDataTransaction<P,D>);\r
387         if (broker == null) {\r
388             if (other.broker != null)\r
389                 return false;\r
390         } else if (!broker.equals(other.broker))\r
391             return false;\r
392         if (identifier == null) {\r
393             if (other.identifier != null)\r
394                 return false;\r
395         } else if (!identifier.equals(other.identifier))\r
396             return false;\r
397         return true;\r
398     }\r
399 \r
400     override TransactionStatus getStatus() {\r
401         return status;\r
402     }\r
403 \r
404     protected abstract def void onStatusChange(TransactionStatus status);\r
405 \r
406     public def changeStatus(TransactionStatus status) {\r
407         LOG.debug("Transaction {} transitioned from {} to {}", identifier, this.status, status);
408         this.status = status;\r
409         onStatusChange(status);\r
410     }\r
411 \r
412 }\r