2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.yangtools.restconf.client;
10 import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
12 import java.io.UnsupportedEncodingException;
13 import java.lang.reflect.InvocationTargetException;
14 import java.lang.reflect.Method;
16 import java.net.URLEncoder;
17 import java.util.Date;
18 import java.util.concurrent.Callable;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.Executors;
22 import javax.ws.rs.core.MediaType;
24 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
25 import org.opendaylight.yangtools.concepts.ListenerRegistration;
26 import org.opendaylight.yangtools.restconf.client.api.event.EventStreamInfo;
27 import org.opendaylight.yangtools.restconf.client.api.event.EventStreamReplay;
28 import org.opendaylight.yangtools.restconf.client.api.event.ListenableEventStreamContext;
29 import org.opendaylight.yangtools.restconf.client.to.RestRpcResult;
30 import org.opendaylight.yangtools.restconf.common.ResourceUri;
31 import org.opendaylight.yangtools.websocket.client.WebSocketIClient;
32 import org.opendaylight.yangtools.websocket.client.callback.ClientMessageCallback;
33 import org.opendaylight.yangtools.yang.binding.NotificationListener;
34 import org.opendaylight.yangtools.yang.binding.util.BindingReflections;
35 import org.opendaylight.yangtools.yang.common.RpcResult;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
39 import com.google.common.base.Charsets;
40 import com.google.common.base.Function;
41 import com.google.common.base.Optional;
42 import com.google.common.util.concurrent.ListenableFuture;
43 import com.google.common.util.concurrent.ListeningExecutorService;
44 import com.google.common.util.concurrent.MoreExecutors;
45 import com.sun.jersey.api.client.ClientResponse;
50 public class RestListenableEventStreamContext<L extends NotificationListener> implements ListenableEventStreamContext,ClientMessageCallback {
52 private static final Logger logger = LoggerFactory.getLogger(RestListenableEventStreamContext.class.toString());
53 private final ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
54 private WebSocketIClient wsClient;
55 private Method listenerCallbackMethod;
56 private final RestconfClientImpl restconfClient;
57 private final EventStreamInfo streamInfo;
58 private static final int STATUS_OK = 200;
60 public RestListenableEventStreamContext(final EventStreamInfo streamInfo,final RestconfClientImpl restconfClient){
61 this.restconfClient = restconfClient;
62 this.streamInfo = streamInfo;
66 public <T extends NotificationListener> ListenerRegistration<T> registerNotificationListener(final T listener) {
68 for (Method m:listener.getClass().getDeclaredMethods()){
69 if (BindingReflections.isNotificationCallback(m)){
70 this.listenerCallbackMethod = m;
74 return new AbstractListenerRegistration<T>(listener) {
76 protected void removeRegistration() {
83 public ListenableFuture<RpcResult<Void>> startListening() {
86 ClientResponse response = null;
88 response = extractWebSocketUriFromRpc(this.streamInfo.getIdentifier());
89 } catch (ExecutionException e) {
90 logger.trace("Execution exception while extracting stream name {}",e);
91 throw new IllegalStateException(e);
92 } catch (InterruptedException e) {
93 logger.trace("InterruptedException while extracting stream name {}",e);
94 throw new IllegalStateException(e);
95 } catch (UnsupportedEncodingException e) {
96 logger.trace("UnsupportedEncodingException while extracting stream name {}",e);
97 throw new IllegalStateException(e);
99 boolean success = true;
100 if (response.getStatus() != STATUS_OK) {
104 final RestRpcResult rpcResult = new RestRpcResult(success,response.getLocation());
105 createWebsocketClient(response.getLocation());
107 ListenableFuture<RpcResult<Void>> future = pool.submit(new Callable<RpcResult<Void>>() {
109 public RpcResult<Void> call() {
118 public ListenableFuture<RpcResult<Void>> startListeningWithReplay(final Optional<Date> startTime, final Optional<Date> endTime) {
119 //TODO RESTCONF doesn't provide this functionality
124 public void stopListening() {
125 this.wsClient.writeAndFlush(new CloseWebSocketFrame(42,this.streamInfo.getIdentifier()));
129 public ListenableFuture<Optional<EventStreamReplay>> getReplay(final Optional<Date> startTime, final Optional<Date> endTime) {
130 //TODO RESTCONF doesn't provide this functionality
135 public void close() {
136 this.stopListening();
139 private ClientResponse extractWebSocketUriFromRpc(final String methodName) throws ExecutionException, InterruptedException, UnsupportedEncodingException {
140 ListenableFuture<ClientResponse> clientFuture = restconfClient.get(ResourceUri.STREAM.getPath()+"/"+encodeUri(this.streamInfo.getIdentifier()),MediaType.APPLICATION_XML,new Function<ClientResponse, ClientResponse>(){
143 public ClientResponse apply(final ClientResponse clientResponse) {
144 return clientResponse;
148 return clientFuture.get();
150 private void createWebsocketClient(final URI websocketServerUri){
151 this.wsClient = new WebSocketIClient(websocketServerUri,this);
153 private String encodeUri(final String encodedPart) throws UnsupportedEncodingException {
154 return URI.create(URLEncoder.encode(encodedPart, Charsets.US_ASCII.name()).toString()).toASCIIString();
158 public void onMessageReceived(final Object message) {
159 if (null == this.listenerCallbackMethod){
160 throw new IllegalStateException("No listener method to invoke.");
163 this.listenerCallbackMethod.invoke(message);
164 } catch (IllegalAccessException | InvocationTargetException e) {
165 throw new IllegalStateException("Failed to invoke callback", e);