827c19e526fcbd5f6b3cc1c33dfdc40698af11e9
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / ProxyHistory.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import akka.actor.ActorRef;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
14 import java.util.LinkedHashMap;
15 import java.util.Map;
16 import java.util.Optional;
17 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
18 import java.util.concurrent.locks.Lock;
19 import java.util.concurrent.locks.ReentrantLock;
20 import java.util.function.BiConsumer;
21 import java.util.function.Consumer;
22 import javax.annotation.concurrent.GuardedBy;
23 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
24 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
25 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
26 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
27 import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
28 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
29 import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
30 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
31 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
32 import org.opendaylight.controller.cluster.access.concepts.Request;
33 import org.opendaylight.controller.cluster.access.concepts.RequestException;
34 import org.opendaylight.controller.cluster.access.concepts.Response;
35 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
36 import org.opendaylight.yangtools.concepts.Identifiable;
37 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
38 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 /**
43  * Per-connection representation of a local history. This class handles state replication across a single connection.
44  *
45  * @author Robert Varga
46  */
47 abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
48     private abstract static class AbstractLocal extends ProxyHistory {
49         private final DataTree dataTree;
50
51         AbstractLocal(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
52             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
53             super(parent, connection, identifier);
54             this.dataTree = Preconditions.checkNotNull(dataTree);
55         }
56
57         final DataTreeSnapshot takeSnapshot() {
58             return dataTree.takeSnapshot();
59         }
60     }
61
62     private abstract static class AbstractRemote extends ProxyHistory {
63         AbstractRemote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
64             final LocalHistoryIdentifier identifier) {
65             super(parent, connection, identifier);
66         }
67     }
68
69     private static final class Local extends AbstractLocal {
70         private static final AtomicReferenceFieldUpdater<Local, LocalReadWriteProxyTransaction> LAST_SEALED_UPDATER =
71                 AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalReadWriteProxyTransaction.class, "lastSealed");
72
73         // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
74         // the open one and attempts to create a new transaction again.
75         private LocalReadWriteProxyTransaction lastOpen;
76
77         private volatile LocalReadWriteProxyTransaction lastSealed;
78
79         Local(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
80             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
81             super(parent, connection, identifier, dataTree);
82         }
83
84         @Override
85         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
86                 final TransactionIdentifier txId, final boolean snapshotOnly) {
87             Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
88
89             // onTransactionCompleted() runs concurrently
90             final LocalReadWriteProxyTransaction localSealed = lastSealed;
91             final DataTreeSnapshot baseSnapshot;
92             if (localSealed != null) {
93                 baseSnapshot = localSealed.getSnapshot();
94             } else {
95                 baseSnapshot = takeSnapshot();
96             }
97
98             if (snapshotOnly) {
99                 return new LocalReadOnlyProxyTransaction(this, txId, baseSnapshot);
100             }
101
102             lastOpen = new LocalReadWriteProxyTransaction(this, txId, baseSnapshot);
103             LOG.debug("Proxy {} open transaction {}", this, lastOpen);
104             return lastOpen;
105         }
106
107         @Override
108         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
109             return createClient(parent(), connection, getIdentifier());
110         }
111
112         @Override
113         void onTransactionAborted(final AbstractProxyTransaction tx) {
114             if (tx.equals(lastOpen)) {
115                 lastOpen = null;
116             }
117         }
118
119         @Override
120         void onTransactionCompleted(final AbstractProxyTransaction tx) {
121             Verify.verify(tx instanceof LocalProxyTransaction);
122             if (tx instanceof LocalReadWriteProxyTransaction) {
123                 if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
124                     LOG.debug("Completed last sealed transaction {}", tx);
125                 }
126             }
127         }
128
129         @Override
130         void onTransactionSealed(final AbstractProxyTransaction tx) {
131             Preconditions.checkState(tx.equals(lastOpen));
132             lastSealed = lastOpen;
133             lastOpen = null;
134         }
135     }
136
137     private static final class LocalSingle extends AbstractLocal {
138         LocalSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
139             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
140             super(parent, connection, identifier, dataTree);
141         }
142
143         @Override
144         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
145                 final TransactionIdentifier txId, final boolean snapshotOnly) {
146             final DataTreeSnapshot snapshot = takeSnapshot();
147             return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
148                 new LocalReadWriteProxyTransaction(this, txId, snapshot);
149         }
150
151         @Override
152         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
153             return createSingle(parent(), connection, getIdentifier());
154         }
155     }
156
157     private static final class Remote extends AbstractRemote {
158         Remote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
159             final LocalHistoryIdentifier identifier) {
160             super(parent, connection, identifier);
161         }
162
163         @Override
164         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
165                 final TransactionIdentifier txId, final boolean snapshotOnly) {
166             return new RemoteProxyTransaction(this, txId, snapshotOnly, true);
167         }
168
169         @Override
170         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
171             return createClient(parent(), connection, getIdentifier());
172         }
173     }
174
175     private static final class RemoteSingle extends AbstractRemote {
176         RemoteSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
177             final LocalHistoryIdentifier identifier) {
178             super(parent, connection, identifier);
179         }
180
181         @Override
182         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
183                 final TransactionIdentifier txId, final boolean snapshotOnly) {
184             return new RemoteProxyTransaction(this, txId, snapshotOnly, false);
185         }
186
187         @Override
188         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
189             return createSingle(parent(), connection, getIdentifier());
190         }
191     }
192
193     private static final class RequestReplayException extends RequestException {
194         private static final long serialVersionUID = 1L;
195
196         RequestReplayException(final String format, final Object... args) {
197             super(String.format(format, args));
198         }
199
200         @Override
201         public boolean isRetriable() {
202             return false;
203         }
204     }
205
206     private final class ReconnectCohort extends ProxyReconnectCohort {
207         @Override
208         public LocalHistoryIdentifier getIdentifier() {
209             return identifier;
210         }
211
212         @GuardedBy("lock")
213         @Override
214         void replayRequests(final Iterable<ConnectionEntry> previousEntries) {
215             // First look for our Create message
216             for (ConnectionEntry e : previousEntries) {
217                 final Request<?, ?> req = e.getRequest();
218                 if (identifier.equals(req.getTarget())) {
219                     Verify.verify(req instanceof LocalHistoryRequest);
220                     if (req instanceof CreateLocalHistoryRequest) {
221                         successor.connection.sendRequest(req, e.getCallback());
222                         break;
223                     }
224                 }
225             }
226
227             for (AbstractProxyTransaction t : proxies.values()) {
228                 LOG.debug("{} creating successor transaction proxy for {}", identifier, t);
229                 final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier(),
230                     t.isSnapshotOnly());
231                 LOG.debug("{} created successor transaction proxy {}", identifier, newProxy);
232                 t.replayMessages(newProxy, previousEntries);
233             }
234
235             // Now look for any finalizing messages
236             for (ConnectionEntry e : previousEntries) {
237                 final Request<?, ?> req = e.getRequest();
238                 if (identifier.equals(req.getTarget())) {
239                     Verify.verify(req instanceof LocalHistoryRequest);
240                     successor.connection.sendRequest(req, e.getCallback());
241                 }
242             }
243         }
244
245         @GuardedBy("lock")
246         @Override
247         ProxyHistory finishReconnect() {
248             final ProxyHistory ret = Verify.verifyNotNull(successor);
249
250             for (AbstractProxyTransaction t : proxies.values()) {
251                 t.finishReconnect();
252             }
253
254             LOG.debug("Finished reconnecting proxy history {}", this);
255             lock.unlock();
256             return ret;
257         }
258
259         @Override
260         void forwardRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
261                 final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> forwardTo) throws RequestException {
262             if (request instanceof TransactionRequest) {
263                 forwardTransactionRequest((TransactionRequest<?>) request, callback);
264             } else if (request instanceof LocalHistoryRequest) {
265                 forwardTo.accept(request, callback);
266             } else {
267                 throw new IllegalArgumentException("Unhandled request " + request);
268             }
269         }
270
271         private void forwardTransactionRequest(final TransactionRequest<?> request,
272                 final Consumer<Response<?, ?>> callback) throws RequestException {
273
274             final AbstractProxyTransaction proxy;
275             lock.lock();
276             try {
277                 proxy = proxies.get(request.getTarget());
278             } finally {
279                 lock.unlock();
280             }
281             if (proxy == null) {
282                 throw new RequestReplayException("Failed to find proxy for %s", request);
283             }
284
285             proxy.forwardRequest(request, callback);
286         }
287     }
288
289     private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class);
290
291     private final Lock lock = new ReentrantLock();
292     private final LocalHistoryIdentifier identifier;
293     private final AbstractClientConnection<ShardBackendInfo> connection;
294     private final AbstractClientHistory parent;
295
296     @GuardedBy("lock")
297     private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
298     @GuardedBy("lock")
299     private ProxyHistory successor;
300
301     private ProxyHistory(final AbstractClientHistory parent,
302             final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
303         this.parent = Preconditions.checkNotNull(parent);
304         this.connection = Preconditions.checkNotNull(connection);
305         this.identifier = Preconditions.checkNotNull(identifier);
306     }
307
308     static ProxyHistory createClient(final AbstractClientHistory parent,
309             final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
310         final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
311         return dataTree.isPresent() ? new Local(parent, connection, identifier, dataTree.get())
312              : new Remote(parent, connection, identifier);
313     }
314
315     static ProxyHistory createSingle(final AbstractClientHistory parent,
316             final AbstractClientConnection<ShardBackendInfo> connection,
317             final LocalHistoryIdentifier identifier) {
318         final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
319         return dataTree.isPresent() ? new LocalSingle(parent, connection, identifier, dataTree.get())
320              : new RemoteSingle(parent, connection, identifier);
321     }
322
323     @Override
324     public LocalHistoryIdentifier getIdentifier() {
325         return identifier;
326     }
327
328     final ActorRef localActor() {
329         return connection.localActor();
330     }
331
332     final AbstractClientHistory parent() {
333         return parent;
334     }
335
336     final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
337             final boolean snapshotOnly) {
338         lock.lock();
339         try {
340             if (successor != null) {
341                 return successor.createTransactionProxy(txId, snapshotOnly);
342             }
343
344             final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
345             final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly);
346             proxies.put(proxyId, ret);
347             LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
348             return ret;
349         } finally {
350             lock.unlock();
351         }
352     }
353
354     final void abortTransaction(final AbstractProxyTransaction tx) {
355         lock.lock();
356         try {
357             proxies.remove(tx.getIdentifier());
358             LOG.debug("Proxy {} aborting transaction {}", this, tx);
359             onTransactionAborted(tx);
360         } finally {
361             lock.unlock();
362         }
363     }
364
365     final void completeTransaction(final AbstractProxyTransaction tx) {
366         lock.lock();
367         try {
368             proxies.remove(tx.getIdentifier());
369             LOG.debug("Proxy {} completing transaction {}", this, tx);
370             onTransactionCompleted(tx);
371         } finally {
372             lock.unlock();
373         }
374     }
375
376     final void close() {
377         lock.lock();
378         try {
379             if (successor != null) {
380                 successor.close();
381                 return;
382             }
383
384             LOG.debug("Proxy {} invoking destroy", this);
385             connection.sendRequest(new DestroyLocalHistoryRequest(getIdentifier(), 1, localActor()),
386                 this::onDestroyComplete);
387         } finally {
388             lock.unlock();
389         }
390     }
391
392     final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
393         connection.sendRequest(request, callback);
394     }
395
396     @GuardedBy("lock")
397     abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
398             TransactionIdentifier txId, boolean snapshotOnly);
399
400     abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
401
402     @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
403     ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
404         lock.lock();
405         if (successor != null) {
406             lock.unlock();
407             throw new IllegalStateException("Proxy history " + this + " already has a successor");
408         }
409
410         successor = createSuccessor(newConnection);
411         LOG.debug("History {} instantiated successor {}", this, successor);
412
413         for (AbstractProxyTransaction t : proxies.values()) {
414             t.startReconnect();
415         }
416
417         return new ReconnectCohort();
418     }
419
420     private void onDestroyComplete(final Response<?, ?> response) {
421         LOG.debug("Proxy {} destroy completed with {}", this, response);
422
423         lock.lock();
424         try {
425             parent.onProxyDestroyed(this);
426             connection.sendRequest(new PurgeLocalHistoryRequest(getIdentifier(), 2, localActor()),
427                 this::onPurgeComplete);
428         } finally {
429             lock.unlock();
430         }
431     }
432
433     private void onPurgeComplete(final Response<?, ?> response) {
434         LOG.debug("Proxy {} purge completed with {}", this, response);
435     }
436
437     @GuardedBy("lock")
438     void onTransactionAborted(final AbstractProxyTransaction tx) {
439         // No-op for most implementations
440     }
441
442     @GuardedBy("lock")
443     void onTransactionCompleted(final AbstractProxyTransaction tx) {
444         // No-op for most implementations
445     }
446
447     void onTransactionSealed(final AbstractProxyTransaction tx) {
448         // No-op on most implementations
449     }
450 }