/**
 * @file tuya_svc_mqtt_direct.h
 * @brief TUYA mqtt direct service
 * @version 0.1
 * @date 2021-04-06
 *
 * @copyright Copyright 2021 Tuya Inc. All Rights Reserved.
 */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "uni_log.h"
#include "tuya_error_code.h"
#include "tal_security.h"
#include "tal_semaphore.h"
#include "tal_workq_service.h"
#include "tuya_svc_mqtt_client.h"

typedef struct {
    BOOL_T inited;
    BOOL_T connected;
    MQTT_HANDLE mqtt_handler;
} custom_connect_t;

typedef struct {
    OPERATE_RET rt;
    SEM_HANDLE sem_handle;
} custom_mqtt_sync_t;

typedef struct {
    CHAR_T * client_id;
    CHAR_T * user_name;
    CHAR_T * passwd;
} custom_signature_t;

/**
 * @brief custom mqtt client
 * 
 */
STATIC custom_connect_t s_custom_con = {0};

/**
 * @brief quit custom mqtt client
 * 
 * @param[in/out] data: should be NULL
 * @return VOID
 */
STATIC VOID __mqtt_custom_quit(VOID *data)
{
    PR_DEBUG("quit custom mqtt");

    if (s_custom_con.inited) {
        s_custom_con.inited = FALSE;
        s_custom_con.connected = FALSE;

        tuya_svc_mqtt_client_destroy(s_custom_con.mqtt_handler);
    }
}

/**
 * @brief custom mqtt client receive data
 * 
 * @param[in] topic : received topic 
 * @param[in] data : received data
 * @param[in] len : received data len
 * @return VOID 
 */
STATIC VOID __mqtt_custom_recv(CHAR_T *topic, IN BYTE_T *data, IN UINT_T len)
{
    // TBD...
    // custom receive DATA process
}

/**
 * @brief custom mqtt client connected callback
 * 
 * @return VOID 
 */
STATIC VOID __mqtt_custom_conn_cb(VOID)
{
    PR_DEBUG("custom mqtt connected");
    s_custom_con.connected = TRUE;
}

/**
 * @brief custom mqtt client disconnected callback
 * 
 * @return VOID 
 */
STATIC VOID __mqtt_custom_disconn_cb(VOID)
{
    PR_DEBUG("custom mqtt disconnected");
    s_custom_con.connected = FALSE;
}

/**
 * @brief custom mqtt client connect denyed callback
 * 
 * @param[in] deny_times : time denyed
 * @return VOID 
 */
STATIC VOID __mqtt_custom_conn_deny_cb(IN BYTE_T deny_times)
{
    PR_DEBUG("custom mqtt connect deny:%d", deny_times);
    s_custom_con.connected = FALSE;
}

/**
 * @brief custom mqtt client init
 * 
 * @param[in] domain : the URL of the custom mqtt server
 * @param[in] port : the port numbed of the custom mqtt server
 * @return OPRT_OK on success, others on failed 
 */
STATIC OPERATE_RET __mqtt_custom_init(CHAR_T *domain, UINT_T port, CHAR_T* topic, custom_signature_t *signature)
{
    OPERATE_RET rt = OPRT_OK;
    s_custom_con.connected = FALSE;
    PR_DEBUG("custom connect MQTT URL:%s Port:%d", domain, port);
    PR_DEBUG("custom MQTT client_id:%s", signature->client_id);   
    PR_DEBUG("custom MQTT user_name:%s", signature->user_name);
    PR_DEBUG("custom MQTT passwd:%s", signature->passwd);
    PR_DEBUG("custom MQTT subcribe topic:%s", topic);

    mqtt_ctx_t mqtt_ctx = {0};
    mqtt_ctx.broker_domain = domain;
    mqtt_ctx.broker_port = port;
    mqtt_ctx.cb_data_recv = __mqtt_custom_recv;
    mqtt_ctx.client_id = signature->client_id;
    mqtt_ctx.user_name = signature->user_name;
    mqtt_ctx.passwd = signature->passwd;
    mqtt_ctx.subcribe_topic = topic;
    mqtt_ctx.heartbeat = 0;
    TUYA_CALL_ERR_GOTO(tuya_svc_mqtt_client_create(&mqtt_ctx, &s_custom_con.mqtt_handler), ERR_EXIT);
    TUYA_CALL_ERR_GOTO(tuya_svc_mqtt_client_register_cb(s_custom_con.mqtt_handler, __mqtt_custom_conn_cb, __mqtt_custom_disconn_cb, __mqtt_custom_conn_deny_cb), ERR_EXIT);
    TUYA_CALL_ERR_GOTO(tuya_svc_mqtt_client_start(s_custom_con.mqtt_handler), ERR_EXIT);

    return rt;
ERR_EXIT:
    PR_ERR("custom mqtt init failed, rt %d!", rt);
    tuya_svc_mqtt_client_destroy(s_custom_con.mqtt_handler);
    return rt;
}

/**
 * @brief custom mqtt client start
 * 
 * @param[in] domain : the URL of the custom mqtt server
 * @param[in] port : the port numbed of the custom mqtt server
 * @return OPRT_OK on success, others on failed 
 */
OPERATE_RET mqtt_custom_start(CHAR_T *domain, UINT_T port, CHAR_T* topic, custom_signature_t *signature)
{
    TUYA_CHECK_NULL_RETURN(domain, OPRT_INVALID_PARM);
    TUYA_CHECK_NULL_RETURN(topic, OPRT_INVALID_PARM);
    TUYA_CHECK_NULL_RETURN(signature, OPRT_INVALID_PARM);

    // psk mode cannot support custom mqtt client
    if (TUYA_SECURITY_LEVEL == 0) {
        return OPRT_NOT_SUPPORTED;
    }

    if (s_custom_con.inited) {
        return OPRT_OK;
    }
    
    OPERATE_RET rt = OPRT_OK;

    PR_DEBUG("start custom mqtt");
    s_custom_con.inited = TRUE;
    TUYA_CALL_ERR_GOTO(__mqtt_custom_init(domain, port, topic, signature), ERR_EXIT);
    return OPRT_OK;

ERR_EXIT:
    s_custom_con.inited = FALSE;
    return rt;
}

/**
 * @brief 
 * 
 * @return OPRT_OK on success, others on failed 
 */
OPERATE_RET mqtt_custom_stop(VOID)
{
    if (!s_custom_con.inited) {
        return OPRT_OK;
    }

    PR_DEBUG("stop custom mqtt");
    return tal_workq_schedule(WORKQ_SYSTEM, __mqtt_custom_quit, NULL);
}

/**
 * @brief publish data to topic async
 * 
 * @param[in] topic : dest topic
 * @param[in] data : publish data
 * @param[in] len : publish data len
 * @param[in] qos : publish qos
 * @param[in] timeout : timeout in seconds, works when qos > 0
 * 
 * @note: if qos > 0, retransmit should be done in cb_pub_inform
 * 
 * @return OPERATE_RET 
 */
OPERATE_RET mqtt_custom_publish_async(CHAR_T *topic, BYTE_T* data, UINT_T len, UINT_T qos, UINT_T timeout, CB_MQTT_PUB_INFORM cb_pub_inform)
{
    OPERATE_RET rt = OPRT_OK;
    mqtt_msg_t msg = {0};
    msg.publish_topic = topic;
    msg.cb_pub_inform = cb_pub_inform;
    msg.data = data;
    msg.len = len;
    msg.qos = qos;
    msg.timeout = timeout;    
    TUYA_CALL_ERR_RETURN(tuya_svc_mqtt_client_publish(s_custom_con.mqtt_handler, &msg));

    return OPRT_OK;
}

/**
 * @brief 
 * 
 * @param[in/out] rt 
 * @param[in/out] prv_data 
 * @return STATIC 
 */
STATIC VOID __custom_mqtt_sync_cb(OPERATE_RET rt, VOID *prv_data)
{
    custom_mqtt_sync_t *sync = (custom_mqtt_sync_t *)prv_data;
    PR_DEBUG("custom sync cb rt: %d", rt);
    sync->rt = rt;
    tal_semaphore_post(sync->sem_handle);
}

/**
 * @brief publish data to topic sync
 * 
 * @param[in] topic : dest topic
 * @param[in] data : publish data
 * @param[in] len : publish data len
 * @param[in] qos : publish qos
 * @param[in] timeout : timeout in seconds, works when qos > 0
 * 
 * @note: if qos > 0, retransmit should be done in cb_pub_inform
 * 
 * @return OPERATE_RET 
 */
OPERATE_RET mqtt_custom_publish_sync(CHAR_T *topic, BYTE_T *data, UINT_T len, UINT_T qos, UINT_T timeout)
{
    OPERATE_RET rt = OPRT_OK;

    custom_mqtt_sync_t *sync = tal_malloc(SIZEOF(custom_mqtt_sync_t));
    TUYA_CHECK_NULL_RETURN(sync, OPRT_MALLOC_FAILED);
    sync->rt = OPRT_COM_ERROR;
    TUYA_CALL_ERR_GOTO(tal_semaphore_create_init(&(sync->sem_handle), 0, 10), SEM_ERR_EXIT);

    mqtt_msg_t msg = {0};
    msg.publish_topic = topic;
    msg.cb_pub_inform = __custom_mqtt_sync_cb;
    msg.ctx = sync;
    msg.qos = qos;
    msg.data = data;
    msg.len = len;
    msg.timeout = timeout;
    TUYA_CALL_ERR_GOTO(tuya_svc_mqtt_client_publish(s_custom_con.mqtt_handler, &msg), PUB_ERR_EXIT);

    tal_semaphore_wait_forever(sync->sem_handle);
    PR_DEBUG("pub finish:%d", sync->rt);
    rt = sync->rt;

PUB_ERR_EXIT:
    tal_semaphore_release(sync->sem_handle);

SEM_ERR_EXIT:
    if (sync)
        tal_free(sync);

    return rt;
}
