Merge "Activate enforcement of Path type"
[controller.git] / opendaylight / md-sal / sal-common-impl / src / main / java / org / opendaylight / controller / md / sal / common / impl / service / TwoPhaseCommit.java
1 /**
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.common.impl.service;
9
10 import java.util.ArrayList;
11 import java.util.Collections;
12 import java.util.List;
13 import java.util.Map;
14 import java.util.Map.Entry;
15 import java.util.Set;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.ExecutorService;
18
19 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
20 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
21 import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener;
22 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
23 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
24 import org.opendaylight.controller.sal.common.util.Rpcs;
25 import org.opendaylight.yangtools.concepts.Path;
26 import org.opendaylight.yangtools.yang.common.RpcError;
27 import org.opendaylight.yangtools.yang.common.RpcResult;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 import com.google.common.base.Optional;
32 import com.google.common.base.Predicate;
33 import com.google.common.collect.ImmutableList;
34 import com.google.common.collect.ImmutableList.Builder;
35 import com.google.common.collect.ImmutableSet;
36 import com.google.common.collect.Sets;
37
38 public class TwoPhaseCommit<P extends Path<P>, D extends Object, DCL extends DataChangeListener<P, D>> implements
39         Callable<RpcResult<TransactionStatus>> {
40     private final static Logger log = LoggerFactory.getLogger(TwoPhaseCommit.class);
41
42     private final AbstractDataTransaction<P, D> transaction;
43
44     private final AbstractDataBroker<P, D, DCL> dataBroker;
45
46     public TwoPhaseCommit(final AbstractDataTransaction<P, D> transaction, final AbstractDataBroker<P, D, DCL> broker) {
47         this.transaction = transaction;
48         this.dataBroker = broker;
49     }
50
51     @Override
52     public RpcResult<TransactionStatus> call() throws Exception {
53         final Object transactionId = this.transaction.getIdentifier();
54
55         Set<P> changedPaths = ImmutableSet.<P> builder().addAll(transaction.getUpdatedConfigurationData().keySet())
56                 .addAll(transaction.getCreatedConfigurationData().keySet())
57                 .addAll(transaction.getRemovedConfigurationData())
58                 .addAll(transaction.getUpdatedOperationalData().keySet())
59                 .addAll(transaction.getCreatedOperationalData().keySet())
60                 .addAll(transaction.getRemovedOperationalData()).build();
61
62         log.trace("Transaction: {} Affected Subtrees:", transactionId, changedPaths);
63
64         // The transaction has no effects, let's just shortcut it
65         if (changedPaths.isEmpty()) {
66             dataBroker.getFinishedTransactionsCount().getAndIncrement();
67             transaction.changeStatus(TransactionStatus.COMMITED);
68
69             log.trace("Transaction: {} Finished successfully (no effects).", transactionId);
70
71             return Rpcs.<TransactionStatus> getRpcResult(true, TransactionStatus.COMMITED,
72                     Collections.<RpcError> emptySet());
73         }
74
75         final ImmutableList.Builder<ListenerStateCapture<P, D, DCL>> listenersBuilder = ImmutableList.builder();
76         listenersBuilder.addAll(dataBroker.affectedListeners(changedPaths));
77         filterProbablyAffectedListeners(dataBroker.probablyAffectedListeners(changedPaths),listenersBuilder);
78
79
80
81         final ImmutableList<ListenerStateCapture<P, D, DCL>> listeners = listenersBuilder.build();
82         final Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(changedPaths);
83         captureInitialState(listeners);
84
85
86         log.trace("Transaction: {} Starting Request Commit.",transactionId);
87         final List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList<>();
88         try {
89             for (final DataCommitHandler<P, D> handler : commitHandlers) {
90                 DataCommitTransaction<P, D> requestCommit = handler.requestCommit(this.transaction);
91                 if (requestCommit != null) {
92                     handlerTransactions.add(requestCommit);
93                 } else {
94                     log.debug("Transaction: {}, Handler {}  is not participating in transaction.", transactionId,
95                             handler);
96                 }
97             }
98         } catch (Exception e) {
99             log.error("Transaction: {} Request Commit failed", transactionId, e);
100             dataBroker.getFailedTransactionsCount().getAndIncrement();
101             this.transaction.changeStatus(TransactionStatus.FAILED);
102             return this.rollback(handlerTransactions, e);
103
104         }
105
106         log.trace("Transaction: {} Starting Finish.",transactionId);
107         final List<RpcResult<Void>> results = new ArrayList<RpcResult<Void>>();
108         try {
109             for (final DataCommitTransaction<P, D> subtransaction : handlerTransactions) {
110                 results.add(subtransaction.finish());
111             }
112         } catch (Exception e) {
113             log.error("Transaction: {} Finish Commit failed", transactionId, e);
114             dataBroker.getFailedTransactionsCount().getAndIncrement();
115             transaction.changeStatus(TransactionStatus.FAILED);
116             return this.rollback(handlerTransactions, e);
117         }
118
119
120         dataBroker.getFinishedTransactionsCount().getAndIncrement();
121         transaction.changeStatus(TransactionStatus.COMMITED);
122
123         log.trace("Transaction: {} Finished successfully.", transactionId);
124
125         captureFinalState(listeners);
126
127         log.trace("Transaction: {} Notifying listeners.");
128
129         publishDataChangeEvent(listeners);
130         return Rpcs.<TransactionStatus> getRpcResult(true, TransactionStatus.COMMITED,
131                 Collections.<RpcError> emptySet());
132     }
133
134     private void captureInitialState(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
135         for (ListenerStateCapture<P, D, DCL> state : listeners) {
136             state.setInitialConfigurationState(dataBroker.readConfigurationData(state.getPath()));
137             state.setInitialOperationalState(dataBroker.readOperationalData(state.getPath()));
138         }
139     }
140
141
142     private void captureFinalState(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
143         for (ListenerStateCapture<P, D, DCL> state : listeners) {
144             state.setFinalConfigurationState(dataBroker.readConfigurationData(state.getPath()));
145             state.setFinalOperationalState(dataBroker.readOperationalData(state.getPath()));
146         }
147     }
148
149     private void filterProbablyAffectedListeners(
150             ImmutableList<ListenerStateCapture<P, D, DCL>> probablyAffectedListeners, Builder<ListenerStateCapture<P, D, DCL>> reallyAffected) {
151
152         for(ListenerStateCapture<P, D, DCL> listenerSet : probablyAffectedListeners) {
153             P affectedPath = listenerSet.getPath();
154             Optional<RootedChangeSet<P,D>> configChange = resolveConfigChange(affectedPath);
155             Optional<RootedChangeSet<P, D>> operChange = resolveOperChange(affectedPath);
156
157             if(configChange.isPresent() || operChange.isPresent()) {
158                 reallyAffected.add(listenerSet);
159                 if(configChange.isPresent()) {
160                     listenerSet.setNormalizedConfigurationChanges(configChange.get());
161                 }
162
163                 if(operChange.isPresent()) {
164                     listenerSet.setNormalizedOperationalChanges(operChange.get());
165                 }
166             }
167         }
168     }
169
170     private Optional<RootedChangeSet<P, D>> resolveOperChange(P affectedPath) {
171         Map<P, D> originalOper = dataBroker.deepGetBySubpath(transaction.getOriginalOperationalData(),affectedPath);
172         Map<P, D> createdOper = dataBroker.deepGetBySubpath(transaction.getCreatedOperationalData(),affectedPath);
173         Map<P, D> updatedOper = dataBroker.deepGetBySubpath(transaction.getUpdatedOperationalData(),affectedPath);
174         Set<P> removedOper = Sets.filter(transaction.getRemovedOperationalData(), dataBroker.createIsContainedPredicate(affectedPath));
175         return resolveChanges(affectedPath,originalOper,createdOper,updatedOper,removedOper);
176     }
177
178     private Optional<RootedChangeSet<P, D>> resolveConfigChange(P affectedPath) {
179         Map<P, D> originalConfig = dataBroker.deepGetBySubpath(transaction.getOriginalConfigurationData(),affectedPath);
180         Map<P, D> createdConfig = dataBroker.deepGetBySubpath(transaction.getCreatedConfigurationData(),affectedPath);
181         Map<P, D> updatedConfig = dataBroker.deepGetBySubpath(transaction.getUpdatedConfigurationData(),affectedPath);
182         Set<P> removedConfig = Sets.filter(transaction.getRemovedConfigurationData(), dataBroker.createIsContainedPredicate(affectedPath));
183         return resolveChanges(affectedPath,originalConfig,createdConfig,updatedConfig,removedConfig);
184     }
185
186     private Optional<RootedChangeSet<P,D>> resolveChanges(P affectedPath, Map<P, D> originalConfig, Map<P, D> createdConfig, Map<P, D> updatedConfig,Set<P> potentialDeletions) {
187         Predicate<P> isContained = dataBroker.createIsContainedPredicate(affectedPath);
188
189         if(createdConfig.isEmpty() && updatedConfig.isEmpty() && potentialDeletions.isEmpty()) {
190             return Optional.absent();
191         }
192         RootedChangeSet<P, D> changeSet = new RootedChangeSet<P,D>(affectedPath,originalConfig);
193         changeSet.addCreated(createdConfig);
194
195         for(Entry<P, D> entry : updatedConfig.entrySet()) {
196             if(originalConfig.containsKey(entry.getKey())) {
197                 changeSet.addUpdated(entry);
198             } else {
199                 changeSet.addCreated(entry);
200             }
201         }
202
203         for(Entry<P,D> entry : originalConfig.entrySet()) {
204             for(P deletion : potentialDeletions) {
205                 if(isContained.apply(deletion)) {
206                     changeSet.addRemoval(entry.getKey());
207                 }
208             }
209         }
210
211         if(changeSet.isChange()) {
212             return Optional.of(changeSet);
213         } else {
214             return Optional.absent();
215         }
216
217     }
218
219     public void publishDataChangeEvent(final ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
220         ExecutorService executor = this.dataBroker.getExecutor();
221         final Runnable notifyTask = new Runnable() {
222             @Override
223             public void run() {
224                 for (final ListenerStateCapture<P, D, DCL> listenerSet : listeners) {
225                     DataChangeEvent<P, D> changeEvent = listenerSet.createEvent(transaction);
226                     for (final DataChangeListenerRegistration<P, D, DCL> listener : listenerSet.getListeners()) {
227                         try {
228                             listener.getInstance().onDataChanged(changeEvent);
229                         } catch (Exception e) {
230                             log.error("Unhandled exception when invoking listener {}", listener, e);
231                         }
232                     }
233                 }
234             }
235         };
236         executor.submit(notifyTask);
237     }
238
239     public RpcResult<TransactionStatus> rollback(final List<DataCommitTransaction<P, D>> transactions, final Exception e) {
240         for (final DataCommitTransaction<P, D> transaction : transactions) {
241             transaction.rollback();
242         }
243         Set<RpcError> _emptySet = Collections.<RpcError> emptySet();
244         return Rpcs.<TransactionStatus> getRpcResult(false, TransactionStatus.FAILED, _emptySet);
245     }
246 }