memcached,相信我们搞linux后端的农民工都知道!这里简单的分析一下memcached是如何处理大量并发的连接的。
如题,memcached是个单进程程序,单进程多线程的程序(linuxer可能会会心一笑,这不就是多进程嘛)。memcached底层是用的libevent来管理事件的,下面我们就来看看这个libevent的经典应用是如何运转的。其实一开始memcached是个正宗的单进程程序,其实使用了异步技术后基本能把cpu和网卡的性能发挥到极限了(这种情况下硬是多线程反而会使程序性能下降),只不过后来随着多核cpu的普及,为了榨光cpu的性能,引入多线程也是顺势而为。
memcached的源码结构非常简单,其中线程相关的代码基本都在Thread.c中。简单的说,memcached的众多线程就是个Master-Worker的模型,其中主线程负责接收连接,然后将连接分给各个worker线程,在各个worker线程中完成命令的接收,处理和返回结果。
OK,让我们从main函数开始,一步一步来。
main函数中,线程相关的代码基本就下面几行:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
case 't' :
settings.num_threads
= atoi (optarg);
if (settings.num_threads
<= 0) {
fprintf (stderr, "Number
of threads must be greater than 0\n" );
return 1;
}
if (settings.num_threads
> 64) {
fprintf (stderr, "WARNING:
Setting a high number of worker"
"threads
is not recommended.\n"
"
Set this value to the number of cores in"
"
your machine or less.\n" );
}
break ;
thread_init(settings.num_threads,
main_base);
|
下面,进入线程的初始化环节,在看thread_init这个函数之前,先看几个结构体:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
typedef struct
{
pthread_t
thread_id;
struct event_base
*base;
struct event
notify_event;
int notify_receive_fd;
int notify_send_fd;
struct thread_stats
stats;
struct conn_queue
*new_conn_queue;
cache_t
*suffix_cache;
}
LIBEVENT_THREAD;
typedef struct
{
pthread_t
thread_id;
struct event_base
*base;
}
LIBEVENT_DISPATCHER_THREAD;
|
下面进入线程初始化函数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
void thread_init( int nthreads,
struct event_base
*main_base) {
int i;
pthread_mutex_init(&cache_lock,
NULL);
pthread_mutex_init(&stats_lock,
NULL);
pthread_mutex_init(&init_lock,
NULL);
pthread_cond_init(&init_cond,
NULL);
pthread_mutex_init(&cqi_freelist_lock,
NULL);
cqi_freelist
= NULL;
threads
= calloc (nthreads, sizeof (LIBEVENT_THREAD));
if (!
threads) {
perror ( "Can't
allocate thread descriptors" );
exit (1);
}
dispatcher_thread.base
= main_base;
dispatcher_thread.thread_id
= pthread_self();
for (i
= 0; i < nthreads; i++) {
int fds[2];
if (pipe(fds))
{
perror ( "Can't
create notify pipe" );
exit (1);
}
threads[i].notify_receive_fd
= fds[0];
threads[i].notify_send_fd
= fds[1];
setup_thread(&threads[i]);
}
for (i
= 0; i < nthreads; i++) {
create_worker(worker_libevent,
&threads[i]);
}
pthread_mutex_lock(&init_lock);
while (init_count
< nthreads) {
pthread_cond_wait(&init_cond,
&init_lock);
}
pthread_mutex_unlock(&init_lock);
}
|
好了,初始化完成,各个线程(包括主线程)都跑了起来,下面我们看看具体的连接是怎么处理的。
先看主线程,在thread_init返回(所有线程初始化完成)之后,main函数做了一些其他的初始化之后就调用了event_base_loop(main_base, 0);这个函数开始处理网络事件,接受连接了。在此之前,main函数在绑定监听端口的时候就已经把监听socket的事件加到了main_base中了(参看server_socket函数,不多说)。监听事件的回调函数是memcached中所有网络事件公用的回调函数event_handler,而这个event_handler也是基本什么都不干,直接又调用drive_machine,这个函数是由一个大大是switch组成的大状态机。这里就是memcached所有网络事件的处理中枢,我们来看看:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
static void
drive_machine(conn *c) {
bool stop
= false ;
int sfd,
flags = 1;
socklen_t
addrlen;
struct sockaddr_storage
addr;
int nreqs
= settings.reqs_per_event;
int res;
while (!stop)
{
switch (c->state)
{
case conn_listening:
addrlen
= sizeof (addr);
if ((sfd
= accept(c->sfd, ( struct sockaddr
*)&addr, &addrlen)) == -1) {
if ( errno ==
EAGAIN || errno ==
EWOULDBLOCK) {
stop
= true ;
} else if
( errno ==
EMFILE) {
if (settings.verbose
> 0)
fprintf (stderr, "Too
many open connections\n" );
accept_new_conns( false );
stop
= true ;
} else {
perror ( "accept()" );
stop
= true ;
}
break ;
}
if ((flags
= fcntl(sfd, F_GETFL, 0)) < 0 ||
fcntl(sfd,
F_SETFL, flags | O_NONBLOCK) < 0) {
perror ( "setting
O_NONBLOCK" );
close(sfd);
break ;
}
dispatch_conn_new(sfd,
conn_new_cmd, EV_READ | EV_PERSIST,
DATA_BUFFER_SIZE,
tcp_transport);
stop
= true ;
break ;
case conn_waiting:
case conn_read:
}
return ;
}
|
接着看dispatch_conn_new这个函数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
void dispatch_conn_new( int sfd,
enum conn_states
init_state, int event_flags,
int read_buffer_size,
enum network_transport
transport) {
CQ_ITEM
*item = cqi_new();
int tid
= (last_thread + 1) % settings.num_threads;
LIBEVENT_THREAD
* thread =
threads + tid;
last_thread
= tid;
item->sfd
= sfd;
item->init_state
= init_state;
item->event_flags
= event_flags;
item->read_buffer_size
= read_buffer_size;
item->transport
= transport;
cq_push( thread ->new_conn_queue,
item);
MEMCACHED_CONN_DISPATCH(sfd, thread ->thread_id);
if (write( thread ->notify_send_fd, "" ,
1) != 1) {
perror ( "Writing
to thread notify pipe" );
}
}
|
好了,自此主线程处理连接的逻辑基本就没了,下面看看worker线程的相关代码。worker线程初始化完成后将notify_event的libevent事件的回调注册到了thread_libevent_process上,来看看:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
static void
thread_libevent_process( int fd,
short which,
void *arg)
{
LIBEVENT_THREAD
*me = arg;
CQ_ITEM
*item;
char buf[1];
if (read(fd,
buf, 1) != 1)
if (settings.verbose
> 0)
fprintf (stderr, "Can't
read from libevent pipe\n" );
item
= cq_pop(me->new_conn_queue);
if (NULL
!= item) {
conn
*c = conn_new(item->sfd, item->init_state, item->event_flags,
item->read_buffer_size,
item->transport, me->base);
if (c
== NULL) {
if (IS_UDP(item->transport))
{
fprintf (stderr, "Can't
listen for events on UDP socket\n" );
exit (1);
} else {
if (settings.verbose
> 0) {
fprintf (stderr, "Can't
listen for events on fd %d\n" ,
item->sfd);
}
close(item->sfd);
}
} else {
c-> thread =
me;
}
cqi_free(item);
}
}
|
好了,这样worker线程就多了一个连接了,后面worker线程就是不断的监听notify事件添加连接和客户端连接socket的网络IO事件处理业务了。
memcached的这套多线程libevent机制几乎成了高性能服务器的一本教材。linux后端农民工必读。
分享到:
相关推荐
memcached的基本设置: -p 监听的端口 -l 连接的IP地址, 默认是本机 -d start 启动memcached服务 -d restart 重起memcached服务 -d stop|shutdown 关闭正在运行的memcached服务 -d install 安装memcached服务 -d ...
memcached-1.9.6,libevent-2.1.12-stable.tar memcached-session-manager-1.9.6,msm-kryo-serializer-1.9.6.jar,tomcat8.5,实现会话共享
python-memcached python-memcached
本资源含有基于java的memcached 的数据缓存开发所需的 memcached-1.2.1-win32.zip 和 java_memcached-release_1.6.zip 快来一起学习吧
memcachedclient-2.0.1.jar 之前在网上搜了很久没搜到,所以找到了跟大家分享
Memcached-session-manager使用说明及相关jar包(tomcat7版) 依照文档部署即可实现多服务器多tomcat session共享,jar包中是tomcat7集成所需要的jar包,序列化工具用的是java默认的序列化工具,若要使用其他版本的...
网上下载过N种jar版本,启动tomcat报各种错误;浪费了很多时间;本资源经tomcat7、tomcat8测试,均正常实现负载均衡,session共享功能。
memcached-session-manager-1.6.5.jar
memcached-session-manager-tc6-1.6.3.jar
自测适用于tomcat7,其他tomcat没有测试 asm kryo kryo-serializer kryo-serializers memcached-session-manager memcached-session-manager-tc7 minlog objenesis reflectasm spymemcached
memcached-session-manager-1.8.1.jar
tomcat-7.0.54配合使用的memcached-session-manager-1.6.1的整套jar包
java-memcached-2.6.6.jar
tomcat7+memcached--session共享需要的jar包,实测通过
64bit:如果需要win64版,下载 memcached-win64-1.4.4-14.zip(直接下),里面有个三个文件,用这三个文件覆盖win32下同名文件。
memcached的线程模型分析,memcached网络事件处理的最核心部分分析
memcached基础-参考
memcached-session-manager-tc7-1.6.5.jar
memcached是分布式缓存。 32位安装程序,下载 安装即可