unifying statistics manager api packages
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / core / RpcListeningExecutorService.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  * 
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.openflow.md.core;
9
10 import java.util.Collection;
11 import java.util.List;
12 import java.util.concurrent.Callable;
13 import java.util.concurrent.ExecutionException;
14 import java.util.concurrent.Future;
15 import java.util.concurrent.TimeUnit;
16 import java.util.concurrent.TimeoutException;
17
18 import org.opendaylight.openflowplugin.openflow.md.core.sal.OFRpcTask;
19 import org.opendaylight.openflowplugin.api.openflow.statistics.MessageSpy;
20 import org.opendaylight.yangtools.yang.binding.DataContainer;
21
22 import com.google.common.util.concurrent.ListenableFuture;
23 import com.google.common.util.concurrent.ListeningExecutorService;
24
25 /**
26  * 
27  */
28 public class RpcListeningExecutorService implements ListeningExecutorService {
29     
30     private MessageSpy<DataContainer> messageSpy;
31     private ListeningExecutorService executorServiceDelegate;
32     private DataContainer notSupportedTask = new NoDataContainerTask();
33     
34     /**
35      * @param executorService
36      */
37     public RpcListeningExecutorService(ListeningExecutorService executorService) {
38         this.executorServiceDelegate = executorService;
39     }
40     
41     /**
42      * @param messageSpy the messageSpy to set
43      */
44     public void setMessageSpy(MessageSpy<DataContainer> messageSpy) {
45         this.messageSpy = messageSpy;
46     }
47     
48     @Override
49     public void shutdown() {
50         executorServiceDelegate.shutdown();
51     }
52
53     @Override
54     public <T> ListenableFuture<T> submit(Callable<T> task) {
55         ListenableFuture<T> resultFuture = executorServiceDelegate.submit(task);
56         
57         boolean covered = false;
58         if (task instanceof OFRpcTask<?, ?>) {
59             if (((OFRpcTask<?, ?>) task).getInput() instanceof DataContainer) {
60                 messageSpy.spyMessage((DataContainer) ((OFRpcTask<?, ?>) task).getInput(), 
61                         MessageSpy.STATISTIC_GROUP.TO_SWITCH_ENQUEUED_SUCCESS);
62                 covered = true;
63             }
64         } 
65         
66         if (! covered) {
67             messageSpy.spyMessage(notSupportedTask, MessageSpy.STATISTIC_GROUP.TO_SWITCH_ENQUEUED_FAILED);
68         }
69         
70         return resultFuture;
71     }
72
73     @Override
74     public ListenableFuture<?> submit(Runnable task) {
75         throw new IllegalAccessError("not supported");
76     }
77
78     @Override
79     public <T> ListenableFuture<T> submit(Runnable task, T result) {
80         throw new IllegalAccessError("not supported");
81     }
82
83     @Override
84     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
85             throws InterruptedException {
86         return executorServiceDelegate.invokeAll(tasks);
87     }
88
89     @Override
90     public <T> List<Future<T>> invokeAll(
91             Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
92             throws InterruptedException {
93         return executorServiceDelegate.invokeAll(tasks, timeout, unit);
94     }
95
96     @Override
97     public void execute(Runnable command) {
98         throw new IllegalAccessError("not supported");
99     }
100
101     @Override
102     public List<Runnable> shutdownNow() {
103         return executorServiceDelegate.shutdownNow();
104     }
105
106     @Override
107     public boolean isShutdown() {
108         return executorServiceDelegate.isShutdown();
109     }
110
111     @Override
112     public boolean isTerminated() {
113         return executorServiceDelegate.isTerminated();
114     }
115
116     @Override
117     public boolean awaitTermination(long timeout, TimeUnit unit)
118             throws InterruptedException {
119         return executorServiceDelegate.awaitTermination(timeout, unit);
120     }
121
122     @Override
123     public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
124             throws InterruptedException, ExecutionException {
125         throw new IllegalAccessError("not supported");
126     }
127
128     @Override
129     public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
130             long timeout, TimeUnit unit) throws InterruptedException,
131             ExecutionException, TimeoutException {
132         throw new IllegalAccessError("not supported");
133     }
134     
135     protected static class NoDataContainerTask implements DataContainer {
136         @Override
137         public Class<? extends DataContainer> getImplementedInterface() {
138             return null;
139         }
140     }
141
142 }