2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.md.sal.common.impl.service
\r
10 import com.google.common.collect.FluentIterable
\r
11 import com.google.common.collect.HashMultimap
\r
12 import com.google.common.collect.ImmutableList
\r
13 import com.google.common.collect.Multimap
\r
14 import java.util.ArrayList
\r
15 import java.util.Arrays
\r
16 import java.util.Collection
\r
17 import java.util.Collections
\r
18 import java.util.HashSet
\r
19 import java.util.List
\r
20 import java.util.Set
\r
21 import java.util.concurrent.Callable
\r
22 import java.util.concurrent.ExecutorService
\r
23 import java.util.concurrent.Future
\r
24 import java.util.concurrent.atomic.AtomicLong
\r
25 import org.opendaylight.controller.md.sal.common.api.RegistrationListener
\r
26 import org.opendaylight.controller.md.sal.common.api.TransactionStatus
\r
27 import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener
\r
28 import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher
\r
29 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
\r
30 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction
\r
31 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration
\r
32 import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory
\r
33 import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService
\r
34 import org.opendaylight.controller.md.sal.common.api.data.DataReader
\r
35 import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification
\r
36 import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter
\r
37 import org.opendaylight.controller.sal.common.util.Rpcs
\r
38 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
\r
39 import org.opendaylight.yangtools.concepts.CompositeObjectRegistration
\r
40 import org.opendaylight.yangtools.concepts.ListenerRegistration
\r
41 import org.opendaylight.yangtools.concepts.Path
\r
42 import org.opendaylight.yangtools.concepts.util.ListenerRegistry
\r
43 import org.opendaylight.yangtools.yang.common.RpcResult
\r
44 import org.slf4j.LoggerFactory
\r
46 import static com.google.common.base.Preconditions.*
\rimport org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent
48 abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //
\r
49 DataReader<P, D>, //
\r
50 DataChangePublisher<P, D, DCL>, //
\r
51 DataProvisionService<P, D> {
\r
53 private static val LOG = LoggerFactory.getLogger(AbstractDataBroker);
\r
56 var ExecutorService executor;
\r
59 var AbstractDataReadRouter<P, D> dataReadRouter;
\r
62 private val AtomicLong submittedTransactionsCount = new AtomicLong;
\r
65 private val AtomicLong failedTransactionsCount = new AtomicLong
\r
68 private val AtomicLong finishedTransactionsCount = new AtomicLong
\r
70 Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = HashMultimap.create();
\r
71 Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = HashMultimap.create();
\r
73 val ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P,D>>> commitHandlerRegistrationListeners = new ListenerRegistry();
\r
77 protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(
\r
79 return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //
\r
80 .transformAndConcat[value] //
\r
81 .transform[instance].toList()
\r
84 override final readConfigurationData(P path) {
\r
85 return dataReadRouter.readConfigurationData(path);
\r
88 override final readOperationalData(P path) {
\r
89 return dataReadRouter.readOperationalData(path);
\r
92 override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {
\r
93 val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);
\r
94 commitHandlers.put(path, registration)
\r
95 LOG.trace("Registering Commit Handler {} for path: {}",commitHandler,path);
\r
96 for(listener : commitHandlerRegistrationListeners) {
\r
98 listener.instance.onRegister(registration);
\r
99 } catch (Exception e) {
\r
100 LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);
\r
103 return registration;
\r
106 override final def registerDataChangeListener(P path, DCL listener) {
\r
107 val reg = new DataChangeListenerRegistration(path, listener, this);
\r
108 listeners.put(path, reg);
\r
109 val initialConfig = dataReadRouter.readConfigurationData(path);
\r
110 val initialOperational = dataReadRouter.readOperationalData(path);
\r
111 val event = createInitialListenerEvent(path,initialConfig,initialOperational);
\r
112 listener.onDataChanged(event);
\r
116 final def registerDataReader(P path, DataReader<P, D> reader) {
\r
118 val confReg = dataReadRouter.registerConfigurationReader(path, reader);
\r
119 val dataReg = dataReadRouter.registerOperationalReader(path, reader);
\r
121 return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
\r
124 override registerCommitHandlerListener(RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {
\r
125 val ret = commitHandlerRegistrationListeners.register(commitHandlerListener);
\r
130 protected def DataChangeEvent<P,D> createInitialListenerEvent(P path,D initialConfig,D initialOperational) {
\r
131 return new InitialDataChangeEventImpl<P, D>(initialConfig,initialOperational);
\r
135 protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {
\r
136 listeners.remove(registration.path, registration);
\r
139 protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl<P, D> registration) {
\r
140 commitHandlers.remove(registration.path, registration);
\r
142 LOG.trace("Removing Commit Handler {} for path: {}",registration.instance,registration.path);
\r
143 for(listener : commitHandlerRegistrationListeners) {
\r
145 listener.instance.onUnregister(registration);
\r
146 } catch (Exception e) {
\r
147 LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);
\r
152 protected final def getActiveCommitHandlers() {
\r
153 return commitHandlers.entries;
\r
156 protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(
\r
157 HashSet<P> paths) {
\r
158 return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [
\r
159 val operationalState = readOperationalData(key)
\r
160 val configurationState = readConfigurationData(key)
\r
161 return new ListenerStateCapture(key, value, operationalState, configurationState)
\r
165 protected def boolean isAffectedBy(P key, Set<P> paths) {
\r
166 if (paths.contains(key)) {
\r
169 for (path : paths) {
\r
170 if (key.contains(path)) {
\r
178 package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P, D> transaction) {
\r
179 checkNotNull(transaction);
\r
180 transaction.changeStatus(TransactionStatus.SUBMITED);
\r
181 val task = new TwoPhaseCommit(transaction, this);
\r
182 submittedTransactionsCount.andIncrement;
\r
183 return executor.submit(task);
\r
189 package class ListenerStateCapture<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> {
\r
195 Collection<DataChangeListenerRegistration<P, D, DCL>> listeners;
\r
198 D initialOperationalState;
\r
201 D initialConfigurationState;
\r
204 package class DataChangeListenerRegistration<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {
\r
206 AbstractDataBroker<P, D, DCL> dataBroker;
\r
211 new(P path, DCL instance, AbstractDataBroker<P, D, DCL> broker) {
\r
213 dataBroker = broker;
\r
217 override protected removeRegistration() {
\r
218 dataBroker.removeListener(this);
\r
224 package class DataCommitHandlerRegistrationImpl<P extends Path<P>, D> //
\r
225 extends AbstractObjectRegistration<DataCommitHandler<P, D>> //
\r
226 implements DataCommitHandlerRegistration<P, D> {
\r
228 AbstractDataBroker<P, D, ?> dataBroker;
\r
233 new(P path, DataCommitHandler<P, D> instance, AbstractDataBroker<P, D, ?> broker) {
\r
235 dataBroker = broker;
\r
239 override protected removeRegistration() {
\r
240 dataBroker.removeCommitHandler(this);
\r
245 package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {
\r
247 private static val log = LoggerFactory.getLogger(TwoPhaseCommit);
\r
249 val AbstractDataTransaction<P, D> transaction;
\r
250 val AbstractDataBroker<P, D, DCL> dataBroker;
\r
252 new(AbstractDataTransaction<P, D> transaction, AbstractDataBroker<P, D, DCL> broker) {
\r
253 this.transaction = transaction;
\r
254 this.dataBroker = broker;
\r
257 override call() throws Exception {
\r
259 // get affected paths
\r
260 val affectedPaths = new HashSet<P>();
\r
262 affectedPaths.addAll(transaction.createdConfigurationData.keySet);
\r
263 affectedPaths.addAll(transaction.updatedConfigurationData.keySet);
\r
264 affectedPaths.addAll(transaction.removedConfigurationData);
\r
266 affectedPaths.addAll(transaction.createdOperationalData.keySet);
\r
267 affectedPaths.addAll(transaction.updatedOperationalData.keySet);
\r
268 affectedPaths.addAll(transaction.removedOperationalData);
\r
270 val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);
\r
272 val transactionId = transaction.identifier;
\r
274 log.trace("Transaction: {} Started.",transactionId);
\r
275 // requesting commits
\r
276 val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);
\r
277 val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();
\r
279 for (handler : commitHandlers) {
\r
280 handlerTransactions.add(handler.requestCommit(transaction));
\r
282 } catch (Exception e) {
\r
283 log.error("Transaction: {} Request Commit failed", transactionId,e);
\r
284 dataBroker.failedTransactionsCount.andIncrement
\r
285 return rollback(handlerTransactions, e);
\r
287 val List<RpcResult<Void>> results = new ArrayList();
\r
289 for (subtransaction : handlerTransactions) {
\r
290 results.add(subtransaction.finish());
\r
292 listeners.publishDataChangeEvent();
\r
293 } catch (Exception e) {
\r
294 log.error("Transaction: {} Finish Commit failed",transactionId, e);
\r
295 dataBroker.failedTransactionsCount.andIncrement
\r
296 return rollback(handlerTransactions, e);
\r
298 log.trace("Transaction: {} Finished successfully.",transactionId);
\r
299 dataBroker.finishedTransactionsCount.andIncrement;
\r
300 return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());
\r
304 def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
\r
305 for (listenerSet : listeners) {
\r
306 val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);
\r
307 val updatedOperational = dataBroker.readOperationalData(listenerSet.path);
\r
309 val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,
\r
310 listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);
\r
311 for (listener : listenerSet.listeners) {
\r
313 listener.instance.onDataChanged(changeEvent);
\r
315 } catch (Exception e) {
\r
316 e.printStackTrace();
\r
322 def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {
\r
323 for (transaction : transactions) {
\r
324 transaction.rollback()
\r
327 // FIXME return encountered error.
\r
328 return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());
\r
332 public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {
\r
334 private static val LOG = LoggerFactory.getLogger(AbstractDataTransaction);
337 private val Object identifier;
\r
339 var TransactionStatus status;
\r
341 var AbstractDataBroker<P, D, ?> broker;
\r
343 protected new(Object identifier,AbstractDataBroker<P, D, ?> dataBroker) {
\r
345 _identifier = identifier;
\r
346 broker = dataBroker;
\r
347 status = TransactionStatus.NEW;
\r
348 LOG.debug("Transaction {} Allocated.", identifier);
350 //listeners = new ListenerRegistry<>();
\r
353 override commit() {
\r
354 return broker.commit(this);
\r
357 override readConfigurationData(P path) {
\r
358 val local = this.updatedConfigurationData.get(path);
\r
359 if(local != null) {
\r
363 return broker.readConfigurationData(path);
\r
366 override readOperationalData(P path) {
\r
367 val local = this.updatedOperationalData.get(path);
\r
368 if(local != null) {
\r
371 return broker.readOperationalData(path);
\r
374 override hashCode() {
\r
375 return identifier.hashCode;
\r
378 override equals(Object obj) {
\r
383 if (getClass() != obj.getClass())
\r
385 val other = (obj as AbstractDataTransaction<P,D>);
\r
386 if (broker == null) {
\r
387 if (other.broker != null)
\r
389 } else if (!broker.equals(other.broker))
\r
391 if (identifier == null) {
\r
392 if (other.identifier != null)
\r
394 } else if (!identifier.equals(other.identifier))
\r
399 override TransactionStatus getStatus() {
\r
403 protected abstract def void onStatusChange(TransactionStatus status);
\r
405 public def changeStatus(TransactionStatus status) {
\r
406 LOG.debug("Transaction {} transitioned from {} to {}", identifier, this.status, status);
407 this.status = status;
\r
408 onStatusChange(status);
\r