X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FAbstractDataStoreClientBehavior.java;h=7187f83a1ac060d41341c110181877b9535b2985;hp=a84715c84360b84596f17118714e51e936afc4b9;hb=99f80f27bee37bb23e345420bf14bb7bb4793c28;hpb=320a4e5cd2d9d80468a3f82798744f2035488218
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java
index a84715c843..7187f83a1a 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java
@@ -16,13 +16,15 @@ import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
-import javax.annotation.concurrent.GuardedBy;
-import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
-import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
+import java.util.concurrent.locks.StampedLock;
+import java.util.stream.Stream;
import org.opendaylight.controller.cluster.access.client.ClientActorBehavior;
import org.opendaylight.controller.cluster.access.client.ClientActorContext;
import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
+import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
+import org.opendaylight.controller.cluster.access.client.ReconnectForwarder;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,10 +35,10 @@ import org.slf4j.LoggerFactory;
*
*
* This class is not visible outside of this package because it breaks the actor containment. Services provided to
- * Java world outside of actor containment are captured in {@link DistributedDataStoreClient}.
+ * Java world outside of actor containment are captured in {@link DataStoreClient}.
*
*
- * IMPORTANT: this class breaks actor containment via methods implementing {@link DistributedDataStoreClient} contract.
+ * IMPORTANT: this class breaks actor containment via methods implementing {@link DataStoreClient} contract.
* When touching internal state, be mindful of the execution context from which execution context, Actor
* or POJO, is the state being accessed or modified.
*
@@ -59,17 +61,18 @@ import org.slf4j.LoggerFactory;
* @author Robert Varga
*/
abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior
- implements DistributedDataStoreClient {
+ implements DataStoreClient {
private static final Logger LOG = LoggerFactory.getLogger(AbstractDataStoreClientBehavior.class);
private final Map histories = new ConcurrentHashMap<>();
private final AtomicLong nextHistoryId = new AtomicLong(1);
+ private final StampedLock lock = new StampedLock();
private final SingleClientHistory singleHistory;
private volatile Throwable aborted;
AbstractDataStoreClientBehavior(final ClientActorContext context,
- final BackendInfoResolver resolver) {
+ final AbstractShardBackendResolver resolver) {
super(context, resolver);
singleHistory = new SingleClientHistory(this, new LocalHistoryIdentifier(getIdentifier(), 0));
}
@@ -91,14 +94,19 @@ abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior currentBehavior) {
@@ -122,44 +130,45 @@ abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior connectionUp(
- final AbstractClientConnection conn, final ShardBackendInfo backend) {
+ protected final ConnectionConnectCohort connectionUp(final ConnectedClientConnection newConn) {
+ final long stamp = lock.writeLock();
- // Step 0: create a new connected connection
- final ConnectedClientConnection newConn = new ConnectedClientConnection<>(conn.context(),
- conn.cookie(), backend);
+ // Step 1: Freeze all AbstractProxyHistory instances pointing to that shard. This indirectly means that no
+ // further TransactionProxies can be created and we can safely traverse maps without risking
+ // missing an entry
+ final Collection cohorts = new ArrayList<>();
+ startReconnect(singleHistory, newConn, cohorts);
+ for (ClientLocalHistory h : histories.values()) {
+ startReconnect(h, newConn, cohorts);
+ }
- LOG.debug("{}: resolving connection {} to {}", persistenceId(), conn, newConn);
+ return previousEntries -> finishReconnect(newConn, stamp, cohorts, previousEntries);
+ }
- final Collection cohorts = new ArrayList<>();
+ private ReconnectForwarder finishReconnect(final ConnectedClientConnection newConn,
+ final long stamp, final Collection cohorts,
+ final Collection previousEntries) {
try {
- // Step 1: Freeze all AbstractProxyHistory instances pointing to that shard. This indirectly means that no
- // further TransactionProxies can be created and we can safely traverse maps without risking
- // missing an entry
- startReconnect(singleHistory, newConn, cohorts);
- for (ClientLocalHistory h : histories.values()) {
- startReconnect(h, newConn, cohorts);
- }
-
// Step 2: Collect previous successful requests from the cohorts. We do not want to expose
// the non-throttling interface to the connection, hence we use a wrapper consumer
for (HistoryReconnectCohort c : cohorts) {
- c.replaySuccessfulRequests();
+ c.replayRequests(previousEntries);
}
// Step 3: Install a forwarder, which will forward requests back to affected cohorts. Any outstanding
- // requests will be immediately sent to it and requests being sent concurrently will get forwarded
- // once they hit the new connection.
- conn.setForwarder(BouncingReconnectForwarder.forCohorts(newConn, cohorts));
+ // requests will be immediately sent to it and requests being sent concurrently will get
+ // forwarded once they hit the new connection.
+ return BouncingReconnectForwarder.forCohorts(newConn, cohorts);
} finally {
- // Step 4: Complete switchover of the connection. The cohorts can resume normal operations.
- for (HistoryReconnectCohort c : cohorts) {
- c.close();
+ try {
+ // Step 4: Complete switchover of the connection. The cohorts can resume normal operations.
+ for (HistoryReconnectCohort c : cohorts) {
+ c.close();
+ }
+ } finally {
+ lock.unlockWrite(stamp);
}
}
-
- return newConn;
}
private static void startReconnect(final AbstractClientHistory history,
@@ -181,19 +190,22 @@ abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior resolveAllShards();
+
+ final ActorUtils actorUtils() {
+ return ((AbstractShardBackendResolver) resolver()).actorUtils();
+ }
}