Untangle the XML/Hello message decoders
[controller.git] / opendaylight / md-sal / sal-common-impl / src / main / java / org / opendaylight / controller / md / sal / common / impl / service / TwoPhaseCommit.java
1 /**
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.common.impl.service;
9
10 import java.util.ArrayList;
11 import java.util.Collections;
12 import java.util.List;
13 import java.util.Map;
14 import java.util.Map.Entry;
15 import java.util.Set;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.ExecutorService;
18
19 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
20 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
21 import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener;
22 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
23 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
24 import org.opendaylight.controller.sal.common.util.Rpcs;
25 import org.opendaylight.yangtools.concepts.Path;
26 import org.opendaylight.yangtools.yang.common.RpcError;
27 import org.opendaylight.yangtools.yang.common.RpcResult;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 import com.google.common.base.Optional;
32 import com.google.common.base.Predicate;
33 import com.google.common.collect.ImmutableList;
34 import com.google.common.collect.ImmutableList.Builder;
35 import com.google.common.collect.ImmutableSet;
36 import com.google.common.collect.Sets;
37
38 public class TwoPhaseCommit<P extends Path<P>, D extends Object, DCL extends DataChangeListener<P, D>> implements
39         Callable<RpcResult<TransactionStatus>> {
40     private final static Logger log = LoggerFactory.getLogger(TwoPhaseCommit.class);
41
42     private final AbstractDataTransaction<P, D> transaction;
43
44     private final AbstractDataBroker<P, D, DCL> dataBroker;
45
46     public TwoPhaseCommit(final AbstractDataTransaction<P, D> transaction, final AbstractDataBroker<P, D, DCL> broker) {
47         this.transaction = transaction;
48         this.dataBroker = broker;
49     }
50
51     @Override
52     public RpcResult<TransactionStatus> call() throws Exception {
53         final Object transactionId = this.transaction.getIdentifier();
54
55         Set<P> changedPaths = ImmutableSet.<P> builder().addAll(transaction.getUpdatedConfigurationData().keySet())
56                 .addAll(transaction.getCreatedConfigurationData().keySet())
57                 .addAll(transaction.getRemovedConfigurationData())
58                 .addAll(transaction.getUpdatedOperationalData().keySet())
59                 .addAll(transaction.getCreatedOperationalData().keySet())
60                 .addAll(transaction.getRemovedOperationalData()).build();
61
62         log.trace("Transaction: {} Affected Subtrees:", transactionId, changedPaths);
63
64         final ImmutableList.Builder<ListenerStateCapture<P, D, DCL>> listenersBuilder = ImmutableList.builder();
65         listenersBuilder.addAll(dataBroker.affectedListeners(changedPaths));
66         filterProbablyAffectedListeners(dataBroker.probablyAffectedListeners(changedPaths),listenersBuilder);
67
68
69
70         final ImmutableList<ListenerStateCapture<P, D, DCL>> listeners = listenersBuilder.build();
71         final Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(changedPaths);
72         captureInitialState(listeners);
73
74
75         log.trace("Transaction: {} Starting Request Commit.",transactionId);
76         final List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList<>();
77         try {
78             for (final DataCommitHandler<P, D> handler : commitHandlers) {
79                 DataCommitTransaction<P, D> requestCommit = handler.requestCommit(this.transaction);
80                 if (requestCommit != null) {
81                     handlerTransactions.add(requestCommit);
82                 } else {
83                     log.debug("Transaction: {}, Handler {}  is not participating in transaction.", transactionId,
84                             handler);
85                 }
86             }
87         } catch (Exception e) {
88             log.error("Transaction: {} Request Commit failed", transactionId, e);
89             dataBroker.getFailedTransactionsCount().getAndIncrement();
90             this.transaction.changeStatus(TransactionStatus.FAILED);
91             return this.rollback(handlerTransactions, e);
92
93         }
94
95         log.trace("Transaction: {} Starting Finish.",transactionId);
96         final List<RpcResult<Void>> results = new ArrayList<RpcResult<Void>>();
97         try {
98             for (final DataCommitTransaction<P, D> subtransaction : handlerTransactions) {
99                 results.add(subtransaction.finish());
100             }
101         } catch (Exception e) {
102             log.error("Transaction: {} Finish Commit failed", transactionId, e);
103             dataBroker.getFailedTransactionsCount().getAndIncrement();
104             transaction.changeStatus(TransactionStatus.FAILED);
105             return this.rollback(handlerTransactions, e);
106         }
107
108
109         dataBroker.getFinishedTransactionsCount().getAndIncrement();
110         transaction.changeStatus(TransactionStatus.COMMITED);
111
112         log.trace("Transaction: {} Finished successfully.", transactionId);
113
114         captureFinalState(listeners);
115
116         log.trace("Transaction: {} Notifying listeners.");
117
118         publishDataChangeEvent(listeners);
119         return Rpcs.<TransactionStatus> getRpcResult(true, TransactionStatus.COMMITED,
120                 Collections.<RpcError> emptySet());
121     }
122
123     private void captureInitialState(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
124         for (ListenerStateCapture<P, D, DCL> state : listeners) {
125             state.setInitialConfigurationState(dataBroker.readConfigurationData(state.getPath()));
126             state.setInitialOperationalState(dataBroker.readOperationalData(state.getPath()));
127         }
128     }
129
130
131     private void captureFinalState(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
132         for (ListenerStateCapture<P, D, DCL> state : listeners) {
133             state.setFinalConfigurationState(dataBroker.readConfigurationData(state.getPath()));
134             state.setFinalOperationalState(dataBroker.readOperationalData(state.getPath()));
135         }
136     }
137
138     private void filterProbablyAffectedListeners(
139             ImmutableList<ListenerStateCapture<P, D, DCL>> probablyAffectedListeners, Builder<ListenerStateCapture<P, D, DCL>> reallyAffected) {
140
141         for(ListenerStateCapture<P, D, DCL> listenerSet : probablyAffectedListeners) {
142             P affectedPath = listenerSet.getPath();
143             Optional<RootedChangeSet<P,D>> configChange = resolveConfigChange(affectedPath);
144             Optional<RootedChangeSet<P, D>> operChange = resolveOperChange(affectedPath);
145
146             if(configChange.isPresent() || operChange.isPresent()) {
147                 reallyAffected.add(listenerSet);
148                 if(configChange.isPresent()) {
149                     listenerSet.setNormalizedConfigurationChanges(configChange.get());
150                 }
151
152                 if(operChange.isPresent()) {
153                     listenerSet.setNormalizedOperationalChanges(operChange.get());
154                 }
155             }
156         }
157     }
158
159     private Optional<RootedChangeSet<P, D>> resolveOperChange(P affectedPath) {
160         Map<P, D> originalOper = dataBroker.deepGetBySubpath(transaction.getOriginalOperationalData(),affectedPath);
161         Map<P, D> createdOper = dataBroker.deepGetBySubpath(transaction.getCreatedOperationalData(),affectedPath);
162         Map<P, D> updatedOper = dataBroker.deepGetBySubpath(transaction.getUpdatedOperationalData(),affectedPath);
163         Set<P> removedOper = Sets.filter(transaction.getRemovedOperationalData(), dataBroker.createIsContainedPredicate(affectedPath));
164         return resolveChanges(affectedPath,originalOper,createdOper,updatedOper,removedOper);
165     }
166
167     private Optional<RootedChangeSet<P, D>> resolveConfigChange(P affectedPath) {
168         Map<P, D> originalConfig = dataBroker.deepGetBySubpath(transaction.getOriginalConfigurationData(),affectedPath);
169         Map<P, D> createdConfig = dataBroker.deepGetBySubpath(transaction.getCreatedConfigurationData(),affectedPath);
170         Map<P, D> updatedConfig = dataBroker.deepGetBySubpath(transaction.getUpdatedConfigurationData(),affectedPath);
171         Set<P> removedConfig = Sets.filter(transaction.getRemovedConfigurationData(), dataBroker.createIsContainedPredicate(affectedPath));
172         return resolveChanges(affectedPath,originalConfig,createdConfig,updatedConfig,removedConfig);
173     }
174
175     private Optional<RootedChangeSet<P,D>> resolveChanges(P affectedPath, Map<P, D> originalConfig, Map<P, D> createdConfig, Map<P, D> updatedConfig,Set<P> potentialDeletions) {
176         Predicate<P> isContained = dataBroker.createIsContainedPredicate(affectedPath);
177
178         if(createdConfig.isEmpty() && updatedConfig.isEmpty() && potentialDeletions.isEmpty()) {
179             return Optional.absent();
180         }
181         RootedChangeSet<P, D> changeSet = new RootedChangeSet<P,D>(affectedPath,originalConfig);
182         changeSet.addCreated(createdConfig);
183
184         for(Entry<P, D> entry : updatedConfig.entrySet()) {
185             if(originalConfig.containsKey(entry.getKey())) {
186                 changeSet.addUpdated(entry);
187             } else {
188                 changeSet.addCreated(entry);
189             }
190         }
191
192         for(Entry<P,D> entry : originalConfig.entrySet()) {
193             for(P deletion : potentialDeletions) {
194                 if(isContained.apply(deletion)) {
195                     changeSet.addRemoval(entry.getKey());
196                 }
197             }
198         }
199
200         if(changeSet.isChange()) {
201             return Optional.of(changeSet);
202         } else {
203             return Optional.absent();
204         }
205
206     }
207
208     public void publishDataChangeEvent(final ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
209         ExecutorService executor = this.dataBroker.getExecutor();
210         final Runnable notifyTask = new Runnable() {
211             @Override
212             public void run() {
213                 for (final ListenerStateCapture<P, D, DCL> listenerSet : listeners) {
214                     {
215                         DataChangeEvent<P, D> changeEvent = listenerSet.createEvent(transaction);
216                         for (final DataChangeListenerRegistration<P, D, DCL> listener : listenerSet.getListeners()) {
217                             try {
218                                 listener.getInstance().onDataChanged(changeEvent);
219                             } catch (Exception e) {
220                                 log.error("Unhandled exception when invoking listener {}", listener);
221                             }
222                         }
223                     }
224                 }
225             }
226         };
227         executor.submit(notifyTask);
228     }
229
230     public RpcResult<TransactionStatus> rollback(final List<DataCommitTransaction<P, D>> transactions, final Exception e) {
231         for (final DataCommitTransaction<P, D> transaction : transactions) {
232             transaction.rollback();
233         }
234         Set<RpcError> _emptySet = Collections.<RpcError> emptySet();
235         return Rpcs.<TransactionStatus> getRpcResult(false, TransactionStatus.FAILED, _emptySet);
236     }
237 }