0
我寫了一個mqtt的客戶端代碼,它應該連接(現在只需連接到服務器)到活動的mq服務器。以下是代碼:MQTT連接activeMQ
#include "stdafx.h"
#include<iostream>
#include<winsock2.h>
#include "mqtt.h"
#pragma comment(lib,"ws2_32.lib") //Winsock Library
#include <stdlib.h>
#include <winsock2.h>
void mqtt_init(mqtt_broker_handle_t* broker, const char* clientid) {
// Connection options
broker->alive = 300; // 300 seconds = 5 minutes
broker->seq = 1; // Sequence for message identifiers
// Client options
memset(broker->clientid, 0, sizeof(broker->clientid));
memset(broker->username, 0, sizeof(broker->username));
memset(broker->password, 0, sizeof(broker->password));
if(clientid) {
strncpy_s(broker->clientid, clientid, sizeof(broker->clientid));
} else {
strcpy_s(broker->clientid, "emqtt");
}
// Will topic
broker->clean_session = 1;
}
void mqtt_init_auth(mqtt_broker_handle_t* broker, const char* username, const char* password)
{
if(username && username[0] != '\0')
strncpy_s(broker->username, username, sizeof(broker->username)-1);
if(password && password[0] != '\0')
strncpy_s(broker->password, password, sizeof(broker->password)-1);
}
using namespace std;
int main(int argc , char *argv[])
{
WSADATA wsa;
SOCKET s;
struct sockaddr_in server;
char message[1000] , server_reply[2000];
int recv_size;
int packet_length;
uint16_t msg_id, msg_id_rcv;
mqtt_broker_handle_t broker;
mqtt_init(&broker, "localhost");
mqtt_init_auth(&broker, "cid", "campeador");
printf("\nInitialising Winsock...");
if (WSAStartup(MAKEWORD(2,2),&wsa) != 0) {
cout<<"Failed. Error Code : "<<WSAGetLastError();
return 1;
}
cout<<"Initialised.\n";
//Create a socket
if((s = socket(AF_INET , SOCK_STREAM , 0)) == INVALID_SOCKET)
{
cout<<"Could not create socket : " << WSAGetLastError();
}
cout<<"Socket created.\n";
server.sin_addr.s_addr = inet_addr("127.0.0.1");
server.sin_family = AF_INET;
server.sin_port = htons(1993);
//Connect to remote server
if (connect(s , (struct sockaddr *)&server , sizeof(server)) < 0)
{
puts("connect error");
return 1;
}
puts("Connected");
mqtt_connect(&broker);
recv_size = recv(s, server_reply, 2000, 0);
server_reply[recv_size] = '\0';
while (server_reply != "end"){
cout<<server_reply<<endl;
recv_size = recv(s, server_reply, 2000, 0);
server_reply[recv_size] = '\0';
}
closesocket(s);
WSACleanup();
return 0;
}
而且還有以下是mqtt.h
#ifndef __LIBEMQTT_H__
#define __LIBEMQTT_H__
#endif
#include <stdint.h>
#include <string.h>
#ifndef MQTT_CONF_USERNAME_LENGTH
#define MQTT_CONF_USERNAME_LENGTH 13 // Recommended by MQTT Specification (12 + '\0')
#endif
#ifndef MQTT_CONF_PASSWORD_LENGTH
#define MQTT_CONF_PASSWORD_LENGTH 13 // Recommended by MQTT Specification (12 + '\0')
#endif
#define MQTT_MSG_CONNECT 1<<4
#define MQTT_MSG_CONNACK 2<<4
#define MQTT_MSG_PUBLISH 3<<4
#define MQTT_MSG_PUBACK 4<<4
#define MQTT_MSG_PUBREC 5<<4
#define MQTT_MSG_PUBREL 6<<4
#define MQTT_MSG_PUBCOMP 7<<4
#define MQTT_MSG_SUBSCRIBE 8<<4
#define MQTT_MSG_SUBACK 9<<4
#define MQTT_MSG_UNSUBSCRIBE 10<<4
#define MQTT_MSG_UNSUBACK 11<<4
#define MQTT_MSG_PINGREQ 12<<4
#define MQTT_MSG_PINGRESP 13<<4
#define MQTT_MSG_DISCONNECT 14<<4
#define MQTT_DUP_FLAG 1<<3
#define MQTT_QOS0_FLAG 0<<1
#define MQTT_QOS1_FLAG 1<<1
#define MQTT_QOS2_FLAG 2<<1
#define MQTT_RETAIN_FLAG 1
#define MQTT_CLEAN_SESSION 1<<1
#define MQTT_WILL_FLAG 1<<2
#define MQTT_WILL_RETAIN 1<<5
#define MQTT_USERNAME_FLAG 1<<7
#define MQTT_PASSWORD_FLAG 1<<6
typedef struct {
void* socket_info;
int (*send)(void* socket_info, const void* buf, unsigned int count);
// Connection info
char clientid[50];
// Auth fields
char username[MQTT_CONF_USERNAME_LENGTH];
char password[MQTT_CONF_PASSWORD_LENGTH];
// Will topic
uint8_t will_retain;
uint8_t will_qos;
uint8_t clean_session;
// Management fields
uint16_t seq;
uint16_t alive;
} mqtt_broker_handle_t;
int mqtt_connect(mqtt_broker_handle_t* broker)
{
uint8_t flags = 0x00;
uint16_t clientidlen = strlen(broker->clientid);
uint16_t usernamelen = strlen(broker->username);
uint16_t passwordlen = strlen(broker->password);
uint16_t payload_len = clientidlen + 2;
// Preparing the flags
if(usernamelen) {
payload_len += usernamelen + 2;
flags |= MQTT_USERNAME_FLAG;
}
if(passwordlen) {
payload_len += passwordlen + 2;
flags |= MQTT_PASSWORD_FLAG;
}
if(broker->clean_session) {
flags |= MQTT_CLEAN_SESSION;
}
// Variable header
uint8_t var_header[] = {
0x00,0x06,0x4d,0x51,0x49,0x73,0x64,0x70, // Protocol name: MQTT
0x03, // Protocol version
flags, // Connect flags
broker->alive>>8, broker->alive&0xFF, // Keep alive
};
// Fixed header
uint8_t fixedHeaderSize = 2; // Default size = one byte Message Type + one byte Remaining Length
uint8_t remainLen = sizeof(var_header)+payload_len;
if (remainLen > 127) {
fixedHeaderSize++; // add an additional byte for Remaining Length
}
uint8_t fixed_header[2];
// Message Type
fixed_header[0] = MQTT_MSG_CONNECT;
// Remaining Length
if (remainLen <= 127) {
fixed_header[1] = remainLen;
} else {
// first byte is remainder (mod) of 128, then set the MSB to indicate more bytes
fixed_header[1] = remainLen % 128;
fixed_header[1] = fixed_header[1] | 0x80;
// second byte is number of 128s
fixed_header[2] = remainLen/128;
}
uint16_t offset = 0;
uint8_t packet[sizeof(fixed_header)+sizeof(var_header)+ sizeof(payload_len)];
memset(packet, 0, sizeof(packet));
memcpy(packet, fixed_header, sizeof(fixed_header));
offset += sizeof(fixed_header);
memcpy(packet+offset, var_header, sizeof(var_header));
offset += sizeof(var_header);
// Client ID - UTF encoded
packet[offset++] = clientidlen>>8;
packet[offset++] = clientidlen&0xFF;
memcpy(packet+offset, broker->clientid, clientidlen);
offset += clientidlen;
if(usernamelen) {
// Username - UTF encoded
packet[offset++] = usernamelen>>8;
packet[offset++] = usernamelen&0xFF;
memcpy(packet+offset, broker->username, usernamelen);
offset += usernamelen;
}
if(passwordlen) {
// Password - UTF encoded
packet[offset++] = passwordlen>>8;
packet[offset++] = passwordlen&0xFF;
memcpy(packet+offset, broker->password, passwordlen);
offset += passwordlen;
}
return 1;
}
代碼當我調試的代碼有沒有錯誤,但是當我運行代碼,我得到錯誤,指出「變量'數據包'周圍的堆棧已損壞」。我對套接字編程和mqtt一般都很陌生,所以任何幫助都會很棒。