2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
10 import static com.google.common.base.Preconditions.checkState;
11 import static com.google.common.base.Verify.verify;
12 import static com.google.common.base.Verify.verifyNotNull;
13 import static java.util.Objects.requireNonNull;
15 import akka.actor.ActorRef;
16 import com.google.common.collect.ImmutableList;
17 import com.google.common.primitives.UnsignedLong;
18 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.Iterator;
22 import java.util.LinkedHashMap;
23 import java.util.List;
25 import java.util.Optional;
26 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
27 import java.util.concurrent.locks.Lock;
28 import java.util.concurrent.locks.ReentrantLock;
29 import java.util.function.Consumer;
30 import org.checkerframework.checker.lock.qual.GuardedBy;
31 import org.checkerframework.checker.lock.qual.Holding;
32 import org.eclipse.jdt.annotation.NonNull;
33 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
34 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
35 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
36 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
37 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
38 import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
39 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
40 import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
41 import org.opendaylight.controller.cluster.access.commands.SkipTransactionsRequest;
42 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
43 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
44 import org.opendaylight.controller.cluster.access.concepts.Request;
45 import org.opendaylight.controller.cluster.access.concepts.RequestException;
46 import org.opendaylight.controller.cluster.access.concepts.Response;
47 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
48 import org.opendaylight.yangtools.concepts.Identifiable;
49 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeSnapshot;
50 import org.opendaylight.yangtools.yang.data.tree.api.ReadOnlyDataTree;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
55 * Per-connection representation of a local history. This class handles state replication across a single connection.
57 * @author Robert Varga
59 abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
60 private abstract static class AbstractLocal extends ProxyHistory {
61 private final ReadOnlyDataTree dataTree;
63 AbstractLocal(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
64 final LocalHistoryIdentifier identifier, final ReadOnlyDataTree dataTree) {
65 super(parent, connection, identifier);
66 this.dataTree = requireNonNull(dataTree);
69 final DataTreeSnapshot takeSnapshot() {
70 return dataTree.takeSnapshot();
74 private abstract static class AbstractRemote extends ProxyHistory {
75 AbstractRemote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
76 final LocalHistoryIdentifier identifier) {
77 super(parent, connection, identifier);
81 private static final class Local extends AbstractLocal {
82 private static final AtomicReferenceFieldUpdater<Local, LocalReadWriteProxyTransaction> LAST_SEALED_UPDATER =
83 AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalReadWriteProxyTransaction.class, "lastSealed");
85 // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
86 // the open one and attempts to create a new transaction again.
87 private LocalReadWriteProxyTransaction lastOpen;
89 private volatile LocalReadWriteProxyTransaction lastSealed;
91 Local(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
92 final LocalHistoryIdentifier identifier, final ReadOnlyDataTree dataTree) {
93 super(parent, connection, identifier, dataTree);
97 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
98 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
99 checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
102 // Done transactions do not register on our radar on should not have any state associated.
103 return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId)
104 : new LocalReadWriteProxyTransaction(this, txId);
107 // onTransactionCompleted() runs concurrently
108 final LocalReadWriteProxyTransaction localSealed = lastSealed;
109 final DataTreeSnapshot baseSnapshot;
110 if (localSealed != null) {
111 baseSnapshot = localSealed.getSnapshot();
113 baseSnapshot = takeSnapshot();
117 return new LocalReadOnlyProxyTransaction(this, txId, baseSnapshot);
120 lastOpen = new LocalReadWriteProxyTransaction(this, txId, baseSnapshot);
121 LOG.debug("Proxy {} open transaction {}", this, lastOpen);
126 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
127 return createClient(parent(), connection, getIdentifier());
131 void onTransactionAborted(final AbstractProxyTransaction tx) {
132 if (tx.equals(lastOpen)) {
138 void onTransactionCompleted(final AbstractProxyTransaction tx) {
139 verify(tx instanceof LocalProxyTransaction, "Unexpected transaction %s", tx);
140 if (tx instanceof LocalReadWriteProxyTransaction
141 && LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
142 LOG.debug("Completed last sealed transaction {}", tx);
147 void onTransactionSealed(final AbstractProxyTransaction tx) {
148 checkState(tx.equals(lastOpen));
149 lastSealed = lastOpen;
154 private static final class LocalSingle extends AbstractLocal {
155 LocalSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
156 final LocalHistoryIdentifier identifier, final ReadOnlyDataTree dataTree) {
157 super(parent, connection, identifier, dataTree);
161 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
162 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
163 final DataTreeSnapshot snapshot = takeSnapshot();
164 return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
165 new LocalReadWriteProxyTransaction(this, txId, snapshot);
169 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
170 return createSingle(parent(), connection, getIdentifier());
174 private static final class Remote extends AbstractRemote {
175 Remote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
176 final LocalHistoryIdentifier identifier) {
177 super(parent, connection, identifier);
181 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
182 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
183 return new RemoteProxyTransaction(this, txId, snapshotOnly, true, isDone);
187 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
188 return createClient(parent(), connection, getIdentifier());
192 private static final class RemoteSingle extends AbstractRemote {
193 RemoteSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
194 final LocalHistoryIdentifier identifier) {
195 super(parent, connection, identifier);
199 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
200 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
201 return new RemoteProxyTransaction(this, txId, snapshotOnly, false, isDone);
205 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
206 return createSingle(parent(), connection, getIdentifier());
210 private static final class RequestReplayException extends RequestException {
211 private static final long serialVersionUID = 1L;
213 RequestReplayException(final String format, final Object... args) {
214 super(String.format(format, args));
218 public boolean isRetriable() {
223 private final class ReconnectCohort extends ProxyReconnectCohort {
225 public LocalHistoryIdentifier getIdentifier() {
231 void replayRequests(final Collection<ConnectionEntry> previousEntries) {
232 // First look for our Create message
233 Iterator<ConnectionEntry> it = previousEntries.iterator();
234 while (it.hasNext()) {
235 final ConnectionEntry e = it.next();
236 final Request<?, ?> req = e.getRequest();
237 if (identifier.equals(req.getTarget())) {
238 verify(req instanceof LocalHistoryRequest, "Unexpected request %s", req);
239 if (req instanceof CreateLocalHistoryRequest) {
240 successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
247 for (AbstractProxyTransaction t : proxies.values()) {
248 LOG.debug("{} replaying messages to old proxy {} towards successor {}", identifier, t, successor);
249 t.replayMessages(successor, previousEntries);
252 // Forward any skipped transactions
253 final var local = skippedTransactions;
255 LOG.debug("{} forwarding skipped transactions towards successor {}", identifier, successor);
256 successor.skipTransactions(local);
257 skippedTransactions = null;
260 // Now look for any finalizing messages
261 it = previousEntries.iterator();
262 while (it.hasNext()) {
263 final ConnectionEntry e = it.next();
264 final Request<?, ?> req = e.getRequest();
265 if (identifier.equals(req.getTarget())) {
266 verify(req instanceof LocalHistoryRequest, "Unexpected request %s", req);
267 if (req instanceof DestroyLocalHistoryRequest) {
268 successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
278 ProxyHistory finishReconnect() {
279 final ProxyHistory ret = verifyNotNull(successor);
281 for (AbstractProxyTransaction t : proxies.values()) {
285 LOG.debug("Finished reconnecting proxy history {}", this);
291 void replayEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> replayTo)
292 throws RequestException {
293 final Request<?, ?> request = entry.getRequest();
294 if (request instanceof TransactionRequest) {
295 lookupProxy(request).replayRequest((TransactionRequest<?>) request, entry.getCallback(),
296 entry.getEnqueuedTicks());
297 } else if (request instanceof LocalHistoryRequest) {
298 replayTo.accept(entry);
300 throw new IllegalArgumentException("Unhandled request " + request);
305 void forwardEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> forwardTo)
306 throws RequestException {
307 final Request<?, ?> request = entry.getRequest();
308 if (request instanceof TransactionRequest) {
309 lookupProxy(request).forwardRequest((TransactionRequest<?>) request, entry.getCallback());
310 } else if (request instanceof LocalHistoryRequest) {
311 forwardTo.accept(entry);
313 throw new IllegalArgumentException("Unhandled request " + request);
317 private AbstractProxyTransaction lookupProxy(final Request<?, ?> request)
318 throws RequestReplayException {
319 final AbstractProxyTransaction proxy;
322 proxy = proxies.get(request.getTarget());
330 throw new RequestReplayException("Failed to find proxy for %s", request);
334 private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class);
336 private final Lock lock = new ReentrantLock();
337 private final @NonNull LocalHistoryIdentifier identifier;
338 private final @NonNull AbstractClientConnection<ShardBackendInfo> connection;
339 private final @NonNull AbstractClientHistory parent;
342 private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
344 private ProxyHistory successor;
346 // List of transaction identifiers which were allocated by our parent history, but did not touch our shard. Each of
347 // these represents a hole in otherwise-contiguous allocation of transactionIds. These holes are problematic, as
348 // each of them prevents LeaderFrontendState.purgedHistories from coalescing, leading to a gradual heap exhaustion.
351 // We keep these in an ArrayList for fast insertion, as that happens when we are otherwise idle. We translate these
352 // into purge requests when:
353 // - we are about to allocate a new transaction
354 // - we get a successor proxy
355 // - the list grows unreasonably long
357 // TODO: we are tracking entire TransactionIdentifiers, but really only need to track the longs. Do that once we
358 // have a {@code List<long>}.
359 // FIXME: this is not tuneable, but perhaps should be
360 // FIXME: default value deserves some explanation -- this affects depth of an RB Tree on the receiving end.
361 private static final int PURGE_SKIPPED_TXID_THRESHOLD = 256;
364 private volatile List<TransactionIdentifier> skippedTransactions;
366 private ProxyHistory(final AbstractClientHistory parent,
367 final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
368 this.parent = requireNonNull(parent);
369 this.connection = requireNonNull(connection);
370 this.identifier = requireNonNull(identifier);
373 static ProxyHistory createClient(final AbstractClientHistory parent,
374 final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
375 final Optional<ReadOnlyDataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
376 return dataTree.isPresent() ? new Local(parent, connection, identifier, dataTree.orElseThrow())
377 : new Remote(parent, connection, identifier);
380 static ProxyHistory createSingle(final AbstractClientHistory parent,
381 final AbstractClientConnection<ShardBackendInfo> connection,
382 final LocalHistoryIdentifier identifier) {
383 final Optional<ReadOnlyDataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
384 return dataTree.isPresent() ? new LocalSingle(parent, connection, identifier, dataTree.orElseThrow())
385 : new RemoteSingle(parent, connection, identifier);
389 // Non-final for mocking
390 public LocalHistoryIdentifier getIdentifier() {
394 final ClientActorContext context() {
395 return connection.context();
398 final long currentTime() {
399 return connection.currentTime();
402 final ActorRef localActor() {
403 return connection.localActor();
406 final AbstractClientHistory parent() {
410 final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
411 final boolean snapshotOnly) {
412 return createTransactionProxy(txId, snapshotOnly, false);
415 // Non-final for mocking
416 AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId, final boolean snapshotOnly,
417 final boolean isDone) {
420 if (successor != null) {
421 return successor.createTransactionProxy(txId, snapshotOnly, isDone);
424 final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
425 final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly, isDone);
426 proxies.put(proxyId, ret);
427 LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
434 final void skipTransaction(final TransactionIdentifier txId) {
437 if (successor != null) {
438 successor.skipTransaction(txId);
442 var local = skippedTransactions;
444 skippedTransactions = local = new ArrayList<>();
447 LOG.debug("Recorded skipped transaction {}", txId);
455 private void skipIfNeeded(final List<TransactionIdentifier> current) {
456 if (current.size() >= PURGE_SKIPPED_TXID_THRESHOLD) {
457 skippedTransactions = null;
458 doSkipTransactions(current);
462 private void skipTransactions(final List<TransactionIdentifier> toSkip) {
465 if (successor != null) {
466 successor.skipTransactions(toSkip);
470 var local = skippedTransactions;
472 local.addAll(toSkip);
474 skippedTransactions = local = toSkip;
482 private void skipTransactions() {
483 var local = skippedTransactions;
487 local = skippedTransactions;
488 if (local != null && successor == null) {
489 skippedTransactions = null;
490 doSkipTransactions(local);
499 private void doSkipTransactions(final List<TransactionIdentifier> toSkip) {
500 final var txIds = toSkip.stream()
501 .mapToLong(TransactionIdentifier::getTransactionId)
504 .mapToObj(UnsignedLong::fromLongBits)
505 .collect(ImmutableList.toImmutableList());
507 LOG.debug("Proxy {} skipping transactions {}", this, txIds);
508 connection.enqueueRequest(new SkipTransactionsRequest(new TransactionIdentifier(identifier,
509 txIds.get(0).longValue()), 0, localActor(),txIds.subList(1, txIds.size())), resp -> {
510 LOG.debug("Proxy {} confirmed transaction skip", this);
511 }, connection.currentTime());
514 final void abortTransaction(final AbstractProxyTransaction tx) {
517 // Removal will be completed once purge completes
518 LOG.debug("Proxy {} aborted transaction {}", this, tx);
519 onTransactionAborted(tx);
525 final void completeTransaction(final AbstractProxyTransaction tx) {
528 // Removal will be completed once purge completes
529 LOG.debug("Proxy {} completing transaction {}", this, tx);
530 onTransactionCompleted(tx);
536 final void purgeTransaction(final AbstractProxyTransaction tx) {
539 proxies.remove(tx.getIdentifier());
540 LOG.debug("Proxy {} purged transaction {}", this, tx);
549 if (successor != null) {
554 LOG.debug("Proxy {} invoking destroy", this);
555 connection.sendRequest(new DestroyLocalHistoryRequest(getIdentifier(), 1, localActor()),
556 this::onDestroyComplete);
562 final void enqueueRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
563 final long enqueuedTicks) {
565 connection.enqueueRequest(request, callback, enqueuedTicks);
568 final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
570 connection.sendRequest(request, callback);
574 @SuppressWarnings("checkstyle:hiddenField")
575 abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
576 TransactionIdentifier txId, boolean snapshotOnly, boolean isDone);
579 @SuppressWarnings("checkstyle:hiddenField")
580 abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
582 @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
583 final ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
585 if (successor != null) {
587 throw new IllegalStateException("Proxy history " + this + " already has a successor");
590 successor = createSuccessor(newConnection);
591 LOG.debug("History {} instantiated successor {}", this, successor);
593 for (AbstractProxyTransaction t : proxies.values()) {
597 return new ReconnectCohort();
600 private void onDestroyComplete(final Response<?, ?> response) {
601 LOG.debug("Proxy {} destroy completed with {}", this, response);
605 parent.onProxyDestroyed(this);
606 connection.sendRequest(new PurgeLocalHistoryRequest(getIdentifier(), 2, localActor()),
607 this::onPurgeComplete);
613 private void onPurgeComplete(final Response<?, ?> response) {
614 LOG.debug("Proxy {} purge completed with {}", this, response);
618 void onTransactionAborted(final AbstractProxyTransaction tx) {
619 // No-op for most implementations
623 void onTransactionCompleted(final AbstractProxyTransaction tx) {
624 // No-op for most implementations
628 void onTransactionSealed(final AbstractProxyTransaction tx) {
629 // No-op on most implementations