Global cleanup of restconf client dependencies
[yangtools.git] / restconf / restconf-client-impl / src / main / java / org / opendaylight / yangtools / restconf / client / RestListenableEventStreamContext.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.yangtools.restconf.client;
9
10 import com.google.common.base.Charsets;
11 import com.google.common.base.Function;
12 import com.google.common.base.Optional;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.ListeningExecutorService;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import com.sun.jersey.api.client.ClientResponse;
17 import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
18 import java.io.UnsupportedEncodingException;
19 import java.lang.reflect.InvocationTargetException;
20 import java.lang.reflect.Method;
21 import java.net.URI;
22 import java.net.URLEncoder;
23 import java.util.Date;
24 import java.util.concurrent.Callable;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.Executors;
27 import javax.ws.rs.core.MediaType;
28 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
29 import org.opendaylight.yangtools.concepts.ListenerRegistration;
30 import org.opendaylight.yangtools.restconf.client.api.event.EventStreamInfo;
31 import org.opendaylight.yangtools.restconf.client.api.event.EventStreamReplay;
32 import org.opendaylight.yangtools.restconf.client.api.event.ListenableEventStreamContext;
33 import org.opendaylight.yangtools.restconf.client.to.RestRpcResult;
34 import org.opendaylight.yangtools.restconf.common.ResourceUri;
35 import org.opendaylight.yangtools.websocket.client.WebSocketIClient;
36 import org.opendaylight.yangtools.websocket.client.callback.ClientMessageCallback;
37 import org.opendaylight.yangtools.yang.binding.NotificationListener;
38 import org.opendaylight.yangtools.yang.binding.util.BindingReflections;
39 import org.opendaylight.yangtools.yang.common.RpcResult;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43
44
45
46 public class RestListenableEventStreamContext<L extends NotificationListener> implements ListenableEventStreamContext,ClientMessageCallback {
47
48     private static final Logger logger = LoggerFactory.getLogger(RestListenableEventStreamContext.class.toString());
49     private final ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
50     private WebSocketIClient wsClient;
51     private Method listenerCallbackMethod;
52     private final RestconfClientImpl restconfClient;
53     private final EventStreamInfo streamInfo;
54
55     public RestListenableEventStreamContext(EventStreamInfo streamInfo,RestconfClientImpl restconfClient){
56         this.restconfClient = restconfClient;
57         this.streamInfo = streamInfo;
58     }
59     @Override
60     public <L extends NotificationListener> ListenerRegistration<L> registerNotificationListener(L listener) {
61
62         for (Method m:listener.getClass().getDeclaredMethods()){
63             if (BindingReflections.isNotificationCallback(m)){
64                 this.listenerCallbackMethod = m;
65                 break;
66             }
67         }
68         return new AbstractListenerRegistration<L>(listener) {
69             @Override
70             protected void removeRegistration() {
71                 stopListening();
72             }
73         };
74     }
75
76     @Override
77     public ListenableFuture<RpcResult<Void>> startListening() {
78
79
80         ClientResponse response = null;
81         try {
82             response = extractWebSocketUriFromRpc(this.streamInfo.getIdentifier());
83         } catch (ExecutionException e) {
84             logger.trace("Execution exception while extracting stream name {}",e);
85             throw new IllegalStateException(e);
86         } catch (InterruptedException e) {
87             logger.trace("InterruptedException while extracting stream name {}",e);
88             throw new IllegalStateException(e);
89         } catch (UnsupportedEncodingException e) {
90             logger.trace("UnsupportedEncodingException while extracting stream name {}",e);
91             throw new IllegalStateException(e);
92         }
93         boolean success = true;
94         if (response.getStatus() != 200) {
95             success = false;
96         }
97
98         final RestRpcResult rpcResult = new RestRpcResult(success,response.getLocation());
99         createWebsocketClient(response.getLocation());
100
101         ListenableFuture<RpcResult<Void>> future = pool.submit(new Callable<RpcResult<Void>>() {
102             @Override
103             public RpcResult<Void> call() throws Exception {
104                 return rpcResult;
105             }
106         });
107
108         return future;
109     }
110
111     @Override
112     public ListenableFuture<RpcResult<Void>> startListeningWithReplay(Optional<Date> startTime, Optional<Date> endTime) {
113         //TODO RESTCONF doesn't provide this functionality
114         return null;
115     }
116
117     @Override
118     public void stopListening() {
119         this.wsClient.writeAndFlush(new CloseWebSocketFrame(42,this.streamInfo.getIdentifier()));
120     }
121
122     @Override
123     public ListenableFuture<Optional<EventStreamReplay>> getReplay(Optional<Date> startTime, Optional<Date> endTime) {
124         //TODO RESTCONF doesn't provide this functionality
125         return null;
126     }
127
128     @Override
129     public void close() {
130         this.stopListening();
131     }
132
133     private ClientResponse extractWebSocketUriFromRpc(String methodName) throws ExecutionException, InterruptedException, UnsupportedEncodingException {
134         ListenableFuture<ClientResponse> clientFuture = restconfClient.get(ResourceUri.STREAM.getPath()+"/"+encodeUri(this.streamInfo.getIdentifier()),MediaType.APPLICATION_XML,new Function<ClientResponse, ClientResponse>(){
135
136             @Override
137             public ClientResponse apply(ClientResponse clientResponse) {
138                 return clientResponse;
139             }
140         });
141         while (!clientFuture.isDone()){
142             //noop
143         }
144         return clientFuture.get();
145     }
146     private void createWebsocketClient(URI websocketServerUri){
147         this.wsClient = new WebSocketIClient(websocketServerUri,this);
148     }
149     private String encodeUri(String encodedPart) throws UnsupportedEncodingException {
150         return URI.create(URLEncoder.encode(encodedPart, Charsets.US_ASCII.name()).toString()).toASCIIString();
151     }
152
153     @Override
154     public void onMessageReceived(Object message) {
155         if (null == this.listenerCallbackMethod){
156             throw new IllegalStateException("No listener method to invoke.");
157         }
158         try {
159             this.listenerCallbackMethod.invoke(message);
160         } catch (IllegalAccessException e) {
161             throw new IllegalStateException(e.getMessage());
162         } catch (InvocationTargetException e) {
163             throw new IllegalStateException(e.getMessage());
164         }
165     }
166
167 }