Merge "Bug 164"
[controller.git] / opendaylight / md-sal / sal-common-impl / src / main / java / org / opendaylight / controller / md / sal / common / impl / service / AbstractDataBroker.xtend
1 package org.opendaylight.controller.md.sal.common.impl.service
2
3 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
4 import org.opendaylight.controller.md.sal.common.api.TransactionStatus
5 import org.opendaylight.controller.md.sal.common.api.data.DataReader
6 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
7 import org.opendaylight.yangtools.concepts.ListenerRegistration
8 import com.google.common.collect.Multimap
9 import static com.google.common.base.Preconditions.*;
10 import java.util.List
11 import com.google.common.collect.HashMultimap
12 import java.util.concurrent.ExecutorService
13 import java.util.concurrent.Callable
14 import org.opendaylight.yangtools.yang.common.RpcResult
15 import java.util.Collections
16 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction
17 import java.util.ArrayList
18 import org.opendaylight.yangtools.concepts.CompositeObjectRegistration
19 import java.util.Arrays
20 import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService
21 import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory
22 import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher
23 import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener
24 import org.opendaylight.controller.sal.common.util.Rpcs
25 import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification
26 import java.util.concurrent.Future
27 import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter
28 import org.opendaylight.yangtools.concepts.Path
29 import org.slf4j.LoggerFactory
30 import java.util.HashSet
31 import java.util.Map.Entry
32 import java.util.Iterator
33 import java.util.Collection
34 import com.google.common.collect.FluentIterable;
35 import java.util.Set
36 import com.google.common.collect.ImmutableList
37 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration
38 import org.opendaylight.controller.md.sal.common.api.RegistrationListener
39 import org.opendaylight.yangtools.concepts.util.ListenerRegistry
40 import java.util.concurrent.atomic.AtomicLong
41
42 abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //
43 DataReader<P, D>, //
44 DataChangePublisher<P, D, DCL>, //
45 DataProvisionService<P, D> {
46
47     private static val LOG = LoggerFactory.getLogger(AbstractDataBroker);
48
49     @Property
50     var ExecutorService executor;
51
52     @Property
53     var AbstractDataReadRouter<P, D> dataReadRouter;
54
55     Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = HashMultimap.create();
56     Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = HashMultimap.create();
57     
58     val ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P,D>>> commitHandlerRegistrationListeners = new ListenerRegistry();
59     public new() {
60     }
61
62     protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(
63         HashSet<P> paths) {
64         return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //
65         .transformAndConcat[value] //
66         .transform[instance].toList()
67     }
68
69     override final readConfigurationData(P path) {
70         return dataReadRouter.readConfigurationData(path);
71     }
72
73     override final readOperationalData(P path) {
74         return dataReadRouter.readOperationalData(path);
75     }
76
77     override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {
78         val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);
79         commitHandlers.put(path, registration)
80         LOG.info("Registering Commit Handler {} for path: {}",commitHandler,path);
81         for(listener : commitHandlerRegistrationListeners) {
82             try {
83                 listener.instance.onRegister(registration);
84             } catch (Exception e) {
85                 LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);
86             }
87         }
88         return registration;
89     }
90
91     override final def registerDataChangeListener(P path, DCL listener) {
92         val reg = new DataChangeListenerRegistration(path, listener, this);
93         listeners.put(path, reg);
94         return reg;
95     }
96
97     final def registerDataReader(P path, DataReader<P, D> reader) {
98
99         val confReg = dataReadRouter.registerConfigurationReader(path, reader);
100         val dataReg = dataReadRouter.registerOperationalReader(path, reader);
101
102         return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
103     }
104     
105     override registerCommitHandlerListener(RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {
106         val ret = commitHandlerRegistrationListeners.register(commitHandlerListener);
107         
108         return ret;
109     }
110     
111
112     protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {
113         listeners.remove(registration.path, registration);
114     }
115
116     protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl<P, D> registration) {
117         commitHandlers.remove(registration.path, registration);
118         
119          LOG.info("Removing Commit Handler {} for path: {}",registration.instance,registration.path);
120         for(listener : commitHandlerRegistrationListeners) {
121             try {
122                 listener.instance.onUnregister(registration);
123             } catch (Exception e) {
124                 LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);
125             }
126         }
127     }
128
129     protected final def getActiveCommitHandlers() {
130         return commitHandlers.entries;
131     }
132
133     protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(
134         HashSet<P> paths) {
135         return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [
136             val operationalState = readOperationalData(key)
137             val configurationState = readConfigurationData(key)
138             return new ListenerStateCapture(key, value, operationalState, configurationState)
139         ].toList()
140     }
141
142     protected def boolean isAffectedBy(P key, Set<P> paths) {
143         if (paths.contains(key)) {
144             return true;
145         }
146         for (path : paths) {
147             if (key.contains(path)) {
148                 return true;
149             }
150         }
151
152         return false;
153     }
154
155     package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P, D> transaction) {
156         checkNotNull(transaction);
157         transaction.changeStatus(TransactionStatus.SUBMITED);
158         val task = new TwoPhaseCommit(transaction, this);
159         return executor.submit(task);
160     }
161
162 }
163
164 @Data
165 package class ListenerStateCapture<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> {
166
167     @Property
168     P path;
169
170     @Property
171     Collection<DataChangeListenerRegistration<P, D, DCL>> listeners;
172
173     @Property
174     D initialOperationalState;
175
176     @Property
177     D initialConfigurationState;
178 }
179
180 package class DataChangeListenerRegistration<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {
181
182     AbstractDataBroker<P, D, DCL> dataBroker;
183
184     @Property
185     val P path;
186
187     new(P path, DCL instance, AbstractDataBroker<P, D, DCL> broker) {
188         super(instance)
189         dataBroker = broker;
190         _path = path;
191     }
192
193     override protected removeRegistration() {
194         dataBroker.removeListener(this);
195         dataBroker = null;
196     }
197
198 }
199
200 package class DataCommitHandlerRegistrationImpl<P extends Path<P>, D> //
201 extends AbstractObjectRegistration<DataCommitHandler<P, D>> //
202 implements DataCommitHandlerRegistration<P, D> {
203
204     AbstractDataBroker<P, D, ?> dataBroker;
205
206     @Property
207     val P path;
208
209     new(P path, DataCommitHandler<P, D> instance, AbstractDataBroker<P, D, ?> broker) {
210         super(instance)
211         dataBroker = broker;
212         _path = path;
213     }
214
215     override protected removeRegistration() {
216         dataBroker.removeCommitHandler(this);
217         dataBroker = null;
218     }
219 }
220
221 package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {
222
223     private static val log = LoggerFactory.getLogger(TwoPhaseCommit);
224
225     val AbstractDataTransaction<P, D> transaction;
226     val AbstractDataBroker<P, D, DCL> dataBroker;
227
228     new(AbstractDataTransaction<P, D> transaction, AbstractDataBroker<P, D, DCL> broker) {
229         this.transaction = transaction;
230         this.dataBroker = broker;
231     }
232
233     override call() throws Exception {
234
235         // get affected paths
236         val affectedPaths = new HashSet<P>();
237
238         affectedPaths.addAll(transaction.createdConfigurationData.keySet);
239         affectedPaths.addAll(transaction.updatedConfigurationData.keySet);
240         affectedPaths.addAll(transaction.removedConfigurationData);
241
242         affectedPaths.addAll(transaction.createdOperationalData.keySet);
243         affectedPaths.addAll(transaction.updatedOperationalData.keySet);
244         affectedPaths.addAll(transaction.removedOperationalData);
245
246         val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);
247
248         val transactionId = transaction.identifier;
249
250         log.info("Transaction: {} Started.",transactionId);
251         // requesting commits
252         val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);
253         val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();
254         try {
255             for (handler : commitHandlers) {
256                 handlerTransactions.add(handler.requestCommit(transaction));
257             }
258         } catch (Exception e) {
259             log.error("Transaction: {} Request Commit failed", transactionId,e);
260             return rollback(handlerTransactions, e);
261         }
262         val List<RpcResult<Void>> results = new ArrayList();
263         try {
264             for (subtransaction : handlerTransactions) {
265                 results.add(subtransaction.finish());
266             }
267             listeners.publishDataChangeEvent();
268         } catch (Exception e) {
269             log.error("Transaction: {} Finish Commit failed",transactionId, e);
270             return rollback(handlerTransactions, e);
271         }
272         log.info("Transaction: {} Finished succesfully.",transactionId);
273         return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());
274
275     }
276
277     def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
278         for (listenerSet : listeners) {
279             val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);
280             val updatedOperational = dataBroker.readOperationalData(listenerSet.path);
281
282             val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,
283                 listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);
284             for (listener : listenerSet.listeners) {
285                 try {
286                     listener.instance.onDataChanged(changeEvent);
287
288                 } catch (Exception e) {
289                     e.printStackTrace();
290                 }
291             }
292         }
293     }
294
295     def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {
296         for (transaction : transactions) {
297             transaction.rollback()
298         }
299
300         // FIXME return encountered error.
301         return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());
302     }
303 }
304
305 public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {
306
307     @Property
308     private val Object identifier;
309
310     var TransactionStatus status;
311
312     var AbstractDataBroker<P, D, ?> broker;
313
314     protected new(Object identifier,AbstractDataBroker<P, D, ?> dataBroker) {
315         super(dataBroker);
316         _identifier = identifier;
317         broker = dataBroker;
318         status = TransactionStatus.NEW;
319
320     //listeners = new ListenerRegistry<>();
321     }
322
323     override commit() {
324         return broker.commit(this);
325     }
326
327     override readConfigurationData(P path) {
328         return broker.readConfigurationData(path);
329     }
330
331     override readOperationalData(P path) {
332         return broker.readOperationalData(path);
333     }
334
335     override hashCode() {
336         return identifier.hashCode;
337     }
338
339     override equals(Object obj) {
340         if (this === obj)
341             return true;
342         if (obj == null)
343             return false;
344         if (getClass() != obj.getClass())
345             return false;
346         val other = (obj as AbstractDataTransaction<P,D>);
347         if (broker == null) {
348             if (other.broker != null)
349                 return false;
350         } else if (!broker.equals(other.broker))
351             return false;
352         if (identifier == null) {
353             if (other.identifier != null)
354                 return false;
355         } else if (!identifier.equals(other.identifier))
356             return false;
357         return true;
358     }
359
360     override TransactionStatus getStatus() {
361         return status;
362     }
363
364     protected abstract def void onStatusChange(TransactionStatus status);
365
366     public def changeStatus(TransactionStatus status) {
367         this.status = status;
368         onStatusChange(status);
369     }
370
371 }