2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
10 import akka.actor.ActorRef;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
14 import java.util.Collection;
15 import java.util.Iterator;
16 import java.util.LinkedHashMap;
18 import java.util.Optional;
19 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
20 import java.util.concurrent.locks.Lock;
21 import java.util.concurrent.locks.ReentrantLock;
22 import java.util.function.BiConsumer;
23 import java.util.function.Consumer;
24 import javax.annotation.concurrent.GuardedBy;
25 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
26 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
27 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
28 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
29 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
30 import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
31 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
32 import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
33 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
34 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
35 import org.opendaylight.controller.cluster.access.concepts.Request;
36 import org.opendaylight.controller.cluster.access.concepts.RequestException;
37 import org.opendaylight.controller.cluster.access.concepts.Response;
38 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
39 import org.opendaylight.yangtools.concepts.Identifiable;
40 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
41 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
46 * Per-connection representation of a local history. This class handles state replication across a single connection.
48 * @author Robert Varga
50 abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
51 private abstract static class AbstractLocal extends ProxyHistory {
52 private final DataTree dataTree;
54 AbstractLocal(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
55 final LocalHistoryIdentifier identifier, final DataTree dataTree) {
56 super(parent, connection, identifier);
57 this.dataTree = Preconditions.checkNotNull(dataTree);
60 final DataTreeSnapshot takeSnapshot() {
61 return dataTree.takeSnapshot();
65 private abstract static class AbstractRemote extends ProxyHistory {
66 AbstractRemote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
67 final LocalHistoryIdentifier identifier) {
68 super(parent, connection, identifier);
72 private static final class Local extends AbstractLocal {
73 private static final AtomicReferenceFieldUpdater<Local, LocalReadWriteProxyTransaction> LAST_SEALED_UPDATER =
74 AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalReadWriteProxyTransaction.class, "lastSealed");
76 // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
77 // the open one and attempts to create a new transaction again.
78 private LocalReadWriteProxyTransaction lastOpen;
80 private volatile LocalReadWriteProxyTransaction lastSealed;
82 Local(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
83 final LocalHistoryIdentifier identifier, final DataTree dataTree) {
84 super(parent, connection, identifier, dataTree);
88 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
89 final TransactionIdentifier txId, final boolean snapshotOnly) {
90 Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
92 // onTransactionCompleted() runs concurrently
93 final LocalReadWriteProxyTransaction localSealed = lastSealed;
94 final DataTreeSnapshot baseSnapshot;
95 if (localSealed != null) {
96 baseSnapshot = localSealed.getSnapshot();
98 baseSnapshot = takeSnapshot();
102 return new LocalReadOnlyProxyTransaction(this, txId, baseSnapshot);
105 lastOpen = new LocalReadWriteProxyTransaction(this, txId, baseSnapshot);
106 LOG.debug("Proxy {} open transaction {}", this, lastOpen);
111 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
112 return createClient(parent(), connection, getIdentifier());
116 void onTransactionAborted(final AbstractProxyTransaction tx) {
117 if (tx.equals(lastOpen)) {
123 void onTransactionCompleted(final AbstractProxyTransaction tx) {
124 Verify.verify(tx instanceof LocalProxyTransaction);
125 if (tx instanceof LocalReadWriteProxyTransaction) {
126 if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
127 LOG.debug("Completed last sealed transaction {}", tx);
133 void onTransactionSealed(final AbstractProxyTransaction tx) {
134 Preconditions.checkState(tx.equals(lastOpen));
135 lastSealed = lastOpen;
140 private static final class LocalSingle extends AbstractLocal {
141 LocalSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
142 final LocalHistoryIdentifier identifier, final DataTree dataTree) {
143 super(parent, connection, identifier, dataTree);
147 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
148 final TransactionIdentifier txId, final boolean snapshotOnly) {
149 final DataTreeSnapshot snapshot = takeSnapshot();
150 return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
151 new LocalReadWriteProxyTransaction(this, txId, snapshot);
155 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
156 return createSingle(parent(), connection, getIdentifier());
160 private static final class Remote extends AbstractRemote {
161 Remote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
162 final LocalHistoryIdentifier identifier) {
163 super(parent, connection, identifier);
167 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
168 final TransactionIdentifier txId, final boolean snapshotOnly) {
169 return new RemoteProxyTransaction(this, txId, snapshotOnly, true);
173 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
174 return createClient(parent(), connection, getIdentifier());
178 private static final class RemoteSingle extends AbstractRemote {
179 RemoteSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
180 final LocalHistoryIdentifier identifier) {
181 super(parent, connection, identifier);
185 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
186 final TransactionIdentifier txId, final boolean snapshotOnly) {
187 return new RemoteProxyTransaction(this, txId, snapshotOnly, false);
191 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
192 return createSingle(parent(), connection, getIdentifier());
196 private static final class RequestReplayException extends RequestException {
197 private static final long serialVersionUID = 1L;
199 RequestReplayException(final String format, final Object... args) {
200 super(String.format(format, args));
204 public boolean isRetriable() {
209 private final class ReconnectCohort extends ProxyReconnectCohort {
211 public LocalHistoryIdentifier getIdentifier() {
217 void replayRequests(final Collection<ConnectionEntry> previousEntries) {
218 // First look for our Create message
219 Iterator<ConnectionEntry> it = previousEntries.iterator();
220 while (it.hasNext()) {
221 final ConnectionEntry e = it.next();
222 final Request<?, ?> req = e.getRequest();
223 if (identifier.equals(req.getTarget())) {
224 Verify.verify(req instanceof LocalHistoryRequest);
225 if (req instanceof CreateLocalHistoryRequest) {
226 successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
233 for (AbstractProxyTransaction t : proxies.values()) {
234 LOG.debug("{} replaying messages to old proxy {} towards successor {}", identifier, t, successor);
235 t.replayMessages(successor, previousEntries);
238 // Now look for any finalizing messages
239 it = previousEntries.iterator();
240 while (it.hasNext()) {
241 final ConnectionEntry e = it.next();
242 final Request<?, ?> req = e.getRequest();
243 if (identifier.equals(req.getTarget())) {
244 Verify.verify(req instanceof LocalHistoryRequest);
245 if (req instanceof DestroyLocalHistoryRequest) {
246 successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
256 ProxyHistory finishReconnect() {
257 final ProxyHistory ret = Verify.verifyNotNull(successor);
259 for (AbstractProxyTransaction t : proxies.values()) {
263 LOG.debug("Finished reconnecting proxy history {}", this);
269 void forwardRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
270 final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> forwardTo) throws RequestException {
271 // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the
272 // period required to get into the queue.
273 if (request instanceof TransactionRequest) {
274 forwardTransactionRequest((TransactionRequest<?>) request, callback);
275 } else if (request instanceof LocalHistoryRequest) {
276 forwardTo.accept(request, callback);
278 throw new IllegalArgumentException("Unhandled request " + request);
282 private void forwardTransactionRequest(final TransactionRequest<?> request,
283 final Consumer<Response<?, ?>> callback) throws RequestException {
285 final AbstractProxyTransaction proxy;
288 proxy = proxies.get(request.getTarget());
293 throw new RequestReplayException("Failed to find proxy for %s", request);
296 proxy.forwardRequest(request, callback);
300 private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class);
302 private final Lock lock = new ReentrantLock();
303 private final LocalHistoryIdentifier identifier;
304 private final AbstractClientConnection<ShardBackendInfo> connection;
305 private final AbstractClientHistory parent;
308 private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
310 private ProxyHistory successor;
312 private ProxyHistory(final AbstractClientHistory parent,
313 final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
314 this.parent = Preconditions.checkNotNull(parent);
315 this.connection = Preconditions.checkNotNull(connection);
316 this.identifier = Preconditions.checkNotNull(identifier);
319 static ProxyHistory createClient(final AbstractClientHistory parent,
320 final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
321 final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
322 return dataTree.isPresent() ? new Local(parent, connection, identifier, dataTree.get())
323 : new Remote(parent, connection, identifier);
326 static ProxyHistory createSingle(final AbstractClientHistory parent,
327 final AbstractClientConnection<ShardBackendInfo> connection,
328 final LocalHistoryIdentifier identifier) {
329 final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
330 return dataTree.isPresent() ? new LocalSingle(parent, connection, identifier, dataTree.get())
331 : new RemoteSingle(parent, connection, identifier);
335 public LocalHistoryIdentifier getIdentifier() {
339 final ClientActorContext context() {
340 return connection.context();
343 final long currentTime() {
344 return connection.currentTime();
347 final ActorRef localActor() {
348 return connection.localActor();
351 final AbstractClientHistory parent() {
355 AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
356 final boolean snapshotOnly) {
359 if (successor != null) {
360 return successor.createTransactionProxy(txId, snapshotOnly);
363 final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
364 final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly);
365 proxies.put(proxyId, ret);
366 LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
373 final void abortTransaction(final AbstractProxyTransaction tx) {
376 // Removal will be completed once purge completes
377 LOG.debug("Proxy {} aborted transaction {}", this, tx);
378 onTransactionAborted(tx);
384 final void completeTransaction(final AbstractProxyTransaction tx) {
387 // Removal will be completed once purge completes
388 LOG.debug("Proxy {} completing transaction {}", this, tx);
389 onTransactionCompleted(tx);
395 void purgeTransaction(final AbstractProxyTransaction tx) {
398 proxies.remove(tx.getIdentifier());
399 LOG.debug("Proxy {} purged transaction {}", this, tx);
408 if (successor != null) {
413 LOG.debug("Proxy {} invoking destroy", this);
414 connection.sendRequest(new DestroyLocalHistoryRequest(getIdentifier(), 1, localActor()),
415 this::onDestroyComplete);
421 final void enqueueRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
422 final long enqueuedTicks) {
423 connection.enqueueRequest(request, callback, enqueuedTicks);
426 final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
427 connection.sendRequest(request, callback);
431 abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
432 TransactionIdentifier txId, boolean snapshotOnly);
434 abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
436 @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
437 ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
439 if (successor != null) {
441 throw new IllegalStateException("Proxy history " + this + " already has a successor");
444 successor = createSuccessor(newConnection);
445 LOG.debug("History {} instantiated successor {}", this, successor);
447 for (AbstractProxyTransaction t : proxies.values()) {
451 return new ReconnectCohort();
454 private void onDestroyComplete(final Response<?, ?> response) {
455 LOG.debug("Proxy {} destroy completed with {}", this, response);
459 parent.onProxyDestroyed(this);
460 connection.sendRequest(new PurgeLocalHistoryRequest(getIdentifier(), 2, localActor()),
461 this::onPurgeComplete);
467 private void onPurgeComplete(final Response<?, ?> response) {
468 LOG.debug("Proxy {} purge completed with {}", this, response);
472 void onTransactionAborted(final AbstractProxyTransaction tx) {
473 // No-op for most implementations
477 void onTransactionCompleted(final AbstractProxyTransaction tx) {
478 // No-op for most implementations
481 void onTransactionSealed(final AbstractProxyTransaction tx) {
482 // No-op on most implementations