1 package org.opendaylight.controller.md.sal.common.impl.service
3 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
4 import org.opendaylight.controller.md.sal.common.api.TransactionStatus
5 import org.opendaylight.controller.md.sal.common.api.data.DataReader
6 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
7 import org.opendaylight.yangtools.concepts.ListenerRegistration
8 import com.google.common.collect.Multimap
9 import static com.google.common.base.Preconditions.*;
11 import com.google.common.collect.HashMultimap
12 import java.util.concurrent.ExecutorService
13 import java.util.concurrent.Callable
14 import org.opendaylight.yangtools.yang.common.RpcResult
15 import java.util.Collections
16 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction
17 import java.util.ArrayList
18 import org.opendaylight.yangtools.concepts.CompositeObjectRegistration
19 import java.util.Arrays
20 import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService
21 import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory
22 import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher
23 import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener
24 import org.opendaylight.controller.sal.common.util.Rpcs
25 import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification
26 import java.util.concurrent.Future
27 import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter
28 import org.opendaylight.yangtools.concepts.Path
29 import org.slf4j.LoggerFactory
30 import java.util.HashSet
31 import java.util.Map.Entry
32 import java.util.Iterator
33 import java.util.Collection
34 import com.google.common.collect.FluentIterable;
36 import com.google.common.collect.ImmutableList
38 abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //
40 DataChangePublisher<P, D, DCL>, //
41 DataProvisionService<P, D> {
44 var ExecutorService executor;
47 var AbstractDataReadRouter<P, D> dataReadRouter;
49 Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = HashMultimap.create();
50 Multimap<P, DataCommitHandlerRegistration<P, D>> commitHandlers = HashMultimap.create();
55 protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(
57 return FluentIterable.from(commitHandlers.asMap.entrySet)
58 .filter[key.isAffectedBy(paths)] //
59 .transformAndConcat [value] //
60 .transform[instance].toList()
63 override final readConfigurationData(P path) {
64 return dataReadRouter.readConfigurationData(path);
67 override final readOperationalData(P path) {
68 return dataReadRouter.readOperationalData(path);
71 override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {
72 val registration = new DataCommitHandlerRegistration(path, commitHandler, this);
73 commitHandlers.put(path, registration)
77 override final def registerDataChangeListener(P path, DCL listener) {
78 val reg = new DataChangeListenerRegistration(path, listener, this);
79 listeners.put(path, reg);
83 final def registerDataReader(P path, DataReader<P, D> reader) {
85 val confReg = dataReadRouter.registerConfigurationReader(path, reader);
86 val dataReg = dataReadRouter.registerOperationalReader(path, reader);
88 return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
91 protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {
92 listeners.remove(registration.path, registration);
95 protected final def removeCommitHandler(DataCommitHandlerRegistration<P, D> registration) {
96 commitHandlers.remove(registration.path, registration);
99 protected final def getActiveCommitHandlers() {
100 return commitHandlers.entries;
103 protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(
105 return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [
106 val operationalState = readOperationalData(key)
107 val configurationState = readConfigurationData(key)
108 return new ListenerStateCapture(key, value, operationalState, configurationState)
112 protected def boolean isAffectedBy(P key, Set<P> paths) {
113 if (paths.contains(key)) {
117 if (key.contains(path)) {
125 package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P, D> transaction) {
126 checkNotNull(transaction);
127 transaction.changeStatus(TransactionStatus.SUBMITED);
128 val task = new TwoPhaseCommit(transaction, this);
129 return executor.submit(task);
135 package class ListenerStateCapture<P extends Path<P>, D,DCL extends DataChangeListener<P, D>> {
141 Collection<DataChangeListenerRegistration<P, D, DCL>> listeners;
144 D initialOperationalState;
147 D initialConfigurationState;
150 package class DataChangeListenerRegistration<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {
152 AbstractDataBroker<P, D, DCL> dataBroker;
157 new(P path, DCL instance, AbstractDataBroker<P, D, DCL> broker) {
163 override protected removeRegistration() {
164 dataBroker.removeListener(this);
170 package class DataCommitHandlerRegistration<P extends Path<P>, D> extends AbstractObjectRegistration<DataCommitHandler<P, D>> {
172 AbstractDataBroker<P, D, ?> dataBroker;
177 new(P path, DataCommitHandler<P, D> instance, AbstractDataBroker<P, D, ?> broker) {
183 override protected removeRegistration() {
184 dataBroker.removeCommitHandler(this);
190 package class TwoPhaseCommit<P extends Path<P>, D,DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {
192 private static val log = LoggerFactory.getLogger(TwoPhaseCommit);
194 val AbstractDataTransaction<P, D> transaction;
195 val AbstractDataBroker<P, D, DCL> dataBroker;
197 new(AbstractDataTransaction<P, D> transaction, AbstractDataBroker<P, D, DCL> broker) {
198 this.transaction = transaction;
199 this.dataBroker = broker;
202 override call() throws Exception {
204 // get affected paths
205 val affectedPaths = new HashSet<P>();
207 affectedPaths.addAll(transaction.createdConfigurationData.keySet);
208 affectedPaths.addAll(transaction.updatedConfigurationData.keySet);
209 affectedPaths.addAll(transaction.removedConfigurationData);
211 affectedPaths.addAll(transaction.createdOperationalData.keySet);
212 affectedPaths.addAll(transaction.updatedOperationalData.keySet);
213 affectedPaths.addAll(transaction.removedOperationalData);
215 val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);
217 // requesting commits
218 val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);
219 val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();
221 for (handler : commitHandlers) {
222 handlerTransactions.add(handler.requestCommit(transaction));
224 } catch (Exception e) {
225 log.error("Request Commit failded", e);
226 return rollback(handlerTransactions, e);
228 val List<RpcResult<Void>> results = new ArrayList();
230 for (subtransaction : handlerTransactions) {
231 results.add(subtransaction.finish());
233 listeners.publishDataChangeEvent();
234 } catch (Exception e) {
235 log.error("Finish Commit failed", e);
236 return rollback(handlerTransactions, e);
240 return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());
244 def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D,DCL>> listeners) {
245 for(listenerSet : listeners) {
246 val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);
247 val updatedOperational = dataBroker.readOperationalData(listenerSet.path);
249 val changeEvent = new DataChangeEventImpl(transaction,listenerSet.initialConfigurationState,listenerSet.initialOperationalState,updatedOperational,updatedConfiguration);
250 for(listener : listenerSet.listeners) {
252 listener.instance.onDataChanged(changeEvent);
254 } catch (Exception e) {
261 def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {
262 for (transaction : transactions) {
263 transaction.rollback()
266 // FIXME return encountered error.
267 return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());
270 public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {
273 private val Object identifier;
275 var TransactionStatus status;
277 var AbstractDataBroker<P, D, ?> broker;
279 protected new(AbstractDataBroker<P, D, ?> dataBroker) {
281 _identifier = new Object();
283 status = TransactionStatus.NEW;
285 //listeners = new ListenerRegistry<>();
289 return broker.commit(this);
292 override readConfigurationData(P path) {
293 return broker.readConfigurationData(path);
296 override readOperationalData(P path) {
297 return broker.readOperationalData(path);
300 override hashCode() {
301 return identifier.hashCode;
304 override equals(Object obj) {
309 if (getClass() != obj.getClass())
311 val other = (obj as AbstractDataTransaction<P,D>);
312 if (broker == null) {
313 if (other.broker != null)
315 } else if (!broker.equals(other.broker))
317 if (identifier == null) {
318 if (other.identifier != null)
320 } else if (!identifier.equals(other.identifier))
325 override TransactionStatus getStatus() {
329 protected abstract def void onStatusChange(TransactionStatus status);
331 public def changeStatus(TransactionStatus status) {
332 this.status = status;
333 onStatusChange(status);