Fixed bug in Data store where multiple readers could overwrite
[controller.git] / opendaylight / md-sal / sal-common-impl / src / main / java / org / opendaylight / controller / md / sal / common / impl / service / AbstractDataBroker.xtend
1 package org.opendaylight.controller.md.sal.common.impl.service
2
3 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
4 import org.opendaylight.controller.md.sal.common.api.TransactionStatus
5 import org.opendaylight.controller.md.sal.common.api.data.DataReader
6 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
7 import org.opendaylight.yangtools.concepts.ListenerRegistration
8 import com.google.common.collect.Multimap
9 import static com.google.common.base.Preconditions.*;
10 import java.util.List
11 import com.google.common.collect.HashMultimap
12 import java.util.concurrent.ExecutorService
13 import java.util.concurrent.Callable
14 import org.opendaylight.yangtools.yang.common.RpcResult
15 import java.util.Collections
16 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction
17 import java.util.ArrayList
18 import org.opendaylight.yangtools.concepts.CompositeObjectRegistration
19 import java.util.Arrays
20 import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService
21 import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory
22 import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher
23 import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener
24 import org.opendaylight.controller.sal.common.util.Rpcs
25 import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification
26 import java.util.concurrent.Future
27 import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter
28 import org.opendaylight.yangtools.concepts.Path
29 import org.slf4j.LoggerFactory
30 import java.util.HashSet
31 import java.util.Collection
32 import com.google.common.collect.FluentIterable;
33 import java.util.Set
34 import com.google.common.collect.ImmutableList
35 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration
36 import org.opendaylight.controller.md.sal.common.api.RegistrationListener
37 import org.opendaylight.yangtools.concepts.util.ListenerRegistry
38 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent
39
40 abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //
41 DataReader<P, D>, //
42 DataChangePublisher<P, D, DCL>, //
43 DataProvisionService<P, D> {
44
45     private static val LOG = LoggerFactory.getLogger(AbstractDataBroker);
46
47     @Property
48     var ExecutorService executor;
49
50     @Property
51     var AbstractDataReadRouter<P, D> dataReadRouter;
52
53     Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = HashMultimap.create();
54     Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = HashMultimap.create();
55     
56     val ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P,D>>> commitHandlerRegistrationListeners = new ListenerRegistry();
57     public new() {
58     }
59
60     protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(
61         HashSet<P> paths) {
62         return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //
63         .transformAndConcat[value] //
64         .transform[instance].toList()
65     }
66
67     override final readConfigurationData(P path) {
68         return dataReadRouter.readConfigurationData(path);
69     }
70
71     override final readOperationalData(P path) {
72         return dataReadRouter.readOperationalData(path);
73     }
74
75     override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {
76         val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);
77         commitHandlers.put(path, registration)
78         LOG.info("Registering Commit Handler {} for path: {}",commitHandler,path);
79         for(listener : commitHandlerRegistrationListeners) {
80             try {
81                 listener.instance.onRegister(registration);
82             } catch (Exception e) {
83                 LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);
84             }
85         }
86         return registration;
87     }
88
89     override final def registerDataChangeListener(P path, DCL listener) {
90         val reg = new DataChangeListenerRegistration(path, listener, this);
91         listeners.put(path, reg);
92         val initialConfig = dataReadRouter.readConfigurationData(path);
93         val initialOperational = dataReadRouter.readOperationalData(path);
94         val event = createInitialListenerEvent(path,initialConfig,initialOperational);
95         listener.onDataChanged(event);
96         return reg;
97     }
98
99     final def registerDataReader(P path, DataReader<P, D> reader) {
100
101         val confReg = dataReadRouter.registerConfigurationReader(path, reader);
102         val dataReg = dataReadRouter.registerOperationalReader(path, reader);
103
104         return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
105     }
106     
107     override registerCommitHandlerListener(RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {
108         val ret = commitHandlerRegistrationListeners.register(commitHandlerListener);
109         
110         return ret;
111     }
112     
113     protected  def DataChangeEvent<P,D> createInitialListenerEvent(P path,D initialConfig,D initialOperational) {
114         return new InitialDataChangeEventImpl<P, D>(initialConfig,initialOperational);
115         
116     }
117
118     protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {
119         listeners.remove(registration.path, registration);
120     }
121
122     protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl<P, D> registration) {
123         commitHandlers.remove(registration.path, registration);
124         
125          LOG.info("Removing Commit Handler {} for path: {}",registration.instance,registration.path);
126         for(listener : commitHandlerRegistrationListeners) {
127             try {
128                 listener.instance.onUnregister(registration);
129             } catch (Exception e) {
130                 LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);
131             }
132         }
133     }
134
135     protected final def getActiveCommitHandlers() {
136         return commitHandlers.entries;
137     }
138
139     protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(
140         HashSet<P> paths) {
141         return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [
142             val operationalState = readOperationalData(key)
143             val configurationState = readConfigurationData(key)
144             return new ListenerStateCapture(key, value, operationalState, configurationState)
145         ].toList()
146     }
147
148     protected def boolean isAffectedBy(P key, Set<P> paths) {
149         if (paths.contains(key)) {
150             return true;
151         }
152         for (path : paths) {
153             if (key.contains(path)) {
154                 return true;
155             }
156         }
157
158         return false;
159     }
160
161     package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P, D> transaction) {
162         checkNotNull(transaction);
163         transaction.changeStatus(TransactionStatus.SUBMITED);
164         val task = new TwoPhaseCommit(transaction, this);
165         return executor.submit(task);
166     }
167
168 }
169
170 @Data
171 package class ListenerStateCapture<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> {
172
173     @Property
174     P path;
175
176     @Property
177     Collection<DataChangeListenerRegistration<P, D, DCL>> listeners;
178
179     @Property
180     D initialOperationalState;
181
182     @Property
183     D initialConfigurationState;
184 }
185
186 package class DataChangeListenerRegistration<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {
187
188     AbstractDataBroker<P, D, DCL> dataBroker;
189
190     @Property
191     val P path;
192
193     new(P path, DCL instance, AbstractDataBroker<P, D, DCL> broker) {
194         super(instance)
195         dataBroker = broker;
196         _path = path;
197     }
198
199     override protected removeRegistration() {
200         dataBroker.removeListener(this);
201         dataBroker = null;
202     }
203
204 }
205
206 package class DataCommitHandlerRegistrationImpl<P extends Path<P>, D> //
207 extends AbstractObjectRegistration<DataCommitHandler<P, D>> //
208 implements DataCommitHandlerRegistration<P, D> {
209
210     AbstractDataBroker<P, D, ?> dataBroker;
211
212     @Property
213     val P path;
214
215     new(P path, DataCommitHandler<P, D> instance, AbstractDataBroker<P, D, ?> broker) {
216         super(instance)
217         dataBroker = broker;
218         _path = path;
219     }
220
221     override protected removeRegistration() {
222         dataBroker.removeCommitHandler(this);
223         dataBroker = null;
224     }
225 }
226
227 package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {
228
229     private static val log = LoggerFactory.getLogger(TwoPhaseCommit);
230
231     val AbstractDataTransaction<P, D> transaction;
232     val AbstractDataBroker<P, D, DCL> dataBroker;
233
234     new(AbstractDataTransaction<P, D> transaction, AbstractDataBroker<P, D, DCL> broker) {
235         this.transaction = transaction;
236         this.dataBroker = broker;
237     }
238
239     override call() throws Exception {
240
241         // get affected paths
242         val affectedPaths = new HashSet<P>();
243
244         affectedPaths.addAll(transaction.createdConfigurationData.keySet);
245         affectedPaths.addAll(transaction.updatedConfigurationData.keySet);
246         affectedPaths.addAll(transaction.removedConfigurationData);
247
248         affectedPaths.addAll(transaction.createdOperationalData.keySet);
249         affectedPaths.addAll(transaction.updatedOperationalData.keySet);
250         affectedPaths.addAll(transaction.removedOperationalData);
251
252         val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);
253
254         val transactionId = transaction.identifier;
255
256         log.info("Transaction: {} Started.",transactionId);
257         // requesting commits
258         val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);
259         val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();
260         try {
261             for (handler : commitHandlers) {
262                 handlerTransactions.add(handler.requestCommit(transaction));
263             }
264         } catch (Exception e) {
265             log.error("Transaction: {} Request Commit failed", transactionId,e);
266             return rollback(handlerTransactions, e);
267         }
268         val List<RpcResult<Void>> results = new ArrayList();
269         try {
270             for (subtransaction : handlerTransactions) {
271                 results.add(subtransaction.finish());
272             }
273             listeners.publishDataChangeEvent();
274         } catch (Exception e) {
275             log.error("Transaction: {} Finish Commit failed",transactionId, e);
276             return rollback(handlerTransactions, e);
277         }
278         log.info("Transaction: {} Finished succesfully.",transactionId);
279         return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());
280
281     }
282
283     def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
284         for (listenerSet : listeners) {
285             val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);
286             val updatedOperational = dataBroker.readOperationalData(listenerSet.path);
287
288             val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,
289                 listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);
290             for (listener : listenerSet.listeners) {
291                 try {
292                     listener.instance.onDataChanged(changeEvent);
293
294                 } catch (Exception e) {
295                     e.printStackTrace();
296                 }
297             }
298         }
299     }
300
301     def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {
302         for (transaction : transactions) {
303             transaction.rollback()
304         }
305
306         // FIXME return encountered error.
307         return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());
308     }
309 }
310
311 public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {
312
313     @Property
314     private val Object identifier;
315
316     var TransactionStatus status;
317
318     var AbstractDataBroker<P, D, ?> broker;
319
320     protected new(Object identifier,AbstractDataBroker<P, D, ?> dataBroker) {
321         super(dataBroker);
322         _identifier = identifier;
323         broker = dataBroker;
324         status = TransactionStatus.NEW;
325
326     //listeners = new ListenerRegistry<>();
327     }
328
329     override commit() {
330         return broker.commit(this);
331     }
332
333     override readConfigurationData(P path) {
334         return broker.readConfigurationData(path);
335     }
336
337     override readOperationalData(P path) {
338         return broker.readOperationalData(path);
339     }
340
341     override hashCode() {
342         return identifier.hashCode;
343     }
344
345     override equals(Object obj) {
346         if (this === obj)
347             return true;
348         if (obj == null)
349             return false;
350         if (getClass() != obj.getClass())
351             return false;
352         val other = (obj as AbstractDataTransaction<P,D>);
353         if (broker == null) {
354             if (other.broker != null)
355                 return false;
356         } else if (!broker.equals(other.broker))
357             return false;
358         if (identifier == null) {
359             if (other.identifier != null)
360                 return false;
361         } else if (!identifier.equals(other.identifier))
362             return false;
363         return true;
364     }
365
366     override TransactionStatus getStatus() {
367         return status;
368     }
369
370     protected abstract def void onStatusChange(TransactionStatus status);
371
372     public def changeStatus(TransactionStatus status) {
373         this.status = status;
374         onStatusChange(status);
375     }
376
377 }