import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.StampedLock;
-import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
+import java.util.stream.Stream;
import org.opendaylight.controller.cluster.access.client.ClientActorBehavior;
import org.opendaylight.controller.cluster.access.client.ClientActorContext;
import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
+import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
+import org.opendaylight.controller.cluster.access.client.ReconnectForwarder;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private volatile Throwable aborted;
AbstractDataStoreClientBehavior(final ClientActorContext context,
- final BackendInfoResolver<ShardBackendInfo> resolver) {
+ final AbstractShardBackendResolver resolver) {
super(context, resolver);
singleHistory = new SingleClientHistory(this, new LocalHistoryIdentifier(getIdentifier(), 0));
}
startReconnect(h, newConn, cohorts);
}
- return previousEntries -> {
+ return previousEntries -> finishReconnect(newConn, stamp, cohorts, previousEntries);
+ }
+
+ private ReconnectForwarder finishReconnect(final ConnectedClientConnection<ShardBackendInfo> newConn,
+ final long stamp, final Collection<HistoryReconnectCohort> cohorts,
+ final Collection<ConnectionEntry> previousEntries) {
+ try {
+ // Step 2: Collect previous successful requests from the cohorts. We do not want to expose
+ // the non-throttling interface to the connection, hence we use a wrapper consumer
+ for (HistoryReconnectCohort c : cohorts) {
+ c.replayRequests(previousEntries);
+ }
+
+ // Step 3: Install a forwarder, which will forward requests back to affected cohorts. Any outstanding
+ // requests will be immediately sent to it and requests being sent concurrently will get
+ // forwarded once they hit the new connection.
+ return BouncingReconnectForwarder.forCohorts(newConn, cohorts);
+ } finally {
try {
- // Step 2: Collect previous successful requests from the cohorts. We do not want to expose
- // the non-throttling interface to the connection, hence we use a wrapper consumer
+ // Step 4: Complete switchover of the connection. The cohorts can resume normal operations.
for (HistoryReconnectCohort c : cohorts) {
- c.replaySuccessfulRequests(previousEntries);
+ c.close();
}
-
- // Step 3: Install a forwarder, which will forward requests back to affected cohorts. Any outstanding
- // requests will be immediately sent to it and requests being sent concurrently will get
- // forwarded once they hit the new connection.
- return BouncingReconnectForwarder.forCohorts(newConn, cohorts);
} finally {
- try {
- // Step 4: Complete switchover of the connection. The cohorts can resume normal operations.
- for (HistoryReconnectCohort c : cohorts) {
- c.close();
- }
- } finally {
- lock.unlockWrite(stamp);
- }
+ lock.unlockWrite(stamp);
}
- };
+ }
}
private static void startReconnect(final AbstractClientHistory history,
final long stamp = lock.readLock();
try {
if (aborted != null) {
- throw Throwables.propagate(aborted);
+ Throwables.throwIfUnchecked(aborted);
+ throw new IllegalStateException(aborted);
}
final ClientLocalHistory history = new ClientLocalHistory(this, historyId);
@Override
public final ClientSnapshot createSnapshot() {
- return singleHistory.doCreateSnapshot();
+ return singleHistory.takeSnapshot();
}
@Override
- public final void close() {
+ public void close() {
+ super.close();
context().executeInActor(this::shutdown);
}
abstract Long resolveShardForPath(YangInstanceIdentifier path);
+
+ abstract Stream<Long> resolveAllShards();
+
+ final ActorUtils actorUtils() {
+ return ((AbstractShardBackendResolver) resolver()).actorUtils();
+ }
}