BUG-956 deadlock by rpc invocation - phase2
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / core / sal / OFRpcTaskFactory.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.openflow.md.core.sal;
9
10 import java.math.BigInteger;
11 import java.util.Collection;
12 import java.util.concurrent.Future;
13
14 import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
15 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.FlowConvertor;
16 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.GroupConvertor;
17 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.MeterConvertor;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAddedBuilder;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdated;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdatedBuilder;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInput;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInput;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupAdded;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupAddedBuilder;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupUpdated;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupUpdatedBuilder;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupInput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.Group;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterInput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterAdded;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterAddedBuilder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterUpdated;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterUpdatedBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterInput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.Meter;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInputBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GroupModInputBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModInputBuilder;
46 import org.opendaylight.yangtools.yang.common.RpcError;
47 import org.opendaylight.yangtools.yang.common.RpcResult;
48
49 import com.google.common.util.concurrent.JdkFutureAdapters;
50 import com.google.common.util.concurrent.ListenableFuture;
51 import com.google.common.util.concurrent.SettableFuture;
52
53 /**
54  *
55  */
56 public abstract class OFRpcTaskFactory {
57
58     /**
59      * @param taskContext 
60      * @param input 
61      * @param cookie 
62      * @return UpdateFlow task
63      */
64     public static OFRpcTask<AddFlowInput, RpcResult<UpdateFlowOutput>> createAddFlowTask(
65             OFRpcTaskContext taskContext, AddFlowInput input, 
66             SwitchConnectionDistinguisher cookie) {
67         OFRpcTask<AddFlowInput, RpcResult<UpdateFlowOutput>> task = 
68                 new OFRpcTask<AddFlowInput, RpcResult<UpdateFlowOutput>>(taskContext, cookie, input) {
69             
70             @Override
71             public ListenableFuture<RpcResult<UpdateFlowOutput>> call() {
72                 ListenableFuture<RpcResult<UpdateFlowOutput>> result = SettableFuture.create();
73                 
74                 Collection<RpcError> barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), getInput().isBarrier(), getCookie());
75                 if (!barrierErrors.isEmpty()) {
76                     OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture<RpcResult<UpdateFlowOutput>>) result), barrierErrors);
77                 } else {
78                     // Convert the AddFlowInput to FlowModInput
79                     FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(getInput(), 
80                             getVersion(), getSession().getFeatures().getDatapathId());
81                     final Long xId = getSession().getNextXid();
82                     ofFlowModInput.setXid(xId);
83                     
84                     Future<RpcResult<UpdateFlowOutput>> resultFromOFLib = 
85                             getMessageService().flowMod(ofFlowModInput.build(), getCookie());
86                     result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
87                     
88                     OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(), 
89                             createFlowAddedNotification(xId, getInput()));
90                 }
91
92                 return result;
93             }
94         };
95         
96         return task;
97     }
98
99     /**
100      * @param xId
101      * @return
102      */
103     protected static NotificationComposer<FlowAdded> createFlowAddedNotification(
104             final Long xId, final AddFlowInput input) {
105         return new NotificationComposer<FlowAdded>() {
106             @Override
107             public FlowAdded compose() {
108                 FlowAddedBuilder newFlow = new FlowAddedBuilder((Flow) input);
109                 newFlow.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
110                 newFlow.setFlowRef(input.getFlowRef());
111                 return newFlow.build();
112             }
113         };
114     }
115
116     /**
117      * @param taskContext 
118      * @param input 
119      * @param cookie 
120      * @return UpdateFlow task
121      */
122     public static OFRpcTask<UpdateFlowInput, RpcResult<UpdateFlowOutput>> createUpdateFlowTask(
123             OFRpcTaskContext taskContext, UpdateFlowInput input, 
124             SwitchConnectionDistinguisher cookie) {
125         
126         OFRpcTask<UpdateFlowInput, RpcResult<UpdateFlowOutput>> task = 
127                 new OFRpcTask<UpdateFlowInput, RpcResult<UpdateFlowOutput>>(taskContext, cookie, input) {
128             
129             @Override
130             public ListenableFuture<RpcResult<UpdateFlowOutput>> call() {
131                 ListenableFuture<RpcResult<UpdateFlowOutput>> result = null;
132                 Collection<RpcError> barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), 
133                         getInput().getUpdatedFlow().isBarrier(), getCookie());
134                 if (!barrierErrors.isEmpty()) {
135                     OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture<RpcResult<UpdateFlowOutput>>) result), barrierErrors);
136                 } else {
137                     // Convert the AddFlowInput to FlowModInput
138                     FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(getInput().getUpdatedFlow(), 
139                             getVersion(), getSession().getFeatures().getDatapathId());
140                     Long xId = getSession().getNextXid();
141                     ofFlowModInput.setXid(xId);
142     
143                     Future<RpcResult<UpdateFlowOutput>> resultFromOFLib = 
144                             getMessageService().flowMod(ofFlowModInput.build(), getCookie());
145                     result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
146                     
147                     OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(), 
148                             createFlowUpdatedNotification(xId, getInput()));
149                 }
150                 return result;
151             }
152         };
153         return task;
154     }
155
156     /**
157      * @param xId
158      * @param input
159      * @return
160      */
161     protected static NotificationComposer<FlowUpdated> createFlowUpdatedNotification(
162             final Long xId, final UpdateFlowInput input) {
163         return new NotificationComposer<FlowUpdated>() {
164             @Override
165             public FlowUpdated compose() {
166                 FlowUpdatedBuilder updFlow = new FlowUpdatedBuilder(input.getUpdatedFlow());
167                 updFlow.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
168                 updFlow.setFlowRef(input.getFlowRef());
169                 return updFlow.build();
170             }
171         };
172     }
173     
174     /**
175      * @param taskContext
176      * @param input
177      * @param cookie
178      * @return update group task
179      */
180     public static OFRpcTask<AddGroupInput, RpcResult<UpdateGroupOutput>> createAddGroupTask(
181             final OFRpcTaskContext taskContext, AddGroupInput input, 
182             final SwitchConnectionDistinguisher cookie) {
183         OFRpcTask<AddGroupInput, RpcResult<UpdateGroupOutput>> task = 
184                 new OFRpcTask<AddGroupInput, RpcResult<UpdateGroupOutput>>(taskContext, cookie, input) {
185             
186             @Override
187             public ListenableFuture<RpcResult<UpdateGroupOutput>> call() {
188                 ListenableFuture<RpcResult<UpdateGroupOutput>> result = SettableFuture.create();
189                 
190                 Collection<RpcError> barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), getInput().isBarrier(), getCookie());
191                 if (!barrierErrors.isEmpty()) {
192                     OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture<RpcResult<UpdateGroupOutput>>) result), barrierErrors);
193                 } else {
194                     // Convert the AddGroupInput to GroupModInput
195                     GroupModInputBuilder ofGroupModInput = GroupConvertor.toGroupModInput(getInput(), 
196                             getVersion(), getSession().getFeatures().getDatapathId());
197                     final Long xId = getSession().getNextXid();
198                     ofGroupModInput.setXid(xId);
199                     
200                     Future<RpcResult<UpdateGroupOutput>> resultFromOFLib = getMessageService()
201                             .groupMod(ofGroupModInput.build(), getCookie());
202                     result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
203                     
204                     OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(), 
205                             createGroupAddedNotification(xId, getInput()));
206                 }
207
208                 return result;
209             }
210         };
211         
212         return task;
213     }
214     
215
216     /**
217      * @param xId
218      * @param input
219      * @return
220      */
221     protected static NotificationComposer<GroupAdded> createGroupAddedNotification(
222             final Long xId, final AddGroupInput input) {
223         return new NotificationComposer<GroupAdded>() {
224             @Override
225             public GroupAdded compose() {
226                 GroupAddedBuilder groupMod = new GroupAddedBuilder((Group) input);
227                 groupMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
228                 groupMod.setGroupRef(input.getGroupRef());
229                 return groupMod.build();
230             }
231         };
232     }
233
234     /**
235      * @param taskContext
236      * @param input
237      * @param cookie
238      * @return update meter task
239      */
240     public static OFRpcTask<AddMeterInput, RpcResult<UpdateMeterOutput>> createAddMeterTask(
241             OFRpcTaskContext taskContext, AddMeterInput input,
242             SwitchConnectionDistinguisher cookie) {
243         OFRpcTask<AddMeterInput, RpcResult<UpdateMeterOutput>> task = 
244                 new OFRpcTask<AddMeterInput, RpcResult<UpdateMeterOutput>>(taskContext, cookie, input) {
245             
246             @Override
247             public ListenableFuture<RpcResult<UpdateMeterOutput>> call() {
248                 ListenableFuture<RpcResult<UpdateMeterOutput>> result = SettableFuture.create();
249                 
250                 Collection<RpcError> barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), getInput().isBarrier(), getCookie());
251                 if (!barrierErrors.isEmpty()) {
252                     OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture<RpcResult<UpdateMeterOutput>>) result), barrierErrors);
253                 } else {
254                     // Convert the AddGroupInput to GroupModInput
255                     MeterModInputBuilder ofMeterModInput = MeterConvertor.toMeterModInput(getInput(), getVersion());
256                     final Long xId = getSession().getNextXid();
257                     ofMeterModInput.setXid(xId);
258                     
259                     Future<RpcResult<UpdateMeterOutput>> resultFromOFLib = getMessageService()
260                             .meterMod(ofMeterModInput.build(), getCookie());
261                     result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
262                     
263                     OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(), 
264                             createMeterAddedNotification(xId, getInput()));
265                 }
266
267                 return result;
268             }
269         };
270         
271         return task;
272         
273     }
274
275     /**
276      * @param xId
277      * @param input
278      * @return
279      */
280     protected static NotificationComposer<MeterAdded> createMeterAddedNotification(
281             final Long xId, final AddMeterInput input) {
282         return new NotificationComposer<MeterAdded>() {
283             @Override
284             public MeterAdded compose() {
285                 MeterAddedBuilder meterMod = new MeterAddedBuilder((Meter) input);
286                 meterMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
287                 meterMod.setMeterRef(input.getMeterRef());
288                 return meterMod.build();
289             }
290         };
291     }
292     
293     /**
294      * @param taskContext 
295      * @param input 
296      * @param cookie 
297      * @return UpdateFlow task
298      */
299     public static OFRpcTask<UpdateGroupInput, RpcResult<UpdateGroupOutput>> createUpdateGroupTask(
300             OFRpcTaskContext taskContext, UpdateGroupInput input, 
301             SwitchConnectionDistinguisher cookie) {
302         OFRpcTask<UpdateGroupInput, RpcResult<UpdateGroupOutput>> task = 
303                 new OFRpcTask<UpdateGroupInput, RpcResult<UpdateGroupOutput>>(taskContext, cookie, input) {
304             
305             @Override
306             public ListenableFuture<RpcResult<UpdateGroupOutput>> call() {
307                 ListenableFuture<RpcResult<UpdateGroupOutput>> result = null;
308                 Collection<RpcError> barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), 
309                         getInput().getUpdatedGroup().isBarrier(), getCookie());
310                 if (!barrierErrors.isEmpty()) {
311                     OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture<RpcResult<UpdateGroupOutput>>) result), barrierErrors);
312                 } else {
313                     // Convert the UpdateGroupInput to GroupModInput
314                     GroupModInputBuilder ofGroupModInput = GroupConvertor.toGroupModInput(
315                             getInput().getUpdatedGroup(), getVersion(),
316                             getSession().getFeatures().getDatapathId());
317                     final Long xId = getSession().getNextXid();
318                     ofGroupModInput.setXid(xId);
319     
320                     Future<RpcResult<UpdateGroupOutput>> resultFromOFLib = 
321                             getMessageService().groupMod(ofGroupModInput.build(), getCookie());
322                     result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
323                     
324                     OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(), 
325                             createGroupUpdatedNotification(xId, getInput()));
326                 }
327                 return result;
328             }
329         };
330         return task;
331     }
332     
333     /**
334      * @param xId
335      * @param input
336      * @return
337      */
338     protected static NotificationComposer<GroupUpdated> createGroupUpdatedNotification(
339             final Long xId, final UpdateGroupInput input) {
340         return new NotificationComposer<GroupUpdated>() {
341             @Override
342             public GroupUpdated compose() {
343                 GroupUpdatedBuilder groupMod = new GroupUpdatedBuilder(input.getUpdatedGroup());
344                 groupMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
345                 groupMod.setGroupRef(input.getGroupRef());
346                 return groupMod.build();
347             }
348         };
349     }
350
351     /**
352      * @param taskContext 
353      * @param input
354      * @param cookie
355      * @return update meter task 
356      */
357     public static OFRpcTask<UpdateMeterInput, RpcResult<UpdateMeterOutput>> createUpdateMeterTask(
358             OFRpcTaskContext taskContext, UpdateMeterInput input,
359             SwitchConnectionDistinguisher cookie) {
360         OFRpcTask<UpdateMeterInput, RpcResult<UpdateMeterOutput>> task = 
361                 new OFRpcTask<UpdateMeterInput, RpcResult<UpdateMeterOutput>>(taskContext, cookie, input) {
362             
363             @Override
364             public ListenableFuture<RpcResult<UpdateMeterOutput>> call() {
365                 ListenableFuture<RpcResult<UpdateMeterOutput>> result = null;
366                 Collection<RpcError> barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), 
367                         getInput().getUpdatedMeter().isBarrier(), getCookie());
368                 if (!barrierErrors.isEmpty()) {
369                     OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture<RpcResult<UpdateMeterOutput>>) result), barrierErrors);
370                 } else {
371                     // Convert the UpdateMeterInput to MeterModInput
372                     MeterModInputBuilder ofMeterModInput = MeterConvertor.toMeterModInput(
373                             getInput().getUpdatedMeter(), getVersion());
374                     final Long xId = getSession().getNextXid();
375                     ofMeterModInput.setXid(xId);
376     
377                     Future<RpcResult<UpdateMeterOutput>> resultFromOFLib = 
378                             getMessageService().meterMod(ofMeterModInput.build(), getCookie());
379                     result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
380                     
381                     OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(), 
382                             createMeterUpdatedNotification(xId, getInput()));
383                 }
384                 return result;
385             }
386         };
387         return task;
388     }
389     
390     /**
391      * @param xId
392      * @param input
393      * @return
394      */
395     protected static NotificationComposer<MeterUpdated> createMeterUpdatedNotification(
396             final Long xId, final UpdateMeterInput input) {
397         return new NotificationComposer<MeterUpdated>() {
398             @Override
399             public MeterUpdated compose() {
400                 MeterUpdatedBuilder meterMod = new MeterUpdatedBuilder(input.getUpdatedMeter());
401                 meterMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
402                 meterMod.setMeterRef(input.getMeterRef());
403                 return meterMod.build();
404             }
405         };
406     }
407 }