Merge "Table features : modified yang model. Patch set 2: Modified match types as...
[controller.git] / opendaylight / md-sal / sal-common-impl / src / main / java / org / opendaylight / controller / md / sal / common / impl / service / AbstractDataBroker.xtend
1 package org.opendaylight.controller.md.sal.common.impl.service
2
3 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
4 import org.opendaylight.controller.md.sal.common.api.TransactionStatus
5 import org.opendaylight.controller.md.sal.common.api.data.DataReader
6 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
7 import org.opendaylight.yangtools.concepts.ListenerRegistration
8 import com.google.common.collect.Multimap
9 import static com.google.common.base.Preconditions.*;
10 import java.util.List
11 import com.google.common.collect.HashMultimap
12 import java.util.concurrent.ExecutorService
13 import java.util.concurrent.Callable
14 import org.opendaylight.yangtools.yang.common.RpcResult
15 import java.util.Collections
16 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction
17 import java.util.ArrayList
18 import org.opendaylight.yangtools.concepts.CompositeObjectRegistration
19 import java.util.Arrays
20 import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService
21 import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory
22 import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher
23 import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener
24 import org.opendaylight.controller.sal.common.util.Rpcs
25 import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification
26 import java.util.concurrent.Future
27 import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter
28 import org.opendaylight.yangtools.concepts.Path
29 import org.slf4j.LoggerFactory
30 import java.util.HashSet
31 import java.util.Map.Entry
32 import java.util.Iterator
33 import java.util.Collection
34 import com.google.common.collect.FluentIterable;
35 import java.util.Set
36 import com.google.common.collect.ImmutableList
37 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration
38 import org.opendaylight.controller.md.sal.common.api.RegistrationListener
39 import org.opendaylight.yangtools.concepts.util.ListenerRegistry
40 import java.util.concurrent.atomic.AtomicLong
41 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent
42
43 abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //
44 DataReader<P, D>, //
45 DataChangePublisher<P, D, DCL>, //
46 DataProvisionService<P, D> {
47
48     private static val LOG = LoggerFactory.getLogger(AbstractDataBroker);
49
50     @Property
51     var ExecutorService executor;
52
53     @Property
54     var AbstractDataReadRouter<P, D> dataReadRouter;
55
56     Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = HashMultimap.create();
57     Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = HashMultimap.create();
58     
59     val ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P,D>>> commitHandlerRegistrationListeners = new ListenerRegistry();
60     public new() {
61     }
62
63     protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(
64         HashSet<P> paths) {
65         return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //
66         .transformAndConcat[value] //
67         .transform[instance].toList()
68     }
69
70     override final readConfigurationData(P path) {
71         return dataReadRouter.readConfigurationData(path);
72     }
73
74     override final readOperationalData(P path) {
75         return dataReadRouter.readOperationalData(path);
76     }
77
78     override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {
79         val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);
80         commitHandlers.put(path, registration)
81         LOG.info("Registering Commit Handler {} for path: {}",commitHandler,path);
82         for(listener : commitHandlerRegistrationListeners) {
83             try {
84                 listener.instance.onRegister(registration);
85             } catch (Exception e) {
86                 LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);
87             }
88         }
89         return registration;
90     }
91
92     override final def registerDataChangeListener(P path, DCL listener) {
93         val reg = new DataChangeListenerRegistration(path, listener, this);
94         listeners.put(path, reg);
95         val initialConfig = dataReadRouter.readConfigurationData(path);
96         val initialOperational = dataReadRouter.readOperationalData(path);
97         val event = createInitialListenerEvent(path,initialConfig,initialOperational);
98         listener.onDataChanged(event);
99         return reg;
100     }
101
102     final def registerDataReader(P path, DataReader<P, D> reader) {
103
104         val confReg = dataReadRouter.registerConfigurationReader(path, reader);
105         val dataReg = dataReadRouter.registerOperationalReader(path, reader);
106
107         return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
108     }
109     
110     override registerCommitHandlerListener(RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {
111         val ret = commitHandlerRegistrationListeners.register(commitHandlerListener);
112         
113         return ret;
114     }
115     
116     protected  def DataChangeEvent<P,D> createInitialListenerEvent(P path,D initialConfig,D initialOperational) {
117         return new InitialDataChangeEventImpl<P, D>(initialConfig,initialOperational);
118         
119     }
120
121     protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {
122         listeners.remove(registration.path, registration);
123     }
124
125     protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl<P, D> registration) {
126         commitHandlers.remove(registration.path, registration);
127         
128          LOG.info("Removing Commit Handler {} for path: {}",registration.instance,registration.path);
129         for(listener : commitHandlerRegistrationListeners) {
130             try {
131                 listener.instance.onUnregister(registration);
132             } catch (Exception e) {
133                 LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);
134             }
135         }
136     }
137
138     protected final def getActiveCommitHandlers() {
139         return commitHandlers.entries;
140     }
141
142     protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(
143         HashSet<P> paths) {
144         return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [
145             val operationalState = readOperationalData(key)
146             val configurationState = readConfigurationData(key)
147             return new ListenerStateCapture(key, value, operationalState, configurationState)
148         ].toList()
149     }
150
151     protected def boolean isAffectedBy(P key, Set<P> paths) {
152         if (paths.contains(key)) {
153             return true;
154         }
155         for (path : paths) {
156             if (key.contains(path)) {
157                 return true;
158             }
159         }
160
161         return false;
162     }
163
164     package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P, D> transaction) {
165         checkNotNull(transaction);
166         transaction.changeStatus(TransactionStatus.SUBMITED);
167         val task = new TwoPhaseCommit(transaction, this);
168         return executor.submit(task);
169     }
170
171 }
172
173 @Data
174 package class ListenerStateCapture<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> {
175
176     @Property
177     P path;
178
179     @Property
180     Collection<DataChangeListenerRegistration<P, D, DCL>> listeners;
181
182     @Property
183     D initialOperationalState;
184
185     @Property
186     D initialConfigurationState;
187 }
188
189 package class DataChangeListenerRegistration<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {
190
191     AbstractDataBroker<P, D, DCL> dataBroker;
192
193     @Property
194     val P path;
195
196     new(P path, DCL instance, AbstractDataBroker<P, D, DCL> broker) {
197         super(instance)
198         dataBroker = broker;
199         _path = path;
200     }
201
202     override protected removeRegistration() {
203         dataBroker.removeListener(this);
204         dataBroker = null;
205     }
206
207 }
208
209 package class DataCommitHandlerRegistrationImpl<P extends Path<P>, D> //
210 extends AbstractObjectRegistration<DataCommitHandler<P, D>> //
211 implements DataCommitHandlerRegistration<P, D> {
212
213     AbstractDataBroker<P, D, ?> dataBroker;
214
215     @Property
216     val P path;
217
218     new(P path, DataCommitHandler<P, D> instance, AbstractDataBroker<P, D, ?> broker) {
219         super(instance)
220         dataBroker = broker;
221         _path = path;
222     }
223
224     override protected removeRegistration() {
225         dataBroker.removeCommitHandler(this);
226         dataBroker = null;
227     }
228 }
229
230 package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {
231
232     private static val log = LoggerFactory.getLogger(TwoPhaseCommit);
233
234     val AbstractDataTransaction<P, D> transaction;
235     val AbstractDataBroker<P, D, DCL> dataBroker;
236
237     new(AbstractDataTransaction<P, D> transaction, AbstractDataBroker<P, D, DCL> broker) {
238         this.transaction = transaction;
239         this.dataBroker = broker;
240     }
241
242     override call() throws Exception {
243
244         // get affected paths
245         val affectedPaths = new HashSet<P>();
246
247         affectedPaths.addAll(transaction.createdConfigurationData.keySet);
248         affectedPaths.addAll(transaction.updatedConfigurationData.keySet);
249         affectedPaths.addAll(transaction.removedConfigurationData);
250
251         affectedPaths.addAll(transaction.createdOperationalData.keySet);
252         affectedPaths.addAll(transaction.updatedOperationalData.keySet);
253         affectedPaths.addAll(transaction.removedOperationalData);
254
255         val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);
256
257         val transactionId = transaction.identifier;
258
259         log.info("Transaction: {} Started.",transactionId);
260         // requesting commits
261         val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);
262         val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();
263         try {
264             for (handler : commitHandlers) {
265                 handlerTransactions.add(handler.requestCommit(transaction));
266             }
267         } catch (Exception e) {
268             log.error("Transaction: {} Request Commit failed", transactionId,e);
269             return rollback(handlerTransactions, e);
270         }
271         val List<RpcResult<Void>> results = new ArrayList();
272         try {
273             for (subtransaction : handlerTransactions) {
274                 results.add(subtransaction.finish());
275             }
276             listeners.publishDataChangeEvent();
277         } catch (Exception e) {
278             log.error("Transaction: {} Finish Commit failed",transactionId, e);
279             return rollback(handlerTransactions, e);
280         }
281         log.info("Transaction: {} Finished succesfully.",transactionId);
282         return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());
283
284     }
285
286     def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
287         for (listenerSet : listeners) {
288             val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);
289             val updatedOperational = dataBroker.readOperationalData(listenerSet.path);
290
291             val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,
292                 listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);
293             for (listener : listenerSet.listeners) {
294                 try {
295                     listener.instance.onDataChanged(changeEvent);
296
297                 } catch (Exception e) {
298                     e.printStackTrace();
299                 }
300             }
301         }
302     }
303
304     def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {
305         for (transaction : transactions) {
306             transaction.rollback()
307         }
308
309         // FIXME return encountered error.
310         return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());
311     }
312 }
313
314 public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {
315
316     @Property
317     private val Object identifier;
318
319     var TransactionStatus status;
320
321     var AbstractDataBroker<P, D, ?> broker;
322
323     protected new(Object identifier,AbstractDataBroker<P, D, ?> dataBroker) {
324         super(dataBroker);
325         _identifier = identifier;
326         broker = dataBroker;
327         status = TransactionStatus.NEW;
328
329     //listeners = new ListenerRegistry<>();
330     }
331
332     override commit() {
333         return broker.commit(this);
334     }
335
336     override readConfigurationData(P path) {
337         return broker.readConfigurationData(path);
338     }
339
340     override readOperationalData(P path) {
341         return broker.readOperationalData(path);
342     }
343
344     override hashCode() {
345         return identifier.hashCode;
346     }
347
348     override equals(Object obj) {
349         if (this === obj)
350             return true;
351         if (obj == null)
352             return false;
353         if (getClass() != obj.getClass())
354             return false;
355         val other = (obj as AbstractDataTransaction<P,D>);
356         if (broker == null) {
357             if (other.broker != null)
358                 return false;
359         } else if (!broker.equals(other.broker))
360             return false;
361         if (identifier == null) {
362             if (other.identifier != null)
363                 return false;
364         } else if (!identifier.equals(other.identifier))
365             return false;
366         return true;
367     }
368
369     override TransactionStatus getStatus() {
370         return status;
371     }
372
373     protected abstract def void onStatusChange(TransactionStatus status);
374
375     public def changeStatus(TransactionStatus status) {
376         this.status = status;
377         onStatusChange(status);
378     }
379
380 }