import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
-import javax.annotation.concurrent.GuardedBy;
-import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
import org.opendaylight.controller.cluster.access.client.ClientActorBehavior;
import org.opendaylight.controller.cluster.access.client.ClientActorContext;
* involved, as the messages need to be replayed to the individual proxies.
*/
@Override
- @GuardedBy("connectionsLock")
- protected final ConnectedClientConnection<ShardBackendInfo> connectionUp(
- final AbstractClientConnection<ShardBackendInfo> conn, final ShardBackendInfo backend) {
-
- // Step 0: create a new connected connection
- final ConnectedClientConnection<ShardBackendInfo> newConn = new ConnectedClientConnection<>(conn.context(),
- conn.cookie(), backend);
-
- LOG.debug("{}: resolving connection {} to {}", persistenceId(), conn, newConn);
-
+ protected final ConnectionConnectCohort connectionUp(final ConnectedClientConnection<ShardBackendInfo> newConn) {
+ // Step 1: Freeze all AbstractProxyHistory instances pointing to that shard. This indirectly means that no
+ // further TransactionProxies can be created and we can safely traverse maps without risking
+ // missing an entry
final Collection<HistoryReconnectCohort> cohorts = new ArrayList<>();
- try {
- // Step 1: Freeze all AbstractProxyHistory instances pointing to that shard. This indirectly means that no
- // further TransactionProxies can be created and we can safely traverse maps without risking
- // missing an entry
- startReconnect(singleHistory, newConn, cohorts);
- for (ClientLocalHistory h : histories.values()) {
- startReconnect(h, newConn, cohorts);
- }
-
- // Step 2: Collect previous successful requests from the cohorts. We do not want to expose
- // the non-throttling interface to the connection, hence we use a wrapper consumer
- for (HistoryReconnectCohort c : cohorts) {
- c.replaySuccessfulRequests();
- }
-
- // Step 3: Install a forwarder, which will forward requests back to affected cohorts. Any outstanding
- // requests will be immediately sent to it and requests being sent concurrently will get forwarded
- // once they hit the new connection.
- conn.setForwarder(BouncingReconnectForwarder.forCohorts(newConn, cohorts));
- } finally {
- // Step 4: Complete switchover of the connection. The cohorts can resume normal operations.
- for (HistoryReconnectCohort c : cohorts) {
- c.close();
- }
+ startReconnect(singleHistory, newConn, cohorts);
+ for (ClientLocalHistory h : histories.values()) {
+ startReconnect(h, newConn, cohorts);
}
- return newConn;
+ return previousEntries -> {
+ try {
+ // Step 2: Collect previous successful requests from the cohorts. We do not want to expose
+ // the non-throttling interface to the connection, hence we use a wrapper consumer
+ for (HistoryReconnectCohort c : cohorts) {
+ c.replaySuccessfulRequests(previousEntries);
+ }
+
+ // Step 3: Install a forwarder, which will forward requests back to affected cohorts. Any outstanding
+ // requests will be immediately sent to it and requests being sent concurrently will get
+ // forwarded once they hit the new connection.
+ return BouncingReconnectForwarder.forCohorts(newConn, cohorts);
+ } finally {
+ // Step 4: Complete switchover of the connection. The cohorts can resume normal operations.
+ for (HistoryReconnectCohort c : cohorts) {
+ c.close();
+ }
+ }
+ };
}
private static void startReconnect(final AbstractClientHistory history,