BUG-5280: fix problems identified by integration tests
[controller.git] / opendaylight / md-sal / cds-access-client / src / main / java / org / opendaylight / controller / cluster / access / client / ClientActorBehavior.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.access.client;
9
10 import com.google.common.annotations.Beta;
11 import com.google.common.base.Preconditions;
12 import java.util.Map;
13 import java.util.concurrent.ConcurrentHashMap;
14 import javax.annotation.Nonnull;
15 import javax.annotation.Nullable;
16 import javax.annotation.concurrent.GuardedBy;
17 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
18 import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
19 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
20 import org.opendaylight.controller.cluster.access.concepts.RequestException;
21 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
22 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
23 import org.opendaylight.controller.cluster.access.concepts.RetiredGenerationException;
24 import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
25 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
26 import org.opendaylight.yangtools.concepts.Identifiable;
27 import org.opendaylight.yangtools.concepts.WritableIdentifier;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 /**
32  * A behavior, which handles messages sent to a {@link AbstractClientActor}.
33  *
34  * @author Robert Varga
35  */
36 @Beta
37 public abstract class ClientActorBehavior<T extends BackendInfo> extends
38         RecoveredClientActorBehavior<ClientActorContext> implements Identifiable<ClientIdentifier> {
39     private static final Logger LOG = LoggerFactory.getLogger(ClientActorBehavior.class);
40
41     /**
42      * Map of connections to the backend. This map is concurrent to allow lookups, but given complex operations
43      * involved in connection transitions it is protected by a {@link InversibleLock}. Write-side of the lock is taken
44      * during connection transitions. Optimistic read-side of the lock is taken when new connections are introduced
45      * into the map.
46      *
47      * <p>
48      * The lock detects potential AB/BA deadlock scenarios and will force the reader side out by throwing
49      * a {@link InversibleLockException} -- which must be propagated up, releasing locks as it propagates. The initial
50      * entry point causing the the conflicting lookup must then call {@link InversibleLockException#awaitResolution()}
51      * before retrying the operation.
52      */
53     // TODO: it should be possible to move these two into ClientActorContext
54     private final Map<Long, AbstractClientConnection<T>> connections = new ConcurrentHashMap<>();
55     private final InversibleLock connectionsLock = new InversibleLock();
56     private final BackendInfoResolver<T> resolver;
57
58     protected ClientActorBehavior(@Nonnull final ClientActorContext context,
59             @Nonnull final BackendInfoResolver<T> resolver) {
60         super(context);
61         this.resolver = Preconditions.checkNotNull(resolver);
62     }
63
64     @Override
65     @Nonnull
66     public final ClientIdentifier getIdentifier() {
67         return context().getIdentifier();
68     }
69
70     /**
71      * Get a connection to a shard.
72      *
73      * @param shard Shard cookie
74      * @return Connection to a shard
75      * @throws InversibleLockException if the shard is being reconnected
76      */
77     public final AbstractClientConnection<T> getConnection(final Long shard) {
78         while (true) {
79             final long stamp = connectionsLock.optimisticRead();
80             final AbstractClientConnection<T> conn = connections.computeIfAbsent(shard, this::createConnection);
81             if (connectionsLock.validate(stamp)) {
82                 // No write-lock in-between, return success
83                 return conn;
84             }
85         }
86     }
87
88     @SuppressWarnings("unchecked")
89     @Override
90     final ClientActorBehavior<T> onReceiveCommand(final Object command) {
91         if (command instanceof InternalCommand) {
92             return ((InternalCommand<T>) command).execute(this);
93         }
94         if (command instanceof SuccessEnvelope) {
95             return onRequestSuccess((SuccessEnvelope) command);
96         }
97         if (command instanceof FailureEnvelope) {
98             return internalOnRequestFailure((FailureEnvelope) command);
99         }
100
101         return onCommand(command);
102     }
103
104     private static long extractCookie(final WritableIdentifier id) {
105         if (id instanceof TransactionIdentifier) {
106             return ((TransactionIdentifier) id).getHistoryId().getCookie();
107         } else if (id instanceof LocalHistoryIdentifier) {
108             return ((LocalHistoryIdentifier) id).getCookie();
109         } else {
110             throw new IllegalArgumentException("Unhandled identifier " + id);
111         }
112     }
113
114     private void onResponse(final ResponseEnvelope<?> response) {
115         final long cookie = extractCookie(response.getMessage().getTarget());
116         final AbstractClientConnection<T> connection = connections.get(cookie);
117         if (connection != null) {
118             connection.receiveResponse(response);
119         } else {
120             LOG.info("{}: Ignoring unknown response {}", persistenceId(), response);
121         }
122     }
123
124     private ClientActorBehavior<T> onRequestSuccess(final SuccessEnvelope success) {
125         onResponse(success);
126         return this;
127     }
128
129     private ClientActorBehavior<T> onRequestFailure(final FailureEnvelope failure) {
130         onResponse(failure);
131         return this;
132     }
133
134     private ClientActorBehavior<T> internalOnRequestFailure(final FailureEnvelope command) {
135         final RequestFailure<?, ?> failure = command.getMessage();
136         final RequestException cause = failure.getCause();
137         if (cause instanceof RetiredGenerationException) {
138             LOG.error("{}: current generation {} has been superseded", persistenceId(), getIdentifier(), cause);
139             haltClient(cause);
140             poison(cause);
141             return null;
142         }
143
144         return onRequestFailure(command);
145     }
146
147     private void poison(final RequestException cause) {
148         final long stamp = connectionsLock.writeLock();
149         try {
150             for (AbstractClientConnection<T> q : connections.values()) {
151                 q.poison(cause);
152             }
153
154             connections.clear();
155         } finally {
156             connectionsLock.unlockWrite(stamp);
157         }
158     }
159
160     /**
161      * Halt And Catch Fire. Halt processing on this client. Implementations need to ensure they initiate state flush
162      * procedures. No attempt to use this instance should be made after this method returns. Any such use may result
163      * in undefined behavior.
164      *
165      * @param cause Failure cause
166      */
167     protected abstract void haltClient(@Nonnull Throwable cause);
168
169     /**
170      * Override this method to handle any command which is not handled by the base behavior.
171      *
172      * @param command the command to process
173      * @return Next behavior to use, null if this actor should shut down.
174      */
175     @Nullable
176     protected abstract ClientActorBehavior<T> onCommand(@Nonnull Object command);
177
178     /**
179      * Override this method to provide a backend resolver instance.
180      *
181      * @return a backend resolver instance
182      */
183     protected final @Nonnull BackendInfoResolver<T> resolver() {
184         return resolver;
185     }
186
187     /**
188      * Callback invoked when a new connection has been established.
189      *
190      * @param conn Old connection
191      * @param backend New backend
192      * @return Newly-connected connection.
193      */
194     @GuardedBy("connectionsLock")
195     protected abstract @Nonnull ConnectedClientConnection<T> connectionUp(
196             final @Nonnull AbstractClientConnection<T> conn, final @Nonnull T backend);
197
198     private void backendConnectFinished(final Long shard, final AbstractClientConnection<T> conn,
199             final T backend, final Throwable failure) {
200         if (failure != null) {
201             LOG.error("{}: failed to resolve shard {}", persistenceId(), shard, failure);
202             return;
203         }
204
205         LOG.debug("{}: resolved shard {} to {}", persistenceId(), shard, backend);
206         final long stamp = connectionsLock.writeLock();
207         try {
208             // Bring the connection up
209             final ConnectedClientConnection<T> newConn = connectionUp(conn, backend);
210
211             // Make sure new lookups pick up the new connection
212             connections.replace(shard, conn, newConn);
213             LOG.debug("{}: replaced connection {} with {}", persistenceId(), conn, newConn);
214         } finally {
215             connectionsLock.unlockWrite(stamp);
216         }
217     }
218
219     void removeConnection(final AbstractClientConnection<?> conn) {
220         connections.remove(conn.cookie(), conn);
221         LOG.debug("{}: removed connection {}", persistenceId(), conn);
222     }
223
224     @SuppressWarnings("unchecked")
225     void reconnectConnection(final ConnectedClientConnection<?> oldConn,
226             final ReconnectingClientConnection<?> newConn) {
227         final ReconnectingClientConnection<T> conn = (ReconnectingClientConnection<T>)newConn;
228         connections.replace(oldConn.cookie(), (AbstractClientConnection<T>)oldConn, conn);
229         LOG.debug("{}: connection {} reconnecting as {}", persistenceId(), oldConn, newConn);
230
231         final Long shard = oldConn.cookie();
232         resolver().refreshBackendInfo(shard, conn.getBackendInfo().get()).whenComplete(
233             (backend, failure) -> context().executeInActor(behavior -> {
234                 backendConnectFinished(shard, conn, backend, failure);
235                 return behavior;
236             }));
237     }
238
239     private ConnectingClientConnection<T> createConnection(final Long shard) {
240         final ConnectingClientConnection<T> conn = new ConnectingClientConnection<>(context(), shard);
241
242         resolver().getBackendInfo(shard).whenComplete((backend, failure) -> context().executeInActor(behavior -> {
243             backendConnectFinished(shard, conn, backend, failure);
244             return behavior;
245         }));
246
247         return conn;
248     }
249 }