X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FAbstractDataStoreClientBehavior.java;h=ef5590fe84354bda48b7a3a300881825dfc56e3a;hb=1d7e8fd9d781f630dee9dfb1b509067dd7fb9caa;hp=5a34b3b77e2afd8419ae100929960e9b6ff02be8;hpb=32b322afd58f120a78208c939a01422aa224d0cf;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java index 5a34b3b77e..ef5590fe84 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java @@ -16,8 +16,7 @@ import java.util.Collection; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; -import javax.annotation.concurrent.GuardedBy; -import org.opendaylight.controller.cluster.access.client.AbstractClientConnection; +import java.util.concurrent.locks.StampedLock; import org.opendaylight.controller.cluster.access.client.BackendInfoResolver; import org.opendaylight.controller.cluster.access.client.ClientActorBehavior; import org.opendaylight.controller.cluster.access.client.ClientActorContext; @@ -64,6 +63,7 @@ abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior histories = new ConcurrentHashMap<>(); private final AtomicLong nextHistoryId = new AtomicLong(1); + private final StampedLock lock = new StampedLock(); private final SingleClientHistory singleHistory; private volatile Throwable aborted; @@ -91,14 +91,19 @@ abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior currentBehavior) { @@ -122,44 +127,41 @@ abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior connectionUp( - final AbstractClientConnection conn, final ShardBackendInfo backend) { - - // Step 0: create a new connected connection - final ConnectedClientConnection newConn = new ConnectedClientConnection<>(conn.context(), - conn.cookie(), backend); - - LOG.debug("{}: resolving connection {} to {}", persistenceId(), conn, newConn); + protected final ConnectionConnectCohort connectionUp(final ConnectedClientConnection newConn) { + final long stamp = lock.writeLock(); + // Step 1: Freeze all AbstractProxyHistory instances pointing to that shard. This indirectly means that no + // further TransactionProxies can be created and we can safely traverse maps without risking + // missing an entry final Collection cohorts = new ArrayList<>(); - try { - // Step 1: Freeze all AbstractProxyHistory instances pointing to that shard. This indirectly means that no - // further TransactionProxies can be created and we can safely traverse maps without risking - // missing an entry - startReconnect(singleHistory, newConn, cohorts); - for (ClientLocalHistory h : histories.values()) { - startReconnect(h, newConn, cohorts); - } - - // Step 2: Collect previous successful requests from the cohorts. We do not want to expose - // the non-throttling interface to the connection, hence we use a wrapper consumer - for (HistoryReconnectCohort c : cohorts) { - c.replaySuccessfulRequests(); - } - - // Step 3: Install a forwarder, which will forward requests back to affected cohorts. Any outstanding - // requests will be immediately sent to it and requests being sent concurrently will get forwarded - // once they hit the new connection. - conn.setForwarder(BouncingReconnectForwarder.forCohorts(newConn, cohorts)); - } finally { - // Step 4: Complete switchover of the connection. The cohorts can resume normal operations. - for (HistoryReconnectCohort c : cohorts) { - c.close(); - } + startReconnect(singleHistory, newConn, cohorts); + for (ClientLocalHistory h : histories.values()) { + startReconnect(h, newConn, cohorts); } - return newConn; + return previousEntries -> { + try { + // Step 2: Collect previous successful requests from the cohorts. We do not want to expose + // the non-throttling interface to the connection, hence we use a wrapper consumer + for (HistoryReconnectCohort c : cohorts) { + c.replaySuccessfulRequests(previousEntries); + } + + // Step 3: Install a forwarder, which will forward requests back to affected cohorts. Any outstanding + // requests will be immediately sent to it and requests being sent concurrently will get + // forwarded once they hit the new connection. + return BouncingReconnectForwarder.forCohorts(newConn, cohorts); + } finally { + try { + // Step 4: Complete switchover of the connection. The cohorts can resume normal operations. + for (HistoryReconnectCohort c : cohorts) { + c.close(); + } + } finally { + lock.unlockWrite(stamp); + } + } + }; } private static void startReconnect(final AbstractClientHistory history, @@ -181,19 +183,21 @@ abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior