Added DELETE support for Bridge and Port resources
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / sal / binding / impl / DataBrokerImpl.xtend
1 package org.opendaylight.controller.sal.binding.impl
2
3 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
4 import org.opendaylight.controller.sal.binding.api.data.DataChangeListener
5 import org.opendaylight.controller.sal.binding.api.data.DataProviderService
6 import org.opendaylight.yangtools.yang.binding.DataObject
7 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier
8 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction
9 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction.DataTransactionListener
10 import org.opendaylight.controller.md.sal.common.api.TransactionStatus
11 import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification
12 import org.opendaylight.controller.md.sal.common.api.data.DataReader
13 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
14 import org.opendaylight.yangtools.concepts.ListenerRegistration
15 import static extension org.opendaylight.controller.sal.binding.impl.util.MapUtils.*;
16 import java.util.Collection
17 import java.util.Map.Entry
18 import java.util.HashSet
19 import java.util.Set
20 import com.google.common.collect.Multimap
21 import static com.google.common.base.Preconditions.*;
22 import java.util.List
23 import java.util.LinkedList
24 import org.opendaylight.controller.sal.binding.api.data.RuntimeDataProvider
25 import com.google.common.collect.HashMultimap
26 import java.util.concurrent.ExecutorService
27 import java.util.concurrent.Callable
28 import org.opendaylight.yangtools.yang.common.RpcResult
29 import org.opendaylight.controller.sal.common.util.Rpcs
30 import java.util.Collections
31 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction
32 import java.util.ArrayList
33 import org.opendaylight.controller.sal.common.util.RpcErrors
34
35 class DataBrokerImpl extends DeprecatedDataAPISupport implements DataProviderService {
36
37     @Property
38     var ExecutorService executor;
39
40     Multimap<InstanceIdentifier, DataReaderRegistration> configReaders = HashMultimap.create();
41     Multimap<InstanceIdentifier, DataReaderRegistration> operationalReaders = HashMultimap.create();
42     Multimap<InstanceIdentifier, DataChangeListenerRegistration> listeners = HashMultimap.create();
43     Multimap<InstanceIdentifier, DataCommitHandlerRegistration> commitHandlers = HashMultimap.create();
44
45     override beginTransaction() {
46         return new DataTransactionImpl(this);
47     }
48
49     override readConfigurationData(InstanceIdentifier<? extends DataObject> path) {
50         val readers = configReaders.getAllChildren(path);
51         return readers.readConfiguration(path);
52     }
53
54     override readOperationalData(InstanceIdentifier<? extends DataObject> path) {
55         val readers = operationalReaders.getAllChildren(path);
56         return readers.readOperational(path);
57     }
58
59     override registerCommitHandler(InstanceIdentifier<? extends DataObject> path,
60         DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> commitHandler) {
61             val registration = new DataCommitHandlerRegistration(path,commitHandler,this);
62             commitHandlers.put(path,registration)
63             return registration;
64     }
65
66     override registerDataChangeListener(InstanceIdentifier<? extends DataObject> path, DataChangeListener listener) {
67         val reg = new DataChangeListenerRegistration(path, listener, this);
68         listeners.put(path, reg);
69         return reg;
70     }
71
72     override registerDataReader(InstanceIdentifier<? extends DataObject> path,
73         DataReader<InstanceIdentifier<? extends DataObject>, DataObject> provider) {
74         val ret = new DataReaderRegistration(provider, this);
75         ret.paths.add(path);
76         configReaders.put(path, ret);
77         operationalReaders.put(path, ret);
78         return ret;
79     }
80
81     protected def removeReader(DataReaderRegistration reader) {
82         for (path : reader.paths) {
83             operationalReaders.remove(path, reader);
84             configReaders.remove(path, reader);
85         }
86     }
87
88     protected def removeListener(DataChangeListenerRegistration registration) {
89         listeners.remove(registration.path, registration);
90     }
91
92     protected def removeCommitHandler(DataCommitHandlerRegistration registration) {
93         commitHandlers.remove(registration.path, registration);
94     }
95
96     protected def DataObject readConfiguration(
97         Collection<Entry<? extends InstanceIdentifier, ? extends DataReaderRegistration>> entries,
98         InstanceIdentifier<? extends DataObject> path) {
99
100         val List<DataObject> partialResults = new LinkedList();
101         for (entry : entries) {
102             partialResults.add(entry.value.instance.readConfigurationData(path))
103         }
104         return merge(path, partialResults);
105     }
106
107     protected def DataObject readOperational(
108         Collection<Entry<? extends InstanceIdentifier, ? extends DataReaderRegistration>> entries,
109         InstanceIdentifier<? extends DataObject> path) {
110
111         val List<DataObject> partialResults = new LinkedList();
112         for (entry : entries) {
113             partialResults.add(entry.value.instance.readOperationalData(path))
114         }
115         return merge(path, partialResults);
116     }
117
118     protected def DataObject merge(InstanceIdentifier<? extends DataObject> identifier, List<DataObject> objects) {
119
120         // FIXME: implement real merge
121         if (objects.size > 0) {
122             return objects.get(0);
123         }
124     }
125     
126     protected def getActiveCommitHandlers() {
127         
128         return commitHandlers.entries.map[ value.instance].toSet
129     }
130
131     protected def commit(DataTransactionImpl transaction) {
132         checkNotNull(transaction);
133         transaction.changeStatus(TransactionStatus.SUBMITED);
134         val task = new TwoPhaseCommit(transaction, this);
135         return executor.submit(task);
136     }
137
138 }
139
140 package class DataReaderRegistration extends //
141 AbstractObjectRegistration<DataReader<InstanceIdentifier<? extends DataObject>, DataObject>> {
142
143     DataBrokerImpl dataBroker;
144
145     @Property
146     val Set<InstanceIdentifier<? extends DataObject>> paths;
147
148     new(DataReader<InstanceIdentifier<? extends DataObject>, DataObject> instance, DataBrokerImpl broker) {
149         super(instance)
150         dataBroker = broker;
151         _paths = new HashSet();
152     }
153
154     override protected removeRegistration() {
155         dataBroker.removeReader(this);
156     }
157
158 }
159
160 package class DataChangeListenerRegistration extends AbstractObjectRegistration<DataChangeListener> implements ListenerRegistration<DataChangeListener> {
161
162     DataBrokerImpl dataBroker;
163
164     @Property
165     val InstanceIdentifier<?> path;
166
167     new(InstanceIdentifier<?> path, DataChangeListener instance, DataBrokerImpl broker) {
168         super(instance)
169         dataBroker = broker;
170         _path = path;
171     }
172
173     override protected removeRegistration() {
174         dataBroker.removeListener(this);
175         dataBroker = null;
176     }
177
178 }
179
180 package class DataCommitHandlerRegistration //
181 extends AbstractObjectRegistration<DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject>> {
182
183     DataBrokerImpl dataBroker;
184
185     @Property
186     val InstanceIdentifier<?> path;
187
188     new(InstanceIdentifier<?> path, DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> instance,
189         DataBrokerImpl broker) {
190         super(instance)
191         dataBroker = broker;
192         _path = path;
193     }
194
195     override protected removeRegistration() {
196         dataBroker.removeCommitHandler(this);
197         dataBroker = null;
198     }
199
200 }
201
202 package class TwoPhaseCommit implements Callable<RpcResult<TransactionStatus>> {
203
204     val DataTransactionImpl transaction;
205     val DataBrokerImpl dataBroker;
206
207     new(DataTransactionImpl transaction, DataBrokerImpl broker) {
208         this.transaction = transaction;
209         this.dataBroker = broker;
210     }
211
212     override call() throws Exception {
213
214         val Iterable<DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject>> commitHandlers = dataBroker.activeCommitHandlers;
215
216         // requesting commits
217         val List<DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject>> handlerTransactions = new ArrayList();
218         try {
219             for (handler : commitHandlers) {
220                 handlerTransactions.add(handler.requestCommit(transaction));
221             }
222         } catch (Exception e) {
223             return rollback(handlerTransactions,e);
224         }
225         val List<RpcResult<Void>> results = new ArrayList();
226         try {
227             for (subtransaction : handlerTransactions) {
228                 results.add(subtransaction.finish());
229             }
230         } catch (Exception e) {
231             return rollback(handlerTransactions,e);
232         }
233
234         return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());
235     }
236
237     def rollback(List<DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject>> transactions,Exception e) {
238         for (transaction : transactions) {
239             transaction.rollback()
240         }
241         // FIXME return encoutered error.
242         return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());
243     }
244 }