1 package org.opendaylight.controller.md.sal.common.impl.service
\r
3 import com.google.common.collect.FluentIterable
\r
4 import com.google.common.collect.HashMultimap
\r
5 import com.google.common.collect.ImmutableList
\r
6 import com.google.common.collect.Multimap
\r
7 import java.util.ArrayList
\r
8 import java.util.Arrays
\r
9 import java.util.Collection
\r
10 import java.util.Collections
\r
11 import java.util.HashSet
\r
12 import java.util.List
\r
13 import java.util.Set
\r
14 import java.util.concurrent.Callable
\r
15 import java.util.concurrent.ExecutorService
\r
16 import java.util.concurrent.Future
\r
17 import java.util.concurrent.atomic.AtomicLong
\r
18 import org.opendaylight.controller.md.sal.common.api.RegistrationListener
\r
19 import org.opendaylight.controller.md.sal.common.api.TransactionStatus
\r
20 import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener
\r
21 import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher
\r
22 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
\r
23 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction
\r
24 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration
\r
25 import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory
\r
26 import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService
\r
27 import org.opendaylight.controller.md.sal.common.api.data.DataReader
\r
28 import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification
\r
29 import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter
\r
30 import org.opendaylight.controller.sal.common.util.Rpcs
\r
31 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
\r
32 import org.opendaylight.yangtools.concepts.CompositeObjectRegistration
\r
33 import org.opendaylight.yangtools.concepts.ListenerRegistration
\r
34 import org.opendaylight.yangtools.concepts.Path
\r
35 import org.opendaylight.yangtools.concepts.util.ListenerRegistry
\r
36 import org.opendaylight.yangtools.yang.common.RpcResult
\r
37 import org.slf4j.LoggerFactory
\r
39 import static com.google.common.base.Preconditions.*
\rimport org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent
41 abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //
\r
42 DataReader<P, D>, //
\r
43 DataChangePublisher<P, D, DCL>, //
\r
44 DataProvisionService<P, D> {
\r
46 private static val LOG = LoggerFactory.getLogger(AbstractDataBroker);
\r
49 var ExecutorService executor;
\r
52 var AbstractDataReadRouter<P, D> dataReadRouter;
\r
55 private val AtomicLong submittedTransactionsCount = new AtomicLong;
\r
58 private val AtomicLong failedTransactionsCount = new AtomicLong
\r
61 private val AtomicLong finishedTransactionsCount = new AtomicLong
\r
63 Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = HashMultimap.create();
\r
64 Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = HashMultimap.create();
\r
66 val ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P,D>>> commitHandlerRegistrationListeners = new ListenerRegistry();
\r
70 protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(
\r
72 return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //
\r
73 .transformAndConcat[value] //
\r
74 .transform[instance].toList()
\r
77 override final readConfigurationData(P path) {
\r
78 return dataReadRouter.readConfigurationData(path);
\r
81 override final readOperationalData(P path) {
\r
82 return dataReadRouter.readOperationalData(path);
\r
85 override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {
\r
86 val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);
\r
87 commitHandlers.put(path, registration)
\r
88 LOG.trace("Registering Commit Handler {} for path: {}",commitHandler,path);
\r
89 for(listener : commitHandlerRegistrationListeners) {
\r
91 listener.instance.onRegister(registration);
\r
92 } catch (Exception e) {
\r
93 LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);
\r
96 return registration;
\r
99 override final def registerDataChangeListener(P path, DCL listener) {
\r
100 val reg = new DataChangeListenerRegistration(path, listener, this);
\r
101 listeners.put(path, reg);
\r
102 val initialConfig = dataReadRouter.readConfigurationData(path);
\r
103 val initialOperational = dataReadRouter.readOperationalData(path);
\r
104 val event = createInitialListenerEvent(path,initialConfig,initialOperational);
\r
105 listener.onDataChanged(event);
\r
109 final def registerDataReader(P path, DataReader<P, D> reader) {
\r
111 val confReg = dataReadRouter.registerConfigurationReader(path, reader);
\r
112 val dataReg = dataReadRouter.registerOperationalReader(path, reader);
\r
114 return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
\r
117 override registerCommitHandlerListener(RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {
\r
118 val ret = commitHandlerRegistrationListeners.register(commitHandlerListener);
\r
123 protected def DataChangeEvent<P,D> createInitialListenerEvent(P path,D initialConfig,D initialOperational) {
\r
124 return new InitialDataChangeEventImpl<P, D>(initialConfig,initialOperational);
\r
128 protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {
\r
129 listeners.remove(registration.path, registration);
\r
132 protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl<P, D> registration) {
\r
133 commitHandlers.remove(registration.path, registration);
\r
135 LOG.trace("Removing Commit Handler {} for path: {}",registration.instance,registration.path);
\r
136 for(listener : commitHandlerRegistrationListeners) {
\r
138 listener.instance.onUnregister(registration);
\r
139 } catch (Exception e) {
\r
140 LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);
\r
145 protected final def getActiveCommitHandlers() {
\r
146 return commitHandlers.entries;
\r
149 protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(
\r
150 HashSet<P> paths) {
\r
151 return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [
\r
152 val operationalState = readOperationalData(key)
\r
153 val configurationState = readConfigurationData(key)
\r
154 return new ListenerStateCapture(key, value, operationalState, configurationState)
\r
158 protected def boolean isAffectedBy(P key, Set<P> paths) {
\r
159 if (paths.contains(key)) {
\r
162 for (path : paths) {
\r
163 if (key.contains(path)) {
\r
171 package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P, D> transaction) {
\r
172 checkNotNull(transaction);
\r
173 transaction.changeStatus(TransactionStatus.SUBMITED);
\r
174 val task = new TwoPhaseCommit(transaction, this);
\r
175 submittedTransactionsCount.andIncrement;
\r
176 return executor.submit(task);
\r
182 package class ListenerStateCapture<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> {
\r
188 Collection<DataChangeListenerRegistration<P, D, DCL>> listeners;
\r
191 D initialOperationalState;
\r
194 D initialConfigurationState;
\r
197 package class DataChangeListenerRegistration<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {
\r
199 AbstractDataBroker<P, D, DCL> dataBroker;
\r
204 new(P path, DCL instance, AbstractDataBroker<P, D, DCL> broker) {
\r
206 dataBroker = broker;
\r
210 override protected removeRegistration() {
\r
211 dataBroker.removeListener(this);
\r
217 package class DataCommitHandlerRegistrationImpl<P extends Path<P>, D> //
\r
218 extends AbstractObjectRegistration<DataCommitHandler<P, D>> //
\r
219 implements DataCommitHandlerRegistration<P, D> {
\r
221 AbstractDataBroker<P, D, ?> dataBroker;
\r
226 new(P path, DataCommitHandler<P, D> instance, AbstractDataBroker<P, D, ?> broker) {
\r
228 dataBroker = broker;
\r
232 override protected removeRegistration() {
\r
233 dataBroker.removeCommitHandler(this);
\r
238 package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {
\r
240 private static val log = LoggerFactory.getLogger(TwoPhaseCommit);
\r
242 val AbstractDataTransaction<P, D> transaction;
\r
243 val AbstractDataBroker<P, D, DCL> dataBroker;
\r
245 new(AbstractDataTransaction<P, D> transaction, AbstractDataBroker<P, D, DCL> broker) {
\r
246 this.transaction = transaction;
\r
247 this.dataBroker = broker;
\r
250 override call() throws Exception {
\r
252 // get affected paths
\r
253 val affectedPaths = new HashSet<P>();
\r
255 affectedPaths.addAll(transaction.createdConfigurationData.keySet);
\r
256 affectedPaths.addAll(transaction.updatedConfigurationData.keySet);
\r
257 affectedPaths.addAll(transaction.removedConfigurationData);
\r
259 affectedPaths.addAll(transaction.createdOperationalData.keySet);
\r
260 affectedPaths.addAll(transaction.updatedOperationalData.keySet);
\r
261 affectedPaths.addAll(transaction.removedOperationalData);
\r
263 val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);
\r
265 val transactionId = transaction.identifier;
\r
267 log.trace("Transaction: {} Started.",transactionId);
\r
268 // requesting commits
\r
269 val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);
\r
270 val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();
\r
272 for (handler : commitHandlers) {
\r
273 handlerTransactions.add(handler.requestCommit(transaction));
\r
275 } catch (Exception e) {
\r
276 log.error("Transaction: {} Request Commit failed", transactionId,e);
\r
277 dataBroker.failedTransactionsCount.andIncrement
\r
278 return rollback(handlerTransactions, e);
\r
280 val List<RpcResult<Void>> results = new ArrayList();
\r
282 for (subtransaction : handlerTransactions) {
\r
283 results.add(subtransaction.finish());
\r
285 listeners.publishDataChangeEvent();
\r
286 } catch (Exception e) {
\r
287 log.error("Transaction: {} Finish Commit failed",transactionId, e);
\r
288 dataBroker.failedTransactionsCount.andIncrement
\r
289 return rollback(handlerTransactions, e);
\r
291 log.trace("Transaction: {} Finished successfully.",transactionId);
\r
292 dataBroker.finishedTransactionsCount.andIncrement;
\r
293 return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());
\r
297 def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
\r
298 for (listenerSet : listeners) {
\r
299 val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);
\r
300 val updatedOperational = dataBroker.readOperationalData(listenerSet.path);
\r
302 val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,
\r
303 listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);
\r
304 for (listener : listenerSet.listeners) {
\r
306 listener.instance.onDataChanged(changeEvent);
\r
308 } catch (Exception e) {
\r
309 e.printStackTrace();
\r
315 def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {
\r
316 for (transaction : transactions) {
\r
317 transaction.rollback()
\r
320 // FIXME return encountered error.
\r
321 return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());
\r
325 public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {
\r
327 private static val LOG = LoggerFactory.getLogger(AbstractDataTransaction);
330 private val Object identifier;
\r
332 var TransactionStatus status;
\r
334 var AbstractDataBroker<P, D, ?> broker;
\r
336 protected new(Object identifier,AbstractDataBroker<P, D, ?> dataBroker) {
\r
338 _identifier = identifier;
\r
339 broker = dataBroker;
\r
340 status = TransactionStatus.NEW;
\r
341 LOG.debug("Transaction {} Allocated.", identifier);
343 //listeners = new ListenerRegistry<>();
\r
346 override commit() {
\r
347 return broker.commit(this);
\r
350 override readConfigurationData(P path) {
\r
351 val local = this.updatedConfigurationData.get(path);
\r
352 if(local != null) {
\r
356 return broker.readConfigurationData(path);
\r
359 override readOperationalData(P path) {
\r
360 val local = this.updatedOperationalData.get(path);
\r
361 if(local != null) {
\r
364 return broker.readOperationalData(path);
\r
367 override hashCode() {
\r
368 return identifier.hashCode;
\r
371 override equals(Object obj) {
\r
376 if (getClass() != obj.getClass())
\r
378 val other = (obj as AbstractDataTransaction<P,D>);
\r
379 if (broker == null) {
\r
380 if (other.broker != null)
\r
382 } else if (!broker.equals(other.broker))
\r
384 if (identifier == null) {
\r
385 if (other.identifier != null)
\r
387 } else if (!identifier.equals(other.identifier))
\r
392 override TransactionStatus getStatus() {
\r
396 protected abstract def void onStatusChange(TransactionStatus status);
\r
398 public def changeStatus(TransactionStatus status) {
\r
399 LOG.debug("Transaction {} transitioned from {} to {}", identifier, this.status, status);
400 this.status = status;
\r
401 onStatusChange(status);
\r