說到linux下多程序通訊,有好幾種,之前也在喵哥的公眾號回覆過,這裡再拿出來,重新寫一遍:多程序通訊有管道,而管道分為匿名和命名管道 ,後者比前者優勢在於可以進行無親緣程序通訊;此外訊號也是程序通訊的一種,比如我們最常用的就是設定ctrl+c的kill訊號傳送給程序;其次號誌一般來說是一種同步機制但是也可以認為是通訊,需要注意的是號誌、共用記憶體、訊息佇列在使用時候也有posix和system v的區別;還有我們今天的主角通訊端( socket ) :通訊端也是一種程序間通訊機制。
執行緒間的通訊的話,共用變數,此外在unpipc書描述的話,同步也屬於通訊機制,那麼就要補充上執行緒中我們最多用的互斥量、條件變數、讀寫鎖、記錄鎖和執行緒中的號誌使用。
今天想分享一些socket程式設計的例子,socket嵌入式。linux開發很常用,用於程序間通訊很方便,也有很多介紹,今天我也也來做自己的介紹分享。和別人不一樣的地方,我主要想分享socket 伺服器端在linux寫的程式碼,使用vscode偵錯執行,並且同時分享自己使用tcp監控軟體去判斷socket通訊正確性。
作者:良知猶存
轉載授權以及圍觀:歡迎關注微信公眾號:羽林君
或者新增作者個人微信:become_me
在這裡有一個簡單demo演示以及函數的介紹,大家開啟這個連結就可以看到哈:
socket通訊有些固定的函數,這裡先給大家做簡單的分享:
int socket(int domain, int type, int protocol);
該函數用於建立一個socket描述符,它唯一標識一個socket,這個socket描述字跟檔案描述字一樣,後續的操作都有用到它,把它作為引數,通過它來進行一些讀寫操作。建立socket的時候,也可以指定不同的引數建立不同的socket描述符,socket函數的三個引數分別為:
1.domain:引數domain表示該通訊端使用的協定族,在Linux系統中支援多種協定族,對於TCP/IP協定來說,選擇AF_INET就足以,當然如果你的IP協定的版本支援IPv6,那麼可以選擇AF_INET6,可選的協定族具體見:
- AF_UNIX, AF_LOCAL: 本地通訊-AF_INET : IPv4
- AF_INET6 : IPv6
- AF_IPX : IPX - Novell 協定
- AF_NETLINK : 核心使用者介面裝置
- AF_X25 : ITU-T X.25 / ISO-8208 協定
- AF_AX25 : 業餘無線電 AX.25 協定
- AF_ATMPVC : 存取原始ATM PVC
- AF_APPLETALK : AppleTalk
- AF_PACKET : 底層封包介面
- AF_ALG : 核心加密API的AF_ALG介面
2.type:引數type指定了通訊端使用的服務型別,可能的型別有以下幾種:
- SOCK_STREAM:提供可靠的(即能保證資料正確傳送到對方)面向連線的Socket服務,多用於資料(如檔案)傳輸,如TCP協定。
- SOCK_DGRAM:是提供無保障的訊息導向的Socket 服務,主要用於在網路上發廣播資訊,如UDP協定,提供無連線不可靠的資料包交付服務。
- SOCK_SEQPACKET:為固定最大長度的資料包提供有序的,可靠的,基於雙向連線的資料傳輸路徑。
- SOCK_RAW:表示原始通訊端,它允許應用程式存取網路層的原始封包,這個通訊端用得比較少,暫時不用理會它。
- SOCK_RDM:提供不保證排序的可靠資料包層。
3.protocol:引數protocol指定了通訊端使用的協定,在IPv4中,只有TCP協定提供SOCK_STREAM這種可靠的服務,只有UDP協定提供SOCK_DGRAM服務,對於這兩種協定,protocol的值均為0,因為當protocol為0時,會自動選擇type型別對應的預設協定。
int bind(int sockfd, struct sockaddr *my_addr, socklen_t addrlen);
在進行網路通訊的時候,必須把一個通訊端與一個IP地址或埠號相關聯,這個bind就是繫結的過程。
int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
這個connect()函數用於使用者端中,將sockfd與遠端IP地址、埠號進行繫結,在TCP使用者端中呼叫這個函數將發生握手過程(會傳送一個TCP連線請求),並最終建立一個TCP連線,而對於UDP協定來說,呼叫這個函數只是在sockfd中記錄遠端IP地址與埠號,而不傳送任何資料,引數資訊與bind()函數是一樣的。
int listen(int s, int backlog);
listen()函數只能在TCP伺服器程序中使用,讓伺服器程序進入監聽狀態,等待使用者端的連線請求,listen()函數在一般在bind()函數之後呼叫,在accept()函數之前呼叫,它的函數原型是:
int accept(int s, struct sockaddr *addr, socklen_t *addrlen);
accept()函數就是用於處理連線請求的,accept()函數用於TCP伺服器中,等待著遠端主機的連線請求,並且建立一個新的TCP連線,在呼叫這個函數之前需要通過呼叫listen()函數讓伺服器進入監聽狀態,如果佇列中沒有未完成連線通訊端,並且通訊端沒有標記為非阻塞模式,accept()函數的呼叫會阻塞應用程式直至與遠端主機建立TCP連線;如果一個通訊端被標記為非阻塞式而佇列中沒有未完成連線通訊端, 呼叫accept()函數將立即返回EAGAIN。
ssize_t read(int fd, void *buf, size_t count);
read 從描述符 fd 中讀取 count 位元組的資料並放入從 buf 開始的緩衝區中.
ssize_t recv(int sockfd, void *buf, size_t len, int flags);
不論是客戶還是伺服器應用程式都可以用recv()函數從TCP連線的另一端接收資料,它與read()函數的功能是差不多的。
ssize_t write(int fd, const void *buf, size_t count);
write()函數一般用於處於穩定的TCP連線中傳輸資料,當然也能用於UDP協定中,它向通訊端描述符 fd 中寫入 count 位元組的資料,資料起始地址由 buf 指定,函數呼叫成功返回寫的位元組數,失敗返回-1,並設定errno變數。
int send(int s, const void *msg, size_t len, int flags);
無論是使用者端還是伺服器應用程式都可以用send()函數來向TCP連線的另一端傳送資料。
int sendto(int s, const void *msg, size_t len, int flags, const struct sockaddr *to, socklen_t tolen);
sendto()函數與send函數非常像,但是它會通過 struct sockaddr 指向的 to 結構體指定要傳送給哪個遠端主機,在to引數中需要指定遠端主機的IP地址、埠號等,而tolen引數則是指定to 結構體的位元組長度。
int close(int fd);
close()函數是用於關閉一個指定的通訊端,在關閉通訊端後,將無法使用對應的通訊端描述符
// 建立通訊端描述符
((sockfd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
// 建立TCP連線
(connect(sockfd, (struct sockaddr *)&server, sizeof(struct sockaddr))
write(sockfd, buffer, sizeof(buffer))
close(sockfd);
// socket create and verification
sockfd = socket(AF_INET, SOCK_STREAM, 0);
// binding newly created socket to given IP and verification
if ((bind(sockfd, (struct sockaddr*)&server, sizeof(server))) != 0)
// now server is ready to listen and verification
if ((listen(sockfd, 5)) != 0) {
// accept the data packet from client and verification
connfd = accept(sockfd, (struct sockaddr*)&client, &len);
if (read(connfd, buff, sizeof(buff)) <= 0) {
close(connfd);
close(sockfd);
這裡也順帶分享一個socket 阻塞和非阻塞的機制 前面提到accept函數中,描述通訊端沒有標記為非阻塞模式,accept()函數的呼叫會阻塞應用程式直至與遠端主機建立TCP連線;如果一個通訊端被標記為非阻塞式而佇列中沒有未完成連線通訊端, 呼叫accept()函數將立即返回EAGAIN。但是socket預設初始化是阻塞的,正常初始化後accept沒有收到使用者端的連結請求的話,就會一直的等待阻塞當前執行緒,直到有使用者端進行連結請求。
那麼如何才能把socket設定為非阻塞呢?用ioctl(sockfd, FIONBIO, &mode);
//-------------------------
// Set the socket I/O mode: In this case FIONBIO
// enables or disables the blocking mode for the
// socket based on the numerical value of iMode.
// If iMode = 0, blocking is enabled;
// If iMode != 0, non-blocking mode is enabled.
u_long iMode = 1; //non-blocking mode is enabled.
ioctlsocket(m_socket, FIONBIO, &iMode); //設定為非阻塞模式
一般大家介紹會說使用ioctlsocket,但是有些系統使用會報錯。如下:
ioctlsocket
會報錯,所以使用 ioctl
就好了,操作都是一樣的。
#include <sys/ioctl.h>
ioctl(sockfd, FIONBIO, &mode);
這是一個簡單的圖表分析,來自下面文章連結,大家有興趣也可以自行檢視。
阻塞非阻塞的介紹 連結:
程式碼有test_socket_client.cpp
、test_socket_server.h
、test_socket_server.cpp
三個檔案,互動機制以及實現功能如下:
首先test_socket_client.cpp 是使用者端程式碼,用來測試連結伺服器端互動,用select進行接收資料,並監聽執行終端是否有輸入資訊,輸入資訊立刻傳送。
test_socket_server.h是test_socket_server.cpp使用定義的類和api的標頭檔案,而在test_socket_server.cpp實現了定義了一個支援多使用者端連線的通訊介面,同時也時刻檢測執行終端輸入資訊,並廣播到全部連結的使用者端;而使用者端發過來的資訊,伺服器端針對的對等收發,即接收到特定使用者端的資訊只傳送到該使用者端。其中使用了std::future + std::async
實現了通訊的非同步操作,並使用 impl模式
包裹了socket介面。在監聽執行終端資訊時候分別使用了std::condition
和std::async
實現,大家可以通過宏開關自行選擇測試。
還有些其他的技術使用,多執行緒的排程以及流的輸出,忽略SIGPIPE訊號用來控制使用者端連結斷開之後程式碼正常執行等,再後面我一一給大家分析介紹。
test_socket_client.cpp 這個檔案就是隨便找了一個socket使用者端程式碼,這個test_socket_client程式碼來源是網路,大家也可以自己去寫或者網上自己找相關的用例,因為本次的重要部分是伺服器端server程式碼,所以這塊就貼一下程式碼。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/time.h>
//g++ test_socket_client.cpp -o test_socket_client
#define BUFLEN 1024
#define PORT 8555
int main(int argc, char **argv)
{
int sockfd;
struct sockaddr_in s_addr;
socklen_t len;
unsigned int port;
char buf[BUFLEN];
fd_set rfds;
struct timeval tv;
int retval, maxfd;
/*建立socket*/
if((sockfd = socket(AF_INET, SOCK_STREAM, 0)) == -1){
perror("socket");
exit(errno);
}else
printf("socket create success!\n");
/*設定伺服器ip*/
memset(&s_addr,0,sizeof(s_addr));
s_addr.sin_family = AF_INET;
s_addr.sin_port = htons(PORT);
if (inet_aton("127.0.0.1", (struct in_addr *)&s_addr.sin_addr.s_addr) == 0) {
perror("127.0.0.1");
exit(errno);
}
/*開始連線伺服器*/
while(connect(sockfd,(struct sockaddr*)&s_addr,sizeof(struct sockaddr)) == -1){
perror("connect");
sleep(1);
exit(errno);
}
while(1){
FD_ZERO(&rfds);
FD_SET(0, &rfds);
maxfd = 0;
FD_SET(sockfd, &rfds);
if(maxfd < sockfd)
maxfd = sockfd;
tv.tv_sec = 6;
tv.tv_usec = 0;
retval = select(maxfd+1, &rfds, NULL, NULL, &tv);
if(retval == -1){
printf("select出錯,使用者端程式退出\n");
break;
}else if(retval == 0){
printf("waiting...\n");
continue;
}else{
/*伺服器發來了訊息*/
if(FD_ISSET(sockfd,&rfds)){
/******接收訊息*******/
bzero(buf,BUFLEN);
len = recv(sockfd,buf,BUFLEN,0);
if(len > 0)
printf("伺服器發來的訊息是:%s\n",buf);
else{
if(len < 0 )
printf("接受訊息失敗!\n");
else
printf("伺服器退出了,聊天終止!\n");
break;
}
}
/*使用者輸入資訊了,開始處理資訊並行送*/
if(FD_ISSET(0, &rfds)){
/******傳送訊息*******/
bzero(buf,BUFLEN);
fgets(buf,BUFLEN,stdin);
if(!strncasecmp(buf,"quit",4)){
printf("client 請求終止聊天!\n");
break;
}
len = send(sockfd,buf,strlen(buf),0);
if(len > 0)
printf("\t訊息傳送成功:%s\n",buf);
else{
printf("訊息傳送失敗!\n");
break;
}
}
}
}
/*關閉連線*/
close(sockfd);
return 0;
}
test_socket_server.h 使用的標頭檔案,定義一些外部api
#ifndef _TEST_SOCKET_H
#define _TEST_SOCKET_H
#include <functional>
#include <memory>
#include <thread>
#include <vector>
namespace linx_socket {
int Writen(int fd, const void *vptr, int n);
int Readn(int fd, void *vptr, int maxlen);
int CreatSocket(const char *ip, int port);
int StartLisen(int fd);
bool Close(int fd);
} // namespace linx_socket
class DevSocket {
public:
using CallBack = std::function<void(int ,std::vector<uint8_t>&&)>;
DevSocket();
DevSocket(const CallBack& callback);
bool Send(int fd,const std::vector<uint8_t> &data) const ;
// std::vector<uint8_t> Recive() const ; //當建立連線後 就會線上程裡面迴圈讀取使用者端發來的資訊, 所以不需要專門寫rx函數
~DevSocket(){}
private:
class Socket;
std::unique_ptr<Socket> SocketImpl;
};
#endif
test_socket_server.cpp
裡面包含的#include "log.h"
這個檔案是我自己寫的log輸出檔案,列印時間和顏色等,看著比較方便,大家使用程式碼時候自行替換成自己需要printf或者std::cout或者自己的列印檔案
#include <stdio.h>
#include <algorithm>
#include <array>
#include <chrono>
#include <boost/thread/mutex.hpp>
#include <mutex>
#include <condition_variable>
#include <iostream>
#include <iterator>
#include <string>
#include <thread>
#include <vector>
#include <arpa/inet.h>
#include <errno.h>
#include <net/if.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/socket.h>
#include <unistd.h>
#include <future>
#include "test_socket_server.h"
#include "log.h"
// g++ test_socket_server_optimiza_2.cpp -o test_socket_server_optimiza -lboost_thread -lpthread
namespace linx_socket
{
constexpr int socket_que_size = 3;
//使用select進行寫入
int Writen(int fd, const void *vptr, int n)
{
ssize_t nleft = n;
const char *ptr = (const char *)vptr;
fd_set write_fd_set;
struct timeval timeout;
while (nleft > 0)
{
ssize_t nwriten = 0;
timeout.tv_sec = 1;
timeout.tv_usec = 0;
FD_ZERO(&write_fd_set);
FD_SET(fd, &write_fd_set);
int s_ret = select(FD_SETSIZE, NULL, &write_fd_set, NULL, &timeout);
if (s_ret < 0)
{
EXC_ERROR("-------write_fd_set error------------");
return -1;
}
else if (s_ret == 0)
{
usleep(100 * 1000);
EXC_ERROR("-------write_fd_set timeout ------------");
continue;
}
if ((nwriten = write(fd, ptr, nleft)) < 0)
{
if (nwriten < 0 && errno == EINTR)
{
nwriten = 0;
}
else
{
EXC_ERROR("-------nwriten error = %d ------------", nwriten);
return -1;
}
}
nleft -= nwriten;
ptr += nwriten;
}
return n;
}
//使用select進行讀取
int Readn(int fd, void *vptr, int maxlen)
{
bool ret = false;
ssize_t nread = 0;
fd_set read_fd_set;
struct timeval timeout;
while (!ret)
{
// EXC_INFO("Readn begine.");
timeout.tv_sec = 1;
timeout.tv_usec = 0;
FD_ZERO(&read_fd_set);
FD_SET(fd, &read_fd_set);
int s_ret = select(FD_SETSIZE, &read_fd_set, NULL, NULL, &timeout);
if (s_ret < 0)
{
EXC_ERROR("-------select error------------");
return -1;
}
else if (s_ret == 0)
{
usleep(100 * 1000);
// EXC_ERROR("-------select timeout ------------");
continue;
}
if ((nread = read(fd, vptr, maxlen)) < 0)
{
if (errno == EINTR)
{
EXC_ERROR("buff = %d, fd=%d, errno=%d.", vptr, fd, errno);
nread = 0;
}
else
{
EXC_ERROR("buff = %d, fd=%d, errno=%d.", vptr, fd, errno);
return -1;
}
}
else
{
if (nread == 0)
{
EXC_ERROR("buff = %d, fd=%d, nread=%d. data:%s", vptr, fd, nread, vptr);
}
// else
// {
// EXC_INFO("buff = %d, fd=%d, nread=%d. data:%s", vptr, fd, nread, vptr);
// }
ret = 1;
}
}
return nread;
}
//進行處理來自使用者端的連線請求
int IsListened(int fd)
{
struct sockaddr_in c_addr;
socklen_t c_lent = sizeof(c_addr);
int fd_c = accept(fd, (struct sockaddr *)&c_addr, &c_lent);
if (fd_c < 0)
{
if (errno == EPROTO || errno == ECONNABORTED)
{
return -1;
}
}
EXC_INFO("accept %s: %d sucess", inet_ntoa(c_addr.sin_addr), ntohs(c_addr.sin_port));
return fd_c;
}
//建立一個通訊端描述符
int CreatSocket(const char *ip, int port)
{
int ret = -1;
// EXC_INFO("CreatSocket");
int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (fd < 0)
{
return -1;
}
int reuse = 1;
//設定通訊端的一些選項 SOL_SOCKET:表示在Socket層 SO_REUSEADDR(允許重用本地地址和埠)
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) < 0)
{
return -1;
}
struct sockaddr_in s_addr;
memset(&s_addr, 0, sizeof(s_addr));
s_addr.sin_addr.s_addr = htonl(INADDR_ANY);
s_addr.sin_port = htons(port);
s_addr.sin_family = AF_INET;
if (bind(fd, (struct sockaddr *)&s_addr, sizeof(s_addr)) < 0)
{
EXC_ERROR("bind %s: %d error", inet_ntoa(s_addr.sin_addr), ntohs(s_addr.sin_port));
close(fd);
return -2;
}
if (listen(fd, socket_que_size) < 0)
{
close(fd);
return -3;
}
return fd;
}
int CreatSocket(const char *ip, int port, int socket_que_size)
{
int ret = -1;
EXC_INFO("");
int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (fd < 0)
{
return -1;
}
struct sockaddr_in s_addr;
memset(&s_addr, 0, sizeof(s_addr));
s_addr.sin_addr.s_addr = htonl(INADDR_ANY);
s_addr.sin_port = htons(port);
s_addr.sin_family = AF_INET;
if (bind(fd, (struct sockaddr *)&s_addr, sizeof(s_addr)) < 0)
{
close(fd);
return -2;
}
if (listen(fd, socket_que_size) < 0)
{
close(fd);
return -3;
}
return fd;
}
bool Close(int fd)
{
close(fd);
return true;
}
} // namespace linx_socket
class Connection
{
public:
Connection(int fd, DevSocket::CallBack c) : call_back_f_(c), fd_(fd)
{
read_sta = std::async(std::launch::async, [this]()
{ Read(); }); //迴圈讀取socket連線的資料
};
void Read()
{
while (!kill_thread_)
{
if (fd_ < 0)
break;
std::array<uint8_t, kBuffSize> buf;
int len = linx_socket::Readn(fd_, buf.data(), kBuffSize);
if (len > 0)
{
if (call_back_f_)
{
data_parser_mutex_.lock();
call_back_f_(fd_, {buf.begin(), buf.begin() + len});
data_parser_mutex_.unlock();
}
}
else if (len < 0)
{
kill_thread_ = true;
EXC_ERROR("read error, fd= %d, rev len= %d.", fd_, len);
break;
}
else if (len == 0)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
EXC_ERROR("call_back_f_ = %d, fd=%d, rev len=%d.", call_back_f_, fd_, len);
}
}
}
bool Write(std::vector<uint8_t> data)
{
if (linx_socket::Writen(fd_, data.data(), data.size()) < 0)
{
kill_thread_ = true;
EXC_ERROR("Writen error.");
return false;
}
return true;
}
bool GetIsKillThread() { return kill_thread_; }
~Connection()
{
EXC_INFO("kill_thread_ is %d", kill_thread_);
kill_thread_ = true;
if (fd_ != -1)
{
linx_socket::Close(fd_);
fd_ = -1;
}
}
std::future<void> &GetReadSta() { return read_sta; }
int GetFd() { return fd_; }
private:
int fd_ = -1;
bool kill_thread_ = false;
DevSocket::CallBack call_back_f_ = nullptr; /**/
boost::mutex data_parser_mutex_;
std::future<void> read_sta;
constexpr static int kBuffSize = 1024;
};
class DevSocket::Socket
{
public:
Socket(){};
Socket(std::pair<std::string, int> port, const CallBack &callback_)
: call_back_f_(callback_)
{
EXC_WARN("Socket ");
int n;
if ((n = linx_socket::CreatSocket(port.first.c_str(), port.second)) < 0)
{
throw std::string("CreatSocket error ") + std::to_string(n);
}
fd = n;
auto threa_func = [this]()
{
while (!kill_thread_)
{
//迴圈std::launch::async 傳遞的可呼叫物件非同步執行
std::future<int> listened_status = std::async(std::launch::async, [this]()
{
EXC_INFO("Listened .");
return linx_socket::IsListened(fd);
});
//lister 通訊端有沒有偵聽到連線,任務沒返回,沒有偵聽到連線通訊端
while (listened_status.wait_for(std::chrono::seconds(0)) !=
std::future_status::ready)
{
if (kill_thread_)
return;
for (auto it = connections_.begin(); it != connections_.end();)
{
//任務返回了,說明該連線結束了
if ((*it)->GetReadSta().wait_for(std::chrono::seconds(0)) ==
std::future_status::ready)
{
if ((*it)->GetReadSta().valid())
{
EXC_ERROR("connection_kill_thread is %d, socket_kill_thread_ is =%d", (*it)->GetIsKillThread(), kill_thread_);
(*it)->GetReadSta().get();//主動退出
}
EXC_INFO("dis connection_ fd=%d.", (*it)->GetFd());
boost::mutex::scoped_lock lock(connection_mutex_);
it = connections_.erase(it);
if (connections_.size() <= 0)
{
EXC_ERROR("all is dis connected");
}
}
if (it != connections_.end())
{
++it;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
// EXC_INFO( "==================== thread id: %d" ,std::this_thread::get_id());
//有新的連線
int clien_fd = listened_status.get();
if (clien_fd > 0)
{
boost::mutex::scoped_lock lock(connection_mutex_);
connections_.push_back(
std::make_shared<Connection>(clien_fd, call_back_f_));
EXC_INFO("connection_ fd=%d.", clien_fd);
}
}
};
// EXC_INFO("before move threa_func=%d.", threa_func);
thread_ = std::thread(std::move(threa_func)); //左值變右值傳入 減少拷貝
// EXC_INFO("after move threa_func=%d.", threa_func);
}
bool SendData(const std::vector<uint8_t> &data, std::shared_ptr<Connection> connection)
{
boost::mutex::scoped_lock lock(connection_mutex_);
return connection->Write(data);
}
std::vector<std::shared_ptr<Connection>> GetConnections()
{
return connections_;
}
~Socket()
{
kill_thread_ = true;
if (fd != -1)
{
linx_socket::Close(fd);
}
if (thread_.joinable())
{
thread_.join();
}
}
private:
int fd = -1;
bool kill_thread_ = false;
CallBack call_back_f_ = nullptr;
std::thread thread_;
std::vector<std::shared_ptr<Connection>> connections_;
boost::mutex connection_mutex_;
};
#define HOST "127.0.0.1" // 根據你伺服器的IP地址修改
#define PORT 8555 // 根據你伺服器程序繫結的埠號修改
DevSocket::DevSocket()
{
EXC_WARN("new DevSocket");
std::pair<std::string, int> par{HOST, PORT};
SocketImpl = std::unique_ptr<Socket>(new Socket(par, nullptr));
}
DevSocket::DevSocket(const CallBack &callback)
{
EXC_WARN("new DevSocket");
std::pair<std::string, int> par{HOST, PORT};
SocketImpl = std::unique_ptr<Socket>(new Socket(par, callback));
}
bool DevSocket::Send(int fd, const std::vector<uint8_t> &data) const
{
for (auto connection : SocketImpl->GetConnections())
{
if (nullptr == connection)
continue;
if (fd == connection->GetFd() || fd == 0) //fd ==0 全部傳送
{
int ret = SocketImpl->SendData(data, connection);
EXC_WARN("fd %d send status :%d", connection->GetFd(), ret);
}
}
return true;
}
std::ostream &operator<<(std::ostream &out, std::vector<uint8_t> &data)
{
EXC_WARN("operator 1<<<<<<<<<<<<<<");
out << "hex ";
out << std::hex;
for (auto &d : data)
{
out << "0x" << std::hex << (int)d << " ";
}
out << std::endl;
EXC_WARN("operator 2<<<<<<<<<<<<<");
return out;
}
#include <signal.h>
void pipesig_handler(int sig)
{
EXC_ERROR("receive signal %d", sig);
}
#if 1 //std::async 控制傳送
int main(int argc, char *argv[])
{
DevSocket *Device;
// 為SIGPIPE新增訊號處理常式,處理完程式繼續執行 1
signal(SIGPIPE, pipesig_handler);
bool SendFlag = false;
std::vector<uint8_t> send_data;
int read_fd{-1};
try
{
EXC_INFO("device socket init");
Device =
new DevSocket([&](int fd, std::vector<uint8_t> &&d)
{
EXC_INFO("recive call fd :%d", fd);
send_data = d;
SendFlag = true;
std::ostringstream ss;
ss << "recive data:[";
std::for_each(send_data.begin(), send_data.end(),
[&](uint8_t temp)
{ ss << " " << temp << ","; });
EXC_WARN("%s]", ss.str().c_str());
// std::cout << send_data; //使用operator<< 函數
});
}
catch (const std::string s)
{
EXC_INFO("Device.emplace_back:%s", s.c_str());
return EXIT_FAILURE;
}
const int BUFLEN = 1024;
char buf[BUFLEN];
std::thread input_keyboard = std::thread([&]
{
while (true)
{
memset(buf, 0, sizeof(buf));
/*fgets函數:從流中讀取BUFLEN-1個字元*/
fgets(buf, BUFLEN, stdin);
EXC_INFO("from terminal:%s", buf);
if (!strncasecmp(buf, "quit", 4))
{
EXC_INFO("server quit!");
exit(0);
}
std::vector<uint8_t> send_msg;
for (int i = 0; buf[i] != '\0'; i++)
{
// EXC_INFO("data:index[%d] :%d", i, buf[i]);
send_msg.emplace_back(buf[i]);
}
Device->Send(0, send_msg); //代表全部連結的都傳送 fd =0
}
});
while (true)
{
// EXC_INFO(" ");
std::future<bool> send_future = std::async(std::launch::async, [&]()
{
while(true)
{
if(SendFlag)
{
return true;
}
std::this_thread::sleep_for(std::chrono::milliseconds(20));
return false;
}
});
{
if (send_future.wait_for(std::chrono::milliseconds(30)) == std::future_status::ready) //子執行緒已執行完
{
// EXC_INFO( "ready...");
if(send_future.get())
{
SendFlag=false;
std::ostringstream ss;
ss.clear();
ss << "send date :[";
std::for_each(send_data.begin(), send_data.end(),
[&](uint8_t &temp)
{ ss << " " << temp << ","; });
EXC_INFO("%s]", ss.str().c_str());
if (!send_data.empty())
{
Device->Send(read_fd, send_data);
send_data.clear();
}
}
}
}
}
input_keyboard.join();
}
#elif 1 //std::condition_variable 選擇傳送
int main(int argc, char *argv[])
{
std::mutex SendMutex;
std::condition_variable SendCondition;
bool SendFlag = false;
DevSocket *Device;
// 為SIGPIPE新增訊號處理常式,處理完程式繼續執行 1
signal(SIGPIPE, pipesig_handler);
std::vector<uint8_t> send_data; //(8, 1);
int read_fd{-1};
EXC_WARN("");
{
std::ostringstream ss;
ss << "send_date 1:[";
std::for_each(send_data.begin(), send_data.end(),
[&](uint8_t temp)
{ ss << " " << temp << ","; });
EXC_INFO("%s]", ss.str().c_str());
}
const int BUFLEN = 1024;
char buf[BUFLEN];
try
{
EXC_INFO("Device.emplace_back");
Device =
new DevSocket([&](int fd,std::vector<uint8_t> &&d)
{
EXC_INFO("recive call fd :%d",fd);
{
std::lock_guard<std::mutex> m(SendMutex);
send_data = d;
SendFlag = true;
std::ostringstream ss;
ss.clear();
ss << "recive 2:[";
std::for_each(send_data.begin(), send_data.end(),
[&](uint8_t temp)
{ ss << " " << temp << ","; });
EXC_WARN("%s]", ss.str().c_str());
std::cout << send_data;
}
SendCondition.notify_one();
EXC_INFO("");
});
}
catch (const std::string s)
{
EXC_INFO("Device.emplace_back:%s", s.c_str());
return EXIT_FAILURE;
}
std::thread input_keyboard = std::thread([&]
{
while (true)
{
memset(buf, 0, sizeof(buf));
/*fgets函數:從流中讀取BUFLEN-1個字元*/
fgets(buf, BUFLEN, stdin);
EXC_INFO("from terminal:%s", buf);
if (!strncasecmp(buf, "quit", 4))
{
EXC_INFO("server quit!");
exit(0);
}
std::vector<uint8_t> send_msg;
for (int i = 0; buf[i] != '\0'; i++)
{
EXC_INFO("data:index[%d] :%d", i, buf[i]);
send_msg.emplace_back(buf[i]);
}
Device->Send(0,send_msg);//代表全部連結的都傳送 fd =0
}
});
while (true)