BUG-5280: add basic concept of ClientSnapshot
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / ProxyHistory.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import akka.actor.ActorRef;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
14 import java.util.LinkedHashMap;
15 import java.util.Map;
16 import java.util.Optional;
17 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
18 import java.util.concurrent.locks.Lock;
19 import java.util.concurrent.locks.ReentrantLock;
20 import java.util.function.BiConsumer;
21 import java.util.function.Consumer;
22 import javax.annotation.concurrent.GuardedBy;
23 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
24 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
25 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
26 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
27 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
28 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
29 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
30 import org.opendaylight.controller.cluster.access.concepts.Request;
31 import org.opendaylight.controller.cluster.access.concepts.RequestException;
32 import org.opendaylight.controller.cluster.access.concepts.Response;
33 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
34 import org.opendaylight.yangtools.concepts.Identifiable;
35 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
36 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 /**
41  * Per-connection representation of a local history. This class handles state replication across a single connection.
42  *
43  * @author Robert Varga
44  */
45 abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
46     private abstract static class AbstractLocal extends ProxyHistory {
47         private final DataTree dataTree;
48
49         AbstractLocal(final AbstractClientConnection<ShardBackendInfo> connection,
50             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
51             super(connection, identifier);
52             this.dataTree = Preconditions.checkNotNull(dataTree);
53         }
54
55         final DataTreeSnapshot takeSnapshot() {
56             return dataTree.takeSnapshot();
57         }
58     }
59
60     private abstract static class AbstractRemote extends ProxyHistory {
61         AbstractRemote(final AbstractClientConnection<ShardBackendInfo> connection,
62             final LocalHistoryIdentifier identifier) {
63             super(connection, identifier);
64         }
65
66         @Override
67         final AbstractProxyTransaction doCreateTransactionProxy(
68                 final AbstractClientConnection<ShardBackendInfo> connection, final TransactionIdentifier txId,
69                 final boolean snapshotOnly) {
70             return new RemoteProxyTransaction(this, txId, snapshotOnly);
71         }
72     }
73
74     private static final class Local extends AbstractLocal {
75         private static final AtomicReferenceFieldUpdater<Local, LocalReadWriteProxyTransaction> LAST_SEALED_UPDATER =
76                 AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalReadWriteProxyTransaction.class, "lastSealed");
77
78         // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
79         // the open one and attempts to create a new transaction again.
80         private LocalReadWriteProxyTransaction lastOpen;
81
82         private volatile LocalReadWriteProxyTransaction lastSealed;
83
84         Local(final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier,
85             final DataTree dataTree) {
86             super(connection, identifier, dataTree);
87         }
88
89         @Override
90         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
91                 final TransactionIdentifier txId, final boolean snapshotOnly) {
92             Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
93
94             // onTransactionCompleted() runs concurrently
95             final LocalReadWriteProxyTransaction localSealed = lastSealed;
96             final DataTreeSnapshot baseSnapshot;
97             if (localSealed != null) {
98                 baseSnapshot = localSealed.getSnapshot();
99             } else {
100                 baseSnapshot = takeSnapshot();
101             }
102
103             if (snapshotOnly) {
104                 return new LocalReadOnlyProxyTransaction(this, txId, baseSnapshot);
105             }
106
107             lastOpen = new LocalReadWriteProxyTransaction(this, txId, baseSnapshot);
108             LOG.debug("Proxy {} open transaction {}", this, lastOpen);
109             return lastOpen;
110         }
111
112         @Override
113         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
114             return createClient(connection, getIdentifier());
115         }
116
117         @Override
118         void onTransactionAborted(final AbstractProxyTransaction tx) {
119             if (tx.equals(lastOpen)) {
120                 lastOpen = null;
121             }
122         }
123
124         @Override
125         void onTransactionCompleted(final AbstractProxyTransaction tx) {
126             Verify.verify(tx instanceof LocalProxyTransaction);
127             if (tx instanceof LocalReadWriteProxyTransaction) {
128                 if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
129                     LOG.debug("Completed last sealed transaction {}", tx);
130                 }
131             }
132         }
133
134         @Override
135         void onTransactionSealed(final AbstractProxyTransaction tx) {
136             Preconditions.checkState(tx.equals(lastOpen));
137             lastSealed = lastOpen;
138             lastOpen = null;
139         }
140     }
141
142     private static final class LocalSingle extends AbstractLocal {
143         LocalSingle(final AbstractClientConnection<ShardBackendInfo> connection,
144             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
145             super(connection, identifier, dataTree);
146         }
147
148         @Override
149         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
150                 final TransactionIdentifier txId, final boolean snapshotOnly) {
151             final DataTreeSnapshot snapshot = takeSnapshot();
152             return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
153                 new LocalReadWriteProxyTransaction(this, txId, snapshot);
154         }
155
156         @Override
157         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
158             return createSingle(connection, getIdentifier());
159         }
160     }
161
162     private static final class Remote extends AbstractRemote {
163         Remote(final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
164             super(connection, identifier);
165         }
166
167         @Override
168         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
169             return createClient(connection, getIdentifier());
170         }
171     }
172
173     private static final class RemoteSingle extends AbstractRemote {
174         RemoteSingle(final AbstractClientConnection<ShardBackendInfo> connection,
175             final LocalHistoryIdentifier identifier) {
176             super(connection, identifier);
177         }
178
179         @Override
180         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
181             return createSingle(connection, getIdentifier());
182         }
183     }
184
185     private static final class RequestReplayException extends RequestException {
186         private static final long serialVersionUID = 1L;
187
188         RequestReplayException(final String format, final Object... args) {
189             super(String.format(format, args));
190         }
191
192         @Override
193         public boolean isRetriable() {
194             return false;
195         }
196     }
197
198     private final class ReconnectCohort extends ProxyReconnectCohort {
199         @Override
200         public LocalHistoryIdentifier getIdentifier() {
201             return identifier;
202         }
203
204         @GuardedBy("lock")
205         @Override
206         void replaySuccessfulRequests(final Iterable<ConnectionEntry> previousEntries) {
207             // First look for our Create message
208             for (ConnectionEntry e : previousEntries) {
209                 final Request<?, ?> req = e.getRequest();
210                 if (identifier.equals(req.getTarget())) {
211                     Verify.verify(req instanceof LocalHistoryRequest);
212                     if (req instanceof CreateLocalHistoryRequest) {
213                         successor.connection.sendRequest(req, e.getCallback());
214                         break;
215                     }
216                 }
217             }
218
219             for (AbstractProxyTransaction t : proxies.values()) {
220                 LOG.debug("{} creating successor transaction proxy for {}", identifier, t);
221                 final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier(),
222                     t.isSnapshotOnly());
223                 LOG.debug("{} created successor transaction proxy {}", identifier, newProxy);
224                 t.replayMessages(newProxy, previousEntries);
225             }
226
227             // Now look for any finalizing messages
228             for (ConnectionEntry e : previousEntries) {
229                 final Request<?, ?> req = e.getRequest();
230                 if (identifier.equals(req.getTarget())) {
231                     Verify.verify(req instanceof LocalHistoryRequest);
232                     successor.connection.sendRequest(req, e.getCallback());
233                 }
234             }
235         }
236
237         @GuardedBy("lock")
238         @Override
239         ProxyHistory finishReconnect() {
240             final ProxyHistory ret = Verify.verifyNotNull(successor);
241
242             for (AbstractProxyTransaction t : proxies.values()) {
243                 t.finishReconnect();
244             }
245
246             LOG.debug("Finished reconnecting proxy history {}", this);
247             lock.unlock();
248             return ret;
249         }
250
251         @Override
252         void replayRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
253                 final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> replayTo) throws RequestException {
254             if (request instanceof TransactionRequest) {
255                 replayTransactionRequest((TransactionRequest<?>) request, callback);
256             } else if (request instanceof LocalHistoryRequest) {
257                 replayTo.accept(request, callback);
258             } else {
259                 throw new IllegalArgumentException("Unhandled request " + request);
260             }
261         }
262
263         private void replayTransactionRequest(final TransactionRequest<?> request,
264                 final Consumer<Response<?, ?>> callback) throws RequestException {
265
266             final AbstractProxyTransaction proxy;
267             lock.lock();
268             try {
269                 proxy = proxies.get(request.getTarget());
270             } finally {
271                 lock.unlock();
272             }
273             if (proxy == null) {
274                 throw new RequestReplayException("Failed to find proxy for %s", request);
275             }
276
277             proxy.replayRequest(request, callback);
278         }
279     }
280
281     private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class);
282
283     private final Lock lock = new ReentrantLock();
284     private final LocalHistoryIdentifier identifier;
285     private final AbstractClientConnection<ShardBackendInfo> connection;
286
287     @GuardedBy("lock")
288     private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
289     @GuardedBy("lock")
290     private ProxyHistory successor;
291
292     private ProxyHistory(final AbstractClientConnection<ShardBackendInfo> connection,
293             final LocalHistoryIdentifier identifier) {
294         this.connection = Preconditions.checkNotNull(connection);
295         this.identifier = Preconditions.checkNotNull(identifier);
296     }
297
298     static ProxyHistory createClient(final AbstractClientConnection<ShardBackendInfo> connection,
299             final LocalHistoryIdentifier identifier) {
300         final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
301         return dataTree.isPresent() ? new Local(connection, identifier, dataTree.get())
302              : new Remote(connection, identifier);
303     }
304
305     static ProxyHistory createSingle(final AbstractClientConnection<ShardBackendInfo> connection,
306             final LocalHistoryIdentifier identifier) {
307         final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
308         return dataTree.isPresent() ? new LocalSingle(connection, identifier, dataTree.get())
309              : new RemoteSingle(connection, identifier);
310     }
311
312     @Override
313     public LocalHistoryIdentifier getIdentifier() {
314         return identifier;
315     }
316
317     final ActorRef localActor() {
318         return connection.localActor();
319     }
320
321     final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
322             final boolean snapshotOnly) {
323         lock.lock();
324         try {
325             if (successor != null) {
326                 return successor.createTransactionProxy(txId, snapshotOnly);
327             }
328
329             final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
330             final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly);
331             proxies.put(proxyId, ret);
332             LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
333             return ret;
334         } finally {
335             lock.unlock();
336         }
337     }
338
339     final void abortTransaction(final AbstractProxyTransaction tx) {
340         lock.lock();
341         try {
342             proxies.remove(tx.getIdentifier());
343             LOG.debug("Proxy {} aborting transaction {}", this, tx);
344             onTransactionAborted(tx);
345         } finally {
346             lock.unlock();
347         }
348     }
349
350     final void completeTransaction(final AbstractProxyTransaction tx) {
351         lock.lock();
352         try {
353             proxies.remove(tx.getIdentifier());
354             LOG.debug("Proxy {} completing transaction {}", this, tx);
355             onTransactionCompleted(tx);
356         } finally {
357             lock.unlock();
358         }
359     }
360
361     final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
362         connection.sendRequest(request, callback);
363     }
364
365     @GuardedBy("lock")
366     abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
367             TransactionIdentifier txId, boolean snapshotOnly);
368
369     abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
370
371     @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
372     ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
373         lock.lock();
374         if (successor != null) {
375             lock.unlock();
376             throw new IllegalStateException("Proxy history " + this + " already has a successor");
377         }
378
379         successor = createSuccessor(newConnection);
380         LOG.debug("History {} instantiated successor {}", this, successor);
381
382         for (AbstractProxyTransaction t : proxies.values()) {
383             t.startReconnect();
384         }
385
386         return new ReconnectCohort();
387     }
388
389     @GuardedBy("lock")
390     void onTransactionAborted(final AbstractProxyTransaction tx) {
391         // No-op for most implementations
392     }
393
394     @GuardedBy("lock")
395     void onTransactionCompleted(final AbstractProxyTransaction tx) {
396         // No-op for most implementations
397     }
398
399     void onTransactionSealed(final AbstractProxyTransaction tx) {
400         // No-op on most implementations
401     }
402 }