85169406cac456a2916f2c04ec1485731b51028d
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / core / sal / OFRpcTaskFactory.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.openflow.md.core.sal;
9
10 import com.google.common.util.concurrent.JdkFutureAdapters;
11 import com.google.common.util.concurrent.ListenableFuture;
12 import com.google.common.util.concurrent.SettableFuture;
13 import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
14 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.FlowConvertor;
15 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.GroupConvertor;
16 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.MeterConvertor;
17 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAddedBuilder;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdated;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdatedBuilder;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInput;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInput;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupAdded;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupAddedBuilder;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupUpdated;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupUpdatedBuilder;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupInput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.Group;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterInput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterAdded;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterAddedBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterUpdated;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterUpdatedBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterInput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.Meter;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInputBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GroupModInputBuilder;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModInputBuilder;
47 import org.opendaylight.yangtools.yang.common.RpcError;
48 import org.opendaylight.yangtools.yang.common.RpcResult;
49
50 import java.math.BigInteger;
51 import java.util.Collection;
52 import java.util.concurrent.Future;
53
54 /**
55  *
56  */
57 public abstract class OFRpcTaskFactory {
58
59     /**
60      * @param taskContext 
61      * @param input 
62      * @param cookie 
63      * @return UpdateFlow task
64      */
65     public static OFRpcTask<AddFlowInput, RpcResult<UpdateFlowOutput>> createAddFlowTask(
66             OFRpcTaskContext taskContext, AddFlowInput input, 
67             SwitchConnectionDistinguisher cookie) {
68         OFRpcTask<AddFlowInput, RpcResult<UpdateFlowOutput>> task = 
69                 new OFRpcTask<AddFlowInput, RpcResult<UpdateFlowOutput>>(taskContext, cookie, input) {
70             
71             @Override
72             public ListenableFuture<RpcResult<UpdateFlowOutput>> call() {
73                 ListenableFuture<RpcResult<UpdateFlowOutput>> result = SettableFuture.create();
74                 
75                 Collection<RpcError> barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), getInput().isBarrier(), getCookie());
76                 if (!barrierErrors.isEmpty()) {
77                     OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture<RpcResult<UpdateFlowOutput>>) result), barrierErrors);
78                 } else {
79                     // Convert the AddFlowInput to FlowModInput
80                     FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(getInput(), 
81                             getVersion(), getSession().getFeatures().getDatapathId());
82                     final Long xId = getSession().getNextXid();
83                     ofFlowModInput.setXid(xId);
84                     
85                     Future<RpcResult<UpdateFlowOutput>> resultFromOFLib = 
86                             getMessageService().flowMod(ofFlowModInput.build(), getCookie());
87                     result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
88                     
89                     OFRpcTaskUtil.hookFutureNotification(this, result, 
90                             getRpcNotificationProviderService(), createFlowAddedNotification(xId, getInput()));
91                 }
92
93                 return result;
94             }
95         };
96         
97         return task;
98     }
99
100     /**
101      * @param xId
102      * @return
103      */
104     protected static NotificationComposer<FlowAdded> createFlowAddedNotification(
105             final Long xId, final AddFlowInput input) {
106         return new NotificationComposer<FlowAdded>() {
107             @Override
108             public FlowAdded compose() {
109                 FlowAddedBuilder newFlow = new FlowAddedBuilder((Flow) input);
110                 newFlow.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
111                 newFlow.setFlowRef(input.getFlowRef());
112                 return newFlow.build();
113             }
114         };
115     }
116
117     /**
118      * @param taskContext 
119      * @param input 
120      * @param cookie 
121      * @return UpdateFlow task
122      */
123     public static OFRpcTask<UpdateFlowInput, RpcResult<UpdateFlowOutput>> createUpdateFlowTask(
124             OFRpcTaskContext taskContext, UpdateFlowInput input, 
125             SwitchConnectionDistinguisher cookie) {
126         
127         OFRpcTask<UpdateFlowInput, RpcResult<UpdateFlowOutput>> task = 
128                 new OFRpcTask<UpdateFlowInput, RpcResult<UpdateFlowOutput>>(taskContext, cookie, input) {
129             
130             @Override
131             public ListenableFuture<RpcResult<UpdateFlowOutput>> call() {
132                 ListenableFuture<RpcResult<UpdateFlowOutput>> result = null;
133                 Collection<RpcError> barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), 
134                         getInput().getUpdatedFlow().isBarrier(), getCookie());
135                 if (!barrierErrors.isEmpty()) {
136                     OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture<RpcResult<UpdateFlowOutput>>) result), barrierErrors);
137
138                 } else {
139                     Flow flow = null;
140                     Long xId = getSession().getNextXid();
141                     boolean updatedFlow = (getInput().getUpdatedFlow().getMatch().equals(getInput().getOriginalFlow().getMatch())) &&
142                             (getInput().getUpdatedFlow().getPriority().equals(getInput().getOriginalFlow().getPriority()));
143
144
145                     if (updatedFlow == false) {
146                         // if neither match nor priority matches, then we would need to remove the flow and add it
147                         //remove flow
148                         RemoveFlowInputBuilder removeflow = new RemoveFlowInputBuilder(getInput().getOriginalFlow());
149                         FlowModInputBuilder ofFlowRemoveInput = FlowConvertor.toFlowModInput(removeflow.build(),
150                                 getVersion(),getSession().getFeatures().getDatapathId());
151                         ofFlowRemoveInput.setXid(xId);
152                         Future<RpcResult<UpdateFlowOutput>> resultFromOFLibRemove = getMessageService().
153                                 flowMod(ofFlowRemoveInput.build(), getCookie());
154                         //add flow
155                         AddFlowInputBuilder addFlow = new AddFlowInputBuilder(getInput().getUpdatedFlow());
156                         flow = addFlow.build();
157                     } else {
158                         //update flow
159                         flow = getInput().getUpdatedFlow();
160                     }
161
162                     FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(flow, getVersion(),
163                             getSession().getFeatures().getDatapathId());
164
165                     ofFlowModInput.setXid(xId);
166
167                     Future<RpcResult<UpdateFlowOutput>> resultFromOFLib =
168                             getMessageService().flowMod(ofFlowModInput.build(), getCookie());
169                     result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
170
171                     OFRpcTaskUtil.hookFutureNotification(this, result,
172                             getRpcNotificationProviderService(), createFlowUpdatedNotification(xId, getInput()));
173                 }
174                 return result;
175             }
176         };
177         return task;
178     }
179
180     /**
181      * @param xId
182      * @param input
183      * @return
184      */
185     protected static NotificationComposer<FlowUpdated> createFlowUpdatedNotification(
186             final Long xId, final UpdateFlowInput input) {
187         return new NotificationComposer<FlowUpdated>() {
188             @Override
189             public FlowUpdated compose() {
190                 FlowUpdatedBuilder updFlow = new FlowUpdatedBuilder(input.getUpdatedFlow());
191                 updFlow.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
192                 updFlow.setFlowRef(input.getFlowRef());
193                 return updFlow.build();
194             }
195         };
196     }
197     
198     /**
199      * @param taskContext
200      * @param input
201      * @param cookie
202      * @return update group task
203      */
204     public static OFRpcTask<AddGroupInput, RpcResult<UpdateGroupOutput>> createAddGroupTask(
205             final OFRpcTaskContext taskContext, AddGroupInput input, 
206             final SwitchConnectionDistinguisher cookie) {
207         OFRpcTask<AddGroupInput, RpcResult<UpdateGroupOutput>> task = 
208                 new OFRpcTask<AddGroupInput, RpcResult<UpdateGroupOutput>>(taskContext, cookie, input) {
209             
210             @Override
211             public ListenableFuture<RpcResult<UpdateGroupOutput>> call() {
212                 ListenableFuture<RpcResult<UpdateGroupOutput>> result = SettableFuture.create();
213                 
214                 Collection<RpcError> barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), getInput().isBarrier(), getCookie());
215                 if (!barrierErrors.isEmpty()) {
216                     OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture<RpcResult<UpdateGroupOutput>>) result), barrierErrors);
217                 } else {
218                     // Convert the AddGroupInput to GroupModInput
219                     GroupModInputBuilder ofGroupModInput = GroupConvertor.toGroupModInput(getInput(), 
220                             getVersion(), getSession().getFeatures().getDatapathId());
221                     final Long xId = getSession().getNextXid();
222                     ofGroupModInput.setXid(xId);
223                     
224                     Future<RpcResult<UpdateGroupOutput>> resultFromOFLib = getMessageService()
225                             .groupMod(ofGroupModInput.build(), getCookie());
226                     result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
227                     
228                     OFRpcTaskUtil.hookFutureNotification(this, result, 
229                             getRpcNotificationProviderService(), createGroupAddedNotification(xId, getInput()));
230                 }
231
232                 return result;
233             }
234         };
235         
236         return task;
237     }
238     
239
240     /**
241      * @param xId
242      * @param input
243      * @return
244      */
245     protected static NotificationComposer<GroupAdded> createGroupAddedNotification(
246             final Long xId, final AddGroupInput input) {
247         return new NotificationComposer<GroupAdded>() {
248             @Override
249             public GroupAdded compose() {
250                 GroupAddedBuilder groupMod = new GroupAddedBuilder((Group) input);
251                 groupMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
252                 groupMod.setGroupRef(input.getGroupRef());
253                 return groupMod.build();
254             }
255         };
256     }
257
258     /**
259      * @param taskContext
260      * @param input
261      * @param cookie
262      * @return update meter task
263      */
264     public static OFRpcTask<AddMeterInput, RpcResult<UpdateMeterOutput>> createAddMeterTask(
265             OFRpcTaskContext taskContext, AddMeterInput input,
266             SwitchConnectionDistinguisher cookie) {
267         OFRpcTask<AddMeterInput, RpcResult<UpdateMeterOutput>> task = 
268                 new OFRpcTask<AddMeterInput, RpcResult<UpdateMeterOutput>>(taskContext, cookie, input) {
269             
270             @Override
271             public ListenableFuture<RpcResult<UpdateMeterOutput>> call() {
272                 ListenableFuture<RpcResult<UpdateMeterOutput>> result = SettableFuture.create();
273                 
274                 Collection<RpcError> barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), getInput().isBarrier(), getCookie());
275                 if (!barrierErrors.isEmpty()) {
276                     OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture<RpcResult<UpdateMeterOutput>>) result), barrierErrors);
277                 } else {
278                     // Convert the AddGroupInput to GroupModInput
279                     MeterModInputBuilder ofMeterModInput = MeterConvertor.toMeterModInput(getInput(), getVersion());
280                     final Long xId = getSession().getNextXid();
281                     ofMeterModInput.setXid(xId);
282                     
283                     Future<RpcResult<UpdateMeterOutput>> resultFromOFLib = getMessageService()
284                             .meterMod(ofMeterModInput.build(), getCookie());
285                     result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
286                     
287                     OFRpcTaskUtil.hookFutureNotification(this, result, 
288                             getRpcNotificationProviderService(), createMeterAddedNotification(xId, getInput()));
289                 }
290
291                 return result;
292             }
293         };
294         
295         return task;
296         
297     }
298
299     /**
300      * @param xId
301      * @param input
302      * @return
303      */
304     protected static NotificationComposer<MeterAdded> createMeterAddedNotification(
305             final Long xId, final AddMeterInput input) {
306         return new NotificationComposer<MeterAdded>() {
307             @Override
308             public MeterAdded compose() {
309                 MeterAddedBuilder meterMod = new MeterAddedBuilder((Meter) input);
310                 meterMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
311                 meterMod.setMeterRef(input.getMeterRef());
312                 return meterMod.build();
313             }
314         };
315     }
316     
317     /**
318      * @param taskContext 
319      * @param input 
320      * @param cookie 
321      * @return UpdateFlow task
322      */
323     public static OFRpcTask<UpdateGroupInput, RpcResult<UpdateGroupOutput>> createUpdateGroupTask(
324             OFRpcTaskContext taskContext, UpdateGroupInput input, 
325             SwitchConnectionDistinguisher cookie) {
326         OFRpcTask<UpdateGroupInput, RpcResult<UpdateGroupOutput>> task = 
327                 new OFRpcTask<UpdateGroupInput, RpcResult<UpdateGroupOutput>>(taskContext, cookie, input) {
328             
329             @Override
330             public ListenableFuture<RpcResult<UpdateGroupOutput>> call() {
331                 ListenableFuture<RpcResult<UpdateGroupOutput>> result = null;
332                 Collection<RpcError> barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), 
333                         getInput().getUpdatedGroup().isBarrier(), getCookie());
334                 if (!barrierErrors.isEmpty()) {
335                     OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture<RpcResult<UpdateGroupOutput>>) result), barrierErrors);
336                 } else {
337                     // Convert the UpdateGroupInput to GroupModInput
338                     GroupModInputBuilder ofGroupModInput = GroupConvertor.toGroupModInput(
339                             getInput().getUpdatedGroup(), getVersion(),
340                             getSession().getFeatures().getDatapathId());
341                     final Long xId = getSession().getNextXid();
342                     ofGroupModInput.setXid(xId);
343     
344                     Future<RpcResult<UpdateGroupOutput>> resultFromOFLib = 
345                             getMessageService().groupMod(ofGroupModInput.build(), getCookie());
346                     result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
347                     
348                     OFRpcTaskUtil.hookFutureNotification(this, result, 
349                             getRpcNotificationProviderService(), createGroupUpdatedNotification(xId, getInput()));
350                 }
351                 return result;
352             }
353         };
354         return task;
355     }
356     
357     /**
358      * @param xId
359      * @param input
360      * @return
361      */
362     protected static NotificationComposer<GroupUpdated> createGroupUpdatedNotification(
363             final Long xId, final UpdateGroupInput input) {
364         return new NotificationComposer<GroupUpdated>() {
365             @Override
366             public GroupUpdated compose() {
367                 GroupUpdatedBuilder groupMod = new GroupUpdatedBuilder(input.getUpdatedGroup());
368                 groupMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
369                 groupMod.setGroupRef(input.getGroupRef());
370                 return groupMod.build();
371             }
372         };
373     }
374
375     /**
376      * @param taskContext 
377      * @param input
378      * @param cookie
379      * @return update meter task 
380      */
381     public static OFRpcTask<UpdateMeterInput, RpcResult<UpdateMeterOutput>> createUpdateMeterTask(
382             OFRpcTaskContext taskContext, UpdateMeterInput input,
383             SwitchConnectionDistinguisher cookie) {
384         OFRpcTask<UpdateMeterInput, RpcResult<UpdateMeterOutput>> task = 
385                 new OFRpcTask<UpdateMeterInput, RpcResult<UpdateMeterOutput>>(taskContext, cookie, input) {
386             
387             @Override
388             public ListenableFuture<RpcResult<UpdateMeterOutput>> call() {
389                 ListenableFuture<RpcResult<UpdateMeterOutput>> result = null;
390                 Collection<RpcError> barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), 
391                         getInput().getUpdatedMeter().isBarrier(), getCookie());
392                 if (!barrierErrors.isEmpty()) {
393                     OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture<RpcResult<UpdateMeterOutput>>) result), barrierErrors);
394                 } else {
395                     // Convert the UpdateMeterInput to MeterModInput
396                     MeterModInputBuilder ofMeterModInput = MeterConvertor.toMeterModInput(
397                             getInput().getUpdatedMeter(), getVersion());
398                     final Long xId = getSession().getNextXid();
399                     ofMeterModInput.setXid(xId);
400     
401                     Future<RpcResult<UpdateMeterOutput>> resultFromOFLib = 
402                             getMessageService().meterMod(ofMeterModInput.build(), getCookie());
403                     result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
404                     
405                     OFRpcTaskUtil.hookFutureNotification(this, result,
406                             getRpcNotificationProviderService(), createMeterUpdatedNotification(xId, getInput()));
407                 }
408                 return result;
409             }
410         };
411         return task;
412     }
413     
414     /**
415      * @param xId
416      * @param input
417      * @return
418      */
419     protected static NotificationComposer<MeterUpdated> createMeterUpdatedNotification(
420             final Long xId, final UpdateMeterInput input) {
421         return new NotificationComposer<MeterUpdated>() {
422             @Override
423             public MeterUpdated compose() {
424                 MeterUpdatedBuilder meterMod = new MeterUpdatedBuilder(input.getUpdatedMeter());
425                 meterMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
426                 meterMod.setMeterRef(input.getMeterRef());
427                 return meterMod.build();
428             }
429         };
430     }
431 }