2013-09-25 34 views
1

我正在使用pthread + ACE來編寫假客戶端。線程被ACE_SOCK_Stream函數send()或recv()暫停了嗎?

該客戶端有3個線程,每個線程都可以通過使用ACE來無休止地發送和接收消息。然而,這些線程總是被send()或recv()函數暫停。也就是說,如果發送或接收有問題,線程將退出,不幸的是,我不知道錯誤是什麼,我也無法理解。代碼是:

struct thread_data { 
    int thread_id; 
    string ip; 
    uint32_t port; 
    uint32_t timeout; 
}; 
std::vector<struct thread_data> m_thread; 

void * test_fun1(void * threadid) 
{ 
    struct thread_data * tmp_thread_data = (struct thread_data *)threadid; 
    long tmp_threadid = (long)tmp_thread_data->thread_id; 
    string tmp_ip = tmp_thread_data->ip; 
    uint32_t tmp_port = tmp_thread_data->port; 
    uint32_t tmp_timeout = tmp_thread_data->timeout; 

    ACE_INET_Addr addr(tmp_port, tmp_ip.c_str()); 
    ACE_Time_Value timeout(0, tmp_timeout * 1000); 
    ACE_SOCK_Connector connector; 
    ACE_SOCK_Stream peer; 

    // connect 
    if(connector.connect(peer, addr, &timeout) != 0) 
      pthread_exit((void *) threadid); 

    // send msg 
    while (1) 
    { 
      ssize_t tmp_ret1 = peer.send("hello world", 12); 
      if (tmp_ret1 <= 0) 
        continue; 

      char tmp_buf[1024] = '\0'; 
      ssize_t tmp_ret2 = peer.recv(tmp_buf, 1024, &timeout); 
      if (tmp_ret2 <= 0) 
        continue; 
      else 
        fprintf(stderr, "recv:%s\n", tmp_buf); 
    } 

    // close 
    peer.close(); 
    pthread_exit((void *) threadid); 
} 

int main(int argc, char *argv[]) 
{ 
    std::vector<pthread_t> threads; 
    pthread_attr_t attr; 
    int rc; 
    int i = 0; 
    void * status; 

    pthread_attr_init(&attr); 
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); 

    // thread create 
    int tmp_num = 3; 
    for(i = 0; i < tmp_num; i++) 
    { 
      pthread_t tmp_thread_handler; 
      struct thread_data tmp_thread_info; 
      tmp_thread_info.thread_id = i; 
      tmp_thread_info.ip = "127.0.0.1"; 
      tmp_thread_info.port = 8001; 
      tmp_thread_info.timeout = 100; 
      rc = pthread_create(&tmp_thread_handler, NULL, test_fun1, (void *)&tmp_thread_info); 
      if (rc != 0) 
        return -1; 

      threads.push_back(tmp_thread_handler); 
      m_thread.push_back(tmp_thread_info); 
    } 

    // thread start 
    pthread_attr_destroy(&attr); 
    for(i = 0; i < tmp_num; i++) 
    { 

      rc = pthread_join(threads[i], &status); 
      if (rc != 0) 
        return -1; 
    } 

    pthread_exit(NULL); 
    return 0; 
} 

如果我想要無休止地發送和接收消息,該怎麼辦?任何幫助,將不勝感激!

+0

您不知道錯誤是什麼的唯一原因是您沒有通過errno或strerror或perror()打印它。這只是代碼中的一個錯誤,而不是你需要提出的關於SO的問題。當然,如果出現錯誤,您應該退出循環:連接幾乎肯定會中斷。如果你得到EOS(recv()返回零),它*被打破。 – EJP

+0

謝謝,我可以打印錯誤信息,並且我已經修復了它。 – Mark

回答

1

您在此代碼中存在重大的範圍生命期問題。您正在向您的線程發送一個數據塊,該線程在每次循環迭代時創建/銷燬。退出邊線:

// thread create 
int tmp_num = 3; 
for(i = 0; i < tmp_num; i++) 
{ 
    pthread_t tmp_thread_handler; 
    struct thread_data tmp_thread_info; 
    // --- snip ---- 
    rc = pthread_create(&tmp_thread_handler, NULL, test_fun1, (void *)&tmp_thread_info); 
    // --- snip ---- 
} 

是否將得到的塊複製到基礎信息向量中是不相關的。一旦循環徘徊,對象就會被銷燬,並使用std::string成員變量ip。它可能駐留在同一個內存空間(結構)中,但是字符串的重新分配幾乎肯定會是而不是。換句話說,您正在調用未定義的行爲

一個可能的解決方案是對結構使用智能指針,將它的get()成員傳遞給線程,並將所述相同的智能指針推入您的信息向量(爲智能指針重新調整)。我個人比較喜歡更簡單的方法,即在之前分配矢量,線程被啓動,從而修復它們的「內部」以用於地址使用。從那裏,因爲矢量持有它,你可以發送結構地址:

// thread create 
size_t tmp_num = 3; 
m_thread.resize(tmp_num); 
threads.resize(tmp_num); 
for(size_t i = 0; i < tmp_num; i++) 
{ 
    m_thread[i].thread_id = i; 
    m_thread[i].ip = "127.0.0.1"; 
    m_thread[i].port = 8001; 
    m_thread[i].timeout = 100; 

    rc = pthread_create(&threads[i], NULL, test_fun1, &m_thread[i]); 
    if (rc != 0) 
     return -1; 
} 

至於其他問題,你的線程PROC這並不編譯:

char tmp_buf[1024] = '\0'; 

但相比這是相當小的你有沒有定義的行爲。

+0

謝謝,這是我的錯。現在它可以工作。 – Mark