2 * Copyright (c) 2004 University of Murcia. All rights reserved.
3 * --------------------------------------------------------------
4 * For more information, please see <http://www.umu.euro6ix.org/>.
7 package org.umu.cops.prpep;
9 import org.slf4j.Logger;
10 import org.slf4j.LoggerFactory;
11 import org.umu.cops.COPSConnection;
12 import org.umu.cops.stack.*;
13 import org.umu.cops.stack.COPSDecision.Command;
14 import org.umu.cops.stack.COPSDecision.DecisionFlag;
16 import javax.annotation.concurrent.ThreadSafe;
17 import java.io.IOException;
18 import java.net.Socket;
19 import java.util.Date;
22 import java.util.concurrent.ConcurrentHashMap;
25 * COPSPepConnection represents a PEP-PDP Connection Manager.
26 * Responsible for processing messages received from PDP.
29 public class COPSPepConnection extends COPSConnection {
31 private final static Logger logger = LoggerFactory.getLogger(COPSPepConnection.class);
33 /** Time to wait responses (milliseconds), default is 10 seconds */
34 protected transient int _responseTime;
36 /** COPS Client-type */
37 protected final short _clientType;
40 Maps a COPS Client Handle to a Request State Manager
42 protected final Map<COPSHandle, COPSPepReqStateMan> _managerMap;
45 * Creates a new PEP connection
46 * @param clientType PEP's client-type
47 * @param sock Socket connected to PDP
49 public COPSPepConnection(final short clientType, final Socket sock) {
50 super(sock, (short)0, (short)0);
51 _clientType = clientType;
52 _responseTime = 10000;
53 _managerMap = new ConcurrentHashMap<>();
57 * Message-processing loop
60 Date lastSendKa = new Date();
61 Date lastSendAcc = new Date();
62 Date lastRecKa = new Date();
63 while (!_sock.isClosed()) {
65 if (_sock.getInputStream().available() != 0) {
66 processMessage(_sock);
67 lastRecKa = new Date();
73 int _startTime = (int) (lastRecKa.getTime());
74 int cTime = (int) (new Date().getTime());
76 if ((cTime - _startTime) > _kaTimer*1000) {
78 // Notify all Request State Managers
79 notifyNoKAAllReqStateMan();
83 _startTime = (int) (lastSendKa.getTime());
84 cTime = (int) (new Date().getTime());
86 if ((cTime - _startTime) > ((_kaTimer*3/4) * 1000)) {
87 final COPSKAMsg msg = new COPSKAMsg(null);
88 COPSTransceiver.sendMsg(msg, _sock);
89 lastSendKa = new Date();
95 int _startTime = (int) (lastSendAcc.getTime());
96 int cTime = (int) (new Date().getTime());
98 if ((cTime - _startTime) > ((_acctTimer*3/4)*1000)) {
99 // Notify all Request State Managers
100 notifyAcctAllReqStateMan();
101 lastSendAcc = new Date();
107 } catch (InterruptedException e) {
108 logger.error("Closing connection");
110 } catch (Exception e) {
111 logger.error("Unexpected exception while sleeping. Continue processing messages", e);
113 } catch (Exception e) {
114 logger.error("Unexpected error while processing socket messages. Continue processing", e);
115 } catch (Throwable e) {
116 logger.error("Unexpected fatal error while processing COPS messages. Stopping thread", e);
121 // connection closed by server
122 // COPSDebug.out(getClass().getName(),"Connection closed by server");
125 } catch (IOException e) {
126 logger.error("Error closing socket", e);
129 // Notify all Request State Managers
131 notifyCloseAllReqStateMan();
132 } catch (COPSException e) {
133 logger.error("Error closing state managers");
138 * Gets a COPS message from the socket and processes it
139 * @param conn Socket connected to the PDP
140 * @throws COPSException
141 * @throws IOException
143 protected void processMessage(final Socket conn) throws COPSException, IOException {
144 final COPSMsg msg = COPSTransceiver.receiveMsg(conn);
146 switch (msg.getHeader().getOpCode()) {
148 handleClientCloseMsg(conn, (COPSClientCloseMsg)msg);
151 handleDecisionMsg((COPSDecisionMsg)msg);
154 handleSyncStateReqMsg((COPSSyncStateMsg)msg);
157 handleKeepAliveMsg((COPSKAMsg)msg);
160 throw new COPSPepException("Message not expected (" + msg.getHeader().getOpCode() + ").");
165 * Handle Keep Alive Message
166 * @param cMsg a COPSKAMsg
168 private void handleKeepAliveMsg(final COPSKAMsg cMsg) {
169 logger.info("Get KAlive Msg");
172 if (cMsg.getIntegrity() != null) {
173 logger.warn("Unsupported objects (Integrity)");
176 // should we do anything else?? ....
178 } catch (Exception unae) {
179 logger.error("Unexpected exception while writing COPS data", unae);
184 * Method handleDecisionMsg
185 * @param dMsg a COPSDecisionMsg
187 protected void handleDecisionMsg(final COPSDecisionMsg dMsg) throws COPSException {
188 final COPSHandle handle = dMsg.getClientHandle();
189 final Map<COPSContext, Set<COPSDecision>> decisions = dMsg.getDecisions();
191 for (final Set<COPSDecision> copsDecisions: decisions.values()) {
192 for (final COPSDecision decision : copsDecisions) {
193 // Get the associated manager
194 final COPSPepReqStateMan manager = _managerMap.get(handle);
195 if (manager == null) {
196 logger.warn("Unable to find state manager with key - " + handle);
201 // Check message type
202 // TODO FIXME - Use of manager object could result in a NPE
203 if (decision.getFlag().equals(DecisionFlag.REQSTATE)) {
204 if (decision.getCommand().equals(Command.REMOVE)) {
205 // Delete Request State
206 manager.processDeleteRequestState(dMsg);
207 } else if (decision.getCommand().equals(Command.INSTALL)) {
208 // Open new Request State
209 handleOpenNewRequestStateMsg(handle);
212 logger.error("Unknown command");
216 manager.processDecision(dMsg);
224 * Method handleOpenNewRequestStateMsg
225 * @param handle a COPSHandle
227 private void handleOpenNewRequestStateMsg(final COPSHandle handle) throws COPSPepException {
228 final COPSPepReqStateMan manager = _managerMap.get(handle);
229 if (manager == null) {
230 logger.warn("Unable to find state manager with key - " + handle.getId().str());
232 manager.processOpenNewRequestState();
237 * Method handleSyncStateReqMsg
238 * @param cMsg a COPSSyncStateMsg
240 private void handleSyncStateReqMsg(final COPSSyncStateMsg cMsg) throws COPSException {
241 if (cMsg.getIntegrity() != null) {
242 logger.warn("Unsupported objects (Integrity)");
245 final COPSPepReqStateMan manager = _managerMap.get(cMsg.getClientHandle());
246 if (manager == null) {
247 logger.warn("Unable to find state manager with key - " + cMsg.getClientHandle().getId().str());
249 manager.processSyncStateRequest(cMsg);
254 * Method createRequestState
255 * @param clientHandle a String
256 * @param process a COPSPepDataProcess
257 * @return a COPSPepmanager
258 * @throws COPSException
259 * @throws COPSPepException
261 public COPSPepReqStateMan addRequestState(final COPSHandle clientHandle, final COPSPepDataProcess process)
262 throws COPSException {
263 final COPSPepReqStateMan manager = new COPSPepReqStateMan(_clientType, clientHandle, process, _sock);
264 if (_managerMap.get(clientHandle) != null)
265 throw new COPSPepException("Duplicate Handle, rejecting " + clientHandle);
267 _managerMap.put(clientHandle, manager);
268 logger.info("Added state manager with key - " + clientHandle);
269 manager.initRequestState();
274 * Method deleteRequestState
275 * @param manager a COPSPepReqStateMan
276 * @throws COPSException
278 public void deleteRequestState(COPSPepReqStateMan manager) throws COPSException {
279 manager.finalizeRequestState();
282 private void notifyCloseAllReqStateMan() throws COPSException {
283 for (final COPSPepReqStateMan man: _managerMap.values()) {
284 man.processClosedConnection(_error);
288 private void notifyNoKAAllReqStateMan() throws COPSException {
289 for (final COPSPepReqStateMan man: _managerMap.values()) {
290 man.processNoKAConnection();
294 private void notifyAcctAllReqStateMan() throws COPSException {
295 for (final COPSPepReqStateMan man: _managerMap.values()) {
296 man.processAcctReport();