Merge "Json to composite node test"
[controller.git] / opendaylight / md-sal / sal-common-impl / src / main / java / org / opendaylight / controller / md / sal / common / impl / service / AbstractDataBroker.xtend
1 package org.opendaylight.controller.md.sal.common.impl.service
2
3 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
4 import org.opendaylight.controller.md.sal.common.api.TransactionStatus
5 import org.opendaylight.controller.md.sal.common.api.data.DataReader
6 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
7 import org.opendaylight.yangtools.concepts.ListenerRegistration
8 import com.google.common.collect.Multimap
9 import static com.google.common.base.Preconditions.*;
10 import java.util.List
11 import com.google.common.collect.HashMultimap
12 import java.util.concurrent.ExecutorService
13 import java.util.concurrent.Callable
14 import org.opendaylight.yangtools.yang.common.RpcResult
15 import java.util.Collections
16 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction
17 import java.util.ArrayList
18 import org.opendaylight.yangtools.concepts.CompositeObjectRegistration
19 import java.util.Arrays
20 import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService
21 import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory
22 import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher
23 import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener
24 import org.opendaylight.controller.sal.common.util.Rpcs
25 import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification
26 import java.util.concurrent.Future
27 import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter
28 import org.opendaylight.yangtools.concepts.Path
29 import org.slf4j.LoggerFactory
30 import java.util.HashSet
31 import java.util.Map.Entry
32 import java.util.Iterator
33 import java.util.Collection
34 import com.google.common.collect.FluentIterable;
35 import java.util.Set
36 import com.google.common.collect.ImmutableList
37
38 abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //
39 DataReader<P, D>, //
40 DataChangePublisher<P, D, DCL>, //
41 DataProvisionService<P, D> {
42
43     @Property
44     var ExecutorService executor;
45
46     @Property
47     var AbstractDataReadRouter<P, D> dataReadRouter;
48
49     Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = HashMultimap.create();
50     Multimap<P, DataCommitHandlerRegistration<P, D>> commitHandlers = HashMultimap.create();
51
52     public new() {
53     }
54
55     protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(
56         HashSet<P> paths) {
57         return FluentIterable.from(commitHandlers.asMap.entrySet)
58             .filter[key.isAffectedBy(paths)] //
59             .transformAndConcat [value] //
60             .transform[instance].toList()
61     }
62
63     override final readConfigurationData(P path) {
64         return dataReadRouter.readConfigurationData(path);
65     }
66
67     override final readOperationalData(P path) {
68         return dataReadRouter.readOperationalData(path);
69     }
70
71     override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {
72         val registration = new DataCommitHandlerRegistration(path, commitHandler, this);
73         commitHandlers.put(path, registration)
74         return registration;
75     }
76
77     override final def registerDataChangeListener(P path, DCL listener) {
78         val reg = new DataChangeListenerRegistration(path, listener, this);
79         listeners.put(path, reg);
80         return reg;
81     }
82
83     final def registerDataReader(P path, DataReader<P, D> reader) {
84
85         val confReg = dataReadRouter.registerConfigurationReader(path, reader);
86         val dataReg = dataReadRouter.registerOperationalReader(path, reader);
87
88         return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
89     }
90
91     protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {
92         listeners.remove(registration.path, registration);
93     }
94
95     protected final def removeCommitHandler(DataCommitHandlerRegistration<P, D> registration) {
96         commitHandlers.remove(registration.path, registration);
97     }
98
99     protected final def getActiveCommitHandlers() {
100         return commitHandlers.entries;
101     }
102
103     protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(
104         HashSet<P> paths) {
105         return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [
106             val operationalState = readOperationalData(key)
107             val configurationState = readConfigurationData(key)
108             return new ListenerStateCapture(key, value, operationalState, configurationState)
109         ].toList()
110     }
111
112     protected def boolean isAffectedBy(P key, Set<P> paths) {
113         if (paths.contains(key)) {
114             return true;
115         }
116         for (path : paths) {
117             if (key.contains(path)) {
118                 return true;
119             }
120         }
121
122         return false;
123     }
124
125     package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P, D> transaction) {
126         checkNotNull(transaction);
127         transaction.changeStatus(TransactionStatus.SUBMITED);
128         val task = new TwoPhaseCommit(transaction, this);
129         return executor.submit(task);
130     }
131
132 }
133
134 @Data
135 package class ListenerStateCapture<P extends Path<P>, D,DCL extends DataChangeListener<P, D>> {
136
137     @Property
138     P path;
139
140     @Property
141     Collection<DataChangeListenerRegistration<P, D, DCL>> listeners;
142
143     @Property
144     D initialOperationalState;
145
146     @Property
147     D initialConfigurationState;
148 }
149
150 package class DataChangeListenerRegistration<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {
151
152     AbstractDataBroker<P, D, DCL> dataBroker;
153
154     @Property
155     val P path;
156
157     new(P path, DCL instance, AbstractDataBroker<P, D, DCL> broker) {
158         super(instance)
159         dataBroker = broker;
160         _path = path;
161     }
162
163     override protected removeRegistration() {
164         dataBroker.removeListener(this);
165         dataBroker = null;
166     }
167
168 }
169
170 package class DataCommitHandlerRegistration<P extends Path<P>, D> extends AbstractObjectRegistration<DataCommitHandler<P, D>> {
171
172     AbstractDataBroker<P, D, ?> dataBroker;
173
174     @Property
175     val P path;
176
177     new(P path, DataCommitHandler<P, D> instance, AbstractDataBroker<P, D, ?> broker) {
178         super(instance)
179         dataBroker = broker;
180         _path = path;
181     }
182
183     override protected removeRegistration() {
184         dataBroker.removeCommitHandler(this);
185         dataBroker = null;
186     }
187
188 }
189
190 package class TwoPhaseCommit<P extends Path<P>, D,DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {
191
192     private static val log = LoggerFactory.getLogger(TwoPhaseCommit);
193
194     val AbstractDataTransaction<P, D> transaction;
195     val AbstractDataBroker<P, D, DCL> dataBroker;
196
197     new(AbstractDataTransaction<P, D> transaction, AbstractDataBroker<P, D, DCL> broker) {
198         this.transaction = transaction;
199         this.dataBroker = broker;
200     }
201
202     override call() throws Exception {
203
204         // get affected paths
205         val affectedPaths = new HashSet<P>();
206
207         affectedPaths.addAll(transaction.createdConfigurationData.keySet);
208         affectedPaths.addAll(transaction.updatedConfigurationData.keySet);
209         affectedPaths.addAll(transaction.removedConfigurationData);
210
211         affectedPaths.addAll(transaction.createdOperationalData.keySet);
212         affectedPaths.addAll(transaction.updatedOperationalData.keySet);
213         affectedPaths.addAll(transaction.removedOperationalData);
214
215         val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);
216
217         // requesting commits
218         val Iterable<DataCommitHandler<P, D>> commitHandlers =   dataBroker.affectedCommitHandlers(affectedPaths);
219         val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();
220         try {
221             for (handler : commitHandlers) {
222                 handlerTransactions.add(handler.requestCommit(transaction));
223             }
224         } catch (Exception e) {
225             log.error("Request Commit failded", e);
226             return rollback(handlerTransactions, e);
227         }
228         val List<RpcResult<Void>> results = new ArrayList();
229         try {
230             for (subtransaction : handlerTransactions) {
231                 results.add(subtransaction.finish());
232             }
233             listeners.publishDataChangeEvent();
234         } catch (Exception e) {
235             log.error("Finish Commit failed", e);
236             return rollback(handlerTransactions, e);
237         }
238
239         
240         return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());
241
242     }
243     
244     def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D,DCL>> listeners) {
245         for(listenerSet : listeners) {
246             val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);
247             val updatedOperational = dataBroker.readOperationalData(listenerSet.path);
248             
249             val changeEvent = new DataChangeEventImpl(transaction,listenerSet.initialConfigurationState,listenerSet.initialOperationalState,updatedOperational,updatedConfiguration);
250             for(listener : listenerSet.listeners) {
251                 try {
252                     listener.instance.onDataChanged(changeEvent);
253                     
254                 } catch (Exception e) {
255                     e.printStackTrace();
256                 }
257             }
258         }
259     }
260
261     def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {
262         for (transaction : transactions) {
263             transaction.rollback()
264         }
265
266         // FIXME return encountered error.
267         return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());
268     }
269 }
270 public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {
271
272     @Property
273     private val Object identifier;
274
275     var TransactionStatus status;
276
277     var AbstractDataBroker<P, D, ?> broker;
278
279     protected new(AbstractDataBroker<P, D, ?> dataBroker) {
280         super(dataBroker);
281         _identifier = new Object();
282         broker = dataBroker;
283         status = TransactionStatus.NEW;
284
285     //listeners = new ListenerRegistry<>();
286     }
287
288     override commit() {
289         return broker.commit(this);
290     }
291
292     override readConfigurationData(P path) {
293         return broker.readConfigurationData(path);
294     }
295
296     override readOperationalData(P path) {
297         return broker.readOperationalData(path);
298     }
299
300     override hashCode() {
301         return identifier.hashCode;
302     }
303
304     override equals(Object obj) {
305         if (this === obj)
306             return true;
307         if (obj == null)
308             return false;
309         if (getClass() != obj.getClass())
310             return false;
311         val other = (obj as AbstractDataTransaction<P,D>);
312         if (broker == null) {
313             if (other.broker != null)
314                 return false;
315         } else if (!broker.equals(other.broker))
316             return false;
317         if (identifier == null) {
318             if (other.identifier != null)
319                 return false;
320         } else if (!identifier.equals(other.identifier))
321             return false;
322         return true;
323     }
324
325     override TransactionStatus getStatus() {
326         return status;
327     }
328
329     protected abstract def void onStatusChange(TransactionStatus status);
330
331     public def changeStatus(TransactionStatus status) {
332         this.status = status;
333         onStatusChange(status);
334     }
335
336 }