1 package org.opendaylight.controller.md.sal.common.impl.service
3 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
4 import org.opendaylight.controller.md.sal.common.api.TransactionStatus
5 import org.opendaylight.controller.md.sal.common.api.data.DataReader
6 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
7 import org.opendaylight.yangtools.concepts.ListenerRegistration
8 import com.google.common.collect.Multimap
9 import static com.google.common.base.Preconditions.*;
11 import com.google.common.collect.HashMultimap
12 import java.util.concurrent.ExecutorService
13 import java.util.concurrent.Callable
14 import org.opendaylight.yangtools.yang.common.RpcResult
15 import java.util.Collections
16 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction
17 import java.util.ArrayList
18 import org.opendaylight.yangtools.concepts.CompositeObjectRegistration
19 import java.util.Arrays
20 import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService
21 import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory
22 import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher
23 import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener
24 import org.opendaylight.controller.sal.common.util.Rpcs
25 import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification
26 import java.util.concurrent.Future
27 import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter
28 import org.opendaylight.yangtools.concepts.Path
29 import org.slf4j.LoggerFactory
30 import java.util.HashSet
31 import java.util.Map.Entry
32 import java.util.Iterator
33 import java.util.Collection
34 import com.google.common.collect.FluentIterable;
36 import com.google.common.collect.ImmutableList
37 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration
38 import org.opendaylight.controller.md.sal.common.api.RegistrationListener
39 import org.opendaylight.yangtools.concepts.util.ListenerRegistry
40 import java.util.concurrent.atomic.AtomicLong
41 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent
43 abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //
45 DataChangePublisher<P, D, DCL>, //
46 DataProvisionService<P, D> {
48 private static val LOG = LoggerFactory.getLogger(AbstractDataBroker);
51 var ExecutorService executor;
54 var AbstractDataReadRouter<P, D> dataReadRouter;
56 Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = HashMultimap.create();
57 Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = HashMultimap.create();
59 val ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P,D>>> commitHandlerRegistrationListeners = new ListenerRegistry();
63 protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(
65 return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //
66 .transformAndConcat[value] //
67 .transform[instance].toList()
70 override final readConfigurationData(P path) {
71 return dataReadRouter.readConfigurationData(path);
74 override final readOperationalData(P path) {
75 return dataReadRouter.readOperationalData(path);
78 override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {
79 val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);
80 commitHandlers.put(path, registration)
81 LOG.info("Registering Commit Handler {} for path: {}",commitHandler,path);
82 for(listener : commitHandlerRegistrationListeners) {
84 listener.instance.onRegister(registration);
85 } catch (Exception e) {
86 LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);
92 override final def registerDataChangeListener(P path, DCL listener) {
93 val reg = new DataChangeListenerRegistration(path, listener, this);
94 listeners.put(path, reg);
95 val initialConfig = dataReadRouter.readConfigurationData(path);
96 val initialOperational = dataReadRouter.readOperationalData(path);
97 val event = createInitialListenerEvent(path,initialConfig,initialOperational);
98 listener.onDataChanged(event);
102 final def registerDataReader(P path, DataReader<P, D> reader) {
104 val confReg = dataReadRouter.registerConfigurationReader(path, reader);
105 val dataReg = dataReadRouter.registerOperationalReader(path, reader);
107 return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
110 override registerCommitHandlerListener(RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {
111 val ret = commitHandlerRegistrationListeners.register(commitHandlerListener);
116 protected def DataChangeEvent<P,D> createInitialListenerEvent(P path,D initialConfig,D initialOperational) {
117 return new InitialDataChangeEventImpl<P, D>(initialConfig,initialOperational);
121 protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {
122 listeners.remove(registration.path, registration);
125 protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl<P, D> registration) {
126 commitHandlers.remove(registration.path, registration);
128 LOG.info("Removing Commit Handler {} for path: {}",registration.instance,registration.path);
129 for(listener : commitHandlerRegistrationListeners) {
131 listener.instance.onUnregister(registration);
132 } catch (Exception e) {
133 LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);
138 protected final def getActiveCommitHandlers() {
139 return commitHandlers.entries;
142 protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(
144 return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [
145 val operationalState = readOperationalData(key)
146 val configurationState = readConfigurationData(key)
147 return new ListenerStateCapture(key, value, operationalState, configurationState)
151 protected def boolean isAffectedBy(P key, Set<P> paths) {
152 if (paths.contains(key)) {
156 if (key.contains(path)) {
164 package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P, D> transaction) {
165 checkNotNull(transaction);
166 transaction.changeStatus(TransactionStatus.SUBMITED);
167 val task = new TwoPhaseCommit(transaction, this);
168 return executor.submit(task);
174 package class ListenerStateCapture<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> {
180 Collection<DataChangeListenerRegistration<P, D, DCL>> listeners;
183 D initialOperationalState;
186 D initialConfigurationState;
189 package class DataChangeListenerRegistration<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {
191 AbstractDataBroker<P, D, DCL> dataBroker;
196 new(P path, DCL instance, AbstractDataBroker<P, D, DCL> broker) {
202 override protected removeRegistration() {
203 dataBroker.removeListener(this);
209 package class DataCommitHandlerRegistrationImpl<P extends Path<P>, D> //
210 extends AbstractObjectRegistration<DataCommitHandler<P, D>> //
211 implements DataCommitHandlerRegistration<P, D> {
213 AbstractDataBroker<P, D, ?> dataBroker;
218 new(P path, DataCommitHandler<P, D> instance, AbstractDataBroker<P, D, ?> broker) {
224 override protected removeRegistration() {
225 dataBroker.removeCommitHandler(this);
230 package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {
232 private static val log = LoggerFactory.getLogger(TwoPhaseCommit);
234 val AbstractDataTransaction<P, D> transaction;
235 val AbstractDataBroker<P, D, DCL> dataBroker;
237 new(AbstractDataTransaction<P, D> transaction, AbstractDataBroker<P, D, DCL> broker) {
238 this.transaction = transaction;
239 this.dataBroker = broker;
242 override call() throws Exception {
244 // get affected paths
245 val affectedPaths = new HashSet<P>();
247 affectedPaths.addAll(transaction.createdConfigurationData.keySet);
248 affectedPaths.addAll(transaction.updatedConfigurationData.keySet);
249 affectedPaths.addAll(transaction.removedConfigurationData);
251 affectedPaths.addAll(transaction.createdOperationalData.keySet);
252 affectedPaths.addAll(transaction.updatedOperationalData.keySet);
253 affectedPaths.addAll(transaction.removedOperationalData);
255 val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);
257 val transactionId = transaction.identifier;
259 log.info("Transaction: {} Started.",transactionId);
260 // requesting commits
261 val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);
262 val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();
264 for (handler : commitHandlers) {
265 handlerTransactions.add(handler.requestCommit(transaction));
267 } catch (Exception e) {
268 log.error("Transaction: {} Request Commit failed", transactionId,e);
269 return rollback(handlerTransactions, e);
271 val List<RpcResult<Void>> results = new ArrayList();
273 for (subtransaction : handlerTransactions) {
274 results.add(subtransaction.finish());
276 listeners.publishDataChangeEvent();
277 } catch (Exception e) {
278 log.error("Transaction: {} Finish Commit failed",transactionId, e);
279 return rollback(handlerTransactions, e);
281 log.info("Transaction: {} Finished succesfully.",transactionId);
282 return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());
286 def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
287 for (listenerSet : listeners) {
288 val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);
289 val updatedOperational = dataBroker.readOperationalData(listenerSet.path);
291 val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,
292 listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);
293 for (listener : listenerSet.listeners) {
295 listener.instance.onDataChanged(changeEvent);
297 } catch (Exception e) {
304 def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {
305 for (transaction : transactions) {
306 transaction.rollback()
309 // FIXME return encountered error.
310 return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());
314 public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {
317 private val Object identifier;
319 var TransactionStatus status;
321 var AbstractDataBroker<P, D, ?> broker;
323 protected new(Object identifier,AbstractDataBroker<P, D, ?> dataBroker) {
325 _identifier = identifier;
327 status = TransactionStatus.NEW;
329 //listeners = new ListenerRegistry<>();
333 return broker.commit(this);
336 override readConfigurationData(P path) {
337 return broker.readConfigurationData(path);
340 override readOperationalData(P path) {
341 return broker.readOperationalData(path);
344 override hashCode() {
345 return identifier.hashCode;
348 override equals(Object obj) {
353 if (getClass() != obj.getClass())
355 val other = (obj as AbstractDataTransaction<P,D>);
356 if (broker == null) {
357 if (other.broker != null)
359 } else if (!broker.equals(other.broker))
361 if (identifier == null) {
362 if (other.identifier != null)
364 } else if (!identifier.equals(other.identifier))
369 override TransactionStatus getStatus() {
373 protected abstract def void onStatusChange(TransactionStatus status);
375 public def changeStatus(TransactionStatus status) {
376 this.status = status;
377 onStatusChange(status);