當 send 被 Block 的時候

今天在工作上遇到一個問題。有兩個 process ( process A 和 process B ) 透過 unix domain socket 彼此在傳遞資料,結果發現 process A 在送資料到 process B  的時候,程式居然卡在 send 這支系統函式。send 是會卡住的函式嗎?

答案是:!來看看「男人」怎麼說:

ssize_t send(int sockfd, const void *buf, size_t len, int flags);

When the message does not fit into the send  buffer  of  the  socket,  send()  normally blocks, unless the socket has been placed in nonblocking I/O mode.  In nonblocking mode it would fail with the error EAGAIN or EWOULDBLOCK in this case.   The  select(2)  call may be used to determine when it is possible to send more data.

意思是當呼叫 send 的時候,send 會先將資料放到 kernel 的 buffer,如果 buffer 已經放不進去了,而且 send 又處於 BLOCK 的模式時,這時候 send 就會卡住。而為什麼 buffer 會滿呢?因為 process B 處理封包的時間過久,進而造成 kernel buffer 被 process A 源源不絕的封包給塞滿了,然後 process A 的 send 就被 block 了。解決這個問題的方法可以將 socket 設成 non-blocking mode,或是在傳送封包的時候把 flag 設成 MSG_DONTWAIT。而這也就是為什麼 epoll, select 這類的函式需要去偵測 fd 是否已經是可寫狀態的原因。不過更根本要處理的問題是,為什麼 process B 會處理一個封包處理這麼久啊 ...

放上測試用的小程式:

unix domain socket receiver:
sleep(10) 用來模擬處理時間過久。

#include <stdio.h>          // for printf() and fprintf() 
#include <sys/socket.h>     // for socket(), bind(), and connect() 
#include <sys/un.h>         // for struct sockaddr_un
#include <stdlib.h>         // for atoi() and exit() 
#include <string.h>         // for memset() 
#include <unistd.h>         // for close() 
#include <sys/time.h>       // for struct timeval {} 
#include <fcntl.h>          // for fcntl() 

#include <sys/epoll.h>

int main( int argc, char *argv[] )
{
    struct sockaddr_un   unixSocketAddr;   // Unix Domain Socket
    int      unfd;     // Unix Socket FD
    struct sockaddr_un      unixSocketDstAddr;      // Unix Domain Socket
    struct sockaddr_un      unixSocketSrcAddr;      // Unix Domain Socket
    socklen_t     unixSocketAddrLen;  // Socket Length
    
    int                     epfd;                   // EPOLL File Descriptor. 
    struct epoll_event      ev;                     // Used for EPOLL.
    struct epoll_event      events[120];            // Used for EPOLL.
    int                     noEvents;               // EPOLL event number.
    
    int      i, j, k, len, running = 1;
    const int    cBufferSize = 10000;
    char     msg[cBufferSize];
    
    if ( argc < 3 )     
    {
        fprintf( stderr, "Usage:  %s   ...\n", argv[0]);
        exit(1);
    }
    
    // Create epoll file descriptor.
    // Standard Input + Bind Socket
    epfd = epoll_create( 2 );
    
    // Create Unix domain socket + Bind + add to the EPOLL set
    
    memset( &unixSocketAddr, 0, sizeof( unixSocketAddr ) );
    unixSocketAddr.sun_family = AF_UNIX;
    strcpy( unixSocketAddr.sun_path, argv[1] );
    
    if( ( unfd = socket( AF_UNIX, SOCK_DGRAM, 0 ) ) < 0 ) 
    {
        perror("socket error.");
        exit(1);
    }
    
    unlink( argv[1] );
    if ( bind( unfd, (struct sockaddr *)&unixSocketAddr, sizeof( unixSocketAddr ) ) == -1 ) 
    {
        perror("bind error.");
        exit(1);
    }
    
    ev.data.fd = unfd;
    ev.events = EPOLLIN | EPOLLET;
    epoll_ctl( epfd, EPOLL_CTL_ADD, unfd, &ev ); 
    
    // Create destination Unix domain socket
    
    memset( &unixSocketDstAddr, 0, sizeof( unixSocketDstAddr ) );
    unixSocketDstAddr.sun_family = AF_UNIX;
    strcpy( unixSocketDstAddr.sun_path, argv[2] );
    
    // Add STDIN into the EPOLL set.
    ev.data.fd = STDIN_FILENO;
    ev.events = EPOLLIN | EPOLLET;
    epoll_ctl( epfd, EPOLL_CTL_ADD, STDIN_FILENO, &ev ); 
    
    printf( "Starting server:  \n" );
    
    while ( running )
    {
        printf( "Input (q to quit): " );
        fflush( stdout ); 
        
        noEvents = epoll_wait( epfd, events, FD_SETSIZE , -1 );
        
        printf( "Get %d events.\n ", noEvents );
        
        for( i = 0 ; i < noEvents; i++ )
        {
            if( events[i].events & EPOLLIN && STDIN_FILENO == events[i].data.fd )
            {
                memset( msg, cBufferSize, 0 );
                fgets( msg, cBufferSize, stdin );
                
                if( strlen( msg ) == 2 && msg[0] == 'q' && msg[1] == '\n' )
                {
                    running = 0;
                    continue;
                }  
                
                len = sendto( unfd, msg, strlen( msg ), 0, ( struct sockaddr * ) &unixSocketDstAddr, sizeof( unixSocketDstAddr ) );
                
                if( len < 0 )
                {
                    perror( "Send Error!!\n" );
                }
            }
            else if( events[i].events & EPOLLIN && unfd == events[i].data.fd )
            {
                memset( msg, cBufferSize, 0 );
                len = recvfrom( unfd, msg, cBufferSize, 0, ( struct sockaddr * ) &unixSocketSrcAddr, &unixSocketAddrLen );
                
                if( len < 0 )
                {
                    perror( "Recv Error!!\n" );
                }
                
                printf( "Receive Msg Len: %d\n", len );
                
                sleep( 10 );
                
                printf( "\n" );
            }
            else
            {
                printf( "Unknown %d!\n", events[i].data.fd );
            }
        }
    }
    
    close( unfd );
    
    return 0;
}

unix domain socket sender:
在呼叫 send時有兩種模式,透過註解來切換。

#include <stdio.h>          // for printf() and fprintf() 
#include <sys/socket.h>     // for socket(), bind(), and connect() 
#include <sys/un.h>         // for struct sockaddr_un
#include <stdlib.h>         // for atoi() and exit() 
#include <string.h>         // for memset() 
#include <unistd.h>         // for close() 
#include <sys/time.h>       // for struct timeval {} 
#include <fcntl.h>          // for fcntl() 
#include <unistd.h>

#include <sys/epoll.h>

int main( int argc, char *argv[] )
{
    struct sockaddr_un   unixSocketAddr;   // Unix Domain Socket
    int      unfd;     // Unix Socket FD
    struct sockaddr_un      unixSocketDstAddr;      // Unix Domain Socket
    struct sockaddr_un      unixSocketSrcAddr;      // Unix Domain Socket
    socklen_t     unixSocketAddrLen;  // Socket Length
    
    int                     epfd;                   // EPOLL File Descriptor. 
    struct epoll_event      ev;                     // Used for EPOLL.
    struct epoll_event      events[120];            // Used for EPOLL.
    int                     noEvents;               // EPOLL event number.
    
    int      i, j, k, len, running = 1;
    const int    cBufferSize = 10000;
    char     msg[cBufferSize];
    int     counter = 0;
    int     result = 0;
    
    if ( argc < 3 )     
    {
        fprintf( stderr, "Usage:  %s   ...\n", argv[0]);
        exit(1);
    }
    
    // Create epoll file descriptor.
    // Standard Input + Bind Socket
    epfd = epoll_create( 2 );
    
    // Create Unix domain socket + Bind + add to the EPOLL set
    
    memset( &unixSocketAddr, 0, sizeof( unixSocketAddr ) );
    unixSocketAddr.sun_family = AF_UNIX;
    strcpy( unixSocketAddr.sun_path, argv[1] );
    
    if( ( unfd = socket( AF_UNIX, SOCK_DGRAM, 0 ) ) < 0 ) 
    {
        perror("socket error.");
        exit(1);
    }
    
    unlink( argv[1] );
    if ( bind( unfd, (struct sockaddr *)&unixSocketAddr, sizeof( unixSocketAddr ) ) == -1 ) 
    {
        perror("bind error.");
        exit(1);
    }
    
    ev.data.fd = unfd;
    ev.events = EPOLLIN | EPOLLET;
    epoll_ctl( epfd, EPOLL_CTL_ADD, unfd, &ev ); 
    
    // Create destination Unix domain socket
    
    memset( &unixSocketDstAddr, 0, sizeof( unixSocketDstAddr ) );
    unixSocketDstAddr.sun_family = AF_UNIX;
    strcpy( unixSocketDstAddr.sun_path, argv[2] );
    
    // Add STDIN into the EPOLL set.
    ev.data.fd = STDIN_FILENO;
    ev.events = EPOLLIN | EPOLLET;
    epoll_ctl( epfd, EPOLL_CTL_ADD, STDIN_FILENO, &ev ); 
    
    printf( "Starting server:  \n" );
    
    memset( msg, 1, 10000 );
    
    while ( running )
    {
        printf( "Counter: %d\n", counter );
        // result = sendto( unfd, msg, 7816, 0, ( struct sockaddr * ) &unixSocketDstAddr, sizeof( unixSocketDstAddr ) );
        result = sendto( unfd, msg, 7816, MSG_DONTWAIT, ( struct sockaddr * ) &unixSocketDstAddr, sizeof( unixSocketDstAddr ) );
        
        if ( result == -1 )
        {
            printf( "Counter: %d fail\n", counter );
        }
        else
        {
            printf( "Counter: %d done\n", counter );
        }

        counter++;
        usleep( 10 );
    }
    
    close( unfd );
    
    return 0;
}

留言

這個網誌中的熱門文章

如何將Linux打造成OpenFlow Switch:Openvswitch

我弟家的新居感恩禮拜分享:善頌善禱

Linux Virtual Interface: TUN/TAP