2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
10 import akka.actor.ActorRef;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
14 import java.util.Collection;
15 import java.util.Iterator;
16 import java.util.LinkedHashMap;
18 import java.util.Optional;
19 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
20 import java.util.concurrent.locks.Lock;
21 import java.util.concurrent.locks.ReentrantLock;
22 import java.util.function.Consumer;
23 import javax.annotation.concurrent.GuardedBy;
24 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
25 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
26 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
27 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
28 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
29 import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
30 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
31 import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
32 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
33 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
34 import org.opendaylight.controller.cluster.access.concepts.Request;
35 import org.opendaylight.controller.cluster.access.concepts.RequestException;
36 import org.opendaylight.controller.cluster.access.concepts.Response;
37 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
38 import org.opendaylight.yangtools.concepts.Identifiable;
39 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
40 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
45 * Per-connection representation of a local history. This class handles state replication across a single connection.
47 * @author Robert Varga
49 abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
50 private abstract static class AbstractLocal extends ProxyHistory {
51 private final DataTree dataTree;
53 AbstractLocal(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
54 final LocalHistoryIdentifier identifier, final DataTree dataTree) {
55 super(parent, connection, identifier);
56 this.dataTree = Preconditions.checkNotNull(dataTree);
59 final DataTreeSnapshot takeSnapshot() {
60 return dataTree.takeSnapshot();
64 private abstract static class AbstractRemote extends ProxyHistory {
65 AbstractRemote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
66 final LocalHistoryIdentifier identifier) {
67 super(parent, connection, identifier);
71 private static final class Local extends AbstractLocal {
72 private static final AtomicReferenceFieldUpdater<Local, LocalReadWriteProxyTransaction> LAST_SEALED_UPDATER =
73 AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalReadWriteProxyTransaction.class, "lastSealed");
75 // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
76 // the open one and attempts to create a new transaction again.
77 private LocalReadWriteProxyTransaction lastOpen;
79 private volatile LocalReadWriteProxyTransaction lastSealed;
81 Local(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
82 final LocalHistoryIdentifier identifier, final DataTree dataTree) {
83 super(parent, connection, identifier, dataTree);
87 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
88 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
89 Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
92 // Done transactions do not register on our radar on should not have any state associated.
93 return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId)
94 : new LocalReadWriteProxyTransaction(this, txId);
97 // onTransactionCompleted() runs concurrently
98 final LocalReadWriteProxyTransaction localSealed = lastSealed;
99 final DataTreeSnapshot baseSnapshot;
100 if (localSealed != null) {
101 baseSnapshot = localSealed.getSnapshot();
103 baseSnapshot = takeSnapshot();
107 return new LocalReadOnlyProxyTransaction(this, txId, baseSnapshot);
110 lastOpen = new LocalReadWriteProxyTransaction(this, txId, baseSnapshot);
111 LOG.debug("Proxy {} open transaction {}", this, lastOpen);
116 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
117 return createClient(parent(), connection, getIdentifier());
121 void onTransactionAborted(final AbstractProxyTransaction tx) {
122 if (tx.equals(lastOpen)) {
128 void onTransactionCompleted(final AbstractProxyTransaction tx) {
129 Verify.verify(tx instanceof LocalProxyTransaction);
130 if (tx instanceof LocalReadWriteProxyTransaction
131 && LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
132 LOG.debug("Completed last sealed transaction {}", tx);
137 void onTransactionSealed(final AbstractProxyTransaction tx) {
138 Preconditions.checkState(tx.equals(lastOpen));
139 lastSealed = lastOpen;
144 private static final class LocalSingle extends AbstractLocal {
145 LocalSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
146 final LocalHistoryIdentifier identifier, final DataTree dataTree) {
147 super(parent, connection, identifier, dataTree);
151 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
152 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
153 final DataTreeSnapshot snapshot = takeSnapshot();
154 return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
155 new LocalReadWriteProxyTransaction(this, txId, snapshot);
159 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
160 return createSingle(parent(), connection, getIdentifier());
164 private static final class Remote extends AbstractRemote {
165 Remote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
166 final LocalHistoryIdentifier identifier) {
167 super(parent, connection, identifier);
171 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
172 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
173 return new RemoteProxyTransaction(this, txId, snapshotOnly, true, isDone);
177 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
178 return createClient(parent(), connection, getIdentifier());
182 private static final class RemoteSingle extends AbstractRemote {
183 RemoteSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
184 final LocalHistoryIdentifier identifier) {
185 super(parent, connection, identifier);
189 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
190 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
191 return new RemoteProxyTransaction(this, txId, snapshotOnly, false, isDone);
195 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
196 return createSingle(parent(), connection, getIdentifier());
200 private static final class RequestReplayException extends RequestException {
201 private static final long serialVersionUID = 1L;
203 RequestReplayException(final String format, final Object... args) {
204 super(String.format(format, args));
208 public boolean isRetriable() {
213 private final class ReconnectCohort extends ProxyReconnectCohort {
215 public LocalHistoryIdentifier getIdentifier() {
221 void replayRequests(final Collection<ConnectionEntry> previousEntries) {
222 // First look for our Create message
223 Iterator<ConnectionEntry> it = previousEntries.iterator();
224 while (it.hasNext()) {
225 final ConnectionEntry e = it.next();
226 final Request<?, ?> req = e.getRequest();
227 if (identifier.equals(req.getTarget())) {
228 Verify.verify(req instanceof LocalHistoryRequest);
229 if (req instanceof CreateLocalHistoryRequest) {
230 successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
237 for (AbstractProxyTransaction t : proxies.values()) {
238 LOG.debug("{} replaying messages to old proxy {} towards successor {}", identifier, t, successor);
239 t.replayMessages(successor, previousEntries);
242 // Now look for any finalizing messages
243 it = previousEntries.iterator();
244 while (it.hasNext()) {
245 final ConnectionEntry e = it.next();
246 final Request<?, ?> req = e.getRequest();
247 if (identifier.equals(req.getTarget())) {
248 Verify.verify(req instanceof LocalHistoryRequest);
249 if (req instanceof DestroyLocalHistoryRequest) {
250 successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
260 ProxyHistory finishReconnect() {
261 final ProxyHistory ret = Verify.verifyNotNull(successor);
263 for (AbstractProxyTransaction t : proxies.values()) {
267 LOG.debug("Finished reconnecting proxy history {}", this);
273 void replayEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> replayTo)
274 throws RequestException {
275 final Request<?, ?> request = entry.getRequest();
276 if (request instanceof TransactionRequest) {
277 lookupProxy(request).replayRequest((TransactionRequest<?>) request, entry.getCallback(),
278 entry.getEnqueuedTicks());
279 } else if (request instanceof LocalHistoryRequest) {
280 replayTo.accept(entry);
282 throw new IllegalArgumentException("Unhandled request " + request);
287 void forwardEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> forwardTo)
288 throws RequestException {
289 final Request<?, ?> request = entry.getRequest();
290 if (request instanceof TransactionRequest) {
291 lookupProxy(request).forwardRequest((TransactionRequest<?>) request, entry.getCallback());
292 } else if (request instanceof LocalHistoryRequest) {
293 forwardTo.accept(entry);
295 throw new IllegalArgumentException("Unhandled request " + request);
299 private AbstractProxyTransaction lookupProxy(final Request<?, ?> request)
300 throws RequestReplayException {
301 final AbstractProxyTransaction proxy;
304 proxy = proxies.get(request.getTarget());
312 throw new RequestReplayException("Failed to find proxy for %s", request);
316 private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class);
318 private final Lock lock = new ReentrantLock();
319 private final LocalHistoryIdentifier identifier;
320 private final AbstractClientConnection<ShardBackendInfo> connection;
321 private final AbstractClientHistory parent;
324 private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
326 private ProxyHistory successor;
328 private ProxyHistory(final AbstractClientHistory parent,
329 final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
330 this.parent = Preconditions.checkNotNull(parent);
331 this.connection = Preconditions.checkNotNull(connection);
332 this.identifier = Preconditions.checkNotNull(identifier);
335 static ProxyHistory createClient(final AbstractClientHistory parent,
336 final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
337 final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
338 return dataTree.isPresent() ? new Local(parent, connection, identifier, dataTree.get())
339 : new Remote(parent, connection, identifier);
342 static ProxyHistory createSingle(final AbstractClientHistory parent,
343 final AbstractClientConnection<ShardBackendInfo> connection,
344 final LocalHistoryIdentifier identifier) {
345 final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
346 return dataTree.isPresent() ? new LocalSingle(parent, connection, identifier, dataTree.get())
347 : new RemoteSingle(parent, connection, identifier);
351 public LocalHistoryIdentifier getIdentifier() {
355 final ClientActorContext context() {
356 return connection.context();
359 final long currentTime() {
360 return connection.currentTime();
363 final ActorRef localActor() {
364 return connection.localActor();
367 final AbstractClientHistory parent() {
371 final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
372 final boolean snapshotOnly) {
373 return createTransactionProxy(txId, snapshotOnly, false);
376 AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId, final boolean snapshotOnly,
377 final boolean isDone) {
380 if (successor != null) {
381 return successor.createTransactionProxy(txId, snapshotOnly, isDone);
384 final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
385 final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly, isDone);
386 proxies.put(proxyId, ret);
387 LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
394 final void abortTransaction(final AbstractProxyTransaction tx) {
397 // Removal will be completed once purge completes
398 LOG.debug("Proxy {} aborted transaction {}", this, tx);
399 onTransactionAborted(tx);
405 final void completeTransaction(final AbstractProxyTransaction tx) {
408 // Removal will be completed once purge completes
409 LOG.debug("Proxy {} completing transaction {}", this, tx);
410 onTransactionCompleted(tx);
416 void purgeTransaction(final AbstractProxyTransaction tx) {
419 proxies.remove(tx.getIdentifier());
420 LOG.debug("Proxy {} purged transaction {}", this, tx);
429 if (successor != null) {
434 LOG.debug("Proxy {} invoking destroy", this);
435 connection.sendRequest(new DestroyLocalHistoryRequest(getIdentifier(), 1, localActor()),
436 this::onDestroyComplete);
442 final void enqueueRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
443 final long enqueuedTicks) {
444 connection.enqueueRequest(request, callback, enqueuedTicks);
447 final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
448 connection.sendRequest(request, callback);
452 @SuppressWarnings("checkstyle:hiddenField")
453 abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
454 TransactionIdentifier txId, boolean snapshotOnly, boolean isDone);
456 @SuppressWarnings("checkstyle:hiddenField")
457 abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
459 @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
460 ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
462 if (successor != null) {
464 throw new IllegalStateException("Proxy history " + this + " already has a successor");
467 successor = createSuccessor(newConnection);
468 LOG.debug("History {} instantiated successor {}", this, successor);
470 for (AbstractProxyTransaction t : proxies.values()) {
474 return new ReconnectCohort();
477 private void onDestroyComplete(final Response<?, ?> response) {
478 LOG.debug("Proxy {} destroy completed with {}", this, response);
482 parent.onProxyDestroyed(this);
483 connection.sendRequest(new PurgeLocalHistoryRequest(getIdentifier(), 2, localActor()),
484 this::onPurgeComplete);
490 private void onPurgeComplete(final Response<?, ?> response) {
491 LOG.debug("Proxy {} purge completed with {}", this, response);
495 void onTransactionAborted(final AbstractProxyTransaction tx) {
496 // No-op for most implementations
500 void onTransactionCompleted(final AbstractProxyTransaction tx) {
501 // No-op for most implementations
504 void onTransactionSealed(final AbstractProxyTransaction tx) {
505 // No-op on most implementations