Skip to content

Commit

Permalink
Merge pull request eclipse-paho#120 from eclipse/develop
Browse files Browse the repository at this point in the history
Add Pre-defined-topic
  • Loading branch information
ty4tw authored Jul 16, 2018
2 parents e2abc87 + a658bd5 commit 70b2c8c
Show file tree
Hide file tree
Showing 38 changed files with 1,115 additions and 771 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,8 @@
*.pyc
/doc/MQTTSNClient/
/doc/MQTTSNPacket/
/rbmutex.key
/ringbuffer.key
/Release/
/Debug/
/core
15 changes: 8 additions & 7 deletions MQTTSNGateway/GatewayTester/samples/ClientPub/mainPub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@
*
* Supported functions.
*
* void PUBLISH ( const char* topicName, uint8_t* payload,
* uint16_t len, uint8_t qos, bool retain = false );
* void PUBLISH ( const char* topicName, uint8_t* payload, uint16_t len, uint8_t qos, bool retain = false );
*
* void PUBLISH ( uint16_t topicId, uint8_t* payload,
* uint16_t len, uint8_t qos, bool retain = false );
* void PUBLISH ( uint16_t topicId, uint8_t* payload, uint16_t len, uint8_t qos, bool retain = false );
*
* void SUBSCRIBE ( const char* topicName, TopicCallback onPublish,
* uint8_t qos );
* void SUBSCRIBE ( const char* topicName, TopicCallback onPublish, uint8_t qos );
*
* void UNSUBSCRIBE( const char* topicName );
* void SUBSCRIBE ( uint16_t topicId, TopicCallback onPublish, uint8_t qos );
*
* void UNSUBSCRIBE ( const char* topicName );
*
* void UNSUBSCRIBE ( uint16_t topicId );
*
* void DISCONNECT ( uint16_t sleepInSecs );
*
Expand Down
27 changes: 15 additions & 12 deletions MQTTSNGateway/GatewayTester/samples/ClientSub/mainSub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@
*
* Supported functions.
*
* void PUBLISH ( const char* topicName, uint8_t* payload,
* uint16_t len, uint8_t qos, bool retain = false );
* void PUBLISH ( const char* topicName, uint8_t* payload, uint16_t len, uint8_t qos, bool retain = false );
*
* void PUBLISH ( uint16_t topicId, uint8_t* payload,
* uint16_t len, uint8_t qos, bool retain = false );
* void PUBLISH ( uint16_t topicId, uint8_t* payload, uint16_t len, uint8_t qos, bool retain = false );
*
* void SUBSCRIBE ( const char* topicName, TopicCallback onPublish,
* uint8_t qos );
* void SUBSCRIBE ( const char* topicName, TopicCallback onPublish, uint8_t qos );
*
* void UNSUBSCRIBE( const char* topicName );
* void SUBSCRIBE ( uint16_t topicId, TopicCallback onPublish, uint8_t qos );
*
* void UNSUBSCRIBE ( const char* topicName );
*
* void UNSUBSCRIBE ( uint16_t topicId );
*
* void DISCONNECT ( uint16_t sleepInSecs );
*
Expand Down Expand Up @@ -112,11 +113,13 @@ int on_Topic03(uint8_t* pload, uint16_t ploadlen)
* A Link list of Callback routines and Topics
*------------------------------------------------------*/

SUBSCRIBE_LIST = {// e.g. SUB(topic, callback, QoS),
//SUB(topic1, on_Topic01, 1),
//SUB(topic4, on_Topic03, 1),
END_OF_SUBSCRIBE_LIST
};
SUBSCRIBE_LIST = {// e.g. SUB(TopicType, topicName, TopicId, callback, QoSx),

// SUB(MQTTSN_TOPIC_TYPE_NORMAL, topic1, 0, on_Topic01, QoS1),
// SUB(MQTTSN_TOPIC_TYPE_NORMAL, topic2, 0, on_Topic02, QoS1),
END_OF_SUBSCRIBE_LIST
};



/*------------------------------------------------------
Expand Down
43 changes: 25 additions & 18 deletions MQTTSNGateway/GatewayTester/samples/mainTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@
*
* Supported functions.
*
* void PUBLISH ( const char* topicName, uint8_t* payload,
* uint16_t len, uint8_t qos, bool retain = false );
* void PUBLISH ( const char* topicName, uint8_t* payload, uint16_t len, uint8_t qos, bool retain = false );
*
* void PUBLISH ( uint16_t topicId, uint8_t* payload,
* uint16_t len, uint8_t qos, bool retain = false );
* void PUBLISH ( uint16_t topicId, uint8_t* payload, uint16_t len, uint8_t qos, bool retain = false );
*
* void SUBSCRIBE ( const char* topicName, TopicCallback onPublish,
* uint8_t qos );
* void SUBSCRIBE ( const char* topicName, TopicCallback onPublish, uint8_t qos );
*
* void UNSUBSCRIBE( const char* topicName );
* void SUBSCRIBE ( uint16_t topicId, TopicCallback onPublish, uint8_t qos );
*
* void UNSUBSCRIBE ( const char* topicName );
*
* void UNSUBSCRIBE ( uint16_t topicId );
*
* void DISCONNECT ( uint16_t sleepInSecs );
*
Expand Down Expand Up @@ -76,6 +77,8 @@ MQTTSNCONF = {
const char* topic1 = "ty4tw/topic1";
const char* topic2 = "ty4tw/topic2";
const char* topic3 = "ty4tw/topic3";
const char* topic4 = "ty4tw/topic4";
const char* topic5 = "ty4tw/topic5";


/*------------------------------------------------------
Expand All @@ -100,7 +103,7 @@ int on_Topic02(uint8_t* pload, uint16_t ploadlen)

int on_Topic03(uint8_t* pload, uint16_t ploadlen)
{
DISPLAY("\n\nNew callback recv Topic2\n");
DISPLAY("\n\nNew callback recv Topic3\n");
pload[ploadlen-1]= 0; // set null terminator
DISPLAY("Payload -->%s <--\n\n",pload);
return 0;
Expand All @@ -110,47 +113,50 @@ int on_Topic03(uint8_t* pload, uint16_t ploadlen)
* A Link list of Callback routines and Topics
*------------------------------------------------------*/

SUBSCRIBE_LIST = {// e.g. SUB(topic, callback, QoS),
SUB(topic1, on_Topic01, 1),
SUBSCRIBE_LIST = {// e.g. SUB(TopicType, topicName, TopicId, callback, QoSx),
SUB(MQTTSN_TOPIC_TYPE_NORMAL, topic1, 0, on_Topic01, QoS1),
SUB(MQTTSN_TOPIC_TYPE_NORMAL, topic2, 0, on_Topic02, QoS1),
END_OF_SUBSCRIBE_LIST
};


/*------------------------------------------------------
* Test functions
*------------------------------------------------------*/
void subscribePredefTopic1(void)
{
SUBSCRIBE(1, on_Topic03, QoS1);
}

void publishTopic1(void)
{
char payload[300];
sprintf(payload, "publish \"ty4tw/Topic1\" \n");
uint8_t qos = 0;
PUBLISH(topic1,(uint8_t*)payload, strlen(payload), qos);
PUBLISH(topic1,(uint8_t*)payload, strlen(payload), QoS0);
}

void subscribeTopic2(void)
{
uint8_t qos = 1;
SUBSCRIBE(topic2, on_Topic02, qos);
SUBSCRIBE(10, on_Topic02, QoS1);
}

void publishTopic2(void)
{
char payload[300];
sprintf(payload, "publish \"ty4tw/topic2\" \n");
uint8_t qos = 0;
PUBLISH(topic2,(uint8_t*)payload, strlen(payload), qos);
PUBLISH(topic2,(uint8_t*)payload, strlen(payload), QoS1);
}



void unsubscribe(void)
{
UNSUBSCRIBE(topic2);
}

void subscribechangeCallback(void)
{
uint8_t qos = 1;
SUBSCRIBE(topic2, on_Topic03, qos);
SUBSCRIBE(topic2, on_Topic02, QoS1);
}

void test3(void)
Expand Down Expand Up @@ -178,6 +184,7 @@ void asleep(void)
*------------------------------------------------------*/

TEST_LIST = {// e.g. TEST( Label, Test),
TEST("Step0:Subscribe predef topic1", subscribePredefTopic1),
TEST("Step1:Publish topic1", publishTopic1),
TEST("Step2:Publish topic2", publishTopic2),
TEST("Step3:Subscribe topic2", subscribeTopic2),
Expand Down
9 changes: 7 additions & 2 deletions MQTTSNGateway/GatewayTester/src/LMqttsnClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,16 +183,21 @@ void LMqttsnClient::subscribe(const char* topicName, TopicCallback onPublish, ui
_subMgr.subscribe(topicName, onPublish, qos);
}

void LMqttsnClient::subscribe(uint16_t topicId, TopicCallback onPublish, uint8_t qos, uint8_t topicType)
void LMqttsnClient::subscribe(uint16_t topicId, TopicCallback onPublish, uint8_t qos)
{
_subMgr.subscribe(topicId, onPublish, qos, topicType);
_subMgr.subscribe(topicId, onPublish, qos);
}

void LMqttsnClient::unsubscribe(const char* topicName)
{
_subMgr.unsubscribe(topicName);
}

void LMqttsnClient::unsubscribe(const uint16_t topicId)
{
_subMgr.unsubscribe(topicId);
}

void LMqttsnClient::disconnect(uint16_t sleepInSecs)
{
_gwProxy.disconnect(sleepInSecs);
Expand Down
5 changes: 4 additions & 1 deletion MQTTSNGateway/GatewayTester/src/LMqttsnClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ namespace linuxAsyncClient {

struct OnPublishList
{
MQTTSN_topicTypes type;
const char* topic;
uint16_t id;
int (*pubCallback)(uint8_t* payload, uint16_t payloadlen);
uint8_t qos;
};
Expand All @@ -52,8 +54,9 @@ class LMqttsnClient{
void publish(uint16_t topicId, Payload* payload, uint8_t qos, bool retain = false);
void publish(uint16_t topicId, uint8_t* payload, uint16_t len, uint8_t qos, bool retain = false);
void subscribe(const char* topicName, TopicCallback onPublish, uint8_t qos);
void subscribe(uint16_t topicId, TopicCallback onPublish, uint8_t qos, uint8_t topicType);
void subscribe(uint16_t topicId, TopicCallback onPublish, uint8_t qos);
void unsubscribe(const char* topicName);
void unsubscribe(const uint16_t topicId);
void disconnect(uint16_t sleepInSecs);
void initialize(LUdpConfig netconf, LMqttsnConfig mqconf);
void run(void);
Expand Down
25 changes: 15 additions & 10 deletions MQTTSNGateway/GatewayTester/src/LMqttsnClientApp.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ struct LUdpConfig{
uint16_t uPortNo;
};


typedef enum
{
MQTTSN_TOPIC_TYPE_NORMAL,
MQTTSN_TOPIC_TYPE_PREDEFINED,
MQTTSN_TOPIC_TYPE_SHORT
} MQTTSN_topicTypes;


/*======================================
MACROs for Application
=======================================*/
Expand All @@ -93,7 +102,7 @@ struct LUdpConfig{
#define END_OF_TEST_LIST {0, 0, 0}
#define SUBSCRIBE_LIST OnPublishList theOnPublishList[]
#define SUB(...) {__VA_ARGS__}
#define END_OF_SUBSCRIBE_LIST {0,0,0}
#define END_OF_SUBSCRIBE_LIST {MQTTSN_TOPIC_TYPE_NORMAL,0,0,0, 0}
#define UDPCONF LUdpConfig theNetcon
#define MQTTSNCONF LMqttsnConfig theMqcon
#ifdef CLIENT_MODE
Expand Down Expand Up @@ -129,6 +138,9 @@ struct LUdpConfig{
/*======================================
MQTT-SN Defines
========================================*/
#define QoS0 0
#define QoS1 1
#define QoS2 2
#define MQTTSN_TYPE_ADVERTISE 0x00
#define MQTTSN_TYPE_SEARCHGW 0x01
#define MQTTSN_TYPE_GWINFO 0x02
Expand Down Expand Up @@ -157,10 +169,7 @@ struct LUdpConfig{
#define MQTTSN_TYPE_WILLMSGUPD 0x1C
#define MQTTSN_TYPE_WILLMSGRESP 0x1D

#define MQTTSN_TOPIC_TYPE_NORMAL 0x00
#define MQTTSN_TOPIC_TYPE_PREDEFINED 0x01
#define MQTTSN_TOPIC_TYPE_SHORT 0x02
#define MQTTSN_TOPIC_TYPE 0x03
#define MQTTSN_TOPIC_TYPE 0x03

#define MQTTSN_FLAG_DUP 0x80
#define MQTTSN_FLAG_QOS_0 0x0
Expand All @@ -179,14 +188,10 @@ struct LUdpConfig{
#define MQTTSN_RC_REJECTED_INVALID_TOPIC_ID 0x02
#define MQTTSN_RC_REJECTED_NOT_SUPPORTED 0x03

#define PREDEFINEDID_OTA_REQ (0x0ff0)
#define PREDEFINEDID_OTA_READY (0x0ff1)
#define PREDEFINEDID_OTA_NO_CLIENT (0x0ff2)

/*=================================
* Starting prompt
==================================*/
#define TESTER_VERSION " * Version: 1.0.0"
#define TESTER_VERSION " * Version: 2.0.0"

#define PAHO_COPYRIGHT0 " * MQTT-SN Gateway Tester"
#define PAHO_COPYRIGHT1 " * Part of Project Paho in Eclipse"
Expand Down
21 changes: 9 additions & 12 deletions MQTTSNGateway/GatewayTester/src/LPublishManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,25 +66,22 @@ void LPublishManager::publish(const char* topicName, Payload* payload, uint8_t q
publish(topicName, payload->getRowData(), payload->getLen(), qos, retain);
}


void LPublishManager::publish(const char* topicName, uint8_t* payload, uint16_t len, uint8_t qos, bool retain)
{
uint8_t topicType = MQTTSN_TOPIC_TYPE_NORMAL;
if ( strlen(topicName) < 2 )
{
topicType = MQTTSN_TOPIC_TYPE_SHORT;
}
publish(topicName, payload, len, qos, topicType, retain);
}

void LPublishManager::publish(const char* topicName, uint8_t* payload, uint16_t len, uint8_t qos, uint8_t topicType, bool retain)
{
uint16_t msgId = 0;
uint8_t topicType = MQTTSN_TOPIC_TYPE_SHORT;
if ( strlen(topicName) > 2 )
{
topicType = MQTTSN_TOPIC_TYPE_NORMAL;
}

if ( qos > 0 )
{
msgId = theClient->getGwProxy()->getNextMsgId();
}

PubElement* elm = add(topicName, 0, payload, len, qos, retain, msgId, topicType);

if (elm->status == TOPICID_IS_READY)
{
sendPublish(elm);
Expand Down Expand Up @@ -286,7 +283,7 @@ void LPublishManager::published(uint8_t* msg, uint16_t msglen)
}

_publishedFlg = NEG_TASK_INDEX;
theClient->getTopicTable()->execCallback(topicId, msg + 6, msglen - 6, msg[1] & 0x03);
theClient->getTopicTable()->execCallback(topicId, msg + 6, msglen - 6, (MQTTSN_topicTypes)(msg[1] & MQTTSN_TOPIC_TYPE));
_publishedFlg = SAVE_TASK_INDEX;
}

Expand Down
1 change: 0 additions & 1 deletion MQTTSNGateway/GatewayTester/src/LPublishManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ class LPublishManager{
~LPublishManager();
void publish(const char* topicName, Payload* payload, uint8_t qos, bool retain = false);
void publish(const char* topicName, uint8_t* payload, uint16_t len, uint8_t qos, bool retain = false);
void publish(const char* topicName, uint8_t* payload, uint16_t len, uint8_t qos, uint8_t topicType, bool retain = false);
void publish(uint16_t topicId, Payload* payload, uint8_t qos, bool retain = false);
void publish(uint16_t topicId, uint8_t* payload, uint16_t len, uint8_t qos, bool retain = false);
void responce(const uint8_t* msg, uint16_t msglen);
Expand Down
8 changes: 4 additions & 4 deletions MQTTSNGateway/GatewayTester/src/LRegisterManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,13 @@ void LRegisterManager::registerTopic(char* topicName)
void LRegisterManager::responceRegAck(uint16_t msgId, uint16_t topicId)
{
const char* topicName = getTopic(msgId);
MQTTSN_topicTypes type = MQTTSN_TOPIC_TYPE_NORMAL;
if (topicName)
{
uint8_t topicType = strlen((char*) topicName) > 2 ? MQTTSN_TOPIC_TYPE_NORMAL : MQTTSN_TOPIC_TYPE_SHORT;
theClient->getGwProxy()->getTopicTable()->setTopicId((char*) topicName, topicId, topicType); // Add Topic to TopicTable
theClient->getGwProxy()->getTopicTable()->setTopicId((char*) topicName, topicId, type); // Add Topic to TopicTable
RegQueElement* elm = getElement(msgId);
remove(elm);
theClient->getPublishManager()->sendSuspend((char*) topicName, topicId, topicType);
theClient->getPublishManager()->sendSuspend((char*) topicName, topicId, type);
}
}

Expand All @@ -213,7 +213,7 @@ void LRegisterManager::responceRegister(uint8_t* msg, uint16_t msglen)
{
TopicCallback callback = tp->getCallback();
void* topicName = calloc(strlen((char*) msg + 5) + 1, sizeof(char));
theClient->getGwProxy()->getTopicTable()->add((char*) topicName, 0, MQTTSN_TOPIC_TYPE_NORMAL, callback, 1);
theClient->getGwProxy()->getTopicTable()->add((char*) topicName, MQTTSN_TOPIC_TYPE_NORMAL, 0, callback, 1);
regack[6] = MQTTSN_RC_ACCEPTED;
}
else
Expand Down
Loading

0 comments on commit 70b2c8c

Please # to comment.