ae5537915556cfa9c3018ef6b27f072c7d2e2003
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / ProxyHistory.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import akka.actor.ActorRef;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
14 import java.util.LinkedHashMap;
15 import java.util.Map;
16 import java.util.Optional;
17 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
18 import java.util.concurrent.locks.Lock;
19 import java.util.concurrent.locks.ReentrantLock;
20 import java.util.function.BiConsumer;
21 import java.util.function.Consumer;
22 import javax.annotation.concurrent.GuardedBy;
23 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
24 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
25 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
26 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
27 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
28 import org.opendaylight.controller.cluster.access.concepts.Request;
29 import org.opendaylight.controller.cluster.access.concepts.RequestException;
30 import org.opendaylight.controller.cluster.access.concepts.Response;
31 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
32 import org.opendaylight.yangtools.concepts.Identifiable;
33 import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
34 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
35 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 /**
40  * Per-connection representation of a local history. This class handles state replication across a single connection.
41  *
42  * @author Robert Varga
43  */
44 abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
45     private abstract static class AbstractLocal extends ProxyHistory {
46         private final DataTree dataTree;
47
48         AbstractLocal(final AbstractClientConnection<ShardBackendInfo> connection,
49             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
50             super(connection, identifier);
51             this.dataTree = Preconditions.checkNotNull(dataTree);
52         }
53
54         final DataTreeSnapshot takeSnapshot() {
55             return dataTree.takeSnapshot();
56         }
57     }
58
59     private abstract static class AbstractRemote extends ProxyHistory {
60         AbstractRemote(final AbstractClientConnection<ShardBackendInfo> connection,
61             final LocalHistoryIdentifier identifier) {
62             super(connection, identifier);
63         }
64
65         @Override
66         final AbstractProxyTransaction doCreateTransactionProxy(
67                 final AbstractClientConnection<ShardBackendInfo> connection, final TransactionIdentifier txId) {
68             return new RemoteProxyTransaction(this, txId);
69         }
70     }
71
72     private static final class Local extends AbstractLocal {
73         private static final AtomicReferenceFieldUpdater<Local, LocalProxyTransaction> LAST_SEALED_UPDATER =
74                 AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalProxyTransaction.class, "lastSealed");
75
76         // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
77         // the open one and attempts to create a new transaction again.
78         private LocalProxyTransaction lastOpen;
79
80         private volatile LocalProxyTransaction lastSealed;
81
82         Local(final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier,
83             final DataTree dataTree) {
84             super(connection, identifier, dataTree);
85         }
86
87         @Override
88         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
89                 final TransactionIdentifier txId) {
90             Preconditions.checkState(lastOpen == null, "Proxy {} is currently open", lastOpen);
91
92             // onTransactionCompleted() runs concurrently
93             final LocalProxyTransaction localSealed = lastSealed;
94             final DataTreeSnapshot baseSnapshot;
95             if (localSealed != null) {
96                 baseSnapshot = localSealed.getSnapshot();
97             } else {
98                 baseSnapshot = takeSnapshot();
99             }
100
101             lastOpen = new LocalProxyTransaction(this, txId,
102                 (CursorAwareDataTreeModification) baseSnapshot.newModification());
103             return lastOpen;
104         }
105
106         @Override
107         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
108             return createClient(connection, getIdentifier());
109         }
110
111         @Override
112         void onTransactionAborted(final AbstractProxyTransaction tx) {
113             Preconditions.checkState(tx.equals(lastOpen));
114             lastOpen = null;
115         }
116
117         @Override
118         void onTransactionCompleted(final AbstractProxyTransaction tx) {
119             Verify.verify(tx instanceof LocalProxyTransaction);
120
121             if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalProxyTransaction) tx, null)) {
122                 LOG.debug("Completed last sealed transaction {}", tx);
123             }
124         }
125
126         @Override
127         void onTransactionSealed(final AbstractProxyTransaction tx) {
128             Preconditions.checkState(tx.equals(lastOpen));
129             lastSealed = lastOpen;
130             lastOpen = null;
131         }
132     }
133
134     private static final class LocalSingle extends AbstractLocal {
135         LocalSingle(final AbstractClientConnection<ShardBackendInfo> connection,
136             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
137             super(connection, identifier, dataTree);
138         }
139
140         @Override
141         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
142                 final TransactionIdentifier txId) {
143             return new LocalProxyTransaction(this, txId,
144                 (CursorAwareDataTreeModification) takeSnapshot().newModification());
145         }
146
147         @Override
148         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
149             return createSingle(connection, getIdentifier());
150         }
151     }
152
153     private static final class Remote extends AbstractRemote {
154         Remote(final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
155             super(connection, identifier);
156         }
157
158         @Override
159         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
160             return createClient(connection, getIdentifier());
161         }
162     }
163
164     private static final class RemoteSingle extends AbstractRemote {
165         RemoteSingle(final AbstractClientConnection<ShardBackendInfo> connection,
166             final LocalHistoryIdentifier identifier) {
167             super(connection, identifier);
168         }
169
170         @Override
171         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
172             return createSingle(connection, getIdentifier());
173         }
174     }
175
176     private static final class RequestReplayException extends RequestException {
177         private static final long serialVersionUID = 1L;
178
179         RequestReplayException(final String format, final Object... args) {
180             super(String.format(format, args));
181         }
182
183         @Override
184         public boolean isRetriable() {
185             return false;
186         }
187     }
188
189     private final class ReconnectCohort extends ProxyReconnectCohort {
190         @Override
191         public LocalHistoryIdentifier getIdentifier() {
192             return identifier;
193         }
194
195         @GuardedBy("lock")
196         @Override
197         void replaySuccessfulRequests() {
198             for (AbstractProxyTransaction t : proxies.values()) {
199                 final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier());
200                 LOG.debug("{} created successor transaction proxy {} for {}", identifier, newProxy, t);
201                 t.replaySuccessfulRequests(newProxy);
202             }
203         }
204
205         @GuardedBy("lock")
206         @Override
207         ProxyHistory finishReconnect() {
208             final ProxyHistory ret = Verify.verifyNotNull(successor);
209             LOG.debug("Finished reconnecting proxy history {}", this);
210             lock.unlock();
211             return ret;
212         }
213
214         @Override
215         void replayRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
216                 final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> replayTo) throws RequestException {
217             if (request instanceof TransactionRequest) {
218                 replayTransactionRequest((TransactionRequest<?>) request, callback);
219             } else if (request instanceof LocalHistoryRequest) {
220                 replayTo.accept(request, callback);
221             } else {
222                 throw new IllegalArgumentException("Unhandled request " + request);
223             }
224         }
225
226         private void replayTransactionRequest(final TransactionRequest<?> request,
227                 final Consumer<Response<?, ?>> callback) throws RequestException {
228
229             final AbstractProxyTransaction proxy;
230             lock.lock();
231             try {
232                 proxy = proxies.get(request.getTarget());
233             } finally {
234                 lock.unlock();
235             }
236             if (proxy == null) {
237                 throw new RequestReplayException("Failed to find proxy for %s", request);
238             }
239
240             proxy.replayRequest(request, callback);
241         }
242     }
243
244     private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class);
245
246     private final Lock lock = new ReentrantLock();
247     private final LocalHistoryIdentifier identifier;
248     private final AbstractClientConnection<ShardBackendInfo> connection;
249
250     @GuardedBy("lock")
251     private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
252     @GuardedBy("lock")
253     private ProxyHistory successor;
254
255     private ProxyHistory(final AbstractClientConnection<ShardBackendInfo> connection,
256             final LocalHistoryIdentifier identifier) {
257         this.connection = Preconditions.checkNotNull(connection);
258         this.identifier = Preconditions.checkNotNull(identifier);
259     }
260
261     static ProxyHistory createClient(final AbstractClientConnection<ShardBackendInfo> connection,
262             final LocalHistoryIdentifier identifier) {
263         final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
264         return dataTree.isPresent() ? new Local(connection, identifier, dataTree.get())
265              : new Remote(connection, identifier);
266     }
267
268     static ProxyHistory createSingle(final AbstractClientConnection<ShardBackendInfo> connection,
269             final LocalHistoryIdentifier identifier) {
270         final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
271         return dataTree.isPresent() ? new LocalSingle(connection, identifier, dataTree.get())
272              : new RemoteSingle(connection, identifier);
273     }
274
275     @Override
276     public LocalHistoryIdentifier getIdentifier() {
277         return identifier;
278     }
279
280     final ActorRef localActor() {
281         return connection.localActor();
282     }
283
284     final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId) {
285         final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
286
287         lock.lock();
288         try {
289             final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId);
290             proxies.put(proxyId, ret);
291             LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
292             return ret;
293         } finally {
294             lock.unlock();
295         }
296     }
297
298     final void abortTransaction(final AbstractProxyTransaction tx) {
299         lock.lock();
300         try {
301             proxies.remove(tx.getIdentifier());
302         } finally {
303             lock.unlock();
304         }
305     }
306
307     final void completeTransaction(final AbstractProxyTransaction tx) {
308         lock.lock();
309         try {
310             proxies.remove(tx.getIdentifier());
311         } finally {
312             lock.unlock();
313         }
314     }
315
316     final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
317         connection.sendRequest(request, callback);
318     }
319
320     @GuardedBy("lock")
321     abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
322             TransactionIdentifier txId);
323
324     abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
325
326     @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
327     ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
328         lock.lock();
329         if (successor != null) {
330             lock.unlock();
331             throw new IllegalStateException("Proxy history " + this + " already has a successor");
332         }
333
334         successor = createSuccessor(newConnection);
335         return new ReconnectCohort();
336     }
337
338     @GuardedBy("lock")
339     void onTransactionAborted(final AbstractProxyTransaction tx) {
340         // No-op for most implementations
341     }
342
343     @GuardedBy("lock")
344     void onTransactionCompleted(final AbstractProxyTransaction tx) {
345         // No-op for most implementations
346     }
347
348     void onTransactionSealed(final AbstractProxyTransaction tx) {
349         // No-op on most implementations
350     }
351 }