2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.md.sal.common.impl.service;
10 import java.util.ArrayList;
11 import java.util.Collections;
12 import java.util.List;
14 import java.util.Map.Entry;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.ExecutorService;
19 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
20 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
21 import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener;
22 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
23 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
24 import org.opendaylight.controller.sal.common.util.Rpcs;
25 import org.opendaylight.yangtools.concepts.Path;
26 import org.opendaylight.yangtools.yang.common.RpcError;
27 import org.opendaylight.yangtools.yang.common.RpcResult;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
31 import com.google.common.base.Optional;
32 import com.google.common.base.Predicate;
33 import com.google.common.collect.ImmutableList;
34 import com.google.common.collect.ImmutableList.Builder;
35 import com.google.common.collect.ImmutableSet;
36 import com.google.common.collect.Sets;
38 public class TwoPhaseCommit<P extends Path<P>, D extends Object, DCL extends DataChangeListener<P, D>> implements
39 Callable<RpcResult<TransactionStatus>> {
40 private final static Logger log = LoggerFactory.getLogger(TwoPhaseCommit.class);
42 private final AbstractDataTransaction<P, D> transaction;
44 private final AbstractDataBroker<P, D, DCL> dataBroker;
46 public TwoPhaseCommit(final AbstractDataTransaction<P, D> transaction, final AbstractDataBroker<P, D, DCL> broker) {
47 this.transaction = transaction;
48 this.dataBroker = broker;
52 public RpcResult<TransactionStatus> call() throws Exception {
53 final Object transactionId = this.transaction.getIdentifier();
55 Set<P> changedPaths = ImmutableSet.<P> builder().addAll(transaction.getUpdatedConfigurationData().keySet())
56 .addAll(transaction.getCreatedConfigurationData().keySet())
57 .addAll(transaction.getRemovedConfigurationData())
58 .addAll(transaction.getUpdatedOperationalData().keySet())
59 .addAll(transaction.getCreatedOperationalData().keySet())
60 .addAll(transaction.getRemovedOperationalData()).build();
62 log.trace("Transaction: {} Affected Subtrees: {}", transactionId, changedPaths);
64 // The transaction has no effects, let's just shortcut it
65 if (changedPaths.isEmpty()) {
66 dataBroker.getFinishedTransactionsCount().getAndIncrement();
67 transaction.succeeded();
69 log.trace("Transaction: {} Finished successfully (no effects).", transactionId);
71 return Rpcs.<TransactionStatus> getRpcResult(true, TransactionStatus.COMMITED,
72 Collections.<RpcError> emptySet());
75 final ImmutableList.Builder<ListenerStateCapture<P, D, DCL>> listenersBuilder = ImmutableList.builder();
76 listenersBuilder.addAll(dataBroker.affectedListeners(changedPaths));
77 filterProbablyAffectedListeners(dataBroker.probablyAffectedListeners(changedPaths),listenersBuilder);
81 final ImmutableList<ListenerStateCapture<P, D, DCL>> listeners = listenersBuilder.build();
82 final Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(changedPaths);
83 captureInitialState(listeners);
86 log.trace("Transaction: {} Starting Request Commit.",transactionId);
87 final List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList<>();
89 for (final DataCommitHandler<P, D> handler : commitHandlers) {
90 DataCommitTransaction<P, D> requestCommit = handler.requestCommit(this.transaction);
91 if (requestCommit != null) {
92 handlerTransactions.add(requestCommit);
94 log.debug("Transaction: {}, Handler {} is not participating in transaction.", transactionId,
98 } catch (Exception e) {
99 log.error("Transaction: {} Request Commit failed", transactionId, e);
100 dataBroker.getFailedTransactionsCount().getAndIncrement();
101 this.transaction.failed();
102 return this.rollback(handlerTransactions, e);
106 log.trace("Transaction: {} Starting Finish.",transactionId);
107 final List<RpcResult<Void>> results = new ArrayList<RpcResult<Void>>();
109 for (final DataCommitTransaction<P, D> subtransaction : handlerTransactions) {
110 results.add(subtransaction.finish());
112 } catch (Exception e) {
113 log.error("Transaction: {} Finish Commit failed", transactionId, e);
114 dataBroker.getFailedTransactionsCount().getAndIncrement();
115 transaction.failed();
116 return this.rollback(handlerTransactions, e);
120 dataBroker.getFinishedTransactionsCount().getAndIncrement();
121 transaction.succeeded();
123 log.trace("Transaction: {} Finished successfully.", transactionId);
125 captureFinalState(listeners);
127 log.trace("Transaction: {} Notifying listeners.", transactionId);
129 publishDataChangeEvent(listeners);
130 return Rpcs.<TransactionStatus> getRpcResult(true, TransactionStatus.COMMITED,
131 Collections.<RpcError> emptySet());
134 private void captureInitialState(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
135 for (ListenerStateCapture<P, D, DCL> state : listeners) {
136 state.setInitialConfigurationState(dataBroker.readConfigurationData(state.getPath()));
137 state.setInitialOperationalState(dataBroker.readOperationalData(state.getPath()));
142 private void captureFinalState(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
143 for (ListenerStateCapture<P, D, DCL> state : listeners) {
144 state.setFinalConfigurationState(dataBroker.readConfigurationData(state.getPath()));
145 state.setFinalOperationalState(dataBroker.readOperationalData(state.getPath()));
149 private void filterProbablyAffectedListeners(
150 ImmutableList<ListenerStateCapture<P, D, DCL>> probablyAffectedListeners, Builder<ListenerStateCapture<P, D, DCL>> reallyAffected) {
152 for(ListenerStateCapture<P, D, DCL> listenerSet : probablyAffectedListeners) {
153 P affectedPath = listenerSet.getPath();
154 Optional<RootedChangeSet<P,D>> configChange = resolveConfigChange(affectedPath);
155 Optional<RootedChangeSet<P, D>> operChange = resolveOperChange(affectedPath);
157 if(configChange.isPresent() || operChange.isPresent()) {
158 reallyAffected.add(listenerSet);
159 if(configChange.isPresent()) {
160 listenerSet.setNormalizedConfigurationChanges(configChange.get());
163 if(operChange.isPresent()) {
164 listenerSet.setNormalizedOperationalChanges(operChange.get());
170 private Optional<RootedChangeSet<P, D>> resolveOperChange(P affectedPath) {
171 Map<P, D> originalOper = dataBroker.deepGetBySubpath(transaction.getOriginalOperationalData(),affectedPath);
172 Map<P, D> createdOper = dataBroker.deepGetBySubpath(transaction.getCreatedOperationalData(),affectedPath);
173 Map<P, D> updatedOper = dataBroker.deepGetBySubpath(transaction.getUpdatedOperationalData(),affectedPath);
174 Set<P> removedOper = Sets.filter(transaction.getRemovedOperationalData(), dataBroker.createIsContainedPredicate(affectedPath));
175 return resolveChanges(affectedPath,originalOper,createdOper,updatedOper,removedOper);
178 private Optional<RootedChangeSet<P, D>> resolveConfigChange(P affectedPath) {
179 Map<P, D> originalConfig = dataBroker.deepGetBySubpath(transaction.getOriginalConfigurationData(),affectedPath);
180 Map<P, D> createdConfig = dataBroker.deepGetBySubpath(transaction.getCreatedConfigurationData(),affectedPath);
181 Map<P, D> updatedConfig = dataBroker.deepGetBySubpath(transaction.getUpdatedConfigurationData(),affectedPath);
182 Set<P> removedConfig = Sets.filter(transaction.getRemovedConfigurationData(), dataBroker.createIsContainedPredicate(affectedPath));
183 return resolveChanges(affectedPath,originalConfig,createdConfig,updatedConfig,removedConfig);
186 private Optional<RootedChangeSet<P,D>> resolveChanges(P affectedPath, Map<P, D> originalConfig, Map<P, D> createdConfig, Map<P, D> updatedConfig,Set<P> potentialDeletions) {
187 Predicate<P> isContained = dataBroker.createIsContainedPredicate(affectedPath);
189 if(createdConfig.isEmpty() && updatedConfig.isEmpty() && potentialDeletions.isEmpty()) {
190 return Optional.absent();
192 RootedChangeSet<P, D> changeSet = new RootedChangeSet<P,D>(affectedPath,originalConfig);
193 changeSet.addCreated(createdConfig);
195 for(Entry<P, D> entry : updatedConfig.entrySet()) {
196 if(originalConfig.containsKey(entry.getKey())) {
197 changeSet.addUpdated(entry);
199 changeSet.addCreated(entry);
203 for(Entry<P,D> entry : originalConfig.entrySet()) {
204 for(P deletion : potentialDeletions) {
205 if(isContained.apply(deletion)) {
206 changeSet.addRemoval(entry.getKey());
211 if(changeSet.isChange()) {
212 return Optional.of(changeSet);
214 return Optional.absent();
219 public void publishDataChangeEvent(final ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
220 ExecutorService executor = this.dataBroker.getExecutor();
221 final Runnable notifyTask = new Runnable() {
224 for (final ListenerStateCapture<P, D, DCL> listenerSet : listeners) {
225 DataChangeEvent<P, D> changeEvent = listenerSet.createEvent(transaction);
226 for (final DataChangeListenerRegistration<P, D, DCL> listener : listenerSet.getListeners()) {
228 listener.getInstance().onDataChanged(changeEvent);
229 } catch (Exception e) {
230 log.error("Unhandled exception when invoking listener {}", listener, e);
236 executor.submit(notifyTask);
239 public RpcResult<TransactionStatus> rollback(final List<DataCommitTransaction<P, D>> transactions, final Exception e) {
240 for (final DataCommitTransaction<P, D> transaction : transactions) {
241 transaction.rollback();
243 Set<RpcError> _emptySet = Collections.<RpcError> emptySet();
244 return Rpcs.<TransactionStatus> getRpcResult(false, TransactionStatus.FAILED, _emptySet);