0c8f6109ed843ac84bdc5d4fef814e1e98518c7d
[controller.git] / opendaylight / md-sal / sal-common-impl / src / main / java / org / opendaylight / controller / md / sal / common / impl / service / AbstractDataBroker.xtend
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.common.impl.service\r
9 \r
10 import com.google.common.collect.FluentIterable\r
11 import com.google.common.collect.HashMultimap\r
12 import com.google.common.collect.ImmutableList\r
13 import com.google.common.collect.Multimap\r
14 import java.util.ArrayList\r
15 import java.util.Arrays\r
16 import java.util.Collection\r
17 import java.util.Collections\r
18 import java.util.HashSet\r
19 import java.util.List\r
20 import java.util.Set\r
21 import java.util.concurrent.Callable\r
22 import java.util.concurrent.ExecutorService\r
23 import java.util.concurrent.Future\r
24 import java.util.concurrent.atomic.AtomicLong\r
25 import org.opendaylight.controller.md.sal.common.api.RegistrationListener\r
26 import org.opendaylight.controller.md.sal.common.api.TransactionStatus\r
27 import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener\r
28 import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher\r
29 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler\r
30 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction\r
31 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration\r
32 import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory\r
33 import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService\r
34 import org.opendaylight.controller.md.sal.common.api.data.DataReader\r
35 import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification\r
36 import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter\r
37 import org.opendaylight.controller.sal.common.util.Rpcs\r
38 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration\r
39 import org.opendaylight.yangtools.concepts.CompositeObjectRegistration\r
40 import org.opendaylight.yangtools.concepts.ListenerRegistration\r
41 import org.opendaylight.yangtools.concepts.Path\r
42 import org.opendaylight.yangtools.concepts.util.ListenerRegistry\r
43 import org.opendaylight.yangtools.yang.common.RpcResult\r
44 import org.slf4j.LoggerFactory\r
45 \r
46 import static com.google.common.base.Preconditions.*\rimport org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent
47 import com.google.common.collect.Multimaps
48
49 abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //\r
50 DataReader<P, D>, //\r
51 DataChangePublisher<P, D, DCL>, //\r
52 DataProvisionService<P, D> {\r
53 \r
54     private static val LOG = LoggerFactory.getLogger(AbstractDataBroker);\r
55 \r
56     @Property\r
57     var ExecutorService executor;\r
58 \r
59     @Property\r
60     var AbstractDataReadRouter<P, D> dataReadRouter;\r
61     \r
62     @Property\r
63     private val AtomicLong submittedTransactionsCount = new AtomicLong;\r
64     \r
65     @Property\r
66     private val AtomicLong failedTransactionsCount = new AtomicLong\r
67     \r
68     @Property\r
69     private val AtomicLong finishedTransactionsCount = new AtomicLong\r
70 \r
71     Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create());\r
72     Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = Multimaps.synchronizedSetMultimap(HashMultimap.create());\r
73     \r
74     val ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P,D>>> commitHandlerRegistrationListeners = new ListenerRegistry();\r
75     public new() {\r
76     }\r
77 \r
78     protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(\r
79         HashSet<P> paths) {\r
80         return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //\r
81         .transformAndConcat[value] //\r
82         .transform[instance].toList()\r
83     }\r
84 \r
85     override final readConfigurationData(P path) {\r
86         return dataReadRouter.readConfigurationData(path);\r
87     }\r
88 \r
89     override final readOperationalData(P path) {\r
90         return dataReadRouter.readOperationalData(path);\r
91     }\r
92 \r
93     override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {\r
94         val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);\r
95         commitHandlers.put(path, registration)\r
96         LOG.trace("Registering Commit Handler {} for path: {}",commitHandler,path);\r
97         for(listener : commitHandlerRegistrationListeners) {\r
98             try {\r
99                 listener.instance.onRegister(registration);\r
100             } catch (Exception e) {\r
101                 LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);\r
102             }\r
103         }\r
104         return registration;\r
105     }\r
106 \r
107     override final def registerDataChangeListener(P path, DCL listener) {\r
108         val reg = new DataChangeListenerRegistration(path, listener, this);\r
109         listeners.put(path, reg);\r
110         val initialConfig = dataReadRouter.readConfigurationData(path);\r
111         val initialOperational = dataReadRouter.readOperationalData(path);\r
112         val event = createInitialListenerEvent(path,initialConfig,initialOperational);\r
113         listener.onDataChanged(event);\r
114         return reg;\r
115     }\r
116 \r
117     final def registerDataReader(P path, DataReader<P, D> reader) {\r
118 \r
119         val confReg = dataReadRouter.registerConfigurationReader(path, reader);\r
120         val dataReg = dataReadRouter.registerOperationalReader(path, reader);\r
121 \r
122         return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));\r
123     }\r
124     \r
125     override registerCommitHandlerListener(RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {\r
126         val ret = commitHandlerRegistrationListeners.register(commitHandlerListener);\r
127         \r
128         return ret;\r
129     }\r
130     \r
131     protected  def DataChangeEvent<P,D> createInitialListenerEvent(P path,D initialConfig,D initialOperational) {\r
132         return new InitialDataChangeEventImpl<P, D>(initialConfig,initialOperational);\r
133         \r
134     }\r
135 \r
136     protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {\r
137         listeners.remove(registration.path, registration);\r
138     }\r
139 \r
140     protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl<P, D> registration) {\r
141         commitHandlers.remove(registration.path, registration);\r
142         \r
143          LOG.trace("Removing Commit Handler {} for path: {}",registration.instance,registration.path);\r
144         for(listener : commitHandlerRegistrationListeners) {\r
145             try {\r
146                 listener.instance.onUnregister(registration);\r
147             } catch (Exception e) {\r
148                 LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);\r
149             }\r
150         }\r
151     }\r
152 \r
153     protected final def getActiveCommitHandlers() {\r
154         return commitHandlers.entries;\r
155     }\r
156 \r
157     protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(\r
158         HashSet<P> paths) {\r
159         return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [\r
160             val operationalState = readOperationalData(key)\r
161             val configurationState = readConfigurationData(key)\r
162             return new ListenerStateCapture(key, value, operationalState, configurationState)\r
163         ].toList()\r
164     }\r
165 \r
166     protected def boolean isAffectedBy(P key, Set<P> paths) {\r
167         if (paths.contains(key)) {\r
168             return true;\r
169         }\r
170         for (path : paths) {\r
171             if (key.contains(path)) {\r
172                 return true;\r
173             }\r
174         }\r
175 \r
176         return false;\r
177     }\r
178 \r
179     package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P, D> transaction) {\r
180         checkNotNull(transaction);\r
181         transaction.changeStatus(TransactionStatus.SUBMITED);\r
182         val task = new TwoPhaseCommit(transaction, this);\r
183         submittedTransactionsCount.andIncrement;\r
184         return executor.submit(task);\r
185     }\r
186 \r
187 }\r
188 \r
189 @Data\r
190 package class ListenerStateCapture<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> {\r
191 \r
192     @Property\r
193     P path;\r
194 \r
195     @Property\r
196     Collection<DataChangeListenerRegistration<P, D, DCL>> listeners;\r
197 \r
198     @Property\r
199     D initialOperationalState;\r
200 \r
201     @Property\r
202     D initialConfigurationState;\r
203 }\r
204 \r
205 package class DataChangeListenerRegistration<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {\r
206 \r
207     AbstractDataBroker<P, D, DCL> dataBroker;\r
208 \r
209     @Property\r
210     val P path;\r
211 \r
212     new(P path, DCL instance, AbstractDataBroker<P, D, DCL> broker) {\r
213         super(instance)\r
214         dataBroker = broker;\r
215         _path = path;\r
216     }\r
217 \r
218     override protected removeRegistration() {\r
219         dataBroker.removeListener(this);\r
220         dataBroker = null;\r
221     }\r
222 \r
223 }\r
224 \r
225 package class DataCommitHandlerRegistrationImpl<P extends Path<P>, D> //\r
226 extends AbstractObjectRegistration<DataCommitHandler<P, D>> //\r
227 implements DataCommitHandlerRegistration<P, D> {\r
228 \r
229     AbstractDataBroker<P, D, ?> dataBroker;\r
230 \r
231     @Property\r
232     val P path;\r
233 \r
234     new(P path, DataCommitHandler<P, D> instance, AbstractDataBroker<P, D, ?> broker) {\r
235         super(instance)\r
236         dataBroker = broker;\r
237         _path = path;\r
238     }\r
239 \r
240     override protected removeRegistration() {\r
241         dataBroker.removeCommitHandler(this);\r
242         dataBroker = null;\r
243     }\r
244 }\r
245 \r
246 package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {\r
247 \r
248     private static val log = LoggerFactory.getLogger(TwoPhaseCommit);\r
249 \r
250     val AbstractDataTransaction<P, D> transaction;\r
251     val AbstractDataBroker<P, D, DCL> dataBroker;\r
252     \r
253     new(AbstractDataTransaction<P, D> transaction, AbstractDataBroker<P, D, DCL> broker) {\r
254         this.transaction = transaction;\r
255         this.dataBroker = broker;\r
256     }\r
257 \r
258     override call() throws Exception {\r
259 \r
260         // get affected paths\r
261         val affectedPaths = new HashSet<P>();\r
262 \r
263         affectedPaths.addAll(transaction.createdConfigurationData.keySet);\r
264         affectedPaths.addAll(transaction.updatedConfigurationData.keySet);\r
265         affectedPaths.addAll(transaction.removedConfigurationData);\r
266 \r
267         affectedPaths.addAll(transaction.createdOperationalData.keySet);\r
268         affectedPaths.addAll(transaction.updatedOperationalData.keySet);\r
269         affectedPaths.addAll(transaction.removedOperationalData);\r
270 \r
271         val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);\r
272 \r
273         val transactionId = transaction.identifier;\r
274 \r
275         log.trace("Transaction: {} Started.",transactionId);\r
276         // requesting commits\r
277         val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);\r
278         val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();\r
279         try {\r
280             for (handler : commitHandlers) {\r
281                 handlerTransactions.add(handler.requestCommit(transaction));\r
282             }\r
283         } catch (Exception e) {\r
284             log.error("Transaction: {} Request Commit failed", transactionId,e);\r
285             dataBroker.failedTransactionsCount.andIncrement\r
286             return rollback(handlerTransactions, e);\r
287         }\r
288         val List<RpcResult<Void>> results = new ArrayList();\r
289         try {\r
290             for (subtransaction : handlerTransactions) {\r
291                 results.add(subtransaction.finish());\r
292             }\r
293             listeners.publishDataChangeEvent();\r
294         } catch (Exception e) {\r
295             log.error("Transaction: {} Finish Commit failed",transactionId, e);\r
296             dataBroker.failedTransactionsCount.andIncrement\r
297             return rollback(handlerTransactions, e);\r
298         }\r
299         log.trace("Transaction: {} Finished successfully.",transactionId);\r
300         dataBroker.finishedTransactionsCount.andIncrement;\r
301         return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());\r
302 \r
303     }\r
304 \r
305     def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {\r
306         dataBroker.executor.submit [|\r
307             for (listenerSet : listeners) {
308                 val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);
309                 val updatedOperational = dataBroker.readOperationalData(listenerSet.path);
310
311                 val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,
312                     listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);
313                 for (listener : listenerSet.listeners) {
314                     try {
315                         listener.instance.onDataChanged(changeEvent);
316
317                     } catch (Exception e) {
318                         e.printStackTrace();
319                     }
320                 }
321             }        \r
322         ]\r
323     }\r
324 \r
325     def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {\r
326         for (transaction : transactions) {\r
327             transaction.rollback()\r
328         }\r
329 \r
330         // FIXME return encountered error.\r
331         return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());\r
332     }\r
333 }\r
334 \r
335 public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {\r
336 \r
337     private static val LOG = LoggerFactory.getLogger(AbstractDataTransaction);
338
339     @Property\r
340     private val Object identifier;\r
341 \r
342     var TransactionStatus status;\r
343 \r
344     var AbstractDataBroker<P, D, ?> broker;\r
345 \r
346     protected new(Object identifier,AbstractDataBroker<P, D, ?> dataBroker) {\r
347         super(dataBroker);\r
348         _identifier = identifier;\r
349         broker = dataBroker;\r
350         status = TransactionStatus.NEW;\r
351         LOG.debug("Transaction {} Allocated.", identifier);
352 \r
353     //listeners = new ListenerRegistry<>();\r
354     }\r
355 \r
356     override commit() {\r
357         return broker.commit(this);\r
358     }\r
359 \r
360     override readConfigurationData(P path) {\r
361         val local = this.updatedConfigurationData.get(path);\r
362         if(local != null) {\r
363             return local;\r
364         }\r
365         \r
366         return broker.readConfigurationData(path);\r
367     }\r
368 \r
369     override readOperationalData(P path) {\r
370         val local = this.updatedOperationalData.get(path);\r
371         if(local != null) {\r
372             return local;\r
373         }\r
374         return broker.readOperationalData(path);\r
375     }\r
376 \r
377     override hashCode() {\r
378         return identifier.hashCode;\r
379     }\r
380 \r
381     override equals(Object obj) {\r
382         if (this === obj)\r
383             return true;\r
384         if (obj == null)\r
385             return false;\r
386         if (getClass() != obj.getClass())\r
387             return false;\r
388         val other = (obj as AbstractDataTransaction<P,D>);\r
389         if (broker == null) {\r
390             if (other.broker != null)\r
391                 return false;\r
392         } else if (!broker.equals(other.broker))\r
393             return false;\r
394         if (identifier == null) {\r
395             if (other.identifier != null)\r
396                 return false;\r
397         } else if (!identifier.equals(other.identifier))\r
398             return false;\r
399         return true;\r
400     }\r
401 \r
402     override TransactionStatus getStatus() {\r
403         return status;\r
404     }\r
405 \r
406     protected abstract def void onStatusChange(TransactionStatus status);\r
407 \r
408     public def changeStatus(TransactionStatus status) {\r
409         LOG.debug("Transaction {} transitioned from {} to {}", identifier, this.status, status);
410         this.status = status;\r
411         onStatusChange(status);\r
412     }\r
413 \r
414 }\r