2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.md.sal.common.impl.service
\r
10 import com.google.common.collect.FluentIterable
\r
11 import com.google.common.collect.HashMultimap
\r
12 import com.google.common.collect.ImmutableList
\r
13 import com.google.common.collect.Multimap
\r
14 import java.util.ArrayList
\r
15 import java.util.Arrays
\r
16 import java.util.Collection
\r
17 import java.util.Collections
\r
18 import java.util.HashSet
\r
19 import java.util.List
\r
20 import java.util.Set
\r
21 import java.util.concurrent.Callable
\r
22 import java.util.concurrent.ExecutorService
\r
23 import java.util.concurrent.Future
\r
24 import java.util.concurrent.atomic.AtomicLong
\r
25 import org.opendaylight.controller.md.sal.common.api.RegistrationListener
\r
26 import org.opendaylight.controller.md.sal.common.api.TransactionStatus
\r
27 import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener
\r
28 import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher
\r
29 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
\r
30 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction
\r
31 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration
\r
32 import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory
\r
33 import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService
\r
34 import org.opendaylight.controller.md.sal.common.api.data.DataReader
\r
35 import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification
\r
36 import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter
\r
37 import org.opendaylight.controller.sal.common.util.Rpcs
\r
38 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
\r
39 import org.opendaylight.yangtools.concepts.CompositeObjectRegistration
\r
40 import org.opendaylight.yangtools.concepts.ListenerRegistration
\r
41 import org.opendaylight.yangtools.concepts.Path
\r
42 import org.opendaylight.yangtools.concepts.util.ListenerRegistry
\r
43 import org.opendaylight.yangtools.yang.common.RpcResult
\r
44 import org.slf4j.LoggerFactory
\r
46 import static com.google.common.base.Preconditions.*
\rimport org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent
47 import com.google.common.collect.Multimaps
48 import java.util.concurrent.locks.Lock
49 import java.util.concurrent.locks.ReentrantLock
51 abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //
\r
52 DataReader<P, D>, //
\r
53 DataChangePublisher<P, D, DCL>, //
\r
54 DataProvisionService<P, D> {
\r
56 private static val LOG = LoggerFactory.getLogger(AbstractDataBroker);
\r
59 var ExecutorService executor;
\r
62 var AbstractDataReadRouter<P, D> dataReadRouter;
\r
65 private val AtomicLong submittedTransactionsCount = new AtomicLong;
\r
68 private val AtomicLong failedTransactionsCount = new AtomicLong
\r
71 private val AtomicLong finishedTransactionsCount = new AtomicLong
\r
73 Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create());
\r
74 Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = Multimaps.synchronizedSetMultimap(HashMultimap.create());
\r
76 private val Lock registrationLock = new ReentrantLock;
78 val ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P,D>>> commitHandlerRegistrationListeners = new ListenerRegistry();
\r
82 protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(
\r
84 return withLock(registrationLock) [|
\r
85 return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //
\r
86 .transformAndConcat[value] //
\r
87 .transform[instance].toList()
91 override final readConfigurationData(P path) {
\r
92 return dataReadRouter.readConfigurationData(path);
\r
95 override final readOperationalData(P path) {
\r
96 return dataReadRouter.readOperationalData(path);
\r
99 private static def <T> withLock(Lock lock,Callable<T> method) {
108 override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {
109 return withLock(registrationLock) [|
\r
110 val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);
\r
111 commitHandlers.put(path, registration)
\r
112 LOG.trace("Registering Commit Handler {} for path: {}",commitHandler,path);
\r
113 for(listener : commitHandlerRegistrationListeners) {
\r
115 listener.instance.onRegister(registration);
\r
116 } catch (Exception e) {
\r
117 LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);
\r
124 override final def registerDataChangeListener(P path, DCL listener) {
\r
125 return withLock(registrationLock) [|
126 val reg = new DataChangeListenerRegistration(path, listener, this);
\r
127 listeners.put(path, reg);
\r
128 val initialConfig = dataReadRouter.readConfigurationData(path);
\r
129 val initialOperational = dataReadRouter.readOperationalData(path);
\r
130 val event = createInitialListenerEvent(path,initialConfig,initialOperational);
\r
131 listener.onDataChanged(event);
\r
136 final def registerDataReader(P path, DataReader<P, D> reader) {
\r
137 return withLock(registrationLock) [|
\r
138 val confReg = dataReadRouter.registerConfigurationReader(path, reader);
\r
139 val dataReg = dataReadRouter.registerOperationalReader(path, reader);
\r
141 return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
145 override registerCommitHandlerListener(RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {
\r
146 val ret = commitHandlerRegistrationListeners.register(commitHandlerListener);
\r
150 protected def DataChangeEvent<P,D> createInitialListenerEvent(P path,D initialConfig,D initialOperational) {
\r
151 return new InitialDataChangeEventImpl<P, D>(initialConfig,initialOperational);
\r
155 protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {
156 return withLock(registrationLock) [|
\r
157 listeners.remove(registration.path, registration);
161 protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl<P, D> registration) {
\r
162 return withLock(registrationLock) [|
163 commitHandlers.remove(registration.path, registration);
\r
164 LOG.trace("Removing Commit Handler {} for path: {}",registration.instance,registration.path);
\r
165 for(listener : commitHandlerRegistrationListeners) {
\r
167 listener.instance.onUnregister(registration);
\r
168 } catch (Exception e) {
\r
169 LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);
\r
176 protected final def getActiveCommitHandlers() {
\r
177 return commitHandlers.entries;
\r
180 protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(
\r
182 return withLock(registrationLock) [|
\r
183 return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [
\r
184 val operationalState = readOperationalData(key)
\r
185 val configurationState = readConfigurationData(key)
\r
186 return new ListenerStateCapture(key, value, operationalState, configurationState)
\r
191 protected def boolean isAffectedBy(P key, Set<P> paths) {
\r
192 if (paths.contains(key)) {
\r
195 for (path : paths) {
\r
196 if (key.contains(path)) {
\r
204 package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P, D> transaction) {
\r
205 checkNotNull(transaction);
\r
206 transaction.changeStatus(TransactionStatus.SUBMITED);
\r
207 val task = new TwoPhaseCommit(transaction, this);
\r
208 submittedTransactionsCount.andIncrement;
\r
209 return executor.submit(task);
\r
215 package class ListenerStateCapture<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> {
\r
221 Collection<DataChangeListenerRegistration<P, D, DCL>> listeners;
\r
224 D initialOperationalState;
\r
227 D initialConfigurationState;
\r
230 package class DataChangeListenerRegistration<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {
\r
232 AbstractDataBroker<P, D, DCL> dataBroker;
\r
237 new(P path, DCL instance, AbstractDataBroker<P, D, DCL> broker) {
\r
239 dataBroker = broker;
\r
243 override protected removeRegistration() {
\r
244 dataBroker.removeListener(this);
\r
250 package class DataCommitHandlerRegistrationImpl<P extends Path<P>, D> //
\r
251 extends AbstractObjectRegistration<DataCommitHandler<P, D>> //
\r
252 implements DataCommitHandlerRegistration<P, D> {
\r
254 AbstractDataBroker<P, D, ?> dataBroker;
\r
259 new(P path, DataCommitHandler<P, D> instance, AbstractDataBroker<P, D, ?> broker) {
\r
261 dataBroker = broker;
\r
265 override protected removeRegistration() {
\r
266 dataBroker.removeCommitHandler(this);
\r
271 package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {
\r
273 private static val log = LoggerFactory.getLogger(TwoPhaseCommit);
\r
275 val AbstractDataTransaction<P, D> transaction;
\r
276 val AbstractDataBroker<P, D, DCL> dataBroker;
\r
278 new(AbstractDataTransaction<P, D> transaction, AbstractDataBroker<P, D, DCL> broker) {
\r
279 this.transaction = transaction;
\r
280 this.dataBroker = broker;
\r
283 override call() throws Exception {
\r
285 // get affected paths
\r
286 val affectedPaths = new HashSet<P>();
\r
288 affectedPaths.addAll(transaction.createdConfigurationData.keySet);
\r
289 affectedPaths.addAll(transaction.updatedConfigurationData.keySet);
\r
290 affectedPaths.addAll(transaction.removedConfigurationData);
\r
292 affectedPaths.addAll(transaction.createdOperationalData.keySet);
\r
293 affectedPaths.addAll(transaction.updatedOperationalData.keySet);
\r
294 affectedPaths.addAll(transaction.removedOperationalData);
\r
296 val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);
\r
298 val transactionId = transaction.identifier;
\r
300 log.trace("Transaction: {} Started.",transactionId);
\r
301 log.trace("Transaction: {} Affected Subtrees:",transactionId,affectedPaths);
302 // requesting commits
\r
303 val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);
\r
304 val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();
\r
306 for (handler : commitHandlers) {
\r
307 handlerTransactions.add(handler.requestCommit(transaction));
\r
309 } catch (Exception e) {
\r
310 log.error("Transaction: {} Request Commit failed", transactionId,e);
\r
311 dataBroker.failedTransactionsCount.andIncrement
\r
312 transaction.changeStatus(TransactionStatus.FAILED)
313 return rollback(handlerTransactions, e);
\r
315 val List<RpcResult<Void>> results = new ArrayList();
\r
317 for (subtransaction : handlerTransactions) {
\r
318 results.add(subtransaction.finish());
\r
320 listeners.publishDataChangeEvent();
\r
321 } catch (Exception e) {
\r
322 log.error("Transaction: {} Finish Commit failed",transactionId, e);
\r
323 dataBroker.failedTransactionsCount.andIncrement
324 transaction.changeStatus(TransactionStatus.FAILED)
\r
325 return rollback(handlerTransactions, e);
\r
327 log.trace("Transaction: {} Finished successfully.",transactionId);
\r
328 dataBroker.finishedTransactionsCount.andIncrement;
329 transaction.changeStatus(TransactionStatus.COMMITED)
\r
330 return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());
\r
334 def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
\r
335 for (listenerSet : listeners) {
\r
336 val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);
\r
337 val updatedOperational = dataBroker.readOperationalData(listenerSet.path);
\r
339 val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,
\r
340 listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);
\r
341 for (listener : listenerSet.listeners) {
\r
343 listener.instance.onDataChanged(changeEvent);
\r
345 } catch (Exception e) {
\r
346 e.printStackTrace();
\r
352 def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {
\r
353 for (transaction : transactions) {
\r
354 transaction.rollback()
\r
357 // FIXME return encountered error.
\r
358 return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());
\r
362 public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {
\r
364 private static val LOG = LoggerFactory.getLogger(AbstractDataTransaction);
367 private val Object identifier;
\r
369 var TransactionStatus status;
\r
371 var AbstractDataBroker<P, D, ?> broker;
\r
373 protected new(Object identifier,AbstractDataBroker<P, D, ?> dataBroker) {
\r
375 _identifier = identifier;
\r
376 broker = dataBroker;
\r
377 status = TransactionStatus.NEW;
\r
378 LOG.debug("Transaction {} Allocated.", identifier);
380 //listeners = new ListenerRegistry<>();
\r
383 override commit() {
\r
384 return broker.commit(this);
\r
387 override readConfigurationData(P path) {
\r
388 val local = this.updatedConfigurationData.get(path);
\r
389 if(local != null) {
\r
393 return broker.readConfigurationData(path);
\r
396 override readOperationalData(P path) {
\r
397 val local = this.updatedOperationalData.get(path);
\r
398 if(local != null) {
\r
401 return broker.readOperationalData(path);
\r
404 override hashCode() {
\r
405 return identifier.hashCode;
\r
408 override equals(Object obj) {
\r
413 if (getClass() != obj.getClass())
\r
415 val other = (obj as AbstractDataTransaction<P,D>);
\r
416 if (broker == null) {
\r
417 if (other.broker != null)
\r
419 } else if (!broker.equals(other.broker))
\r
421 if (identifier == null) {
\r
422 if (other.identifier != null)
\r
424 } else if (!identifier.equals(other.identifier))
\r
429 override TransactionStatus getStatus() {
\r
433 protected abstract def void onStatusChange(TransactionStatus status);
\r
435 public def changeStatus(TransactionStatus status) {
\r
436 LOG.debug("Transaction {} transitioned from {} to {}", identifier, this.status, status);
437 this.status = status;
\r
438 onStatusChange(status);
\r