我有一個應用程序運行在otp-19.0,其中有一個erlang節點和兩個c節點;來自erlang節點的幾個進程不斷地向c節點服務器發送消息。 c節點接受單個連接,並在除ERL_TICK,ERL_ERROR等之外的單獨線程中處理所有收到的消息(ERL_MSG-> ERL_REG_SEND,ERL_SEND)。Erlang的c節點隨機崩潰與雙免費,內存損壞malloc messege
所以在c-node main()中,連接被接受,所有收到消息(erl_receive_msg),如果消息是ERL_TICK,ERL_ERROR等,則通過釋放內存來正確處理它們;否則,如果消息是ERL_MSG,則創建一個線程並將消息傳遞到處理它的線程,使用erl_send發送回覆;這種通過線程處理消息的方法被視爲由c節點執行的一些操作花費相當多的時間,這比tick時間多(是否有更好的方法來做到這一點?)。
現在兩個c節點中的一個隨機崩潰(24小時內10次,或多或少);兩個c節點遵循相同的體系結構,只有它們執行的操作是不同的。在大多數情況下,c節點只是在沒有任何錯誤原因的情況下關閉,而在2或3種情況下,由於malloc引起的雙重空閒或內存損壞打印機崩潰,回溯指向erl_receive_msg。另一點觀察到的是,在erl_free_compound之後的線程中,當我們使用erl_eterm_statistics(分配,釋放)來查看分配的塊時,大部分時間是0,但有時它是非零值,即9,18等
任何幫助表示讚賞。
格雷格
這裏是主要的()
#include "error_code.h"
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <limits.h>
#include <pthread.h>
#include <semaphore.h>
#include "erl_interface.h"
#include "ei.h"
#include "i2c_cnode.h"
#include "i2c_cnode_query_handler.h"
static int setup_connection_to_communicate_with_erlang_node(char *node_name_without_ip,
char *ip_address, char *magic_cookie, int socket_port_number);
static int initialize_socket_connection(int socket_port_number, int *socket_file_descriptor_return);
static int convert_string_to_integer(char *string_to_convert_into_integer, int *integer_value_return);
int main(int argc, char *argv[])
{
char *node_name_without_ip = NULL;
char *ip_address_of_card = NULL;
char *magic_cookie = NULL;
int socket_port_number = 0;
int socket_port_number_return = 0;
int status_of_get_socket_port_number = FAILURE;
int semaphore_count = 0;
int semaphore_count_return = 0;
int status_of_get_semaphore_count = FAILURE;
int i = 0; /* used as a loop variable for creation of semaphore */
int status_of_semaphore_creation = FAILURE;
int socket_file_descriptor = 0;
unsigned char erlang_message_buffer[ERLANG_MESSAGE_BUFFER_SIZE] = {0};
int return_status_of_erl_receive_msg = ERL_ERROR;
int return_status_of_erl_close_connection = 0;
ErlMessage *erlmessage = NULL;
ErlConnect erlang_node_connection_information = {
.ipadr = {0},
.nodename = {0}
};
pthread_t query_handler_thread_id;
void release_all_eterm_resources()
{
erl_free_compound(erlmessage->msg);
erl_free_compound(erlmessage->from);
erl_free_compound(erlmessage->to);
erl_eterm_release();
free(erlmessage);
}
if(NUMBER_OF_COMMAND_LINE_ARGS != argc)
{
printf("\nError: Arguments validation from " __FILE__ " %s():%d failed\n"
"Usage : <executable_name> <node_name_without_ip> <ip_address> <cookie> <socket_port_number> <semaphore_count>\n"
"For example : %s i2c_cnode 10.3.31.127 utl 4022 1 \n"
"Terminating C node\n\n", __func__, __LINE__, argv[0]);
exit(EXIT_FAILURE);
}
node_name_without_ip = argv[1];
ip_address_of_card = argv[2];
magic_cookie = argv[3];
status_of_get_socket_port_number = convert_string_to_integer(argv[4], &socket_port_number_return);
if(FAILURE == status_of_get_socket_port_number)
{
printf("\nError: Get Socket port number from "
__FILE__ " %s():%d failed\n", __func__, __LINE__);
exit(EXIT_FAILURE);
}
socket_port_number = socket_port_number_return;
status_of_get_semaphore_count = convert_string_to_integer(argv[5], &semaphore_count_return);
if(FAILURE == status_of_get_semaphore_count)
{
printf("\nError: Get Semaphore count from "
__FILE__ " %s():%d failed\n", __func__, __LINE__);
exit(EXIT_FAILURE);
}
semaphore_count = semaphore_count_return;
if(MAX_COUNT_OF_SEMAPHORE_FOR_I2C_ACCESS < semaphore_count)
{
printf("\nError: Too may Semaphores requested from "
__FILE__ " %s():%d failed\n", __func__, __LINE__);
exit(EXIT_FAILURE);
}
socket_file_descriptor = setup_connection_to_communicate_with_erlang_node(
node_name_without_ip, ip_address_of_card, magic_cookie, socket_port_number);
for(i = 0; i < MAX_COUNT_OF_SEMAPHORE_FOR_I2C_ACCESS; i++)
{
status_of_semaphore_creation = sem_init(&i2c_access_semaphore[i], 0, 1);
if(FAILURE == status_of_semaphore_creation)
{
printf("\nError: Create POSIX semaphore from "
__FILE__ " %s():%d failed\n", __func__, __LINE__);
exit(EXIT_FAILURE);
}
}
while(1)
{
erlang_node_file_descriptor = erl_accept(socket_file_descriptor, &erlang_node_connection_information);
if(ERL_ERROR == erlang_node_file_descriptor)
{
printf("\nError: Accepting connection from peer node from "
__FILE__ " %s():%d failed: %s\n", __func__, __LINE__, strerror(erl_errno));
continue;
}
while(1)
{
erlmessage = malloc(sizeof(ErlMessage));
if(NULL == erlmessage)
{
printf("\nError: Get heap space for ErlMessage from "
__FILE__ " %s():%d failed\n", __func__, __LINE__);
exit(EXIT_FAILURE);
}
return_status_of_erl_receive_msg = erl_receive_msg(erlang_node_file_descriptor,
erlang_message_buffer,
ERLANG_MESSAGE_BUFFER_SIZE,
erlmessage);
if(ERL_TICK == return_status_of_erl_receive_msg)
{
release_all_eterm_resources();
continue;
}
else if(ERL_ERROR == return_status_of_erl_receive_msg)
{
printf("\nError: Message receiving from the Erlang node %s, from "
__FILE__ " %s():%d failed: %s\n",
erlang_node_connection_information.nodename,
__func__, __LINE__, strerror(erl_errno));
release_all_eterm_resources();
if(EIO == erl_errno)
{
return_status_of_erl_close_connection = erl_close_connection(
erlang_node_file_descriptor);
if(SUCCESS != return_status_of_erl_close_connection)
{
printf("\nError: Closing Erlang file descriptor from "
__FILE__ " %s():%d failed: %s\n",
__func__, __LINE__, strerror(erl_errno));
}
break;
}
else if((ENOMEM == erl_errno) || (EMSGSIZE == erl_errno))
{
exit(EXIT_FAILURE);
}
}
else
{
switch(erlmessage->type)
{
case ERL_REG_SEND:
case ERL_SEND:
pthread_create(&query_handler_thread_id, NULL, query_handler, erlmessage);
break;
case ERL_EXIT:
printf("\nError: Message receive from the Erlang node %s, from "
__FILE__ " %s():%d failed: %s\n",
erlang_node_connection_information.nodename,
__func__, __LINE__, strerror(erl_errno));
printf("Received a ERL_EXIT with message: ");
erl_print_term(stdout, erlmessage->msg);
printf("\n");
return_status_of_erl_close_connection = erl_close_connection(erlang_node_file_descriptor);
if(SUCCESS != return_status_of_erl_close_connection)
{
printf("\nError: Close Erlang file descriptor from "
__FILE__ " %s():%d failed: %s\n",
__func__, __LINE__, strerror(erl_errno));
}
release_all_eterm_resources();
break;
case ERL_LINK:
printf("\nError : Link Message received from Erlang node %s in "
__FILE__ " %s():%d failed: %s\n",
erlang_node_connection_information.nodename,
__func__, __LINE__, strerror(erl_errno));
release_all_eterm_resources();
break;
case ERL_UNLINK:
printf("\nError : Unlink Message received from Erlang node %s in "
__FILE__ " %s():%d failed: %s\n",
erlang_node_connection_information.nodename,
__func__, __LINE__, strerror(erl_errno));
release_all_eterm_resources();
break;
default:
printf("\nError : Unknown Message type received from Erlang node %s in "
__FILE__ " %s():%d failed: %s\n",
erlang_node_connection_information.nodename,
__func__, __LINE__, strerror(erl_errno));
release_all_eterm_resources();
}
}
}
}
return EXIT_FAILURE;
}
static int setup_connection_to_communicate_with_erlang_node(char *node_name_without_ip,
char *ip_address, char *magic_cookie, int socket_port_number)
{
char long_node_name_with_ip[MAX_BYTES_OF_FULL_NODE_NAME] = {0};
char hostname[HOST_NAME_MAX] = {0};
struct in_addr ipv4_address_as_binary_data = {0};
int socket_file_descriptor = 0;
int return_status_of_gethostname = 0;
int return_status_of_erl_connect_xinit = 0;
int return_status_of_erl_publish = 0;
int status_of_get_socket_file_descriptor = FAILURE;
erl_init(NULL, 0);
return_status_of_gethostname = gethostname(hostname, HOST_NAME_MAX);
if(FAILURE == return_status_of_gethostname)
{
printf("\nError: Get hostname from " __FILE__ " %s():%d failed: %s\n",
__func__, __LINE__, strerror(errno));
exit(EXIT_FAILURE);
}
sprintf(long_node_name_with_ip, "%[email protected]%s", node_name_without_ip, ip_address);
ipv4_address_as_binary_data.s_addr = inet_addr(ip_address);
if(BROADCAST_IP_ADDRESS == ipv4_address_as_binary_data.s_addr)
{
printf("\nError: Convert of IPv4 dotted notation to binary data from "
__FILE__ " %s():%d failed\n", __func__, __LINE__);
exit(EXIT_FAILURE);
}
return_status_of_erl_connect_xinit = erl_connect_xinit(hostname,node_name_without_ip,
long_node_name_with_ip, &ipv4_address_as_binary_data, magic_cookie, 0);
if(return_status_of_erl_connect_xinit <= FAILURE)
{
printf("\nError: Initialize C Node connection from " __FILE__ " %s():%d failed: %s\n",
__func__, __LINE__, strerror(erl_errno));
exit(EXIT_FAILURE);
}
status_of_get_socket_file_descriptor = initialize_socket_connection(socket_port_number,
&socket_file_descriptor);
if(FAILURE == status_of_get_socket_file_descriptor)
{
printf("\nError: Initialize socket connection from " __FILE__ " %s():%d failed: %s\n",
__func__, __LINE__, strerror(errno));
exit(EXIT_FAILURE);
}
return_status_of_erl_publish = erl_publish(socket_port_number);
if(FAILURE == return_status_of_erl_publish)
{
printf("\nError: Publish C node to EPMD from " __FILE__ " %s():%d failed: %s\n",
__func__, __LINE__, strerror(errno));
exit(EXIT_FAILURE);
}
printf("\nC Node name: %s\n", erl_thisnodename());
return socket_file_descriptor;
}
static int initialize_socket_connection(int socket_port_number, int *socket_file_descriptor_return)
{
int socket_file_descriptor = 0;
int boolean_option = 0;
int setsockopt_return_status = 0;
int bind_return_status = 0;
int listen_return_status = 0;
struct sockaddr_in socket_address = {0};
boolean_option = BOOLEAN_OPTION_ENABLE;
/**
* A socket is an endpoint for communication. socket() creates a socket according to the
* IPV4 protocol(AF_INET) which provides a sequenced, reliable, two-way, connection-based
* byte streams(SOCK_STREAM) for communication between two entities. After creation, the
* socket() returns a descriptor with respect to which further operations can be performed
*/
socket_file_descriptor = socket(AF_INET, SOCK_STREAM, DEFAULT_PROTOCOL);
if (FAILURE == socket_file_descriptor)
{
printf("\nError : Creation of socket from %s() failed with error %s\n", __func__, strerror(errno));
return FAILURE;
}
setsockopt_return_status = setsockopt(socket_file_descriptor, SOL_SOCKET, SO_REUSEADDR, &boolean_option, sizeof(int));
if (FAILURE == setsockopt_return_status)
{
printf("\nError : Setting of options to socket from %s() failed with %s\n", __func__, strerror(errno));
return FAILURE;
}
socket_address.sin_family = AF_INET;
socket_address.sin_port = htons(socket_port_number);
socket_address.sin_addr.s_addr = htonl(INADDR_ANY);
bind_return_status = bind(socket_file_descriptor, (struct sockaddr*)&socket_address, sizeof(struct sockaddr_in));
if (FAILURE == bind_return_status)
{
printf("\nError : Binding to socket from %s() failed with error %s\n", __func__, strerror(errno));
return FAILURE;
}
listen_return_status = listen(socket_file_descriptor, MAX_LENGTH_PENDING_CONNECTIONS_QUEUE);
if (FAILURE == listen_return_status)
{
printf("\nError : Listening from socket from %s() failed with %s\n", __func__, strerror(errno));
return FAILURE;
}
*socket_file_descriptor_return = socket_file_descriptor;
return SUCCESS;
}
static int convert_string_to_integer(char *string_to_convert_into_integer, int *integer_value_return)
{
char *end_pointer = NULL;
int integer_number_in_string = 0;
integer_number_in_string = strtol(string_to_convert_into_integer, &end_pointer, 10);
if(errno == ERANGE)
{
printf("\nError: Convert String to integer number from "
__FILE__ " %s():%d failed: %s\n", __func__, __LINE__, strerror(errno));
return FAILURE;
}
if(end_pointer == string_to_convert_into_integer)
{
printf("\nError: Search integer number in string from "
__FILE__ " %s():%d failed\n", __func__, __LINE__);
return FAILURE;
}
*integer_value_return = integer_number_in_string;
return SUCCESS;
}
這裏是線程函數
#include "error_code.h"
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#include <math.h>
#include "erl_interface.h"
#include "ei.h"
struct erlmessage_and_reply_to_erlang_node
{
ErlMessage *erlmessage;
ETERM **reply_to_erlang_node;
};
static void release_all_eterm_resources(void *erlmessage_and_reply_to_erlang_node);
static void semaphore_post_cleanup(void *semaphore_id);
void* query_handler(void *thread_arguments)
{
ETERM *query_msg_from_erlang_node = NULL;
ETERM *message_id = NULL;
ETERM *hardware_connection_info_from_erlang_node = NULL;
ETERM *set_value_from_erlang_node = NULL;
ETERM *set_or_get_data_from_erlang_node = NULL;
ETERM *reply_to_erlang_node = NULL;
ETERM *i2c_bus_number_eterm = NULL;
ETERM *i2c_device_address_eterm = NULL;
int status_of_send_erl_message_to_erlang = 0;
int function_id = 0;
unsigned char i2c_bus_number = 0;
unsigned char i2c_device_address = 0;
int i2c_function_return_status = {0};
unsigned char data_to_be_written[400] = {0};
unsigned int data_to_be_read[60] = {0};
double data_as_double = 0;
short int data_as_short_int = 0;
unsigned char data_value_as_unsigned_char = 0;
double data_value_return_as_double = 0;
short int data_value_return_as_short_int = 0;
unsigned char data_value_return_as_char = 0;
long long int data_value_return_as_long_long_int= 0;
char eterm_in_form_of_string[15];
unsigned char gpio_pin_number = 0;
unsigned char gpio_port_number = 0;
unsigned char channel_number = 0;
unsigned int start_memory_address = 0;
unsigned int number_of_bytes_requested_to_read = 0;
unsigned int number_of_bytes_requested_to_write = 0;
ErlMessage *erlmessage = (ErlMessage *) thread_arguments;
struct erlmessage_and_reply_to_erlang_node clean_up_eterms = {
.erlmessage = erlmessage,
.reply_to_erlang_node = &reply_to_erlang_node
};
pthread_cleanup_push(release_all_eterm_resources, (void *) &clean_up_eterms);
pthread_detach(pthread_self());
query_msg_from_erlang_node = erlmessage->msg;
function_id = ERL_INT_VALUE(erl_element(1, query_msg_from_erlang_node));
message_id = erl_element(2, query_msg_from_erlang_node);
hardware_connection_info_from_erlang_node = erl_element(3, query_msg_from_erlang_node);
i2c_bus_number_eterm = erl_element(2, hardware_connection_info_from_erlang_node);
if(NULL != i2c_bus_number_eterm)
{
i2c_bus_number = (unsigned char)ERL_INT_UVALUE(i2c_bus_number_eterm);
i2c_device_address_eterm = erl_element(3, hardware_connection_info_from_erlang_node);
if(NULL != i2c_device_address_eterm)
{
i2c_device_address = (unsigned char)ERL_INT_UVALUE(i2c_device_address_eterm);
switch(function_id)
{
case CHECK_IF_XYZ_IS_PRESENT :
gpio_pin_number = (unsigned char) ERL_INT_UVALUE(erl_element(4, hardware_connection_info_from_erlang_node));
sem_wait(&i2c_access_semaphore[0]);
pthread_cleanup_push(semaphore_post_cleanup, (void *) &i2c_access_semaphore[0]);
i2c_function_return_status =
check_if_xyz_is_present(i2c_bus_number,
i2c_device_address,
gpio_pin_number,
&data_value_return_as_char);
pthread_cleanup_pop(1);
if(SUCCESS == i2c_function_return_status)
{
reply_to_erlang_node = erl_format("{ success, ~w, ~w }",
erl_mk_int(data_value_return_as_char), message_id);
}
else
{
reply_to_erlang_node = erl_format("{ error, ~i, ~w }",
i2c_function_return_status, message_id);
}
break;
case CHECK_IF_ON :
gpio_pin_number = (unsigned char) ERL_INT_UVALUE(erl_element(4, hardware_connection_info_from_erlang_node));
sem_wait(&i2c_access_semaphore[0]);
pthread_cleanup_push(semaphore_post_cleanup, (void *) &i2c_access_semaphore[0]);
i2c_function_return_status =
check_if_on(i2c_bus_number,
i2c_device_address,
gpio_pin_number,
&data_value_return_as_char);
pthread_cleanup_pop(1);
if(SUCCESS == i2c_function_return_status)
{
reply_to_erlang_node = erl_format("{ success, ~w, ~w }",
erl_mk_int(data_value_return_as_char), message_id);
}
else
{
reply_to_erlang_node = erl_format("{ error, ~i, ~w }",
i2c_function_return_status, message_id);
}
break;
//There are several more cases...
default :
printf("\nError: Search 'function_id': %d from "
__FILE__ " %s():%d failed\n", function_id, __func__, __LINE__);
reply_to_erlang_node = erl_format("{ error, command_not_found, ~w }", message_id);
}
}
else
{
reply_to_erlang_node = erl_format("{ error, i2c_device_address_not_received, ~w }", message_id);
}
}
else
{
reply_to_erlang_node = erl_format("{ error, i2c_bus_number_not_received, ~w }", message_id);
}
status_of_send_erl_message_to_erlang = erl_send(
erlang_node_file_descriptor,
erlmessage->from,
reply_to_erlang_node
);
if(ERL_SEND_FAILURE == status_of_send_erl_message_to_erlang)
{
printf("\nError: Sending message to Erlang node from "
__FILE__ " %s():%d failed\n", __func__, __LINE__);
if(ENOMEM == erl_errno)
{
exit(EXIT_FAILURE);
}
}
pthread_cleanup_pop(1);
return NULL;
}
static void release_all_eterm_resources(void *argument)
{
ErlMessage *erlmessage = ((struct erlmessage_and_reply_to_erlang_node *) argument)->erlmessage;
ETERM *reply_to_erlang_node = *(((struct erlmessage_and_reply_to_erlang_node *) argument)->reply_to_erlang_node);
erl_free_compound(erlmessage->msg);
erl_free_compound(erlmessage->from);
erl_free_compound(erlmessage->to);
erl_free_compound(reply_to_erlang_node);
erl_eterm_release();
free(erlmessage);
}
static void semaphore_post_cleanup(void *semaphore_id)
{
sem_post((sem_t *) semaphore_id);
}
Greg,沒有一些代碼,它很難調試......但我會開始雙重檢查處理錯誤的'if'-else語句。這可能是錯誤被釋放,但該消息仍然被轉發到任務線程(即使內存現在無效)。另外,請確保你使用的是erl_free_term函數而不是free,因爲有些內存可能會被重新用作內部緩衝區的一部分。 – Myst
感謝Myst,添加了代碼 – Greg