2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.openflow.md.core.sal;
10 import com.google.common.util.concurrent.JdkFutureAdapters;
11 import com.google.common.util.concurrent.ListenableFuture;
12 import com.google.common.util.concurrent.SettableFuture;
13 import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
14 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.FlowConvertor;
15 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.GroupConvertor;
16 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.MeterConvertor;
17 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAddedBuilder;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdated;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdatedBuilder;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInput;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInput;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupAdded;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupAddedBuilder;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupUpdated;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupUpdatedBuilder;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupInput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.Group;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterInput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterAdded;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterAddedBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterUpdated;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterUpdatedBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterInput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.Meter;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInputBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GroupModInputBuilder;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModInputBuilder;
47 import org.opendaylight.yangtools.yang.common.RpcError;
48 import org.opendaylight.yangtools.yang.common.RpcResult;
50 import java.math.BigInteger;
51 import java.util.Collection;
52 import java.util.concurrent.Future;
57 public abstract class OFRpcTaskFactory {
63 * @return UpdateFlow task
65 public static OFRpcTask<AddFlowInput, RpcResult<UpdateFlowOutput>> createAddFlowTask(
66 OFRpcTaskContext taskContext, AddFlowInput input,
67 SwitchConnectionDistinguisher cookie) {
68 OFRpcTask<AddFlowInput, RpcResult<UpdateFlowOutput>> task =
69 new OFRpcTask<AddFlowInput, RpcResult<UpdateFlowOutput>>(taskContext, cookie, input) {
72 public ListenableFuture<RpcResult<UpdateFlowOutput>> call() {
73 ListenableFuture<RpcResult<UpdateFlowOutput>> result = SettableFuture.create();
75 Collection<RpcError> barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), getInput().isBarrier(), getCookie());
76 if (!barrierErrors.isEmpty()) {
77 OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture<RpcResult<UpdateFlowOutput>>) result), barrierErrors);
79 // Convert the AddFlowInput to FlowModInput
80 FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(getInput(),
81 getVersion(), getSession().getFeatures().getDatapathId());
82 final Long xId = getSession().getNextXid();
83 ofFlowModInput.setXid(xId);
85 Future<RpcResult<UpdateFlowOutput>> resultFromOFLib =
86 getMessageService().flowMod(ofFlowModInput.build(), getCookie());
87 result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
89 OFRpcTaskUtil.hookFutureNotification(this, result,
90 getRpcNotificationProviderService(), createFlowAddedNotification(xId, getInput()));
104 protected static NotificationComposer<FlowAdded> createFlowAddedNotification(
105 final Long xId, final AddFlowInput input) {
106 return new NotificationComposer<FlowAdded>() {
108 public FlowAdded compose() {
109 FlowAddedBuilder newFlow = new FlowAddedBuilder((Flow) input);
110 newFlow.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
111 newFlow.setFlowRef(input.getFlowRef());
112 return newFlow.build();
121 * @return UpdateFlow task
123 public static OFRpcTask<UpdateFlowInput, RpcResult<UpdateFlowOutput>> createUpdateFlowTask(
124 OFRpcTaskContext taskContext, UpdateFlowInput input,
125 SwitchConnectionDistinguisher cookie) {
127 OFRpcTask<UpdateFlowInput, RpcResult<UpdateFlowOutput>> task =
128 new OFRpcTask<UpdateFlowInput, RpcResult<UpdateFlowOutput>>(taskContext, cookie, input) {
131 public ListenableFuture<RpcResult<UpdateFlowOutput>> call() {
132 ListenableFuture<RpcResult<UpdateFlowOutput>> result = null;
133 Collection<RpcError> barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(),
134 getInput().getUpdatedFlow().isBarrier(), getCookie());
135 if (!barrierErrors.isEmpty()) {
136 OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture<RpcResult<UpdateFlowOutput>>) result), barrierErrors);
140 Long xId = getSession().getNextXid();
141 boolean updatedFlow = (getInput().getUpdatedFlow().getMatch().equals(getInput().getOriginalFlow().getMatch())) &&
142 (getInput().getUpdatedFlow().getPriority().equals(getInput().getOriginalFlow().getPriority()));
145 if (updatedFlow == false) {
146 // if neither match nor priority matches, then we would need to remove the flow and add it
148 RemoveFlowInputBuilder removeflow = new RemoveFlowInputBuilder(getInput().getOriginalFlow());
149 FlowModInputBuilder ofFlowRemoveInput = FlowConvertor.toFlowModInput(removeflow.build(),
150 getVersion(),getSession().getFeatures().getDatapathId());
151 ofFlowRemoveInput.setXid(xId);
152 Future<RpcResult<UpdateFlowOutput>> resultFromOFLibRemove = getMessageService().
153 flowMod(ofFlowRemoveInput.build(), getCookie());
155 AddFlowInputBuilder addFlow = new AddFlowInputBuilder(getInput().getUpdatedFlow());
156 flow = addFlow.build();
159 flow = getInput().getUpdatedFlow();
162 FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(flow, getVersion(),
163 getSession().getFeatures().getDatapathId());
165 ofFlowModInput.setXid(xId);
167 Future<RpcResult<UpdateFlowOutput>> resultFromOFLib =
168 getMessageService().flowMod(ofFlowModInput.build(), getCookie());
169 result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
171 OFRpcTaskUtil.hookFutureNotification(this, result,
172 getRpcNotificationProviderService(), createFlowUpdatedNotification(xId, getInput()));
185 protected static NotificationComposer<FlowUpdated> createFlowUpdatedNotification(
186 final Long xId, final UpdateFlowInput input) {
187 return new NotificationComposer<FlowUpdated>() {
189 public FlowUpdated compose() {
190 FlowUpdatedBuilder updFlow = new FlowUpdatedBuilder(input.getUpdatedFlow());
191 updFlow.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
192 updFlow.setFlowRef(input.getFlowRef());
193 return updFlow.build();
202 * @return update group task
204 public static OFRpcTask<AddGroupInput, RpcResult<UpdateGroupOutput>> createAddGroupTask(
205 final OFRpcTaskContext taskContext, AddGroupInput input,
206 final SwitchConnectionDistinguisher cookie) {
207 OFRpcTask<AddGroupInput, RpcResult<UpdateGroupOutput>> task =
208 new OFRpcTask<AddGroupInput, RpcResult<UpdateGroupOutput>>(taskContext, cookie, input) {
211 public ListenableFuture<RpcResult<UpdateGroupOutput>> call() {
212 ListenableFuture<RpcResult<UpdateGroupOutput>> result = SettableFuture.create();
214 Collection<RpcError> barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), getInput().isBarrier(), getCookie());
215 if (!barrierErrors.isEmpty()) {
216 OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture<RpcResult<UpdateGroupOutput>>) result), barrierErrors);
218 // Convert the AddGroupInput to GroupModInput
219 GroupModInputBuilder ofGroupModInput = GroupConvertor.toGroupModInput(getInput(),
220 getVersion(), getSession().getFeatures().getDatapathId());
221 final Long xId = getSession().getNextXid();
222 ofGroupModInput.setXid(xId);
224 Future<RpcResult<UpdateGroupOutput>> resultFromOFLib = getMessageService()
225 .groupMod(ofGroupModInput.build(), getCookie());
226 result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
228 OFRpcTaskUtil.hookFutureNotification(this, result,
229 getRpcNotificationProviderService(), createGroupAddedNotification(xId, getInput()));
245 protected static NotificationComposer<GroupAdded> createGroupAddedNotification(
246 final Long xId, final AddGroupInput input) {
247 return new NotificationComposer<GroupAdded>() {
249 public GroupAdded compose() {
250 GroupAddedBuilder groupMod = new GroupAddedBuilder((Group) input);
251 groupMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
252 groupMod.setGroupRef(input.getGroupRef());
253 return groupMod.build();
262 * @return update meter task
264 public static OFRpcTask<AddMeterInput, RpcResult<UpdateMeterOutput>> createAddMeterTask(
265 OFRpcTaskContext taskContext, AddMeterInput input,
266 SwitchConnectionDistinguisher cookie) {
267 OFRpcTask<AddMeterInput, RpcResult<UpdateMeterOutput>> task =
268 new OFRpcTask<AddMeterInput, RpcResult<UpdateMeterOutput>>(taskContext, cookie, input) {
271 public ListenableFuture<RpcResult<UpdateMeterOutput>> call() {
272 ListenableFuture<RpcResult<UpdateMeterOutput>> result = SettableFuture.create();
274 Collection<RpcError> barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), getInput().isBarrier(), getCookie());
275 if (!barrierErrors.isEmpty()) {
276 OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture<RpcResult<UpdateMeterOutput>>) result), barrierErrors);
278 // Convert the AddGroupInput to GroupModInput
279 MeterModInputBuilder ofMeterModInput = MeterConvertor.toMeterModInput(getInput(), getVersion());
280 final Long xId = getSession().getNextXid();
281 ofMeterModInput.setXid(xId);
283 Future<RpcResult<UpdateMeterOutput>> resultFromOFLib = getMessageService()
284 .meterMod(ofMeterModInput.build(), getCookie());
285 result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
287 OFRpcTaskUtil.hookFutureNotification(this, result,
288 getRpcNotificationProviderService(), createMeterAddedNotification(xId, getInput()));
304 protected static NotificationComposer<MeterAdded> createMeterAddedNotification(
305 final Long xId, final AddMeterInput input) {
306 return new NotificationComposer<MeterAdded>() {
308 public MeterAdded compose() {
309 MeterAddedBuilder meterMod = new MeterAddedBuilder((Meter) input);
310 meterMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
311 meterMod.setMeterRef(input.getMeterRef());
312 return meterMod.build();
321 * @return UpdateFlow task
323 public static OFRpcTask<UpdateGroupInput, RpcResult<UpdateGroupOutput>> createUpdateGroupTask(
324 OFRpcTaskContext taskContext, UpdateGroupInput input,
325 SwitchConnectionDistinguisher cookie) {
326 OFRpcTask<UpdateGroupInput, RpcResult<UpdateGroupOutput>> task =
327 new OFRpcTask<UpdateGroupInput, RpcResult<UpdateGroupOutput>>(taskContext, cookie, input) {
330 public ListenableFuture<RpcResult<UpdateGroupOutput>> call() {
331 ListenableFuture<RpcResult<UpdateGroupOutput>> result = null;
332 Collection<RpcError> barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(),
333 getInput().getUpdatedGroup().isBarrier(), getCookie());
334 if (!barrierErrors.isEmpty()) {
335 OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture<RpcResult<UpdateGroupOutput>>) result), barrierErrors);
337 // Convert the UpdateGroupInput to GroupModInput
338 GroupModInputBuilder ofGroupModInput = GroupConvertor.toGroupModInput(
339 getInput().getUpdatedGroup(), getVersion(),
340 getSession().getFeatures().getDatapathId());
341 final Long xId = getSession().getNextXid();
342 ofGroupModInput.setXid(xId);
344 Future<RpcResult<UpdateGroupOutput>> resultFromOFLib =
345 getMessageService().groupMod(ofGroupModInput.build(), getCookie());
346 result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
348 OFRpcTaskUtil.hookFutureNotification(this, result,
349 getRpcNotificationProviderService(), createGroupUpdatedNotification(xId, getInput()));
362 protected static NotificationComposer<GroupUpdated> createGroupUpdatedNotification(
363 final Long xId, final UpdateGroupInput input) {
364 return new NotificationComposer<GroupUpdated>() {
366 public GroupUpdated compose() {
367 GroupUpdatedBuilder groupMod = new GroupUpdatedBuilder(input.getUpdatedGroup());
368 groupMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
369 groupMod.setGroupRef(input.getGroupRef());
370 return groupMod.build();
379 * @return update meter task
381 public static OFRpcTask<UpdateMeterInput, RpcResult<UpdateMeterOutput>> createUpdateMeterTask(
382 OFRpcTaskContext taskContext, UpdateMeterInput input,
383 SwitchConnectionDistinguisher cookie) {
384 OFRpcTask<UpdateMeterInput, RpcResult<UpdateMeterOutput>> task =
385 new OFRpcTask<UpdateMeterInput, RpcResult<UpdateMeterOutput>>(taskContext, cookie, input) {
388 public ListenableFuture<RpcResult<UpdateMeterOutput>> call() {
389 ListenableFuture<RpcResult<UpdateMeterOutput>> result = null;
390 Collection<RpcError> barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(),
391 getInput().getUpdatedMeter().isBarrier(), getCookie());
392 if (!barrierErrors.isEmpty()) {
393 OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture<RpcResult<UpdateMeterOutput>>) result), barrierErrors);
395 // Convert the UpdateMeterInput to MeterModInput
396 MeterModInputBuilder ofMeterModInput = MeterConvertor.toMeterModInput(
397 getInput().getUpdatedMeter(), getVersion());
398 final Long xId = getSession().getNextXid();
399 ofMeterModInput.setXid(xId);
401 Future<RpcResult<UpdateMeterOutput>> resultFromOFLib =
402 getMessageService().meterMod(ofMeterModInput.build(), getCookie());
403 result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
405 OFRpcTaskUtil.hookFutureNotification(this, result,
406 getRpcNotificationProviderService(), createMeterUpdatedNotification(xId, getInput()));
419 protected static NotificationComposer<MeterUpdated> createMeterUpdatedNotification(
420 final Long xId, final UpdateMeterInput input) {
421 return new NotificationComposer<MeterUpdated>() {
423 public MeterUpdated compose() {
424 MeterUpdatedBuilder meterMod = new MeterUpdatedBuilder(input.getUpdatedMeter());
425 meterMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
426 meterMod.setMeterRef(input.getMeterRef());
427 return meterMod.build();