SalFlowServiceImpl - implementing methods
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / services / CommonService.java
index 9fe7f829f58cb449e94c96563db1cd292d2a20e6..303f3e4a56ae4025d6009a49012dd42b3a3ceab7 100644 (file)
  */
 package org.opendaylight.openflowplugin.impl.services;
 
-import com.google.common.base.Function;
-import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
-import org.opendaylight.openflowplugin.api.OFConstants;
-import org.opendaylight.openflowplugin.api.openflow.device.Xid;
-import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
-import org.opendaylight.openflowplugin.api.openflow.md.core.sal.NotificationComposer;
-import org.opendaylight.openflowplugin.api.openflow.md.core.session.IMessageDispatchService;
+import java.math.BigInteger;
+import java.util.concurrent.Future;
+import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
+import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
+import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAddedBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemovedBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdated;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdatedBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionAware;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
-import org.opendaylight.yangtools.yang.binding.DataContainer;
-import org.opendaylight.yangtools.yang.binding.Notification;
-import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FeaturesReply;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.slf4j.Logger;
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Future;
 
 public class CommonService {
+    private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(CommonService.class);
+    private static final long WAIT_TIME = 2000;
+    protected final static Future<RpcResult<Void>> ERROR_RPC_RESULT = Futures.immediateFuture(RpcResultBuilder
+            .<Void> failed().withError(ErrorType.APPLICATION, "", "Request quota exceeded.").build());
+    protected static final BigInteger PRIMARY_CONNECTION = new BigInteger("0");
+
     // protected OFRpcTaskContext rpcTaskContext;
     protected short version;
     protected BigInteger datapathId;
     protected RpcContext rpcContext;
-    protected SwitchConnectionDistinguisher cookie;
-    // TODO should come from deviceContext
-    protected IMessageDispatchService messageService;
-    protected Xid xid;
-    protected Boolean isBarrier;
-
-    protected NotificationProviderService notificationProviderService;
-
-    protected final static Future<RpcResult<Void>> errorRpcResult = Futures.immediateFuture(RpcResultBuilder
-            .<Void>failed().withError(ErrorType.APPLICATION, "", "Request quota exceeded.").build());
-
-    private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(CommonService.class);
+    protected DeviceContext deviceContext;
+    private ConnectionAdapter primaryConnectionAdapter;
 
     public CommonService() {
-
     }
 
-    /**
-     * @param xid
-     */
-    public CommonService(final RpcContext rpcContext, final short version, final BigInteger datapathId,
-                         final IMessageDispatchService service, final Xid xid, final SwitchConnectionDistinguisher cookie) {
+    public CommonService(final RpcContext rpcContext) {
         this.rpcContext = rpcContext;
-        this.version = version;
-        this.datapathId = datapathId;
-        this.messageService = service;
-        this.xid = xid;
-        this.cookie = cookie;
-    }
-
-    /**
-     * @param originalResult
-     * @param notificationProviderService
-     * @param notificationComposer        lazy notification composer
-     */
-    protected <R extends RpcResult<? extends TransactionAware>, N extends Notification, I extends DataContainer> void hookFutureNotification(
-            final ListenableFuture<R> originalResult, final NotificationProviderService notificationProviderService,
-            final NotificationComposer<N> notificationComposer) {
 
-        class FutureCallbackImpl implements FutureCallback<R> {
-            @Override
-            public void onSuccess(final R result) {
-                if (null == notificationProviderService) {
-                    LOG.warn("onSuccess(): notificationServiceProvider is null, could not publish result {}", result);
-                } else if (notificationComposer == null) {
-                    LOG.warn("onSuccess(): notificationComposer is null, could not publish result {}", result);
-                } else if (result == null) {
-                    LOG.warn("onSuccess(): result is null, could not publish result {}", result);
-                } else if (result.getResult() == null) {
-                    LOG.warn("onSuccess(): result.getResult() is null, could not publish result {}", result);
-                } else if (result.getResult().getTransactionId() == null) {
-                    LOG.warn("onSuccess(): result.getResult().getTransactionId() is null, could not publish result {}",
-                            result);
-                } else {
-                    notificationProviderService.publish(notificationComposer.compose(result.getResult()
-                            .getTransactionId()));
-                    // TODO: solve without task
-                    // task.getTaskContext().getMessageSpy().spyMessage(
-                    // task.getInput(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMITTED_SUCCESS);
-                }
-            }
-
-            @Override
-            public void onFailure(final Throwable t) {
-                // TODO: good place to notify MD-SAL about errors
-                // TODO: solve without task
-                // task.getTaskContext().getMessageSpy().spyMessage(
-                // task.getInput(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMITTED_FAILURE);
-            }
-        }
-
-        Futures.addCallback(originalResult, new FutureCallbackImpl());
-    }
-
-    /**
-     * @param input
-     * @return
-     */
-    protected NotificationComposer<FlowAdded> createFlowAddedNotification(final AddFlowInput input) {
-        return new NotificationComposer<FlowAdded>() {
-            @Override
-            public FlowAdded compose(final TransactionId tXid) {
-                final FlowAddedBuilder newFlow = new FlowAddedBuilder((Flow) input);
-                newFlow.setTransactionId(tXid);
-                newFlow.setFlowRef(input.getFlowRef());
-                return newFlow.build();
-            }
-        };
-    }
-
-    protected NotificationComposer<FlowUpdated> createFlowUpdatedNotification(final UpdateFlowInput input) {
-        return new NotificationComposer<FlowUpdated>() {
-            @Override
-            public FlowUpdated compose(final TransactionId tXid) {
-                final FlowUpdatedBuilder updFlow = new FlowUpdatedBuilder(input.getUpdatedFlow());
-                updFlow.setTransactionId(tXid);
-                updFlow.setFlowRef(input.getFlowRef());
-                return updFlow.build();
-            }
-        };
+        this.deviceContext = rpcContext.getDeviceContext();
+        final FeaturesReply features = this.deviceContext.getPrimaryConnectionContext().getFeatures();
+        this.datapathId = features.getDatapathId();
+        this.version = features.getVersion();
+        this.primaryConnectionAdapter = deviceContext.getPrimaryConnectionContext().getConnectionAdapter();
     }
 
-    protected static NotificationComposer<FlowRemoved> createFlowRemovedNotification(final RemoveFlowInput input) {
-        return new NotificationComposer<FlowRemoved>() {
-            @Override
-            public FlowRemoved compose(final TransactionId tXid) {
-                final FlowRemovedBuilder removedFlow = new FlowRemovedBuilder((Flow) input);
-                removedFlow.setTransactionId(tXid);
-                removedFlow.setFlowRef(input.getFlowRef());
-                return removedFlow.build();
-            }
-        };
+    protected long getWaitTime() {
+        return WAIT_TIME;
     }
 
-    /**
-     * @param originalInput
-     * @return
-     */
-    protected static <T extends TransactionAware> Function<RpcResult<BarrierOutput>, RpcResult<T>> transformBarrierToTransactionAware(
-            final RpcResult<T> originalInput, final BarrierInput barrierInput) {
-
-        class FunctionImpl implements Function<RpcResult<BarrierOutput>, RpcResult<T>> {
-
-            @Override
-            public RpcResult<T> apply(final RpcResult<BarrierOutput> barrierResult) {
-                RpcResultBuilder<T> rpcBuilder = null;
-                if (barrierResult.isSuccessful()) {
-                    rpcBuilder = RpcResultBuilder.<T>success();
-                } else {
-                    rpcBuilder = RpcResultBuilder.<T>failed();
-                    final RpcError rpcError = RpcResultBuilder
-                            .newWarning(
-                                    ErrorType.RPC,
-                                    OFConstants.ERROR_TAG_TIMEOUT,
-                                    "barrier sending failed",
-                                    OFConstants.APPLICATION_TAG,
-                                    "switch failed to respond on barrier request, barrier.xid = "
-                                            + barrierInput.getXid(), null);
-                    final List<RpcError> chainedErrors = new ArrayList<>();
-                    chainedErrors.add(rpcError);
-                    chainedErrors.addAll(barrierResult.getErrors());
-                    rpcBuilder.withRpcErrors(chainedErrors);
-                }
-
-                rpcBuilder.withResult(originalInput.getResult());
+    protected ConnectionAdapter provideConnectionAdapter(final BigInteger connectionID) {
+        if (connectionID == null) {
+            return primaryConnectionAdapter;
+        }
+        if (connectionID.equals(PRIMARY_CONNECTION)) {
+            return primaryConnectionAdapter;
+        }
 
-                return rpcBuilder.build();
-            }
+        // TODO uncomment when getAuxiali.... will be merged to APIs
+        // final ConnectionContext auxiliaryConnectionContext =
+        // deviceContext.getAuxiliaryConnectionContext(connectionID);
+        final ConnectionContext auxiliaryConnectionContext = null;
+        if (auxiliaryConnectionContext != null) {
+            return auxiliaryConnectionContext.getConnectionAdapter();
         }
 
-        return new FunctionImpl();
+        return primaryConnectionAdapter;
     }
 
 }