當 send 被 Block 的時候
今天在工作上遇到一個問題。有兩個 process ( process A 和 process B ) 透過 unix domain socket 彼此在傳遞資料,結果發現 process A 在送資料到 process B 的時候,程式居然卡在 send 這支系統函式。send 是會卡住的函式嗎?
答案是:會!來看看「男人」怎麼說:
ssize_t send(int sockfd, const void *buf, size_t len, int flags);
When the message does not fit into the send buffer of the socket, send() normally blocks, unless the socket has been placed in nonblocking I/O mode. In nonblocking mode it would fail with the error EAGAIN or EWOULDBLOCK in this case. The select(2) call may be used to determine when it is possible to send more data.
意思是當呼叫 send 的時候,send 會先將資料放到 kernel 的 buffer,如果 buffer 已經放不進去了,而且 send 又處於 BLOCK 的模式時,這時候 send 就會卡住。而為什麼 buffer 會滿呢?因為 process B 處理封包的時間過久,進而造成 kernel buffer 被 process A 源源不絕的封包給塞滿了,然後 process A 的 send 就被 block 了。解決這個問題的方法可以將 socket 設成 non-blocking mode,或是在傳送封包的時候把 flag 設成 MSG_DONTWAIT。而這也就是為什麼 epoll, select 這類的函式需要去偵測 fd 是否已經是可寫狀態的原因。不過更根本要處理的問題是,為什麼 process B 會處理一個封包處理這麼久啊 ...
放上測試用的小程式:
unix domain socket receiver:
sleep(10) 用來模擬處理時間過久。
unix domain socket sender:
在呼叫 send時有兩種模式,透過註解來切換。
答案是:會!來看看「男人」怎麼說:
ssize_t send(int sockfd, const void *buf, size_t len, int flags);
When the message does not fit into the send buffer of the socket, send() normally blocks, unless the socket has been placed in nonblocking I/O mode. In nonblocking mode it would fail with the error EAGAIN or EWOULDBLOCK in this case. The select(2) call may be used to determine when it is possible to send more data.
意思是當呼叫 send 的時候,send 會先將資料放到 kernel 的 buffer,如果 buffer 已經放不進去了,而且 send 又處於 BLOCK 的模式時,這時候 send 就會卡住。而為什麼 buffer 會滿呢?因為 process B 處理封包的時間過久,進而造成 kernel buffer 被 process A 源源不絕的封包給塞滿了,然後 process A 的 send 就被 block 了。解決這個問題的方法可以將 socket 設成 non-blocking mode,或是在傳送封包的時候把 flag 設成 MSG_DONTWAIT。而這也就是為什麼 epoll, select 這類的函式需要去偵測 fd 是否已經是可寫狀態的原因。不過更根本要處理的問題是,為什麼 process B 會處理一個封包處理這麼久啊 ...
放上測試用的小程式:
unix domain socket receiver:
sleep(10) 用來模擬處理時間過久。
#include <stdio.h> // for printf() and fprintf() #include <sys/socket.h> // for socket(), bind(), and connect() #include <sys/un.h> // for struct sockaddr_un #include <stdlib.h> // for atoi() and exit() #include <string.h> // for memset() #include <unistd.h> // for close() #include <sys/time.h> // for struct timeval {} #include <fcntl.h> // for fcntl() #include <sys/epoll.h> int main( int argc, char *argv[] ) { struct sockaddr_un unixSocketAddr; // Unix Domain Socket int unfd; // Unix Socket FD struct sockaddr_un unixSocketDstAddr; // Unix Domain Socket struct sockaddr_un unixSocketSrcAddr; // Unix Domain Socket socklen_t unixSocketAddrLen; // Socket Length int epfd; // EPOLL File Descriptor. struct epoll_event ev; // Used for EPOLL. struct epoll_event events[120]; // Used for EPOLL. int noEvents; // EPOLL event number. int i, j, k, len, running = 1; const int cBufferSize = 10000; char msg[cBufferSize]; if ( argc < 3 ) { fprintf( stderr, "Usage: %s ...\n", argv[0]); exit(1); } // Create epoll file descriptor. // Standard Input + Bind Socket epfd = epoll_create( 2 ); // Create Unix domain socket + Bind + add to the EPOLL set memset( &unixSocketAddr, 0, sizeof( unixSocketAddr ) ); unixSocketAddr.sun_family = AF_UNIX; strcpy( unixSocketAddr.sun_path, argv[1] ); if( ( unfd = socket( AF_UNIX, SOCK_DGRAM, 0 ) ) < 0 ) { perror("socket error."); exit(1); } unlink( argv[1] ); if ( bind( unfd, (struct sockaddr *)&unixSocketAddr, sizeof( unixSocketAddr ) ) == -1 ) { perror("bind error."); exit(1); } ev.data.fd = unfd; ev.events = EPOLLIN | EPOLLET; epoll_ctl( epfd, EPOLL_CTL_ADD, unfd, &ev ); // Create destination Unix domain socket memset( &unixSocketDstAddr, 0, sizeof( unixSocketDstAddr ) ); unixSocketDstAddr.sun_family = AF_UNIX; strcpy( unixSocketDstAddr.sun_path, argv[2] ); // Add STDIN into the EPOLL set. ev.data.fd = STDIN_FILENO; ev.events = EPOLLIN | EPOLLET; epoll_ctl( epfd, EPOLL_CTL_ADD, STDIN_FILENO, &ev ); printf( "Starting server: \n" ); while ( running ) { printf( "Input (q to quit): " ); fflush( stdout ); noEvents = epoll_wait( epfd, events, FD_SETSIZE , -1 ); printf( "Get %d events.\n ", noEvents ); for( i = 0 ; i < noEvents; i++ ) { if( events[i].events & EPOLLIN && STDIN_FILENO == events[i].data.fd ) { memset( msg, cBufferSize, 0 ); fgets( msg, cBufferSize, stdin ); if( strlen( msg ) == 2 && msg[0] == 'q' && msg[1] == '\n' ) { running = 0; continue; } len = sendto( unfd, msg, strlen( msg ), 0, ( struct sockaddr * ) &unixSocketDstAddr, sizeof( unixSocketDstAddr ) ); if( len < 0 ) { perror( "Send Error!!\n" ); } } else if( events[i].events & EPOLLIN && unfd == events[i].data.fd ) { memset( msg, cBufferSize, 0 ); len = recvfrom( unfd, msg, cBufferSize, 0, ( struct sockaddr * ) &unixSocketSrcAddr, &unixSocketAddrLen ); if( len < 0 ) { perror( "Recv Error!!\n" ); } printf( "Receive Msg Len: %d\n", len ); sleep( 10 ); printf( "\n" ); } else { printf( "Unknown %d!\n", events[i].data.fd ); } } } close( unfd ); return 0; }
unix domain socket sender:
在呼叫 send時有兩種模式,透過註解來切換。
#include <stdio.h> // for printf() and fprintf() #include <sys/socket.h> // for socket(), bind(), and connect() #include <sys/un.h> // for struct sockaddr_un #include <stdlib.h> // for atoi() and exit() #include <string.h> // for memset() #include <unistd.h> // for close() #include <sys/time.h> // for struct timeval {} #include <fcntl.h> // for fcntl() #include <unistd.h> #include <sys/epoll.h> int main( int argc, char *argv[] ) { struct sockaddr_un unixSocketAddr; // Unix Domain Socket int unfd; // Unix Socket FD struct sockaddr_un unixSocketDstAddr; // Unix Domain Socket struct sockaddr_un unixSocketSrcAddr; // Unix Domain Socket socklen_t unixSocketAddrLen; // Socket Length int epfd; // EPOLL File Descriptor. struct epoll_event ev; // Used for EPOLL. struct epoll_event events[120]; // Used for EPOLL. int noEvents; // EPOLL event number. int i, j, k, len, running = 1; const int cBufferSize = 10000; char msg[cBufferSize]; int counter = 0; int result = 0; if ( argc < 3 ) { fprintf( stderr, "Usage: %s ...\n", argv[0]); exit(1); } // Create epoll file descriptor. // Standard Input + Bind Socket epfd = epoll_create( 2 ); // Create Unix domain socket + Bind + add to the EPOLL set memset( &unixSocketAddr, 0, sizeof( unixSocketAddr ) ); unixSocketAddr.sun_family = AF_UNIX; strcpy( unixSocketAddr.sun_path, argv[1] ); if( ( unfd = socket( AF_UNIX, SOCK_DGRAM, 0 ) ) < 0 ) { perror("socket error."); exit(1); } unlink( argv[1] ); if ( bind( unfd, (struct sockaddr *)&unixSocketAddr, sizeof( unixSocketAddr ) ) == -1 ) { perror("bind error."); exit(1); } ev.data.fd = unfd; ev.events = EPOLLIN | EPOLLET; epoll_ctl( epfd, EPOLL_CTL_ADD, unfd, &ev ); // Create destination Unix domain socket memset( &unixSocketDstAddr, 0, sizeof( unixSocketDstAddr ) ); unixSocketDstAddr.sun_family = AF_UNIX; strcpy( unixSocketDstAddr.sun_path, argv[2] ); // Add STDIN into the EPOLL set. ev.data.fd = STDIN_FILENO; ev.events = EPOLLIN | EPOLLET; epoll_ctl( epfd, EPOLL_CTL_ADD, STDIN_FILENO, &ev ); printf( "Starting server: \n" ); memset( msg, 1, 10000 ); while ( running ) { printf( "Counter: %d\n", counter ); // result = sendto( unfd, msg, 7816, 0, ( struct sockaddr * ) &unixSocketDstAddr, sizeof( unixSocketDstAddr ) ); result = sendto( unfd, msg, 7816, MSG_DONTWAIT, ( struct sockaddr * ) &unixSocketDstAddr, sizeof( unixSocketDstAddr ) ); if ( result == -1 ) { printf( "Counter: %d fail\n", counter ); } else { printf( "Counter: %d done\n", counter ); } counter++; usleep( 10 ); } close( unfd ); return 0; }
留言
張貼留言