2016-09-23 91 views

我需要與在給定端口上作爲服務器運行的C++應用程序對話。它公開了一個二進制API(協議緩衝區)以獲得更好的性能。我的RESTful服務是在Spring MVC和Jersey中開發的,並希望使用這個新功能。我已經能夠成功地使用和生成協議緩衝區消息。Java Web應用程序和C++服務器之間的套接字通信

在我的spring web應用程序中,我最初創建了一個Apache Commons Pool來創建一個套接字連接池。這是我在讀/寫插槽


public class PooledSocketConnectionFactory extends BasePooledObjectFactory<Socket> { 

    private static final Logger LOGGER = LoggerFactory.getLogger(PooledSocketConnectionFactory.class); 

    final private String hostname; 
    final private int port; 

    private PooledSocketConnectionFactory(final String hostname, final int port) { 
     this.hostname = hostname; 
     this.port = port; 

    public Socket create() throws Exception { 
     return new Socket(hostname, port); 

    public PooledObject wrap(Socket socket) { 
     return new DefaultPooledObject<>(socket); 

    public void destroyObject(final PooledObject<Socket> p) throws Exception { 
     final Socket socket = p.getObject(); 

    public boolean validateObject(final PooledObject<Socket> p) { 
     final Socket socket = p.getObject(); 
     return socket != null && socket.isConnected(); 

    public void activateObject(final PooledObject<SocketConnection> p) throws Exception { 

    public void passivateObject(final PooledObject<SocketConnection> p) throws Exception { 

public class Gateway { 
    private GenericObjectPool pool; 

    public Response sendAndReceive(Request request) throws CommunicationException { 
     Response response = null; 
     final Socket socket = pool.borrowObject(); 
     try { 
      response = Response.parseDelimitedFrom(socket.getInputStream()); 
     } catch (Exception ex) { 
      LOGGER.error("Gateway error", ex); 
      throw new CommunicationException("Gateway error", ex); 
     } finally { 
     return response; 





package com.es.socket; 

import com.es.protos.RequestProtos.Request; 
import com.es.protos.ResponseProtos.Response; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

import java.io.*; 
import java.net.ServerSocket; 
import java.net.Socket; 

public class TcpServer1 { 

    final static Logger LOGGER = LoggerFactory.getLogger(TcpServer1.class.getName()); 

    public static void main(String[] args) throws Exception { 
     ServerSocket serverSocket = new ServerSocket(Integer.parseInt(args[0])); 
     Socket socket = null; 
     while (true) { 
      try { 
       socket = serverSocket.accept(); 
      } catch (IOException e) { 
       LOGGER.warn("Could not listen on port"); 

      Thread thread = new Thread(new ServerConnection1(socket)); 

class ServerConnection1 implements Runnable { 

    static final Logger LOGGER = LoggerFactory.getLogger(ServerConnection.class.getName()); 

    private Socket socket = null; 

    ServerConnection1(Socket socket) { 
     this.socket = socket; 

    public void run() { 
     try { 
      serveRequest(socket.getInputStream(), socket.getOutputStream()); 
     } catch (IOException ex) { 
      LOGGER.warn("Error", ex); 

    public void serveRequest(InputStream inputStream, OutputStream outputStream) { 
     try { 
     } catch (IOException ex) { 
      LOGGER.warn("ERROR", ex); 

    private void write(OutputStream outputStream) throws IOException { 
     Response.Builder builder = Response.newBuilder(); 
     Response response = builder.setStatus("SUCCESS").setPing("PING").build(); 
     LOGGER.info("Server sent {}", response.toString()); 

    private void read(InputStream inputStream) throws IOException { 
     Request request = Request.parseDelimitedFrom(inputStream); 
     LOGGER.info("Server received {}", request.toString()); 

package com.es.socket; 

import com.es.protos.RequestProtos.Request; 
import com.es.protos.ResponseProtos.Response; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

import java.io.*; 
import java.net.Socket; 

public class TcpClient1 { 

    final static Logger LOGGER = LoggerFactory.getLogger(TcpClient1.class.getName()); 

    private Socket openConnection(final String hostName, final int port) { 
     Socket clientSocket = null; 
     try { 
      clientSocket = new Socket(hostName, port); 
     } catch (IOException e) { 
      LOGGER.warn("Exception occurred while connecting to server", e); 
     return clientSocket; 

    private void closeConnection(Socket clientSocket) { 
     try { 
      LOGGER.info("Closing the connection"); 
     } catch (IOException e) { 
      LOGGER.warn("Exception occurred while closing the connection", e); 

    private void write(OutputStream outputStream) throws IOException { 
     Request.Builder builder = Request.newBuilder(); 
     Request request = builder.setPing("PING").build(); 
     LOGGER.info("Client sent {}", request.toString()); 

    private void read(InputStream inputStream) throws IOException { 
     Response response = Response.parseDelimitedFrom(inputStream); 
     LOGGER.info("Client received {}", response.toString()); 

    public static void main(String args[]) throws Exception { 
     TcpClient1 client = new TcpClient1(); 
     try { 
      Socket clientSocket = null; 

      LOGGER.info("Scenario 1 --> One socket for each call"); 
      for (int i = 0; i < 2; i++) { 
       clientSocket = client.openConnection("localhost", Integer.parseInt(args[0])); 
       OutputStream outputStream = clientSocket.getOutputStream(); 
       InputStream inputStream = clientSocket.getInputStream(); 
       LOGGER.info("REQUEST {}", i); 

      LOGGER.info("Scenario 2 --> One socket for all calls"); 
      clientSocket = client.openConnection("localhost", Integer.parseInt(args[0])); 
      OutputStream outputStream = clientSocket.getOutputStream(); 
      InputStream inputStream = clientSocket.getInputStream(); 
      for (int i = 0; i < 2; i++) { 
       LOGGER.info("REQUEST {}", i); 
     } catch (Exception e) { 
      LOGGER.warn("Exception occurred", e); 


17:03:10.508 [main] INFO c.d.e.socket.TcpClient1 - Scenario 1 --> One socket for each call 
17:03:10.537 [main] INFO c.d.e.socket.TcpClient1 - REQUEST 0 
17:03:10.698 [main] INFO c.d.e.socket.TcpClient1 - Client sent ping: "PING" 
17:03:10.730 [main] INFO c.d.e.socket.TcpClient1 - Client received status: "SUCCESS" 
ping: "PING" 
17:03:10.730 [main] INFO c.d.e.socket.TcpClient1 - Closing the connection 
17:03:10.731 [main] INFO c.d.e.socket.TcpClient1 - REQUEST 1 
17:03:10.732 [main] INFO c.d.e.socket.TcpClient1 - Client sent ping: "PING" 
17:03:10.733 [main] INFO c.d.e.socket.TcpClient1 - Client received status: "SUCCESS" 
ping: "PING" 
17:03:10.733 [main] INFO c.d.e.socket.TcpClient1 - Closing the connection 
17:03:10.733 [main] INFO c.d.e.socket.TcpClient1 - Scenario 2 --> One socket for all calls 
17:03:10.733 [main] INFO c.d.e.socket.TcpClient1 - REQUEST 0 
17:03:10.734 [main] INFO c.d.e.socket.TcpClient1 - Client sent ping: "PING" 
17:03:10.734 [main] INFO c.d.e.socket.TcpClient1 - Client received status: "SUCCESS" 
ping: "PING" 
17:03:10.734 [main] INFO c.d.e.socket.TcpClient1 - REQUEST 1 
17:03:10.735 [main] INFO c.d.e.socket.TcpClient1 - Client sent ping: "PING" 

上沒有足夠的信息了'PooledObjectFactory'(由'GenericObjectPool'使用)的行爲 - 也許'passivateObject'方法關閉套接字? –


它是否也可能不是關閉連接的C++應用程序,比如說在閒置的特定時間段之後? – Gimby


@Adrian新增工廠類別 – user2459396



經過很大的痛苦,我才解決了這個問題。正在處理對套接字的讀/寫的類被定義爲原型。所以一旦檢索到套接字的引用,它就不會被清除(由Tomcat管理)。隨後對套接字的後續調用被排隊,然後超時並且Apache Commons Pool銷燬該對象。


class SocketConnection { 

    final private String identity; 
    private boolean alive; 
    final private ThreadLocal<Socket> threadLocal; 

    public SocketConnection(final String hostname, final int port) throws IOException { 
     this.identity = UUID.randomUUID().toString(); 
     this.alive = true; 
     threadLocal = ThreadLocal.withInitial(rethrowSupplier(() -> new Socket(hostname, port))); 


public class PooledSocketConnectionFactory extends BasePooledObjectFactory<SocketConnection> { 

    private static final Logger LOGGER = LoggerFactory.getLogger(PooledSocketConnectionFactory.class); 

    final private String hostname; 
    final private int port; 
    private SocketConnection connection = null; 

    private PooledSocketConnectionFactory(final String hostname, final int port) { 
     this.hostname = hostname; 
     this.port = port; 

    public SocketConnection create() throws Exception { 
     LOGGER.info("Creating Socket"); 
     return new SocketConnection(hostname, port); 

    public PooledObject wrap(SocketConnection socketConnection) { 
     return new DefaultPooledObject<>(socketConnection); 

    public void destroyObject(final PooledObject<SocketConnection> p) throws Exception { 
     final SocketConnection socketConnection = p.getObject(); 

    public boolean validateObject(final PooledObject<SocketConnection> p) { 
     final SocketConnection connection = p.getObject(); 
     final Socket socket = connection.get(); 
     return connection != null && connection.isAlive() && socket.isConnected(); 

    public void activateObject(final PooledObject<SocketConnection> p) throws Exception { 
     final SocketConnection socketConnection = p.getObject(); 

    public void passivateObject(final PooledObject<SocketConnection> p) throws Exception { 
     final SocketConnection socketConnection = p.getObject(); 


class SocketCallback implements Callable<Response> { 

    private SocketConnection socketConnection; 
    private Request request; 

    public SocketCallback() { 

    public SocketCallback(SocketConnection socketConnection, Request request) { 
     this.socketConnection = socketConnection; 
     this.request = request; 

    public Response call() throws Exception { 
     final Socket socket = socketConnection.get(); 
     Response response = Response.parseDelimitedFrom(socket.getInputStream()); 
     return response; 


public class SocketGateway { 

    private static final Logger LOGGER = LoggerFactory.getLogger(SocketGateway.class); 

    private GenericObjectPool<SocketConnection> socketPool; 
    private ExecutorService executorService; 

    public Response eligibility(Request request) throws DataException { 
     EligibilityResponse response = null; 
     SocketConnection connection = null; 
     if (request != null) { 
      try { 
       connection = socketPool.borrowObject(); 
       Future<Response> future = executorService.submit(new SocketCallback(connection, request)); 
       response = future.get(); 
      } catch (Exception ex) { 
       LOGGER.error("Gateway error {}"); 
       throw new DataException("Gateway error", ex); 
      } finally { 

     return response; 
