Merge "Bug 509: Fixed incorrect merging of Data Store Writes / Events"
[controller.git] / opendaylight / netconf / netconf-client / src / main / java / org / opendaylight / controller / netconf / client / SimpleNetconfClientSessionListener.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.client;
10
11 import io.netty.util.concurrent.Future;
12 import io.netty.util.concurrent.GlobalEventExecutor;
13 import io.netty.util.concurrent.Promise;
14
15 import java.util.ArrayDeque;
16 import java.util.Queue;
17
18 import javax.annotation.concurrent.GuardedBy;
19
20 import org.opendaylight.controller.netconf.api.NetconfMessage;
21 import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24
25 import com.google.common.base.Preconditions;
26
27 public class SimpleNetconfClientSessionListener implements NetconfClientSessionListener {
28     private static final class RequestEntry {
29         final Promise<NetconfMessage> promise;
30         final NetconfMessage request;
31
32         public RequestEntry(Promise<NetconfMessage> future, NetconfMessage request) {
33             this.promise = Preconditions.checkNotNull(future);
34             this.request = Preconditions.checkNotNull(request);
35         }
36     }
37
38     private static final Logger logger = LoggerFactory.getLogger(SimpleNetconfClientSessionListener.class);
39
40     @GuardedBy("this")
41     private final Queue<RequestEntry> requests = new ArrayDeque<>();
42
43     @GuardedBy("this")
44     private NetconfClientSession clientSession;
45
46     @GuardedBy("this")
47     private void dispatchRequest() {
48         while (!requests.isEmpty()) {
49             final RequestEntry e = requests.peek();
50             if (e.promise.setUncancellable()) {
51                 logger.debug("Sending message {}", e.request);
52                 clientSession.sendMessage(e.request);
53                 break;
54             }
55
56             logger.debug("Message {} has been cancelled, skipping it", e.request);
57             requests.poll();
58         }
59     }
60
61     @Override
62     public final synchronized void onSessionUp(NetconfClientSession clientSession) {
63         this.clientSession = Preconditions.checkNotNull(clientSession);
64         logger.debug("Client session {} went up", clientSession);
65         dispatchRequest();
66     }
67
68     private synchronized void tearDown(final Exception cause) {
69         final RequestEntry e = requests.poll();
70         if (e != null) {
71             e.promise.setFailure(cause);
72         }
73
74         this.clientSession = null;
75     }
76
77     @Override
78     public final void onSessionDown(NetconfClientSession clientSession, Exception e) {
79         logger.debug("Client Session {} went down unexpectedly", clientSession, e);
80         tearDown(e);
81     }
82
83     @Override
84     public final void onSessionTerminated(NetconfClientSession clientSession,
85             NetconfTerminationReason netconfTerminationReason) {
86         logger.debug("Client Session {} terminated, reason: {}", clientSession,
87                 netconfTerminationReason.getErrorMessage());
88         tearDown(new RuntimeException(netconfTerminationReason.getErrorMessage()));
89     }
90
91     @Override
92     public synchronized void onMessage(NetconfClientSession session, NetconfMessage message) {
93         logger.debug("New message arrived: {}", message);
94
95         final RequestEntry e = requests.poll();
96         if (e != null) {
97             e.promise.setSuccess(message);
98             dispatchRequest();
99         } else {
100             logger.info("Ignoring unsolicited message {}", message);
101         }
102     }
103
104     public final synchronized Future<NetconfMessage> sendRequest(NetconfMessage message) {
105         final RequestEntry req = new RequestEntry(GlobalEventExecutor.INSTANCE.<NetconfMessage>newPromise(), message);
106
107         requests.add(req);
108         if (clientSession != null) {
109             dispatchRequest();
110         }
111
112         return req.promise;
113     }
114 }