SRS之StateThreads學習

2023-07-12 18:00:50

最近在看SRS的原始碼。SRS是基於協程開發的,底層使用了StateThreads。所以為了充分的理解SRS原始碼,需要先學習一下StateThreads。這裡對StateThreads的學習做了一些總結和記錄。

StateThreads是什麼

StateThreads是一個使用者級執行緒庫,用於多執行緒程式設計。它提供了一種輕量級的執行緒模型,允許開發人員以更簡單的方式編寫並行程式。

StateThreads有什麼用

StateThreads 的主要目標是提供一種高效的使用者級執行緒模型,以減少執行緒切換和上下文切換的開銷。它採用共同作業式排程策略,即執行緒在主動釋放執行權之前不會被搶佔。這種方式可以減少執行緒切換的開銷,但也需要開發人員在適當的時機主動釋放執行權,以避免長時間的阻塞導致程式響應性下降。

StateThreads 提供了一組簡單的函數和宏,用於建立和管理執行緒、同步和通訊等操作。它支援執行緒的建立、銷燬、休眠、喚醒等基本操作,以及互斥鎖、條件變數、號誌等同步機制。開發人員可以使用這些函數和宏來編寫並行程式,而不需要直接操作作業系統提供的執行緒和同步原語。

總的來說,StateThreads是一個高效能、高並行、高擴充套件性和可讀性的網路伺服器架構。

StateThreads怎麼用

下載

git clone -b srs https://github.com/ossrs/state-threads.git

編譯

make linux-debug
編譯完成後,將標頭檔案匯入需要使用到StateThreads的專案。並在編譯專案時連結st庫即可。

使用範例

範例一

下面是用StateThreads實現的一個簡單的服務,可以監聽使用者端的連線。

#include <iostream>
#include <stdio.h>
#include <arpa/inet.h>
#include <errno.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <st.h>

#define LISTEN_PORT 8000

#define ERR_EXIT(m) \
  do                \
  {                 \
    perror(m);      \
    exit(-1);       \
  } while (0)

void *client_thread(void *arg)
{
  st_netfd_t client_st_fd = (st_netfd_t)arg;
  // 用於獲取與 st_netfd_t 物件關聯的檔案描述符(File Descriptor)。它返回一個整數值,表示檔案描述符的值。
  // 將 st_netfd_t 物件轉換為普通的檔案描述符
  int client_fd = st_netfd_fileno(client_st_fd);

  sockaddr_in client_addr;
  socklen_t client_addr_len = sizeof(client_addr);
  // 獲取與通訊端連線的對端的地址資訊
  int ret = getpeername(client_fd, (sockaddr *)&client_addr, &client_addr_len);
  if (ret == -1)
  {
    printf("[WARN] Failed to get client ip: %s\n", strerror(ret));
  }

  char ip_buf[INET_ADDRSTRLEN];
  // 記憶體區域清零
  memset(ip_buf, 0, sizeof(ip_buf));
  inet_ntop(client_addr.sin_family, &client_addr.sin_addr, ip_buf,
            sizeof(ip_buf));

  while (1)
  {
    char buf[1024] = {0};
    // 從給定的通訊端中讀取指定位元組數的資料,並將其儲存在提供的緩衝區 buf 中
    ssize_t ret = st_read(client_st_fd, buf, sizeof(buf), ST_UTIME_NO_TIMEOUT);
    if (ret == -1)
    {
      printf("client st_read error\n");
      break;
    }
    else if (ret == 0)
    {
      printf("client quit, ip = %s\n", ip_buf);
      break;
    }

    printf("recv from %s, data = %s", ip_buf, buf);

    ret = st_write(client_st_fd, buf, ret, ST_UTIME_NO_TIMEOUT);
    if (ret == -1)
    {
      printf("client st_write error\n");
    }
  }
}

void *listen_thread(void *arg)
// 監聽
{
  while (1)
  {
    st_netfd_t client_st_fd =
        st_accept((st_netfd_t)arg, NULL, NULL, ST_UTIME_NO_TIMEOUT);
    if (client_st_fd == NULL)
    {
      continue;
    }

    printf("get a new client, fd = %d\n", st_netfd_fileno(client_st_fd));

    st_thread_t client_tid =
        st_thread_create(client_thread, (void *)client_st_fd, 0, 0);
    if (client_tid == NULL)
    {
      printf("Failed to st create client thread\n");
    }
  }
}

int main()
{
  // 用於設定 ST 庫的事件系統。
  int ret = st_set_eventsys(ST_EVENTSYS_ALT);
  if (ret == -1)
  {
    printf("st_set_eventsys use linux epoll failed\n");
  }
  // st初始化
  ret = st_init();
  if (ret != 0)
  {
    printf("st_init failed. ret = %d\n", ret);
    return -1;
  }
  // 建立通訊端
  int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
  if (listen_fd == -1)
  {
    ERR_EXIT("socket");
  }

  int reuse_socket = 1;

  // 設定通訊端選項
  ret = setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuse_socket,
                   sizeof(int));
  if (ret == -1)
  {
    ERR_EXIT("setsockopt");
  }

  struct sockaddr_in server_addr;            // 用於表示 IPv4 地址的結構體
  server_addr.sin_family = AF_INET;          // 地址族,一般為 AF_INET
  server_addr.sin_port = htons(LISTEN_PORT); // 埠
  server_addr.sin_addr.s_addr = INADDR_ANY;  // ipv4地址結構
  // 將通訊端與特定的 IP 地址和埠號進行繫結
  ret =
      bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr));
  if (ret == -1)
  {
    ERR_EXIT("bind");
  }

  ret = listen(listen_fd, 128);
  if (ret == -1)
  {
    ERR_EXIT("listen");
  }
  // st_netfd_open_socket() 是 State Threads (ST) 庫中的一個函數,用於建立一個 st_netfd_t 型別的檔案描述符物件,以便進行非同步 I/O 操作。
  st_netfd_t st_listen_fd = st_netfd_open_socket(listen_fd);
  if (!st_listen_fd)
  {
    printf("st_netfd_open_socket open socket failed.\n");
    return -1;
  }

  // 建立執行緒監聽來一個建立連線的請求
  st_thread_t listen_tid =
      st_thread_create(listen_thread, (void *)st_listen_fd, 1, 0);
  if (listen_tid == NULL)
  {
    printf("Failed to st create listen thread\n");
  }

  while (1)
  {
    st_sleep(1);
  }

  return 0;
}

範例二

StateThreads建立多執行緒

#include <stdio.h>
#include <st.h>
#include <string>

void *do_calc(void *arg)
{
    int sleep_ms = (int)(long int)(char *)arg * 10;

    for (;;)
    {
        printf("in sthread #%dms\n", sleep_ms);
        st_usleep(sleep_ms * 1000);
    }

    return NULL;
}

int main(int argc, char **argv)
{
    if (argc <= 1)
    {
        printf("Test the concurrence of state-threads!\n"
               "Usage: %s <sthread_count>\n"
               "eg. %s 10000\n",
               argv[0], argv[0]);
        return -1;
    }

    if (st_init() < 0)
    {
        printf("error!");
        return -1;
    }

    int i;
    int count = std::stoi(argv[1]);
    for (i = 1; i <= count; i++)
    {
        if (st_thread_create(do_calc, (void *)i, 0, 0) == NULL)
        {
            printf("error!");
            return -1;
        }
    }

    st_thread_exit(NULL);

    return 0;
}

關於StateThreads的執行原理,可以看文章《SRS開源直播服務 - StateThreads微執行緒框架學習

SRS中的StateThreads

使用的原始碼為SRS4.0

系統架構圖:

在SRS的原始碼中,StateThreads在srs_st_init()函數中完成初始化。具體的呼叫流程如下。
SRS的main函數在檔案srs_main_server.cpp中。
srs_main_server.cpp

......
int main(int argc, char** argv)
{
    srs_error_t err = do_main(argc, argv);
    ......
}

srs_error_t do_main(int argc, char** argv)
{
    srs_error_t err = srs_success;

    // Initialize global or thread-local variables.
    if ((err = srs_thread_initialize()) != srs_success) {
        return srs_error_wrap(err, "thread init");
    }
    ......
}

srs_app_threads.cpp

......
srs_error_t srs_thread_initialize()
{
    srs_error_t err = srs_success;
    ......
    // Initialize ST, which depends on pps cids.
    if ((err = srs_st_init()) != srs_success) {
        return srs_error_wrap(err, "initialize st failed");
    }
    ......
}
......

srs_service_st.cpp

......
srs_error_t srs_st_init()
{
  ......
    int r0 = 0;
    if((r0 = st_init()) != 0){
        return srs_error_new(ERROR_ST_INITIALIZE, "st initialize failed, r0=%d", r0);
    }
  ......

在srs_service_st.cpp中呼叫StateThreads庫的初始化函數,完成StateThreads的初始化。