Merge "API Usability: Introduced type capture for Transaction Factory"
[controller.git] / opendaylight / md-sal / sal-common-impl / src / main / java / org / opendaylight / controller / md / sal / common / impl / service / TwoPhaseCommit.java
1 /**
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.common.impl.service;
9
10 import java.util.ArrayList;
11 import java.util.List;
12 import java.util.Map;
13 import java.util.Map.Entry;
14 import java.util.Set;
15 import java.util.concurrent.Callable;
16 import java.util.concurrent.ExecutorService;
17
18 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
19 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
20 import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener;
21 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
22 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
23 import org.opendaylight.yangtools.concepts.Path;
24 import org.opendaylight.yangtools.yang.common.RpcResult;
25 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 import com.google.common.base.Optional;
30 import com.google.common.base.Predicate;
31 import com.google.common.collect.ImmutableList;
32 import com.google.common.collect.ImmutableList.Builder;
33 import com.google.common.collect.ImmutableSet;
34 import com.google.common.collect.Sets;
35
36 public class TwoPhaseCommit<P extends Path<P>, D extends Object, DCL extends DataChangeListener<P, D>> implements
37         Callable<RpcResult<TransactionStatus>> {
38     private final static Logger log = LoggerFactory.getLogger(TwoPhaseCommit.class);
39
40     private final AbstractDataTransaction<P, D> transaction;
41
42     private final AbstractDataBroker<P, D, DCL> dataBroker;
43
44     public TwoPhaseCommit(final AbstractDataTransaction<P, D> transaction, final AbstractDataBroker<P, D, DCL> broker) {
45         this.transaction = transaction;
46         this.dataBroker = broker;
47     }
48
49     @Override
50     public RpcResult<TransactionStatus> call() throws Exception {
51         final Object transactionId = this.transaction.getIdentifier();
52
53         Set<P> changedPaths = ImmutableSet.<P> builder().addAll(transaction.getUpdatedConfigurationData().keySet())
54                 .addAll(transaction.getCreatedConfigurationData().keySet())
55                 .addAll(transaction.getRemovedConfigurationData())
56                 .addAll(transaction.getUpdatedOperationalData().keySet())
57                 .addAll(transaction.getCreatedOperationalData().keySet())
58                 .addAll(transaction.getRemovedOperationalData()).build();
59
60         log.trace("Transaction: {} Affected Subtrees: {}", transactionId, changedPaths);
61
62         // The transaction has no effects, let's just shortcut it
63         if (changedPaths.isEmpty()) {
64             dataBroker.getFinishedTransactionsCount().getAndIncrement();
65             transaction.succeeded();
66
67             log.trace("Transaction: {} Finished successfully (no effects).", transactionId);
68
69             return RpcResultBuilder.<TransactionStatus> success( TransactionStatus.COMMITED ).build();
70         }
71
72         final ImmutableList.Builder<ListenerStateCapture<P, D, DCL>> listenersBuilder = ImmutableList.builder();
73         listenersBuilder.addAll(dataBroker.affectedListeners(changedPaths));
74         filterProbablyAffectedListeners(dataBroker.probablyAffectedListeners(changedPaths),listenersBuilder);
75
76
77
78         final ImmutableList<ListenerStateCapture<P, D, DCL>> listeners = listenersBuilder.build();
79         final Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(changedPaths);
80         captureInitialState(listeners);
81
82
83         log.trace("Transaction: {} Starting Request Commit.",transactionId);
84         final List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList<>();
85         try {
86             for (final DataCommitHandler<P, D> handler : commitHandlers) {
87                 DataCommitTransaction<P, D> requestCommit = handler.requestCommit(this.transaction);
88                 if (requestCommit != null) {
89                     handlerTransactions.add(requestCommit);
90                 } else {
91                     log.debug("Transaction: {}, Handler {}  is not participating in transaction.", transactionId,
92                             handler);
93                 }
94             }
95         } catch (Exception e) {
96             log.error("Transaction: {} Request Commit failed", transactionId, e);
97             dataBroker.getFailedTransactionsCount().getAndIncrement();
98             this.transaction.failed();
99             return this.rollback(handlerTransactions, e);
100
101         }
102
103         log.trace("Transaction: {} Starting Finish.",transactionId);
104         final List<RpcResult<Void>> results = new ArrayList<RpcResult<Void>>();
105         try {
106             for (final DataCommitTransaction<P, D> subtransaction : handlerTransactions) {
107                 results.add(subtransaction.finish());
108             }
109         } catch (Exception e) {
110             log.error("Transaction: {} Finish Commit failed", transactionId, e);
111             dataBroker.getFailedTransactionsCount().getAndIncrement();
112             transaction.failed();
113             return this.rollback(handlerTransactions, e);
114         }
115
116
117         dataBroker.getFinishedTransactionsCount().getAndIncrement();
118         transaction.succeeded();
119
120         log.trace("Transaction: {} Finished successfully.", transactionId);
121
122         captureFinalState(listeners);
123
124         log.trace("Transaction: {} Notifying listeners.", transactionId);
125
126         publishDataChangeEvent(listeners);
127         return RpcResultBuilder.<TransactionStatus> success(TransactionStatus.COMMITED).build();
128     }
129
130     private void captureInitialState(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
131         for (ListenerStateCapture<P, D, DCL> state : listeners) {
132             state.setInitialConfigurationState(dataBroker.readConfigurationData(state.getPath()));
133             state.setInitialOperationalState(dataBroker.readOperationalData(state.getPath()));
134         }
135     }
136
137
138     private void captureFinalState(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
139         for (ListenerStateCapture<P, D, DCL> state : listeners) {
140             state.setFinalConfigurationState(dataBroker.readConfigurationData(state.getPath()));
141             state.setFinalOperationalState(dataBroker.readOperationalData(state.getPath()));
142         }
143     }
144
145     private void filterProbablyAffectedListeners(
146             ImmutableList<ListenerStateCapture<P, D, DCL>> probablyAffectedListeners, Builder<ListenerStateCapture<P, D, DCL>> reallyAffected) {
147
148         for(ListenerStateCapture<P, D, DCL> listenerSet : probablyAffectedListeners) {
149             P affectedPath = listenerSet.getPath();
150             Optional<RootedChangeSet<P,D>> configChange = resolveConfigChange(affectedPath);
151             Optional<RootedChangeSet<P, D>> operChange = resolveOperChange(affectedPath);
152
153             if(configChange.isPresent() || operChange.isPresent()) {
154                 reallyAffected.add(listenerSet);
155                 if(configChange.isPresent()) {
156                     listenerSet.setNormalizedConfigurationChanges(configChange.get());
157                 }
158
159                 if(operChange.isPresent()) {
160                     listenerSet.setNormalizedOperationalChanges(operChange.get());
161                 }
162             }
163         }
164     }
165
166     private Optional<RootedChangeSet<P, D>> resolveOperChange(P affectedPath) {
167         Map<P, D> originalOper = dataBroker.deepGetBySubpath(transaction.getOriginalOperationalData(),affectedPath);
168         Map<P, D> createdOper = dataBroker.deepGetBySubpath(transaction.getCreatedOperationalData(),affectedPath);
169         Map<P, D> updatedOper = dataBroker.deepGetBySubpath(transaction.getUpdatedOperationalData(),affectedPath);
170         Set<P> removedOper = Sets.filter(transaction.getRemovedOperationalData(), dataBroker.createIsContainedPredicate(affectedPath));
171         return resolveChanges(affectedPath,originalOper,createdOper,updatedOper,removedOper);
172     }
173
174     private Optional<RootedChangeSet<P, D>> resolveConfigChange(P affectedPath) {
175         Map<P, D> originalConfig = dataBroker.deepGetBySubpath(transaction.getOriginalConfigurationData(),affectedPath);
176         Map<P, D> createdConfig = dataBroker.deepGetBySubpath(transaction.getCreatedConfigurationData(),affectedPath);
177         Map<P, D> updatedConfig = dataBroker.deepGetBySubpath(transaction.getUpdatedConfigurationData(),affectedPath);
178         Set<P> removedConfig = Sets.filter(transaction.getRemovedConfigurationData(), dataBroker.createIsContainedPredicate(affectedPath));
179         return resolveChanges(affectedPath,originalConfig,createdConfig,updatedConfig,removedConfig);
180     }
181
182     private Optional<RootedChangeSet<P,D>> resolveChanges(P affectedPath, Map<P, D> originalConfig, Map<P, D> createdConfig, Map<P, D> updatedConfig,Set<P> potentialDeletions) {
183         Predicate<P> isContained = dataBroker.createIsContainedPredicate(affectedPath);
184
185         if(createdConfig.isEmpty() && updatedConfig.isEmpty() && potentialDeletions.isEmpty()) {
186             return Optional.absent();
187         }
188         RootedChangeSet<P, D> changeSet = new RootedChangeSet<P,D>(affectedPath,originalConfig);
189         changeSet.addCreated(createdConfig);
190
191         for(Entry<P, D> entry : updatedConfig.entrySet()) {
192             if(originalConfig.containsKey(entry.getKey())) {
193                 changeSet.addUpdated(entry);
194             } else {
195                 changeSet.addCreated(entry);
196             }
197         }
198
199         for(Entry<P,D> entry : originalConfig.entrySet()) {
200             for(P deletion : potentialDeletions) {
201                 if(isContained.apply(deletion)) {
202                     changeSet.addRemoval(entry.getKey());
203                 }
204             }
205         }
206
207         if(changeSet.isChange()) {
208             return Optional.of(changeSet);
209         } else {
210             return Optional.absent();
211         }
212
213     }
214
215     public void publishDataChangeEvent(final ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
216         ExecutorService executor = this.dataBroker.getExecutor();
217         final Runnable notifyTask = new Runnable() {
218             @Override
219             public void run() {
220                 for (final ListenerStateCapture<P, D, DCL> listenerSet : listeners) {
221                     DataChangeEvent<P, D> changeEvent = listenerSet.createEvent(transaction);
222                     for (final DataChangeListenerRegistration<P, D, DCL> listener : listenerSet.getListeners()) {
223                         try {
224                             listener.getInstance().onDataChanged(changeEvent);
225                         } catch (Exception e) {
226                             log.error("Unhandled exception when invoking listener {}", listener, e);
227                         }
228                     }
229                 }
230             }
231         };
232         executor.submit(notifyTask);
233     }
234
235     public RpcResult<TransactionStatus> rollback(final List<DataCommitTransaction<P, D>> transactions, final Exception e) {
236         for (final DataCommitTransaction<P, D> transaction : transactions) {
237             transaction.rollback();
238         }
239         return RpcResultBuilder.<TransactionStatus> failed().withResult(TransactionStatus.FAILED).build();
240     }
241 }