import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.StampedLock;
import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
import org.opendaylight.controller.cluster.access.client.ClientActorBehavior;
import org.opendaylight.controller.cluster.access.client.ClientActorContext;
import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
+import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
+import org.opendaylight.controller.cluster.access.client.ReconnectForwarder;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
private final Map<LocalHistoryIdentifier, ClientLocalHistory> histories = new ConcurrentHashMap<>();
private final AtomicLong nextHistoryId = new AtomicLong(1);
+ private final StampedLock lock = new StampedLock();
private final SingleClientHistory singleHistory;
private volatile Throwable aborted;
}
private void abortOperations(final Throwable cause) {
- // This acts as a barrier, application threads check this after they have added an entry in the maps,
- // and if they observe aborted being non-null, they will perform their cleanup and not return the handle.
- aborted = cause;
-
- for (ClientLocalHistory h : histories.values()) {
- h.localAbort(cause);
+ final long stamp = lock.writeLock();
+ try {
+ // This acts as a barrier, application threads check this after they have added an entry in the maps,
+ // and if they observe aborted being non-null, they will perform their cleanup and not return the handle.
+ aborted = cause;
+
+ for (ClientLocalHistory h : histories.values()) {
+ h.localAbort(cause);
+ }
+ histories.clear();
+ } finally {
+ lock.unlockWrite(stamp);
}
- histories.clear();
}
private AbstractDataStoreClientBehavior shutdown(final ClientActorBehavior<ShardBackendInfo> currentBehavior) {
*/
@Override
protected final ConnectionConnectCohort connectionUp(final ConnectedClientConnection<ShardBackendInfo> newConn) {
+ final long stamp = lock.writeLock();
+
// Step 1: Freeze all AbstractProxyHistory instances pointing to that shard. This indirectly means that no
// further TransactionProxies can be created and we can safely traverse maps without risking
// missing an entry
startReconnect(h, newConn, cohorts);
}
- return previousEntries -> {
- try {
- // Step 2: Collect previous successful requests from the cohorts. We do not want to expose
- // the non-throttling interface to the connection, hence we use a wrapper consumer
- for (HistoryReconnectCohort c : cohorts) {
- c.replaySuccessfulRequests(previousEntries);
- }
+ return previousEntries -> finishReconnect(newConn, stamp, cohorts, previousEntries);
+ }
- // Step 3: Install a forwarder, which will forward requests back to affected cohorts. Any outstanding
- // requests will be immediately sent to it and requests being sent concurrently will get
- // forwarded once they hit the new connection.
- return BouncingReconnectForwarder.forCohorts(newConn, cohorts);
- } finally {
+ private ReconnectForwarder finishReconnect(final ConnectedClientConnection<ShardBackendInfo> newConn,
+ final long stamp, final Collection<HistoryReconnectCohort> cohorts,
+ final Collection<ConnectionEntry> previousEntries) {
+ try {
+ // Step 2: Collect previous successful requests from the cohorts. We do not want to expose
+ // the non-throttling interface to the connection, hence we use a wrapper consumer
+ for (HistoryReconnectCohort c : cohorts) {
+ c.replayRequests(previousEntries);
+ }
+
+ // Step 3: Install a forwarder, which will forward requests back to affected cohorts. Any outstanding
+ // requests will be immediately sent to it and requests being sent concurrently will get
+ // forwarded once they hit the new connection.
+ return BouncingReconnectForwarder.forCohorts(newConn, cohorts);
+ } finally {
+ try {
// Step 4: Complete switchover of the connection. The cohorts can resume normal operations.
for (HistoryReconnectCohort c : cohorts) {
c.close();
}
+ } finally {
+ lock.unlockWrite(stamp);
}
- };
+ }
}
private static void startReconnect(final AbstractClientHistory history,
public final ClientLocalHistory createLocalHistory() {
final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(),
nextHistoryId.getAndIncrement());
- final ClientLocalHistory history = new ClientLocalHistory(this, historyId);
- LOG.debug("{}: creating a new local history {}", persistenceId(), history);
- Verify.verify(histories.put(historyId, history) == null);
+ final long stamp = lock.readLock();
+ try {
+ if (aborted != null) {
+ Throwables.throwIfUnchecked(aborted);
+ throw new RuntimeException(aborted);
+ }
+
+ final ClientLocalHistory history = new ClientLocalHistory(this, historyId);
+ LOG.debug("{}: creating a new local history {}", persistenceId(), history);
- final Throwable a = aborted;
- if (a != null) {
- history.localAbort(a);
- histories.remove(historyId, history);
- throw Throwables.propagate(a);
+ Verify.verify(histories.put(historyId, history) == null);
+ return history;
+ } finally {
+ lock.unlockRead(stamp);
}
-
- return history;
}
@Override
}
@Override
- public final void close() {
+ public final ClientSnapshot createSnapshot() {
+ return singleHistory.takeSnapshot();
+ }
+
+ @Override
+ public void close() {
+ super.close();
context().executeInActor(this::shutdown);
}
- abstract Long resolveShardForPath(final YangInstanceIdentifier path);
+ abstract Long resolveShardForPath(YangInstanceIdentifier path);
}