BUG-5280: add AbstractClientConnection
[controller.git] / opendaylight / md-sal / cds-access-client / src / main / java / org / opendaylight / controller / cluster / access / client / ClientActorBehavior.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.access.client;
9
10 import com.google.common.annotations.Beta;
11 import com.google.common.base.Preconditions;
12 import java.util.Map;
13 import java.util.concurrent.ConcurrentHashMap;
14 import javax.annotation.Nonnull;
15 import javax.annotation.Nullable;
16 import javax.annotation.concurrent.GuardedBy;
17 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
18 import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
19 import org.opendaylight.controller.cluster.access.concepts.RequestException;
20 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
21 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
22 import org.opendaylight.controller.cluster.access.concepts.RetiredGenerationException;
23 import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
24 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
25 import org.opendaylight.yangtools.concepts.Identifiable;
26 import org.opendaylight.yangtools.concepts.WritableIdentifier;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 /**
31  * A behavior, which handles messages sent to a {@link AbstractClientActor}.
32  *
33  * @author Robert Varga
34  */
35 @Beta
36 public abstract class ClientActorBehavior<T extends BackendInfo> extends
37         RecoveredClientActorBehavior<ClientActorContext> implements Identifiable<ClientIdentifier> {
38     private static final Logger LOG = LoggerFactory.getLogger(ClientActorBehavior.class);
39
40     /**
41      * Map of connections to the backend. This map is concurrent to allow lookups, but given complex operations
42      * involved in connection transitions it is protected by a {@link InversibleLock}. Write-side of the lock is taken
43      * during connection transitions. Optimistic read-side of the lock is taken when new connections are introduced
44      * into the map.
45      *
46      * <p>
47      * The lock detects potential AB/BA deadlock scenarios and will force the reader side out by throwing
48      * a {@link InversibleLockException} -- which must be propagated up, releasing locks as it propagates. The initial
49      * entry point causing the the conflicting lookup must then call {@link InversibleLockException#awaitResolution()}
50      * before retrying the operation.
51      */
52     // TODO: it should be possible to move these two into ClientActorContext
53     private final Map<Long, AbstractClientConnection<T>> connections = new ConcurrentHashMap<>();
54     private final InversibleLock connectionsLock = new InversibleLock();
55     private final BackendInfoResolver<T> resolver;
56
57     protected ClientActorBehavior(@Nonnull final ClientActorContext context,
58             @Nonnull final BackendInfoResolver<T> resolver) {
59         super(context);
60         this.resolver = Preconditions.checkNotNull(resolver);
61     }
62
63     @Override
64     @Nonnull
65     public final ClientIdentifier getIdentifier() {
66         return context().getIdentifier();
67     }
68
69     /**
70      * Get a connection to a shard.
71      *
72      * @param shard Shard cookie
73      * @return Connection to a shard
74      * @throws InversibleLockException if the shard is being reconnected
75      */
76     public final AbstractClientConnection<T> getConnection(final Long shard) {
77         while (true) {
78             final long stamp = connectionsLock.optimisticRead();
79             final AbstractClientConnection<T> conn = connections.computeIfAbsent(shard, this::createConnection);
80             if (connectionsLock.validate(stamp)) {
81                 // No write-lock in-between, return success
82                 return conn;
83             }
84         }
85     }
86
87     @SuppressWarnings("unchecked")
88     @Override
89     final ClientActorBehavior<T> onReceiveCommand(final Object command) {
90         if (command instanceof InternalCommand) {
91             return ((InternalCommand<T>) command).execute(this);
92         }
93         if (command instanceof SuccessEnvelope) {
94             return onRequestSuccess((SuccessEnvelope) command);
95         }
96         if (command instanceof FailureEnvelope) {
97             return internalOnRequestFailure((FailureEnvelope) command);
98         }
99
100         return onCommand(command);
101     }
102
103     private void onResponse(final ResponseEnvelope<?> response) {
104         final WritableIdentifier id = response.getMessage().getTarget();
105
106         // FIXME: this will need to be updated for other Request/Response types to extract cookie
107         Preconditions.checkArgument(id instanceof TransactionIdentifier);
108         final TransactionIdentifier txId = (TransactionIdentifier) id;
109
110         final AbstractClientConnection<T> connection = connections.get(txId.getHistoryId().getCookie());
111         if (connection != null) {
112             connection.receiveResponse(response);
113         } else {
114             LOG.info("{}: Ignoring unknown response {}", persistenceId(), response);
115         }
116     }
117
118     private ClientActorBehavior<T> onRequestSuccess(final SuccessEnvelope success) {
119         onResponse(success);
120         return this;
121     }
122
123     private ClientActorBehavior<T> onRequestFailure(final FailureEnvelope failure) {
124         onResponse(failure);
125         return this;
126     }
127
128     private ClientActorBehavior<T> internalOnRequestFailure(final FailureEnvelope command) {
129         final RequestFailure<?, ?> failure = command.getMessage();
130         final RequestException cause = failure.getCause();
131         if (cause instanceof RetiredGenerationException) {
132             LOG.error("{}: current generation {} has been superseded", persistenceId(), getIdentifier(), cause);
133             haltClient(cause);
134             poison(cause);
135             return null;
136         }
137
138         return onRequestFailure(command);
139     }
140
141     private void poison(final RequestException cause) {
142         final long stamp = connectionsLock.writeLock();
143         try {
144             for (AbstractClientConnection<T> q : connections.values()) {
145                 q.poison(cause);
146             }
147
148             connections.clear();
149         } finally {
150             connectionsLock.unlockWrite(stamp);
151         }
152     }
153
154     /**
155      * Halt And Catch Fire. Halt processing on this client. Implementations need to ensure they initiate state flush
156      * procedures. No attempt to use this instance should be made after this method returns. Any such use may result
157      * in undefined behavior.
158      *
159      * @param cause Failure cause
160      */
161     protected abstract void haltClient(@Nonnull Throwable cause);
162
163     /**
164      * Override this method to handle any command which is not handled by the base behavior.
165      *
166      * @param command the command to process
167      * @return Next behavior to use, null if this actor should shut down.
168      */
169     @Nullable
170     protected abstract ClientActorBehavior<T> onCommand(@Nonnull Object command);
171
172     /**
173      * Override this method to provide a backend resolver instance.
174      *
175      * @return a backend resolver instance
176      */
177     protected final @Nonnull BackendInfoResolver<T> resolver() {
178         return resolver;
179     }
180
181     /**
182      * Callback invoked when a new connection has been established.
183      *
184      * @param conn Old connection
185      * @param backend New backend
186      * @return Newly-connected connection.
187      */
188     @GuardedBy("connectionsLock")
189     protected abstract @Nonnull ConnectedClientConnection<T> connectionUp(
190             final @Nonnull AbstractClientConnection<T> conn, final @Nonnull T backend);
191
192     private void backendConnectFinished(final Long shard, final AbstractClientConnection<T> conn,
193             final T backend, final Throwable failure) {
194         if (failure != null) {
195             LOG.error("{}: failed to resolve shard {}", persistenceId(), shard, failure);
196             return;
197         }
198
199         final long stamp = connectionsLock.writeLock();
200         try {
201             // Bring the connection up
202             final ConnectedClientConnection<T> newConn = connectionUp(conn, backend);
203
204             // Make sure new lookups pick up the new connection
205             connections.replace(shard, conn, newConn);
206             LOG.debug("{}: replaced connection {} with {}", persistenceId(), conn, newConn);
207         } finally {
208             connectionsLock.unlockWrite(stamp);
209         }
210     }
211
212     void removeConnection(final AbstractClientConnection<?> conn) {
213         connections.remove(conn.cookie(), conn);
214         LOG.debug("{}: removed connection {}", persistenceId(), conn);
215     }
216
217     @SuppressWarnings("unchecked")
218     void reconnectConnection(final ConnectedClientConnection<?> oldConn,
219             final ReconnectingClientConnection<?> newConn) {
220         final ReconnectingClientConnection<T> conn = (ReconnectingClientConnection<T>)newConn;
221         connections.replace(oldConn.cookie(), (AbstractClientConnection<T>)oldConn, conn);
222         LOG.debug("{}: connection {} reconnecting as {}", persistenceId(), oldConn, newConn);
223
224         final Long shard = oldConn.cookie();
225         resolver().refreshBackendInfo(shard, conn.getBackendInfo().get()).whenComplete(
226             (backend, failure) -> context().executeInActor(behavior -> {
227                 backendConnectFinished(shard, conn, backend, failure);
228                 return behavior;
229             }));
230     }
231
232     private ConnectingClientConnection<T> createConnection(final Long shard) {
233         final ConnectingClientConnection<T> conn = new ConnectingClientConnection<>(context(), shard);
234
235         resolver().getBackendInfo(shard).whenComplete((backend, failure) -> context().executeInActor(behavior -> {
236             backendConnectFinished(shard, conn, backend, failure);
237             return behavior;
238         }));
239
240         return conn;
241     }
242 }