Merge "Global cleanup of restconf client dependencies"
[yangtools.git] / restconf / restconf-client-impl / src / main / java / org / opendaylight / yangtools / restconf / client / RestListenableEventStreamContext.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.yangtools.restconf.client;
9
10 import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
11
12 import java.io.UnsupportedEncodingException;
13 import java.lang.reflect.InvocationTargetException;
14 import java.lang.reflect.Method;
15 import java.net.URI;
16 import java.net.URLEncoder;
17 import java.util.Date;
18 import java.util.concurrent.Callable;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.Executors;
21
22 import javax.ws.rs.core.MediaType;
23
24 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
25 import org.opendaylight.yangtools.concepts.ListenerRegistration;
26 import org.opendaylight.yangtools.restconf.client.api.event.EventStreamInfo;
27 import org.opendaylight.yangtools.restconf.client.api.event.EventStreamReplay;
28 import org.opendaylight.yangtools.restconf.client.api.event.ListenableEventStreamContext;
29 import org.opendaylight.yangtools.restconf.client.to.RestRpcResult;
30 import org.opendaylight.yangtools.restconf.common.ResourceUri;
31 import org.opendaylight.yangtools.websocket.client.WebSocketIClient;
32 import org.opendaylight.yangtools.websocket.client.callback.ClientMessageCallback;
33 import org.opendaylight.yangtools.yang.binding.NotificationListener;
34 import org.opendaylight.yangtools.yang.binding.util.BindingReflections;
35 import org.opendaylight.yangtools.yang.common.RpcResult;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 import com.google.common.base.Charsets;
40 import com.google.common.base.Function;
41 import com.google.common.base.Optional;
42 import com.google.common.util.concurrent.ListenableFuture;
43 import com.google.common.util.concurrent.ListeningExecutorService;
44 import com.google.common.util.concurrent.MoreExecutors;
45 import com.sun.jersey.api.client.ClientResponse;
46
47
48
49
50 public class RestListenableEventStreamContext<L extends NotificationListener> implements ListenableEventStreamContext,ClientMessageCallback {
51
52     private static final Logger logger = LoggerFactory.getLogger(RestListenableEventStreamContext.class.toString());
53     private final ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
54     private WebSocketIClient wsClient;
55     private Method listenerCallbackMethod;
56     private final RestconfClientImpl restconfClient;
57     private final EventStreamInfo streamInfo;
58
59     public RestListenableEventStreamContext(EventStreamInfo streamInfo,RestconfClientImpl restconfClient){
60         this.restconfClient = restconfClient;
61         this.streamInfo = streamInfo;
62     }
63     @Override
64     public <L extends NotificationListener> ListenerRegistration<L> registerNotificationListener(L listener) {
65
66         for (Method m:listener.getClass().getDeclaredMethods()){
67             if (BindingReflections.isNotificationCallback(m)){
68                 this.listenerCallbackMethod = m;
69                 break;
70             }
71         }
72         return new AbstractListenerRegistration<L>(listener) {
73             @Override
74             protected void removeRegistration() {
75                 stopListening();
76             }
77         };
78     }
79
80     @Override
81     public ListenableFuture<RpcResult<Void>> startListening() {
82
83
84         ClientResponse response = null;
85         try {
86             response = extractWebSocketUriFromRpc(this.streamInfo.getIdentifier());
87         } catch (ExecutionException e) {
88             logger.trace("Execution exception while extracting stream name {}",e);
89             throw new IllegalStateException(e);
90         } catch (InterruptedException e) {
91             logger.trace("InterruptedException while extracting stream name {}",e);
92             throw new IllegalStateException(e);
93         } catch (UnsupportedEncodingException e) {
94             logger.trace("UnsupportedEncodingException while extracting stream name {}",e);
95             throw new IllegalStateException(e);
96         }
97         boolean success = true;
98         if (response.getStatus() != 200) {
99             success = false;
100         }
101
102         final RestRpcResult rpcResult = new RestRpcResult(success,response.getLocation());
103         createWebsocketClient(response.getLocation());
104
105         ListenableFuture<RpcResult<Void>> future = pool.submit(new Callable<RpcResult<Void>>() {
106             @Override
107             public RpcResult<Void> call() throws Exception {
108                 return rpcResult;
109             }
110         });
111
112         return future;
113     }
114
115     @Override
116     public ListenableFuture<RpcResult<Void>> startListeningWithReplay(Optional<Date> startTime, Optional<Date> endTime) {
117         //TODO RESTCONF doesn't provide this functionality
118         return null;
119     }
120
121     @Override
122     public void stopListening() {
123         this.wsClient.writeAndFlush(new CloseWebSocketFrame(42,this.streamInfo.getIdentifier()));
124     }
125
126     @Override
127     public ListenableFuture<Optional<EventStreamReplay>> getReplay(Optional<Date> startTime, Optional<Date> endTime) {
128         //TODO RESTCONF doesn't provide this functionality
129         return null;
130     }
131
132     @Override
133     public void close() {
134         this.stopListening();
135     }
136
137     private ClientResponse extractWebSocketUriFromRpc(String methodName) throws ExecutionException, InterruptedException, UnsupportedEncodingException {
138         ListenableFuture<ClientResponse> clientFuture = restconfClient.get(ResourceUri.STREAM.getPath()+"/"+encodeUri(this.streamInfo.getIdentifier()),MediaType.APPLICATION_XML,new Function<ClientResponse, ClientResponse>(){
139
140             @Override
141             public ClientResponse apply(ClientResponse clientResponse) {
142                 return clientResponse;
143             }
144         });
145         while (!clientFuture.isDone()){
146             //noop
147         }
148         return clientFuture.get();
149     }
150     private void createWebsocketClient(URI websocketServerUri){
151         this.wsClient = new WebSocketIClient(websocketServerUri,this);
152     }
153     private String encodeUri(String encodedPart) throws UnsupportedEncodingException {
154         return URI.create(URLEncoder.encode(encodedPart, Charsets.US_ASCII.name()).toString()).toASCIIString();
155     }
156
157     @Override
158     public void onMessageReceived(Object message) {
159         if (null == this.listenerCallbackMethod){
160             throw new IllegalStateException("No listener method to invoke.");
161         }
162         try {
163             this.listenerCallbackMethod.invoke(message);
164         } catch (IllegalAccessException e) {
165             throw new IllegalStateException(e.getMessage());
166         } catch (InvocationTargetException e) {
167             throw new IllegalStateException(e.getMessage());
168         }
169     }
170
171 }