1 package org.opendaylight.controller.md.sal.common.impl.service
\r
3 import com.google.common.collect.FluentIterable
\r
4 import com.google.common.collect.HashMultimap
\r
5 import com.google.common.collect.ImmutableList
\r
6 import com.google.common.collect.Multimap
\r
7 import java.util.ArrayList
\r
8 import java.util.Arrays
\r
9 import java.util.Collection
\r
10 import java.util.Collections
\r
11 import java.util.HashSet
\r
12 import java.util.List
\r
13 import java.util.Set
\r
14 import java.util.concurrent.Callable
\r
15 import java.util.concurrent.ExecutorService
\r
16 import java.util.concurrent.Future
\r
17 import java.util.concurrent.atomic.AtomicLong
\r
18 import org.opendaylight.controller.md.sal.common.api.RegistrationListener
\r
19 import org.opendaylight.controller.md.sal.common.api.TransactionStatus
\r
20 import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener
\r
21 import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher
\r
22 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
\r
23 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction
\r
24 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration
\r
25 import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory
\r
26 import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService
\r
27 import org.opendaylight.controller.md.sal.common.api.data.DataReader
\r
28 import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification
\r
29 import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter
\r
30 import org.opendaylight.controller.sal.common.util.Rpcs
\r
31 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
\r
32 import org.opendaylight.yangtools.concepts.CompositeObjectRegistration
\r
33 import org.opendaylight.yangtools.concepts.ListenerRegistration
\r
34 import org.opendaylight.yangtools.concepts.Path
\r
35 import org.opendaylight.yangtools.concepts.util.ListenerRegistry
\r
36 import org.opendaylight.yangtools.yang.common.RpcResult
\r
37 import org.slf4j.LoggerFactory
\r
39 import static com.google.common.base.Preconditions.*
\rimport org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent
40 import com.google.common.collect.Multimaps
42 abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //
\r
43 DataReader<P, D>, //
\r
44 DataChangePublisher<P, D, DCL>, //
\r
45 DataProvisionService<P, D> {
\r
47 private static val LOG = LoggerFactory.getLogger(AbstractDataBroker);
\r
50 var ExecutorService executor;
\r
53 var AbstractDataReadRouter<P, D> dataReadRouter;
\r
56 private val AtomicLong submittedTransactionsCount = new AtomicLong;
\r
59 private val AtomicLong failedTransactionsCount = new AtomicLong
\r
62 private val AtomicLong finishedTransactionsCount = new AtomicLong
\r
64 Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create());
\r
65 Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = Multimaps.synchronizedSetMultimap(HashMultimap.create());
\r
67 val ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P,D>>> commitHandlerRegistrationListeners = new ListenerRegistry();
\r
71 protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(
\r
73 return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //
\r
74 .transformAndConcat[value] //
\r
75 .transform[instance].toList()
\r
78 override final readConfigurationData(P path) {
\r
79 return dataReadRouter.readConfigurationData(path);
\r
82 override final readOperationalData(P path) {
\r
83 return dataReadRouter.readOperationalData(path);
\r
86 override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {
\r
87 val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);
\r
88 commitHandlers.put(path, registration)
\r
89 LOG.trace("Registering Commit Handler {} for path: {}",commitHandler,path);
\r
90 for(listener : commitHandlerRegistrationListeners) {
\r
92 listener.instance.onRegister(registration);
\r
93 } catch (Exception e) {
\r
94 LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);
\r
97 return registration;
\r
100 override final def registerDataChangeListener(P path, DCL listener) {
\r
101 val reg = new DataChangeListenerRegistration(path, listener, this);
\r
102 listeners.put(path, reg);
\r
103 val initialConfig = dataReadRouter.readConfigurationData(path);
\r
104 val initialOperational = dataReadRouter.readOperationalData(path);
\r
105 val event = createInitialListenerEvent(path,initialConfig,initialOperational);
\r
106 listener.onDataChanged(event);
\r
110 final def registerDataReader(P path, DataReader<P, D> reader) {
\r
112 val confReg = dataReadRouter.registerConfigurationReader(path, reader);
\r
113 val dataReg = dataReadRouter.registerOperationalReader(path, reader);
\r
115 return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
\r
118 override registerCommitHandlerListener(RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {
\r
119 val ret = commitHandlerRegistrationListeners.register(commitHandlerListener);
\r
124 protected def DataChangeEvent<P,D> createInitialListenerEvent(P path,D initialConfig,D initialOperational) {
\r
125 return new InitialDataChangeEventImpl<P, D>(initialConfig,initialOperational);
\r
129 protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {
\r
130 listeners.remove(registration.path, registration);
\r
133 protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl<P, D> registration) {
\r
134 commitHandlers.remove(registration.path, registration);
\r
136 LOG.trace("Removing Commit Handler {} for path: {}",registration.instance,registration.path);
\r
137 for(listener : commitHandlerRegistrationListeners) {
\r
139 listener.instance.onUnregister(registration);
\r
140 } catch (Exception e) {
\r
141 LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);
\r
146 protected final def getActiveCommitHandlers() {
\r
147 return commitHandlers.entries;
\r
150 protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(
\r
151 HashSet<P> paths) {
\r
152 return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [
\r
153 val operationalState = readOperationalData(key)
\r
154 val configurationState = readConfigurationData(key)
\r
155 return new ListenerStateCapture(key, value, operationalState, configurationState)
\r
159 protected def boolean isAffectedBy(P key, Set<P> paths) {
\r
160 if (paths.contains(key)) {
\r
163 for (path : paths) {
\r
164 if (key.contains(path)) {
\r
172 package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P, D> transaction) {
\r
173 checkNotNull(transaction);
\r
174 transaction.changeStatus(TransactionStatus.SUBMITED);
\r
175 val task = new TwoPhaseCommit(transaction, this);
\r
176 submittedTransactionsCount.andIncrement;
\r
177 return executor.submit(task);
\r
183 package class ListenerStateCapture<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> {
\r
189 Collection<DataChangeListenerRegistration<P, D, DCL>> listeners;
\r
192 D initialOperationalState;
\r
195 D initialConfigurationState;
\r
198 package class DataChangeListenerRegistration<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {
\r
200 AbstractDataBroker<P, D, DCL> dataBroker;
\r
205 new(P path, DCL instance, AbstractDataBroker<P, D, DCL> broker) {
\r
207 dataBroker = broker;
\r
211 override protected removeRegistration() {
\r
212 dataBroker.removeListener(this);
\r
218 package class DataCommitHandlerRegistrationImpl<P extends Path<P>, D> //
\r
219 extends AbstractObjectRegistration<DataCommitHandler<P, D>> //
\r
220 implements DataCommitHandlerRegistration<P, D> {
\r
222 AbstractDataBroker<P, D, ?> dataBroker;
\r
227 new(P path, DataCommitHandler<P, D> instance, AbstractDataBroker<P, D, ?> broker) {
\r
229 dataBroker = broker;
\r
233 override protected removeRegistration() {
\r
234 dataBroker.removeCommitHandler(this);
\r
239 package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {
\r
241 private static val log = LoggerFactory.getLogger(TwoPhaseCommit);
\r
243 val AbstractDataTransaction<P, D> transaction;
\r
244 val AbstractDataBroker<P, D, DCL> dataBroker;
\r
246 new(AbstractDataTransaction<P, D> transaction, AbstractDataBroker<P, D, DCL> broker) {
\r
247 this.transaction = transaction;
\r
248 this.dataBroker = broker;
\r
251 override call() throws Exception {
\r
253 // get affected paths
\r
254 val affectedPaths = new HashSet<P>();
\r
256 affectedPaths.addAll(transaction.createdConfigurationData.keySet);
\r
257 affectedPaths.addAll(transaction.updatedConfigurationData.keySet);
\r
258 affectedPaths.addAll(transaction.removedConfigurationData);
\r
260 affectedPaths.addAll(transaction.createdOperationalData.keySet);
\r
261 affectedPaths.addAll(transaction.updatedOperationalData.keySet);
\r
262 affectedPaths.addAll(transaction.removedOperationalData);
\r
264 val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);
\r
266 val transactionId = transaction.identifier;
\r
268 log.trace("Transaction: {} Started.",transactionId);
\r
269 // requesting commits
\r
270 val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);
\r
271 val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();
\r
273 for (handler : commitHandlers) {
\r
274 handlerTransactions.add(handler.requestCommit(transaction));
\r
276 } catch (Exception e) {
\r
277 log.error("Transaction: {} Request Commit failed", transactionId,e);
\r
278 dataBroker.failedTransactionsCount.andIncrement
\r
279 return rollback(handlerTransactions, e);
\r
281 val List<RpcResult<Void>> results = new ArrayList();
\r
283 for (subtransaction : handlerTransactions) {
\r
284 results.add(subtransaction.finish());
\r
286 listeners.publishDataChangeEvent();
\r
287 } catch (Exception e) {
\r
288 log.error("Transaction: {} Finish Commit failed",transactionId, e);
\r
289 dataBroker.failedTransactionsCount.andIncrement
\r
290 return rollback(handlerTransactions, e);
\r
292 log.trace("Transaction: {} Finished successfully.",transactionId);
\r
293 dataBroker.finishedTransactionsCount.andIncrement;
\r
294 return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());
\r
298 def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
\r
299 for (listenerSet : listeners) {
\r
300 val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);
\r
301 val updatedOperational = dataBroker.readOperationalData(listenerSet.path);
\r
303 val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,
\r
304 listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);
\r
305 for (listener : listenerSet.listeners) {
\r
307 listener.instance.onDataChanged(changeEvent);
\r
309 } catch (Exception e) {
\r
310 e.printStackTrace();
\r
316 def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {
\r
317 for (transaction : transactions) {
\r
318 transaction.rollback()
\r
321 // FIXME return encountered error.
\r
322 return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());
\r
326 public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {
\r
328 private static val LOG = LoggerFactory.getLogger(AbstractDataTransaction);
331 private val Object identifier;
\r
333 var TransactionStatus status;
\r
335 var AbstractDataBroker<P, D, ?> broker;
\r
337 protected new(Object identifier,AbstractDataBroker<P, D, ?> dataBroker) {
\r
339 _identifier = identifier;
\r
340 broker = dataBroker;
\r
341 status = TransactionStatus.NEW;
\r
342 LOG.debug("Transaction {} Allocated.", identifier);
344 //listeners = new ListenerRegistry<>();
\r
347 override commit() {
\r
348 return broker.commit(this);
\r
351 override readConfigurationData(P path) {
\r
352 val local = this.updatedConfigurationData.get(path);
\r
353 if(local != null) {
\r
357 return broker.readConfigurationData(path);
\r
360 override readOperationalData(P path) {
\r
361 val local = this.updatedOperationalData.get(path);
\r
362 if(local != null) {
\r
365 return broker.readOperationalData(path);
\r
368 override hashCode() {
\r
369 return identifier.hashCode;
\r
372 override equals(Object obj) {
\r
377 if (getClass() != obj.getClass())
\r
379 val other = (obj as AbstractDataTransaction<P,D>);
\r
380 if (broker == null) {
\r
381 if (other.broker != null)
\r
383 } else if (!broker.equals(other.broker))
\r
385 if (identifier == null) {
\r
386 if (other.identifier != null)
\r
388 } else if (!identifier.equals(other.identifier))
\r
393 override TransactionStatus getStatus() {
\r
397 protected abstract def void onStatusChange(TransactionStatus status);
\r
399 public def changeStatus(TransactionStatus status) {
\r
400 LOG.debug("Transaction {} transitioned from {} to {}", identifier, this.status, status);
401 this.status = status;
\r
402 onStatusChange(status);
\r