X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FAbstractDataStoreClientBehavior.java;h=7187f83a1ac060d41341c110181877b9535b2985;hp=a84715c84360b84596f17118714e51e936afc4b9;hb=99f80f27bee37bb23e345420bf14bb7bb4793c28;hpb=320a4e5cd2d9d80468a3f82798744f2035488218 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java index a84715c843..7187f83a1a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java @@ -16,13 +16,15 @@ import java.util.Collection; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; -import javax.annotation.concurrent.GuardedBy; -import org.opendaylight.controller.cluster.access.client.AbstractClientConnection; -import org.opendaylight.controller.cluster.access.client.BackendInfoResolver; +import java.util.concurrent.locks.StampedLock; +import java.util.stream.Stream; import org.opendaylight.controller.cluster.access.client.ClientActorBehavior; import org.opendaylight.controller.cluster.access.client.ClientActorContext; import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection; +import org.opendaylight.controller.cluster.access.client.ConnectionEntry; +import org.opendaylight.controller.cluster.access.client.ReconnectForwarder; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,10 +35,10 @@ import org.slf4j.LoggerFactory; * *

* This class is not visible outside of this package because it breaks the actor containment. Services provided to - * Java world outside of actor containment are captured in {@link DistributedDataStoreClient}. + * Java world outside of actor containment are captured in {@link DataStoreClient}. * *

- * IMPORTANT: this class breaks actor containment via methods implementing {@link DistributedDataStoreClient} contract. + * IMPORTANT: this class breaks actor containment via methods implementing {@link DataStoreClient} contract. * When touching internal state, be mindful of the execution context from which execution context, Actor * or POJO, is the state being accessed or modified. * @@ -59,17 +61,18 @@ import org.slf4j.LoggerFactory; * @author Robert Varga */ abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior - implements DistributedDataStoreClient { + implements DataStoreClient { private static final Logger LOG = LoggerFactory.getLogger(AbstractDataStoreClientBehavior.class); private final Map histories = new ConcurrentHashMap<>(); private final AtomicLong nextHistoryId = new AtomicLong(1); + private final StampedLock lock = new StampedLock(); private final SingleClientHistory singleHistory; private volatile Throwable aborted; AbstractDataStoreClientBehavior(final ClientActorContext context, - final BackendInfoResolver resolver) { + final AbstractShardBackendResolver resolver) { super(context, resolver); singleHistory = new SingleClientHistory(this, new LocalHistoryIdentifier(getIdentifier(), 0)); } @@ -91,14 +94,19 @@ abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior currentBehavior) { @@ -122,44 +130,45 @@ abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior connectionUp( - final AbstractClientConnection conn, final ShardBackendInfo backend) { + protected final ConnectionConnectCohort connectionUp(final ConnectedClientConnection newConn) { + final long stamp = lock.writeLock(); - // Step 0: create a new connected connection - final ConnectedClientConnection newConn = new ConnectedClientConnection<>(conn.context(), - conn.cookie(), backend); + // Step 1: Freeze all AbstractProxyHistory instances pointing to that shard. This indirectly means that no + // further TransactionProxies can be created and we can safely traverse maps without risking + // missing an entry + final Collection cohorts = new ArrayList<>(); + startReconnect(singleHistory, newConn, cohorts); + for (ClientLocalHistory h : histories.values()) { + startReconnect(h, newConn, cohorts); + } - LOG.debug("{}: resolving connection {} to {}", persistenceId(), conn, newConn); + return previousEntries -> finishReconnect(newConn, stamp, cohorts, previousEntries); + } - final Collection cohorts = new ArrayList<>(); + private ReconnectForwarder finishReconnect(final ConnectedClientConnection newConn, + final long stamp, final Collection cohorts, + final Collection previousEntries) { try { - // Step 1: Freeze all AbstractProxyHistory instances pointing to that shard. This indirectly means that no - // further TransactionProxies can be created and we can safely traverse maps without risking - // missing an entry - startReconnect(singleHistory, newConn, cohorts); - for (ClientLocalHistory h : histories.values()) { - startReconnect(h, newConn, cohorts); - } - // Step 2: Collect previous successful requests from the cohorts. We do not want to expose // the non-throttling interface to the connection, hence we use a wrapper consumer for (HistoryReconnectCohort c : cohorts) { - c.replaySuccessfulRequests(); + c.replayRequests(previousEntries); } // Step 3: Install a forwarder, which will forward requests back to affected cohorts. Any outstanding - // requests will be immediately sent to it and requests being sent concurrently will get forwarded - // once they hit the new connection. - conn.setForwarder(BouncingReconnectForwarder.forCohorts(newConn, cohorts)); + // requests will be immediately sent to it and requests being sent concurrently will get + // forwarded once they hit the new connection. + return BouncingReconnectForwarder.forCohorts(newConn, cohorts); } finally { - // Step 4: Complete switchover of the connection. The cohorts can resume normal operations. - for (HistoryReconnectCohort c : cohorts) { - c.close(); + try { + // Step 4: Complete switchover of the connection. The cohorts can resume normal operations. + for (HistoryReconnectCohort c : cohorts) { + c.close(); + } + } finally { + lock.unlockWrite(stamp); } } - - return newConn; } private static void startReconnect(final AbstractClientHistory history, @@ -181,19 +190,22 @@ abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior resolveAllShards(); + + final ActorUtils actorUtils() { + return ((AbstractShardBackendResolver) resolver()).actorUtils(); + } }