Refactor of PDP Agent code. 33/19733/2
authorSteven Pisarski <s.pisarski@cablelabs.com>
Wed, 6 May 2015 17:58:57 +0000 (11:58 -0600)
committerThomas Kee <xsited@yahoo.com>
Fri, 8 May 2015 19:52:03 +0000 (12:52 -0700)
Moved shared functionality up the the super COPSPdpAgent. Clarified purpose of the class by removing the extension of the Thread class as the agent is responsible for creating the persistent connection via the COPSPdpConnection class not itself. It appears that the class was originally meant handle COPS messages at one point but the run() method or extension of Thread was never removed after adding the connection object.

Change-Id: I5fd9b5dd914cf902c8726821253491f316e7d93f
Signed-off-by: Steven Pisarski <s.pisarski@cablelabs.com>
packetcable-driver/src/main/java/org/pcmm/PCMMPdpAgent.java
packetcable-driver/src/main/java/org/umu/cops/ospdp/COPSPdpOSAgent.java
packetcable-driver/src/main/java/org/umu/cops/prpdp/COPSPdpAgent.java
packetcable-policy-server/src/main/java/org/opendaylight/controller/packetcable/provider/PCMMService.java

index 8d840266c6dfdce4a16c131a448b8ccc6eb76aa9..1cbe6d244a8a39e6f9488075087bc2a08a635126 100644 (file)
@@ -4,17 +4,13 @@
 
 package org.pcmm;
 
-import org.pcmm.objects.MMVersionInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.umu.cops.prpdp.COPSPdpAgent;
+import org.umu.cops.prpdp.COPSPdpConnection;
 import org.umu.cops.prpdp.COPSPdpException;
-import org.umu.cops.stack.*;
-import org.umu.cops.stack.COPSError.ErrorTypes;
-import org.umu.cops.stack.COPSHeader.OPCode;
+import org.umu.cops.stack.COPSHandle;
 
-import java.io.IOException;
-import java.net.InetAddress;
 import java.net.Socket;
 
 /**
@@ -27,27 +23,10 @@ public class PCMMPdpAgent extends COPSPdpAgent {
     /** Well-known port for PCMM */
     public static final int WELL_KNOWN_PDP_PORT = 3918;
 
-    /**
-     * PEP host name
-     */
-    private final String psHost;
-
-    /**
-     * PEP port
-     */
-    private final int psPort;
-
     /**
      * Policy data processing object
      */
-    private final PCMMPdpDataProcess _process;
-
-    // Next two attributes are initialized when connected
-    /**
-     * The Socket connection to the PEP
-     */
-    private transient Socket socket;
-    private transient COPSHandle _handle;
+    private final PCMMPdpDataProcess _thisProcess;
 
     /**
      * Creates a PDP Agent
@@ -57,178 +36,18 @@ public class PCMMPdpAgent extends COPSPdpAgent {
      * @param psPort - Port to connect to
      * @param process - Object to perform policy data processing
      */
-    public PCMMPdpAgent(final short clientType, final String psHost, final int psPort,
+    public PCMMPdpAgent(final String psHost, final int psPort, final short clientType,
                         final PCMMPdpDataProcess process) {
-        super(psPort, clientType, null);
-        this._process = process;
-        this.psHost = psHost;
-        this.psPort = psPort;
+        super(psHost, psPort, clientType, process);
+        _thisProcess = process;
     }
 
-    /**
-     * XXX -tek- This is the retooled connect. Not sure if the while forever
-     * loop is needed. Socket accept --> handleClientOpenMsg --> pdpConn.run()
-     *
-     * Below is new Thread(pdpConn).start(); Does that do it?
-     *
-     */
-    /**
-     * Connects to a PDP
-     *
-     * @return <tt>true</tt> if PDP accepts the connection; <tt>false</tt>
-     *         otherwise
-     * @throws java.net.UnknownHostException
-     * @throws java.io.IOException
-     * @throws COPSException
-     */
-    public boolean connect() throws IOException, COPSException {
-        // Create Socket and send OPN
-        final InetAddress addr = InetAddress.getByName(psHost);
-        socket = new Socket(addr, psPort);
-        logger.debug("{} {}", getClass().getName(), "PDP Socket Opened");
-
-        // We're waiting for an message
-        try {
-            logger.debug("Waiting to receiveMsg");
-            final COPSMsg msg = COPSTransceiver.receiveMsg(socket);
-            logger.debug("Message received of type - " + msg.getHeader().getOpCode());
-            if (msg.getHeader().getOpCode().equals(OPCode.OPN)) {
-                handleClientOpenMsg(socket, msg);
-            } else {
-                try {
-                    socket.close();
-                } catch (Exception ex) {
-                    logger.error("Unexpected error closing socket", ex);
-                }
-            }
-        } catch (Exception e) {
-            logger.error("Unexpected error handing client open message", e);
-            try {
-                socket.close();
-            } catch (Exception ex) {
-                logger.error("Unexpected error closing socket", ex);
-            }
-            return true;
-        }
-
-        return false;
-    }
-
-    // TODO - remove and let super handle after DataProcess & PdpConnection classes are properly refactored
     @Override
-    public void disconnect (final String pepID, final COPSError error) throws COPSException, IOException {
-        final PCMMPdpConnection pdpConn = (PCMMPdpConnection) _connectionMap.get(pepID);
-        if (pdpConn != null) {
-            final COPSClientCloseMsg closeMsg = new COPSClientCloseMsg(getClientType(), error, null, null);
-            closeMsg.writeData(pdpConn.getSocket());
-            pdpConn.close();
-        }
-
-        final Thread thread = threadMap.remove(pepID);
-        if (thread != null) thread.interrupt();
-    }
-
-    // TODO - this method should be broken apart into smaller pieces.
-    @Override
-    protected void handleClientOpenMsg(final Socket conn, final COPSMsg msg) throws COPSException, IOException {
-        logger.info("Processing client open message");
-        final COPSClientOpenMsg cMsg = (COPSClientOpenMsg) msg;
-        _pepId = cMsg.getPepId();
-
-        // Validate Client Type
-        if (msg.getHeader().getClientType() != getClientType()) {
-            // Unsupported client type
-            final COPSClientCloseMsg closeMsg = new COPSClientCloseMsg(getClientType(),
-                    new COPSError(ErrorTypes.UNSUPPORTED_CLIENT_TYPE, ErrorTypes.NA), null, null);
-            try {
-                closeMsg.writeData(conn);
-            } catch (IOException unae) {
-                logger.error("Unexpected error writing data", unae);
-            }
-
-            throw new COPSException("Unsupported client type");
-        }
-
-        // PEPId is mandatory
-        if (_pepId == null) {
-            // Mandatory COPS object missing
-            final COPSClientCloseMsg closeMsg = new COPSClientCloseMsg(getClientType(),
-                    new COPSError(ErrorTypes.MANDATORY_OBJECT_MISSING, ErrorTypes.NA), null, null);
-            try {
-                closeMsg.writeData(conn);
-            } catch (IOException unae) {
-                logger.error("Unexpected error closing socket", unae);
-            }
-
-            throw new COPSException("Mandatory COPS object missing (PEPId)");
-        }
-
-        // Support
-        if ((cMsg.getClientSI() != null) ) {
-            final MMVersionInfo _mminfo = new MMVersionInfo(cMsg.getClientSI().getData().getData());
-            logger.debug("CMTS sent MMVersion info : major:" + _mminfo.getMajorVersionNB() + "  minor:" +
-                    _mminfo.getMinorVersionNB());
-
-        } else {
-            // Unsupported objects
-            final COPSClientCloseMsg closeMsg = new COPSClientCloseMsg(getClientType(),
-                    new COPSError(ErrorTypes.UNKNOWN_OBJECT, ErrorTypes.NA), null, null);
-            try {
-                closeMsg.writeData(conn);
-            } catch (IOException unae) {
-                logger.error("Unexpected error writing data", unae);
-            }
-
-            throw new COPSException("Unsupported objects (PdpAddress, Integrity)");
-        }
-        /*
-        */
-
-        // Connection accepted
-        final COPSClientAcceptMsg acceptMsg;
-        if (getAcctTimer() != 0)
-            acceptMsg = new COPSClientAcceptMsg(getClientType(), new COPSKATimer(getKaTimer()),
-                    new COPSAcctTimer(getAcctTimer()), null);
-        else
-            acceptMsg = new COPSClientAcceptMsg(getClientType(), new COPSKATimer(getKaTimer()), null, null);
-        acceptMsg.writeData(conn);
-        // XXX - handleRequestMsg
-        try {
-            logger.debug("handleClientOpenMsg() - Waiting to receive message");
-            final COPSMsg rmsg = COPSTransceiver.receiveMsg(socket);
-            logger.debug("Received message of type - " + rmsg.getHeader().getOpCode());
-            // Client-Close
-            if (rmsg.getHeader().getOpCode().equals(OPCode.CC)) {
-                System.out.println(((COPSClientCloseMsg) rmsg)
-                        .getError().getDescription());
-                // close the socket
-                final COPSClientCloseMsg closeMsg = new COPSClientCloseMsg(getClientType(),
-                        new COPSError(ErrorTypes.UNKNOWN_OBJECT, ErrorTypes.NA), null, null);
-                try {
-                    closeMsg.writeData(conn);
-                } catch (IOException unae) {
-                    logger.error("Unexpected error writing data", unae);
-                }
-                throw new COPSException("CMTS requetsed Client-Close");
-            } else {
-                // Request
-                if (rmsg.getHeader().getOpCode().equals(OPCode.REQ)) {
-                    COPSReqMsg rMsg = (COPSReqMsg) rmsg;
-                    _handle = rMsg.getClientHandle();
-                } else
-                    throw new COPSException("Can't understand request");
-            }
-        } catch (Exception e) { // COPSException, IOException
-            throw new COPSException("Error COPSTransceiver.receiveMsg", e);
-        }
-
+    protected COPSPdpConnection setputPdpConnection(final Socket conn, final COPSHandle handle) {
         logger.debug("PDPCOPSConnection");
-        final PCMMPdpConnection pdpConn = new PCMMPdpConnection(_pepId, conn, _process, getKaTimer(), getAcctTimer());
-
-        // XXX - handleRequestMsg
-        // XXX - check handle is valid
-        final PCMMPdpReqStateMan man = new PCMMPdpReqStateMan(getClientType(), _handle, _process);
-        pdpConn.addStateMan(_handle, man);
+        final PCMMPdpConnection pdpConn = new PCMMPdpConnection(_pepId, conn, _thisProcess, _kaTimer, _acctTimer);
+        final PCMMPdpReqStateMan man = new PCMMPdpReqStateMan(_clientType, handle, _thisProcess);
+        pdpConn.addStateMan(handle, man);
         try {
             man.initRequestState(conn);
         } catch (COPSPdpException unae) {
@@ -236,25 +55,8 @@ public class PCMMPdpAgent extends COPSPdpAgent {
         }
         // XXX - End handleRequestMsg
 
-        logger.info("Starting PDP connection thread to - " + psHost);
-
-        // TODO - store the thread reference so it is possible to manage.
-        final Thread thread = new Thread(pdpConn, "Agent for - " + psHost);
-        thread.start();
-        threadMap.put(_pepId.getData().str(), thread);
-        _connectionMap.put(_pepId.getData().str(), pdpConn);
-    }
-
-    public Socket getSocket() {
-        return socket;
-    }
-
-    public COPSHandle getClientHandle() {
-        return _handle;
-    }
-
-    public String getPepIdString() {
-        return _pepId.getData().str();
+        logger.info("Starting PDP connection thread to - " + _host);
+        return pdpConn;
     }
 
 }
index f66a2318e574702630de7a207015263371650d6a..95134b3178f7485b4c01766bc3a18fcaed30dfba 100644 (file)
@@ -2,76 +2,20 @@ package org.umu.cops.ospdp;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.umu.cops.stack.*;
-import org.umu.cops.stack.COPSError.ErrorTypes;
-import org.umu.cops.stack.COPSHeader.OPCode;
-
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import org.umu.cops.prpdp.COPSPdpAgent;
 
 /**
  * Core PDP agent for outsourcing.
+ * TODO - Implement me
  */
-public class COPSPdpOSAgent extends Thread {
+public class COPSPdpOSAgent extends COPSPdpAgent {
 
     public final static Logger logger = LoggerFactory.getLogger(COPSPdpOSAgent.class);
 
-    /** Well-known port for COPS */
-    public static final int WELL_KNOWN_PDP_PORT = 3288;
-    /** Default keep-alive timer value (secs) */
-    public static final short KA_TIMER_VALUE = 30;
-    /** Default accounting timer value (secs) */
-    public static final short ACCT_TIMER_VALUE = 0;
-
-    /**
-        PDP host port
-     */
-    private int _serverPort;
-
-    /**
-        Client-type of connecting PEP
-     */
-    private short _clientType;
-
-    /**
-        Accounting timer (secs)
-     */
-    private short _acctTimer;
-
-    /**
-        Keep-alive timer (secs)
-     */
-    private short _kaTimer;
-
-    /**
-        Maps a PEP-ID to a connection
-     */
-    private final Map<String, COPSPdpOSConnection> _connectionMap;
-    // map < String(PEPID), COPSPdpOSConnection > ConnectionMap;
-
     /**
      *  Policy data processing object
      */
-    private COPSPdpOSDataProcess _process;
-
-    /**
-     * Creates a PDP Agent
-     *
-     * @param clientType    COPS Client-type
-     * @param process       Object to perform policy data processing
-     */
-    public COPSPdpOSAgent(final short clientType, final COPSPdpOSDataProcess process) {
-        _serverPort = WELL_KNOWN_PDP_PORT;
-        _kaTimer = KA_TIMER_VALUE;
-        _acctTimer = ACCT_TIMER_VALUE;
-
-        _clientType = clientType;
-        _connectionMap = new ConcurrentHashMap<>();
-        _process = process;
-    }
+    private COPSPdpOSDataProcess _thisProcess;
 
     /**
      * Creates a PDP Agent
@@ -80,205 +24,10 @@ public class COPSPdpOSAgent extends Thread {
      * @param clientType    COPS Client-type
      * @param process   Object to perform policy data processing
      */
-    public COPSPdpOSAgent(final int port, final short clientType, final COPSPdpOSDataProcess process) {
-        _serverPort = port;
-
-        _kaTimer = KA_TIMER_VALUE;
-        _acctTimer = ACCT_TIMER_VALUE;
-
-        _clientType = clientType;
-        _connectionMap = new ConcurrentHashMap<>();
-        _process = process;
-    }
-
-    /**
-     * Sets the keep-alive timer value
-     * @param    kaTimer    Keep alive timer value (secs)
-     */
-    public void setKaTimer (short kaTimer) {
-        _kaTimer = kaTimer;
-    }
-
-    /**
-     * Sets the accounting timer value
-     * @param    acctTimer  Accounting timer value (secs)
-     */
-    public void setAcctTimer (short acctTimer) {
-        _acctTimer = acctTimer;
+    public COPSPdpOSAgent(final String host, final int port, final short clientType,
+                          final COPSPdpOSDataProcess process) {
+        super(host, port, clientType, process);
+        this._thisProcess = process;
     }
 
-    /**
-     * Gets the value of the keep-alive timer
-     * @return   Keep-alive timer value (secs)
-     */
-    public short getKaTimer () {
-        return _kaTimer;
-    }
-
-    /**
-     * Gets the accounting timer value
-     * @return   Accounting timer value (secs)
-     */
-    public short getAcctTimer () {
-        return _acctTimer;
-    }
-
-    /**
-     * Gets the client-type
-     * @return   The client-type
-     */
-    public int getClientType() {
-        return _clientType;
-    }
-
-    /**
-     * Disconnects a PEP
-     * @param pepID PEP-ID of the PEP to be disconnected
-     * @param error COPS Error to be reported as a reason
-     * @throws COPSException
-     * @throws IOException
-     */
-    public void disconnect(final String pepID, final COPSError error) throws COPSException, IOException {
-        final COPSPdpOSConnection pdpConn = _connectionMap.get(pepID);
-        final COPSClientCloseMsg closeMsg = new COPSClientCloseMsg(_clientType, error, null, null);
-        closeMsg.writeData(pdpConn.getSocket());
-        pdpConn.close();
-    }
-
-    /**
-     * Requests a COPS sync for a PEP
-     * @param pepID PEP-ID of the PEP to be synced
-     * @throws COPSException
-     * @throws COPSPdpException
-     */
-    public void sync(String pepID) throws COPSException, COPSPdpException {
-        COPSPdpOSConnection pdpConn = _connectionMap.get(pepID);
-        pdpConn.syncAllRequestState();
-    }
-
-    /**
-     * Removes a PEP from the connection map
-     * @param pepID PEP-ID of the PEP to be removed
-     */
-    public void delete (String pepID) {
-        _connectionMap.remove(pepID);
-    }
-
-    /**
-     * Runs the PDP process
-     */
-    public void run() {
-        try {
-            final ServerSocket serverSocket = new ServerSocket (_serverPort);
-
-            //Loop through for Incoming messages
-
-            // server infinite loop
-            while (true) {
-                // Wait for an incoming connection from a PEP
-                Socket socket = serverSocket.accept();
-
-                // COPSDebug.out(getClass().getName(),"New connection accepted " +
-                //           socket.getInetAddress() +
-                //           ":" + socket.getPort());
-
-                // We're waiting for an OPN message
-                try {
-                    COPSMsg msg = COPSTransceiver.receiveMsg(socket);
-                    if (msg.getHeader().getOpCode().equals(OPCode.OPN)) {
-                        handleClientOpenMsg(socket, msg);
-                    } else {
-                        // COPSDebug.err(getClass().getName(), COPSDebug.ERROR_NOEXPECTEDMSG);
-                        try {
-                            socket.close();
-                        } catch (Exception ex) {
-                            logger.error("Error closing socket", ex);
-                        }
-                    }
-                } catch (Exception e) { // COPSException, IOException
-                    // COPSDebug.err(getClass().getName(), COPSDebug.ERROR_EXCEPTION,
-                    //    "(" + socket.getInetAddress() + ":" + socket.getPort() + ")", e);
-                    try {
-                        socket.close();
-                    } catch (Exception ex) {
-                        logger.error("Error closing socket", ex);
-                    }
-                }
-            }
-        } catch (IOException e) {
-            logger.error("Error processing socket messages", e);
-        }
-    }
-
-    /**
-      * Handles a COPS client-open message
-      * @param    conn Socket to the PEP
-      * @param    msg <tt>COPSMsg</tt> holding the client-open message
-      * @throws COPSException
-      * @throws IOException
-      */
-    private void handleClientOpenMsg(Socket conn, COPSMsg msg) throws COPSException, IOException {
-        COPSClientOpenMsg cMsg = (COPSClientOpenMsg) msg;
-        COPSPepId pepId = cMsg.getPepId();
-
-        // Validate Client Type
-        if (msg.getHeader().getClientType() != _clientType) {
-            // Unsupported client type
-            final COPSClientCloseMsg closeMsg = new COPSClientCloseMsg(_clientType,
-                    new COPSError(ErrorTypes.UNSUPPORTED_CLIENT_TYPE, ErrorTypes.NA), null, null);
-            try {
-                closeMsg.writeData(conn);
-            } catch (IOException unae) {
-                logger.error("Error writing data", unae);
-            }
-
-            throw new COPSException("Unsupported client type");
-        }
-
-        // PEPId is mandatory
-        if (pepId == null) {
-            // Mandatory COPS object missing
-            final COPSClientCloseMsg closeMsg = new COPSClientCloseMsg(_clientType,
-                    new COPSError(ErrorTypes.MANDATORY_OBJECT_MISSING, ErrorTypes.NA), null, null);
-            try {
-                closeMsg.writeData(conn);
-            } catch (IOException unae) {
-                logger.error("Error writing data", unae);
-            }
-
-            throw new COPSException("Mandatory COPS object missing (PEPId)");
-        }
-
-        // Support
-        if ( (cMsg.getClientSI() != null) || (cMsg.getPdpAddress() != null) || (cMsg.getIntegrity() != null)) {
-
-            // Unsupported objects
-            final COPSClientCloseMsg closeMsg = new COPSClientCloseMsg(_clientType,
-                    new COPSError(ErrorTypes.UNKNOWN_OBJECT, ErrorTypes.NA), null, null);
-            try {
-                closeMsg.writeData(conn);
-            } catch (IOException unae) {
-                logger.error("Error writing data", unae);
-            }
-
-            throw new COPSException("Unsupported objects (ClientSI, PdpAddress, Integrity)");
-        }
-
-        // Connection accepted
-        final COPSKATimer katimer = new COPSKATimer(_kaTimer);
-        final COPSClientAcceptMsg acceptMsg;
-        if (_acctTimer != 0)
-            acceptMsg = new COPSClientAcceptMsg(msg.getHeader().getClientType(), katimer, new COPSAcctTimer(_acctTimer),
-                    null);
-        else
-            acceptMsg = new COPSClientAcceptMsg(msg.getHeader().getClientType(), katimer, null, null);
-
-        acceptMsg.writeData(conn);
-
-        final COPSPdpOSConnection pdpConn = new COPSPdpOSConnection(pepId, conn, _process);
-        pdpConn.setKaTimer(_kaTimer);
-        if (_acctTimer != 0) pdpConn.setAcctTimer(_acctTimer);
-        new Thread(pdpConn).start();
-        _connectionMap.put(pepId.getData().str(),pdpConn);
-    }
 }
index eed74c90e3021818b75c1fc4974f0f63ed7037a3..cc66cacb54b998c306f5ecf3ead5cee00567c70d 100644 (file)
@@ -6,6 +6,7 @@
 
 package org.umu.cops.prpdp;
 
+import org.pcmm.objects.MMVersionInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.umu.cops.stack.*;
@@ -13,258 +14,324 @@ import org.umu.cops.stack.COPSError.ErrorTypes;
 import org.umu.cops.stack.COPSHeader.OPCode;
 
 import java.io.IOException;
-import java.net.ServerSocket;
+import java.net.InetAddress;
 import java.net.Socket;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 
 /**
- * Core PDP agent for provisioning
+ * Core PDP agent for managing the connection to one PDP.
  */
-public class COPSPdpAgent extends Thread {
+public class COPSPdpAgent {
 
     private static final Logger logger = LoggerFactory.getLogger(COPSPdpAgent.class);
 
     /** Well-known port for COPS */
-    //    public static final int WELL_KNOWN_PDP_PORT = 3288;
+    public static final int WELL_KNOWN_PDP_PORT = 3288;
     /** Default keep-alive timer value (secs) */
     public static final short KA_TIMER_VALUE = 30;
     /** Default accounting timer value (secs) */
     public static final short ACCT_TIMER_VALUE = 0;
 
+    /**
+     * PDP host address
+     */
+    protected final String _host;
+
     /**
      * PDP host port
      */
-    private int _serverPort;
+    protected final int _serverPort;
 
     /**
      * Client-type of connecting PEP
      */
-    private short _clientType;
+    protected final short _clientType;
 
     /**
      * Accounting timer (secs)
      */
-    private short _acctTimer;
+    protected final short _acctTimer;
 
     /**
      * Keep-alive timer (secs)
      */
-    private short _kaTimer;
+    protected final short _kaTimer;
 
     /**
-     * The PEP ID
+     *  Policy data processing object
      */
-    protected transient COPSPepId _pepId;
+    protected final COPSPdpDataProcess _process;
 
+    // Next two attributes are initialized when connected
     /**
-     *   Maps a PEP-ID to a connection
-     *   TODO - Refactor COPSPdpConnection to extend PCMMPdpConnection. Until then, the value must remain an Object
+     * The Socket connection to the PEP
      */
-    protected Map<String, Object> _connectionMap;
-    // map < String(PEPID), COPSPdpConnection > ConnectionMap;
+    protected transient Socket _socket;
 
     /**
-     *  Policy data processing object
+     * The PEP handle
      */
-    private COPSPdpDataProcess _process;
+    protected transient COPSHandle _handle;
+
+    // Next three attributes are initialized after the client accepts
+    /**
+     * Holds the last PEP ID processed
+     */
+    protected transient COPSPepId _pepId;
 
     /**
-     * Holds all of the threads to manage by PEP ID
+     * the PDP connection connection
      */
-    protected final Map<String, Thread> threadMap;
+    protected transient COPSPdpConnection _pdpConn;
+
+    /**
+     * The handle to the tread accepting messages from the PDP
+     */
+    protected transient Thread _thread;
 
     /**
      * Creates a PDP Agent
      *
+     * @param host  PDP agent host name
      * @param port  Port to listen to
      * @param clientType    COPS Client-type
      * @param process   Object to perform policy data processing
      */
-    public COPSPdpAgent(final int port, final short clientType, final COPSPdpDataProcess process) {
-        _serverPort = port;
+    public COPSPdpAgent(final String host, final int port, final short clientType, final COPSPdpDataProcess process) {
+        this._host = host;
+        this._serverPort = port;
 
-        _kaTimer = KA_TIMER_VALUE;
-        _acctTimer = ACCT_TIMER_VALUE;
+        this._kaTimer = KA_TIMER_VALUE;
+        this._acctTimer = ACCT_TIMER_VALUE;
 
-        _clientType = clientType;
-        _connectionMap = new ConcurrentHashMap<>();
-        _process = process;
-        this.threadMap = new ConcurrentHashMap<>();
+        this._clientType = clientType;
+        this._process = process;
     }
 
     /**
-     * Gets the value of the keep-alive timer
-     * @return   Keep-alive timer value (secs)
+     * Returns handle after connect() has successfully been executed
+     * @return - the handle
      */
-    public short getKaTimer () {
-        return _kaTimer;
+    public COPSHandle getClientHandle() {
+        return _handle;
     }
 
     /**
-     * Gets the accounting timer value
-     * @return   Accounting timer value (secs)
+     * Returns handle after connect() has successfully been executed
+     * @return - the handle
      */
-    public short getAcctTimer () {
-        return _acctTimer;
+    public Socket getSocket() {
+        return _socket;
     }
 
     /**
-     * Gets the client-type
-     * @return   The client-type
+     * Connects to a PDP
+     * @throws java.net.UnknownHostException
+     * @throws java.io.IOException
+     * @throws COPSException
      */
-    public short getClientType() {
-        return _clientType;
+    public void connect() throws IOException, COPSException {
+        // Create Socket and send OPN
+        final InetAddress addr = InetAddress.getByName(_host);
+        _socket = new Socket(addr, _serverPort);
+        logger.info("PDP Socket Opened. Waiting to receive client-open message");
+        final COPSMsg msg = COPSTransceiver.receiveMsg(_socket);
+        logger.debug("Message received of type - " + msg.getHeader().getOpCode());
+        if (msg.getHeader().getOpCode().equals(OPCode.OPN)) {
+            handleClientOpenMsg(_socket, msg);
+        } else {
+            try {
+                _socket.close();
+            } catch (Exception ex) {
+                logger.error("Unexpected error closing socket", ex);
+            }
+        }
     }
 
     /**
-     * Disconnects a PEP
-     * @param pepID PEP-ID of the PEP to be disconnected
+     * Disconnects a PEP and stops the listener thread
      * @param error COPS Error to be reported as a reason
      * @throws COPSException
      * @throws IOException
      */
-    public void disconnect(final String pepID, final COPSError error) throws COPSException, IOException {
-        final COPSPdpConnection pdpConn = (COPSPdpConnection)_connectionMap.get(pepID);
-        final COPSClientCloseMsg closeMsg = new COPSClientCloseMsg(_clientType, error, null, null);
-        closeMsg.writeData(pdpConn.getSocket());
-        pdpConn.close();
+    public void disconnect(final COPSError error) throws COPSException, IOException {
+        if (_pdpConn != null) {
+            sendCloseMessage(_socket, error.getErrCode(), error.getErrSubCode(), "Disconnecting from PDP requested");
+            _pdpConn.close();
+        } else {
+            logger.warn("Unable to locate PDP connection. Cannot close");
+        }
+        if (_thread != null) _thread.interrupt();
+        else logger.warn("Unable to locate PDP connection thread. Cannot stop it.");
+
+        if (_socket.isConnected()) _socket.close();
+
+        _pepId = null;
+        _pdpConn = null;
+        _thread = null;
     }
 
     /**
      * Requests a COPS sync for a PEP
-     * @param pepID PEP-ID of the PEP to be synced
      * @throws COPSException
      * @throws COPSPdpException
      */
-    public void sync(final String pepID) throws COPSException {
-        COPSPdpConnection pdpConn = (COPSPdpConnection) _connectionMap.get(pepID);
-        pdpConn.syncAllRequestState();
-    }
-
-    /**
-     * Removes a PEP from the connection map
-     * @param pepID PEP-ID of the PEP to be removed
-     */
-    public void delete(final String pepID) {
-        _connectionMap.remove(pepID);
+    public void sync() throws COPSException {
+        if (_pdpConn != null) _pdpConn.syncAllRequestState();
+        else logger.warn("Unable to sync, not connected to a PEP");
     }
 
-
     /**
-     * Runs the PDP process
-     */
-    public void run() {
-        try {
-            final ServerSocket serverSocket = new ServerSocket (_serverPort);
-
-            //Loop through for Incoming messages
-            // server infinite loop
-            while (true) {
-                // Wait for an incoming connection from a PEP
-                final Socket socket = serverSocket.accept();
-
-                // COPSDebug.out(getClass().getName(),"New connection accepted " +
-                //           socket.getInetAddress() +
-                //           ":" + socket.getPort());
-
-                // We're waiting for an OPN message
-                try {
-                    final COPSMsg msg = COPSTransceiver.receiveMsg(socket);
-                    logger.info("Message received - " + msg);
-                    if (msg.getHeader().getOpCode().equals(OPCode.OPN)) {
-                        handleClientOpenMsg(socket, msg);
-                    } else {
-                        logger.error("Not an open message, closing socket");
-                        try {
-                            socket.close();
-                        } catch (Exception ex) {
-                            logger.error("Unexpected exception closing socket", ex);
-                        }
-                    }
-                } catch (Exception e) { // COPSException, IOException
-                    // COPSDebug.err(getClass().getName(), COPSDebug.ERROR_EXCEPTION,
-                    //    "(" + socket.getInetAddress() + ":" + socket.getPort() + ")", e);
-                    try {
-                        socket.close();
-                    } catch (Exception ex) {
-                        logger.error("Unexpected exception closing socket", ex);
-                    }
-                }
-            }
-        } catch (IOException e) {
-            logger.error("Error caught while processing socket messages", e);
-        }
-    }
-
-    /**
-     * Handles a COPS client-open message
+     * Handles a COPS client-open message and sets the _pepId, _handle, _pdpConn, & _thread objects in the process
+     * as well as starts the PDP connection thread for receiving other COPS messages from the PDP
      * @param    conn Socket to the PEP
      * @param    msg <tt>COPSMsg</tt> holding the client-open message
      * @throws COPSException
      * @throws IOException
-     * TODO - Refactor PCMMPdpAgent#handleClientOpenMsg() as it contains much of this same logic
      */
     protected void handleClientOpenMsg(final Socket conn, final COPSMsg msg) throws COPSException, IOException {
+        logger.info("Processing client open message");
+
+        if (_pepId != null) {
+            throw new COPSException("Connection already opened");
+        }
+
         final COPSClientOpenMsg cMsg = (COPSClientOpenMsg) msg;
         _pepId = cMsg.getPepId();
 
         // Validate Client Type
         if (msg.getHeader().getClientType() != _clientType) {
-            // Unsupported client type
-            final COPSClientCloseMsg closeMsg = new COPSClientCloseMsg(_clientType,
-                    new COPSError(ErrorTypes.UNSUPPORTED_CLIENT_TYPE, ErrorTypes.NA), null, null);
-            try {
-                closeMsg.writeData(conn);
-            } catch (IOException unae) {
-                logger.error("Unexpected exception writing data", unae);
-            }
-
-            throw new COPSException("Unsupported client type");
+            sendCloseMessage(conn, ErrorTypes.UNSUPPORTED_CLIENT_TYPE, ErrorTypes.NA,
+                    "Unsupported client type");
         }
 
         // PEPId is mandatory
         if (_pepId == null) {
-            // Mandatory COPS object missing
-            final COPSClientCloseMsg closeMsg = new COPSClientCloseMsg(_clientType,
-                    new COPSError(ErrorTypes.MANDATORY_OBJECT_MISSING, ErrorTypes.NA), null, null);
-            try {
-                closeMsg.writeData(conn);
-            } catch (IOException unae) {
-                logger.error("Unexpected exception writing data", unae);
-            }
-
-            throw new COPSException("Mandatory COPS object missing (PEPId)");
+            sendCloseMessage(conn, ErrorTypes.MANDATORY_OBJECT_MISSING, ErrorTypes.NA,
+                    "Mandatory COPS object missing (PEPId)");
         }
 
+        // TODO - Determine if I should be checking for the PDPAddress and Integrity objects on the message too???
         // Support
+/*
         if ( (cMsg.getClientSI() != null) || (cMsg.getPdpAddress() != null) || (cMsg.getIntegrity() != null)) {
-            // Unsupported objects
-            final COPSClientCloseMsg closeMsg = new COPSClientCloseMsg(_clientType,
-                    new COPSError(ErrorTypes.UNKNOWN_OBJECT, ErrorTypes.NA), null, null);
-            try {
-                closeMsg.writeData(conn);
-            } catch (IOException unae) {
-                logger.error("Exception writing data", unae);
-            }
+            sendCloseMessage(conn, ErrorTypes.UNSUPPORTED_CLIENT_TYPE,
+                    "Unsupported objects (ClientSI, PdpAddress, Integrity)");
+        }
+*/
+        // Support
+        if ((cMsg.getClientSI() == null) ) {
+            sendCloseMessage(conn, ErrorTypes.UNKNOWN_OBJECT, ErrorTypes.NA,
+                    "Unsupported objects (PdpAddress, Integrity)");
+        } else {
+            final MMVersionInfo _mminfo = new MMVersionInfo(cMsg.getClientSI().getData().getData());
+            logger.debug("CMTS sent MMVersion info : major:" + _mminfo.getMajorVersionNB() + "  minor:" +
+                    _mminfo.getMinorVersionNB());
+        }
 
-            throw new COPSException("Unsupported objects (ClientSI, PdpAddress, Integrity)");
+        acceptConnection(conn);
+
+        _handle = handleAcceptResponse(conn);
+        if (_handle != null) {
+            // Connection accepted
+            _pdpConn = setputPdpConnection(conn, _handle);
+            _thread = new Thread(_pdpConn, "PDP Agent for PEP ID " + _pepId.getData().str());
+            _thread.start();
+        } else {
+            throw new COPSException("Unable to connect to PDP");
         }
+    }
 
-        // Connection accepted
+    /**
+     * Creates and sends a client close message
+     * @param conn - the socket connection
+     * @param errorType - the error type to send
+     * @param msg - the error message to log
+     * @throws COPSException
+     */
+    private void sendCloseMessage(final Socket conn, final ErrorTypes errorType, final ErrorTypes errorSubType,
+                                  final String msg)
+            throws COPSException {
+        final COPSClientCloseMsg closeMsg = new COPSClientCloseMsg(_clientType,
+                new COPSError(errorType, errorSubType), null, null);
+        try {
+            closeMsg.writeData(conn);
+        } catch (IOException unae) {
+            logger.error("Exception writing data", unae);
+        }
+
+        throw new COPSException(msg);
+    }
+
+    /**
+     * Sends a client-accept message back to the PDP
+     * @param conn - the socket connection to the PDP
+     * @throws IOException
+     */
+    private void acceptConnection(final Socket conn) throws IOException {
         final COPSClientAcceptMsg acceptMsg;
         if (_acctTimer != 0)
             acceptMsg = new COPSClientAcceptMsg(_clientType, new COPSKATimer(_kaTimer),
-                new COPSAcctTimer(_acctTimer), null);
+                    new COPSAcctTimer(_acctTimer), null);
         else
-            acceptMsg = new COPSClientAcceptMsg(_clientType, new COPSKATimer(_kaTimer),null, null);
+            acceptMsg = new COPSClientAcceptMsg(_clientType, new COPSKATimer(_kaTimer) ,null, null);
         acceptMsg.writeData(conn);
+    }
+
+    /**
+     * Waits for the response back from the PDP and handles it appropriately. When successful, the handle to the
+     * client is returned.
+     * @param conn - the socket connection to the PDP
+     * @return - the handle or null if not successful
+     */
+    private COPSHandle handleAcceptResponse(final Socket conn) {
+        try {
+            logger.debug("handleClientOpenMsg() - Waiting to receive message");
+            final COPSMsg rmsg = COPSTransceiver.receiveMsg(conn);
+            logger.debug("Received message of type - " + rmsg.getHeader().getOpCode());
+            // Client-Close
+            if (rmsg.getHeader().getOpCode().equals(OPCode.CC)) {
+                logger.info("Received client-close message");
+                sendCloseMessage(conn, ErrorTypes.SHUTTING_DOWN, ErrorTypes.NA, "Received client-close message");
+                return null;
+            } else {
+                // Request
+                if (rmsg.getHeader().getOpCode().equals(OPCode.REQ)) {
+                    final COPSReqMsg rMsg = (COPSReqMsg) rmsg;
+                    return rMsg.getClientHandle();
+                } else {
+                    sendCloseMessage(conn, ErrorTypes.UNKNOWN_OBJECT, ErrorTypes.NA, "Received unknown object");
+                    return null;
+                }
+            }
+        } catch (Exception e) {
+            logger.error("Error COPSTransceiver.receiveMsg", e);
+            return null;
+        }
+    }
+
+    /**
+     * Creates the PDP connection object
+     * @param conn - the socket connection to the PDP
+     * @param handle - the client's handle
+     * @return - the PDP connection object
+     */
+    protected COPSPdpConnection setputPdpConnection(final Socket conn, final COPSHandle handle) {
+        logger.debug("PDPCOPSConnection");
+        final COPSPdpConnection pdpConn = new COPSPdpConnection(_pepId, conn, _process, _kaTimer, _acctTimer);
+
+        // XXX - handleRequestMsg
+        // XXX - check handle is valid
+        final COPSPdpReqStateMan man = new COPSPdpReqStateMan(_clientType, handle, _process);
+        pdpConn.addStateMan(handle, man);
+        try {
+            man.initRequestState(conn);
+        } catch (COPSException unae) {
+            logger.error("Unexpected error initializing state", unae);
+        }
+        // XXX - End handleRequestMsg
 
-        final COPSPdpConnection pdpConn = new COPSPdpConnection(_pepId, conn,  _process);
-        pdpConn.setKaTimer(_kaTimer);
-        if (_acctTimer != 0) pdpConn.setAcctTimer(_acctTimer);
-        new Thread(pdpConn).start();
-        _connectionMap.put(_pepId.getData().str(), pdpConn);
+        logger.info("Starting PDP connection thread to - " + _host);
+        return pdpConn;
     }
 
 }
index 97567d4792fa2dd290bbd46419608b630c092d9b..e1e56bcae0f993467986a434877fed0779f92d89 100644 (file)
@@ -2,7 +2,6 @@ package org.opendaylight.controller.packetcable.provider;
 
 import com.google.common.collect.Maps;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Ipv4Address;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.PortNumber;
 import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.ServiceClassName;
 import org.opendaylight.yang.gen.v1.urn.packetcable.rev150327.ServiceFlowDirection;
@@ -169,15 +168,6 @@ public class PCMMService {
                }
        }
 
-       private String getIpAddressStr(final IpAddress ipAddress) {
-               final Ipv4Address ipv4 = ipAddress.getIpv4Address();
-               if (ipv4 != null) {
-                       return ipv4.getValue();
-               } else {
-                       return ipAddress.getIpv6Address().getValue();
-               }
-       }
-
        /**
         * Used to interface with a CCAP (including CMTSs)
         */
@@ -207,7 +197,7 @@ public class PCMMService {
                        // TODO see - PCMMPdpReqStateMan#processReport() where the report type is success and the process is null
                        //            pcmmProcess = new PCMMPdpDataProcess();
                        pcmmProcess = null;
-                       pcmmPdp = new PCMMPdpAgent(clientType, ipv4, port, pcmmProcess);
+                       pcmmPdp = new PCMMPdpAgent(ipv4, port, clientType, pcmmProcess);
                }
 
                /**
@@ -232,7 +222,7 @@ public class PCMMService {
                public void disconnect() {
                        logger.info("CcapClient: disconnect(): {}:{}", ipv4, port);
                        try {
-                               pcmmPdp.disconnect(pcmmPdp.getPepIdString(), new COPSError(ErrorTypes.SHUTTING_DOWN, ErrorTypes.NA));
+                               pcmmPdp.disconnect(new COPSError(ErrorTypes.SHUTTING_DOWN, ErrorTypes.NA));
                                isConnected = false;
                        } catch (COPSException | IOException e) {
                                logger.error("CcapClient: disconnect(): {}:{} FAILED: {}", ipv4, port, e.getMessage());