基础知识
reactor
图片来自 POSA 卷4
redis event
Redis Server 是一个事件驱动程序,事件有文件事件和时间事件。
event loop
先看main方法,以下选自 redis/server.c (省略部分)
int main(int argc, char **argv) {
initServer();
aeSetBeforeSleepProc(server.el, beforeSleep);
aeSetAfterSleepProc(server.el, afterSleep);
aeMain(server.el);
aeDeleteEventLoop(server.el);
return 0;
}
int initServer(void) {
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
/* Create the timer callback, this is our way to process many background
* operations incrementally, like clients timeout, eviction of unaccessed
* expired keys and so forth. */
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
/* Register a readable event for the pipe used to awake the event loop
* when a blocked client in a module needs attention. */
if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE,
moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
serverPanic(
"Error registering the readable event for the module "
"blocked clients subsystem.");
}
}
文件事件,常用的handler为 acceptTcpHandler、acceptUnixHandler、readQueryFromClient、sendReplyToClient。
时间事件,常用的handler为 serverCron。
ae
关于event-driven programming library,Redis没有使用第三方的libevent,而是自己写了一个,ae.c
。
- struct
- aeFileEvent
- aeFileEvent
- aeFiredEvent
- aeEventLoop
- Prototypes
- lifecycle
- aeCreateEventLoop
- aeDeleteEventLoop
- aeStop
- aeWait
- aeMain
- file event
- aeCreateFileEvent
- aeDeleteFileEvent
- aeGetFileEvents
- time event
- aeCreateTimeEvent
- aeDeleteTimeEvent
- process
- aeProcessEvents
- other
- ...
- lifecycle
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
有一点,ae_epoll.c 中用的 LT (默认); 一般来说,如果 server 响应通常较小,不会多次触发 EPOLLOUT,用 LT 就行,简单高效;况且还要兼顾 select、evport、kqueue。
accept 一个新的连接,就在 createClient() 中添加读事件(readQueryFromClient),等 EPOLLIN,读到 EAGAIN。
要写数据,就 addReplyXXX(),在 prepareClientToWrite() 中挂在 clients_pending_write 列表,在 beforeSleep() 做发送,发送不完就添加写事件(sendReplyToClient)。
int handleClientsWithPendingWrites(void) {
for c in clients_pending_write
if (writeToClient(c->fd,c,0) == C_ERR) continue;
if (clientHasPendingReplies(c)) {
if (aeCreateFileEvent(server.el, c->fd, ae_flags, sendReplyToClient, c) == AE_ERR)
{
freeClientAsync(c);
}
}
rof
}
关于 epoll LT 中修改事件的操作,可以在 networking.c
中找到。
https://blog.csdn.net/dongfuye/article/details/50880251
ET还是LT?
LT的处理过程:
. accept一个连接,添加到epoll中监听EPOLLIN事件
. 当EPOLLIN事件到达时,read fd中的数据并处理
. 当需要写出数据时,把数据write到fd中;如果数据较大,无法一次性写出,那么在epoll中监听EPOLLOUT事件
. 当EPOLLOUT事件到达时,继续把数据write到fd中;如果数据写出完毕,那么在epoll中关闭EPOLLOUT事件ET的处理过程:
. accept一个一个连接,添加到epoll中监听EPOLLIN|EPOLLOUT事件
. 当EPOLLIN事件到达时,read fd中的数据并处理,read需要一直读,直到返回EAGAIN为止
. 当需要写出数据时,把数据write到fd中,直到数据全部写完,或者write返回EAGAIN
. 当EPOLLOUT事件到达时,继续把数据write到fd中,直到数据全部写完,或者write返回EAGAIN从ET的处理过程中可以看到,ET的要求是需要一直读写,直到返回EAGAIN,否则就会遗漏事件。 而LT的处理过程中,直到返回EAGAIN不是硬性要求,但通常的处理过程都会读写直到返回EAGAIN,但LT比ET多了一个开关EPOLLOUT事件的步骤