Fix thread safety issues in netconf client
[controller.git] / opendaylight / netconf / netconf-client / src / main / java / org / opendaylight / controller / netconf / client / NetconfClientSessionListener.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.client;
10
11 import io.netty.util.concurrent.Future;
12 import io.netty.util.concurrent.GlobalEventExecutor;
13 import io.netty.util.concurrent.Promise;
14
15 import java.util.ArrayDeque;
16 import java.util.Queue;
17
18 import javax.annotation.concurrent.GuardedBy;
19
20 import org.opendaylight.controller.netconf.api.NetconfMessage;
21 import org.opendaylight.controller.netconf.api.NetconfSessionListener;
22 import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 import com.google.common.base.Preconditions;
27
28 public class NetconfClientSessionListener implements NetconfSessionListener<NetconfClientSession> {
29     private static final class RequestEntry {
30         final Promise<NetconfMessage> promise;
31         final NetconfMessage request;
32
33         public RequestEntry(Promise<NetconfMessage> future, NetconfMessage request) {
34             this.promise = Preconditions.checkNotNull(future);
35             this.request = Preconditions.checkNotNull(request);
36         }
37     }
38
39     private static final Logger logger = LoggerFactory.getLogger(NetconfClientSessionListener.class);
40
41     @GuardedBy("this")
42     private final Queue<RequestEntry> requests = new ArrayDeque<>();
43
44     @GuardedBy("this")
45     private NetconfClientSession clientSession;
46
47     @GuardedBy("this")
48     private void dispatchRequest() {
49         while (!requests.isEmpty()) {
50             final RequestEntry e = requests.peek();
51             if (e.promise.setUncancellable()) {
52                 logger.debug("Sending message {}", e.request);
53                 clientSession.sendMessage(e.request);
54                 break;
55             }
56
57             logger.debug("Message {} has been cancelled, skipping it", e.request);
58             requests.poll();
59         }
60     }
61
62     @Override
63     public final synchronized void onSessionUp(NetconfClientSession clientSession) {
64         this.clientSession = Preconditions.checkNotNull(clientSession);
65         logger.debug("Client session {} went up", clientSession);
66         dispatchRequest();
67     }
68
69     private synchronized void tearDown(final Exception cause) {
70         final RequestEntry e = requests.poll();
71         if (e != null) {
72             e.promise.setFailure(cause);
73         }
74
75         this.clientSession = null;
76     }
77
78     @Override
79     public final void onSessionDown(NetconfClientSession clientSession, Exception e) {
80         logger.debug("Client Session {} went down unexpectedly", clientSession, e);
81         tearDown(e);
82     }
83
84     @Override
85     public final void onSessionTerminated(NetconfClientSession clientSession,
86             NetconfTerminationReason netconfTerminationReason) {
87         logger.debug("Client Session {} terminated, reason: {}", clientSession,
88                 netconfTerminationReason.getErrorMessage());
89         tearDown(new RuntimeException(netconfTerminationReason.getErrorMessage()));
90     }
91
92     @Override
93     public synchronized void onMessage(NetconfClientSession session, NetconfMessage message) {
94         logger.debug("New message arrived: {}", message);
95
96         final RequestEntry e = requests.poll();
97         if (e != null) {
98             e.promise.setSuccess(message);
99             dispatchRequest();
100         } else {
101             logger.info("Ignoring unsolicited message {}", message);
102         }
103     }
104
105     final synchronized Future<NetconfMessage> sendRequest(NetconfMessage message) {
106         final RequestEntry req = new RequestEntry(GlobalEventExecutor.INSTANCE.<NetconfMessage>newPromise(), message);
107
108         requests.add(req);
109         if (clientSession != null) {
110             dispatchRequest();
111         }
112
113         return req.promise;
114     }
115 }