中文版本

Ruby 3 Fiber Scheduler

I wrote an article in July 2020, Ruby 3 Fiber changes preview (in Chinese), and followed up by another post in August A Walkthrough of Ruby 3 Scheduler. Ruby 3 has updated lots of versions during these months, including ruby-3.0.0-preview1 ruby-3.0.0-preview2 and ruby-3.0.0-rc1, which makes lots of improvements to the Fiber Scheduler API.

But as I mentioned before, what Ruby 3 implements is the interface. It would not use the scheduler, unless a scheduler implementation is included.

I am very busy working and studying in the past four months, and I took some time in the recent days to get updated with the API.

GitHub: Evt

Use of Fiber Scheduler

Suppose we have a pair of fds generated by IO.pipe. When we write Hello World to one of them, we could read it from the other side of the pipe. We would have code like this:

rd, wr = IO.pipe

wr.write("Hello World")
wr.close

message = rd.read(20)
puts message
rd.close

This program has lots of limitations. For example, you can’t write a string longer than the buffer size. Since the other side is not reading at the same time, it would get stuck if the string is too long. You would also have to write first, otherwise it would also get stuck. Of course, we could use multi-threading to solve this problem.

require 'thread'

rd, wr = IO.pipe

t1 = Thread.new do
  message = rd.read(20)
  puts message
  rd.close
end

t2 = Thread.new do
  wr.write("Hello World")
  wr.close
end

t1.join
t2.join

But as we all know, using threads to solve I/O problems is very inefficient. The OS context switch is slow. The fairness of thread scheduling is still a very hard problem in the field of OS. For an I/O problem, which is not CPU-bound, all we need is to halt it and wait for the proper callback. In this case, all you need is to call Ruby 3 scheduler.

require 'evt'

rd, wr = IO.pipe
scheduler = Evt::Scheduler.new

Fiber.set_scheduler scheduler

Fiber.schedule do
  message = rd.read(20)
  puts message
  rd.close
end

Fiber.schedule do
  wr.write("Hello World")
  wr.close
end

scheduler.run

In general, an async function requires keywords like callback, async, or await. But this is not necessary in Ruby 3. Ruby 3 lists all common situations where you need async functions: I/O multiplexing, process halting, kernel sleep, and mutex. Ruby 3 exposes all of these interfaces for scheduler to improve the performance without adding any new keywords. My project evt is such a scheduler to meet the needs of Ruby 3 Scheduler.

Comparing to the simple example above, here is an example of HTTP/1.1 server

require 'evt'

@scheduler = Evt::Scheduler.new
Fiber.set_scheduler @scheduler

@server = Socket.new Socket::AF_INET, Socket::SOCK_STREAM
@server.bind Addrinfo.tcp '127.0.0.1', 3002
@server.listen Socket::SOMAXCONN

def handle_socket(socket)
  until socket.closed?
    line = socket.gets
    until line == "\r\n" || line.nil?
      line = socket.gets
    end
    socket.write("HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
  end
end

Fiber.schedule do
  loop do
    socket, addr = @server.accept
    Fiber.schedule do
      handle_socket(socket)
    end
  end
end

@scheduler.run

We could see from this that, the code is almost the same with synchronous development. All you need to do is to setup the scheduler with Fiber.set_scheduler, and add Fiber.scheduler where you usually have to solve with multithreading. Finally, use scheduler.run to start the scheduler.

Backend support

io_uring Support

Not only the Ruby API gets lots of updates in the recent months, but also my scheduler. Especially for a better I/O multiplexing backend support. io_uring is included since Linux 5.4. Since the io_uring could reduce the syscalls and could have direct iov calls to acheive better performance comparing to epoll, the support of io_uring is important. Direct iov support requires Ruby Fiber scheduler for some further changes. These changes are introduced by ioquatix since Ruby 3.0.0-preview2. What we need to implement is two parts. One of them is epoll compatible API:

#include <liburing.h>

#define URING_ENTRIES 64
#define URING_MAX_EVENTS 64

struct uring_data {
  bool is_poll;
  short poll_mask;
  VALUE io;
};

void uring_payload_free(void* data);
size_t uring_payload_size(const void* data);

static const rb_data_type_t type_uring_payload = {
  .wrap_struct_name = "uring_payload",
  .function = {
    .dmark = NULL,
    .dfree = uring_payload_free,
    .dsize = uring_payload_size,
  },
  .data = NULL,
  .flags = RUBY_TYPED_FREE_IMMEDIATELY,
};

void uring_payload_free(void* data) {
    io_uring_queue_exit((struct io_uring*) data);
    xfree(data);
}

size_t uring_payload_size(const void* data) {
    return sizeof(struct io_uring);
}

VALUE method_scheduler_init(VALUE self) {
    int ret;
    struct io_uring* ring;
    ring = xmalloc(sizeof(struct io_uring));
    ret = io_uring_queue_init(URING_ENTRIES, ring, 0);
    if (ret < 0) {
        rb_raise(rb_eIOError, "unable to initalize io_uring");
    }
    rb_iv_set(self, "@ring", TypedData_Wrap_Struct(Payload, &type_uring_payload, ring));
    return Qnil;
}

VALUE method_scheduler_register(VALUE self, VALUE io, VALUE interest) {
    VALUE ring_obj;
    struct io_uring* ring;
    struct io_uring_sqe *sqe;
    struct uring_data *data;
    short poll_mask = 0;
    ID id_fileno = rb_intern("fileno");

    ring_obj = rb_iv_get(self, "@ring");
    TypedData_Get_Struct(ring_obj, struct io_uring, &type_uring_payload, ring);
    sqe = io_uring_get_sqe(ring);
    int fd = NUM2INT(rb_funcall(io, id_fileno, 0));

    int ruby_interest = NUM2INT(interest);
    int readable = NUM2INT(rb_const_get(rb_cIO, rb_intern("READABLE")));
    int writable = NUM2INT(rb_const_get(rb_cIO, rb_intern("WRITABLE")));

    if (ruby_interest & readable) {
        poll_mask |= POLL_IN;
    }

    if (ruby_interest & writable) {
        poll_mask |= POLL_OUT;
    }

    data = (struct uring_data*) xmalloc(sizeof(struct uring_data));
    data->is_poll = true;
    data->io = io;
    data->poll_mask = poll_mask;
    
    io_uring_prep_poll_add(sqe, fd, poll_mask);
    io_uring_sqe_set_data(sqe, data);
    io_uring_submit(ring);
    return Qnil;
}

VALUE method_scheduler_deregister(VALUE self, VALUE io) {
    // io_uring runs under oneshot mode. No need to deregister.
    return Qnil;
}

The other part is direct iov support:

VALUE method_scheduler_io_read(VALUE self, VALUE io, VALUE buffer, VALUE offset, VALUE length) {
    struct io_uring* ring;
    struct uring_data *data;
    char* read_buffer;
    ID id_fileno = rb_intern("fileno");
    // @iov[io] = Fiber.current
    VALUE iovs = rb_iv_get(self, "@iovs");
    rb_hash_aset(iovs, io, rb_funcall(Fiber, rb_intern("current"), 0));
    // register
    VALUE ring_obj = rb_iv_get(self, "@ring");
    TypedData_Get_Struct(ring_obj, struct io_uring, &type_uring_payload, ring);
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    int fd = NUM2INT(rb_funcall(io, id_fileno, 0));

    read_buffer = (char*) xmalloc(NUM2SIZET(length));
    struct iovec iov = {
        .iov_base = read_buffer,
        .iov_len = NUM2SIZET(length),
    };

    data = (struct uring_data*) xmalloc(sizeof(struct uring_data));
    data->is_poll = false;
    data->io = io;
    data->poll_mask = 0;
    
    io_uring_prep_readv(sqe, fd, &iov, 1, NUM2SIZET(offset));
    io_uring_sqe_set_data(sqe, data);
    io_uring_submit(ring);

    VALUE result = rb_str_new(read_buffer, strlen(read_buffer));
    if (buffer != Qnil) {
        rb_str_append(buffer, result);
    }

    rb_funcall(Fiber, rb_intern("yield"), 0); // Fiber.yield
    return result;
}

VALUE method_scheduler_io_write(VALUE self, VALUE io, VALUE buffer, VALUE offset, VALUE length) {
    struct io_uring* ring;
    struct uring_data *data;
    char* write_buffer;
    ID id_fileno = rb_intern("fileno");
    // @iov[io] = Fiber.current
    VALUE iovs = rb_iv_get(self, "@iovs");
    rb_hash_aset(iovs, io, rb_funcall(Fiber, rb_intern("current"), 0));
    // register
    VALUE ring_obj = rb_iv_get(self, "@ring");
    TypedData_Get_Struct(ring_obj, struct io_uring, &type_uring_payload, ring);
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    int fd = NUM2INT(rb_funcall(io, id_fileno, 0));

    write_buffer = StringValueCStr(buffer);
    struct iovec iov = {
        .iov_base = write_buffer,
        .iov_len = NUM2SIZET(length),
    };

    data = (struct uring_data*) xmalloc(sizeof(struct uring_data));
    data->is_poll = false;
    data->io = io;
    data->poll_mask = 0;
    
    io_uring_prep_writev(sqe, fd, &iov, 1, NUM2SIZET(offset));
    io_uring_sqe_set_data(sqe, data);
    io_uring_submit(ring);
    rb_funcall(Fiber, rb_intern("yield"), 0); // Fiber.yield
    return length;
}

But in some cases, the iov would not be called. I’m still figuring out the bug. But at least the performance is very close to epoll.

IOCP Support

Another problem is to support Windows IOCP. I tried to implement somethine like this:

VALUE method_scheduler_init(VALUE self) {
    HANDLE iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
    rb_iv_set(self, "@iocp", TypedData_Wrap_Struct(Payload, &type_iocp_payload, iocp));
    return Qnil;
}

VALUE method_scheduler_register(VALUE self, VALUE io, VALUE interest) {
    HANDLE iocp;
    VALUE iocp_obj = rb_iv_get(self, "@iocp");
    struct iocp_data* data;
    TypedData_Get_Struct(iocp_obj, HANDLE, &type_iocp_payload, iocp);
    int fd = NUM2INT(rb_funcallv(io, rb_intern("fileno"), 0, 0));
    HANDLE io_handler = (HANDLE)rb_w32_get_osfhandle(fd);
    
    int ruby_interest = NUM2INT(interest);
    int readable = NUM2INT(rb_const_get(rb_cIO, rb_intern("READABLE")));
    int writable = NUM2INT(rb_const_get(rb_cIO, rb_intern("WRITABLE")));
    data = (struct iocp_data*) xmalloc(sizeof(struct iocp_data));
    data->io = io;
    data->is_poll = true;
    data->interest = 0;

    if (ruby_interest & readable) {
        interest |= readable;
    }

    if (ruby_interest & writable) {
        interest |= writable;
    }

    HANDLE res = CreateIoCompletionPort(io_handler, iocp, (ULONG_PTR) data, 0);
    printf("IO at address: 0x%08x\n", (void *)data);

    return Qnil;
}

VALUE method_scheduler_wait(VALUE self) {
    ID id_next_timeout = rb_intern("next_timeout");
    ID id_push = rb_intern("push");
    VALUE iocp_obj = rb_iv_get(self, "@iocp");
    VALUE next_timeout = rb_funcall(self, id_next_timeout, 0);
    
    int readable = NUM2INT(rb_const_get(rb_cIO, rb_intern("READABLE")));
    int writable = NUM2INT(rb_const_get(rb_cIO, rb_intern("WRITABLE")));

    HANDLE iocp;
    OVERLAPPED_ENTRY lpCompletionPortEntries[IOCP_MAX_EVENTS];
    ULONG ulNumEntriesRemoved;
    TypedData_Get_Struct(iocp_obj, HANDLE, &type_iocp_payload, iocp);

    DWORD timeout;
    if (next_timeout == Qnil) {
        timeout = 0x5000;
    } else {
        timeout = NUM2INT(next_timeout) * 1000; // seconds to milliseconds
    }

    DWORD NumberOfBytesTransferred;
    LPOVERLAPPED pOverlapped;
    ULONG_PTR CompletionKey;

    BOOL res = GetQueuedCompletionStatus(iocp, &NumberOfBytesTransferred, &CompletionKey, &pOverlapped, timeout);
    // BOOL res = GetQueuedCompletionStatusEx(
    //    iocp, lpCompletionPortEntries, IOCP_MAX_EVENTS, &ulNumEntriesRemoved, timeout, TRUE);

    VALUE result = rb_ary_new2(2);

    VALUE readables = rb_ary_new();
    VALUE writables = rb_ary_new();

    rb_ary_store(result, 0, readables);
    rb_ary_store(result, 1, writables);

    if (!result) {
        return result;
    }

    printf("--------- Received! ---------\n");
    printf("Received IO at address: 0x%08x\n", (void *)CompletionKey);
    printf("dwNumberOfBytesTransferred: %lld\n", NumberOfBytesTransferred);

    // if (ulNumEntriesRemoved > 0) {
    //     printf("Entries: %ld\n", ulNumEntriesRemoved);
    // }

    // for (ULONG i = 0; i < ulNumEntriesRemoved; i++) {
    //     OVERLAPPED_ENTRY entry = lpCompletionPortEntries[i];
        
    //     struct iocp_data *data = (struct iocp_data*) entry.lpCompletionKey;

    //     int interest = data->interest;
    //     VALUE obj_io = data->io;
    //     if (interest & readable) {
    //         rb_funcall(readables, id_push, 1, obj_io);
    //     } else if (interest & writable) {
    //         rb_funcall(writables, id_push, 1, obj_io);
    //     }

    //     xfree(data);
    // }

    return result;
}

But the I/O scheduler receives the wrong pointers when callback. After some researches, to support IOCP, you have to initialize the I/O with FILE_FLAG_OVERLAPPED flag. This may need some changes in Ruby win32/win32.c to support IOCP. But at least I solved the problems of the IO.select fallback. The problem is fine, since nobody cares about Windows production performance…

kqueue Improvements

Another Improvement is to macOS kqueue. kqueue on FreeBSD is good. Bug the performance on macOS is really weird. Since all of our I/O registration is in one-shot, I used one-shot mode of kqueue to reduce the number of syscalls.

VALUE method_scheduler_register(VALUE self, VALUE io, VALUE interest) {
    struct kevent event;
    u_short event_flags = 0;
    ID id_fileno = rb_intern("fileno");
    int kq = NUM2INT(rb_iv_get(self, "@kq"));
    int fd = NUM2INT(rb_funcall(io, id_fileno, 0));
    int ruby_interest = NUM2INT(interest);
    int readable = NUM2INT(rb_const_get(rb_cIO, rb_intern("READABLE")));
    int writable = NUM2INT(rb_const_get(rb_cIO, rb_intern("WRITABLE")));
    
    if (ruby_interest & readable) {
        event_flags |= EVFILT_READ;
    }

    if (ruby_interest & writable) {
        event_flags |= EVFILT_WRITE;
    }

    EV_SET(&event, fd, event_flags, EV_ADD|EV_ENABLE|EV_ONESHOT, 0, 0, (void*) io);
    kevent(kq, &event, 1, NULL, 0, NULL); // TODO: Check the return value
    return Qnil;
}

Overall

At last, we support almost all I/O multiplexing backends of mostly used OS:

  Linux Windows macOS FreeBSD
io_uring ✅ (See 1)
epoll ✅ (See 2)
kqueue ✅ (⚠️See 5)
IOCP ❌ (⚠️See 3)
Ruby (IO.select) ✅ Fallback ✅ (⚠️See 4) ✅ Fallback ✅ Fallback
  1. when liburing is installed
  2. when kernel version >= 2.6.8
  3. WOULD NOT WORK until FILE_FLAG_OVERLAPPED is included in I/O initialization process.
  4. Some I/Os are not able to be nonblock under Windows. See Scheduler Docs.
  5. kqueue performance in Darwin is very poor. MAY BE DISABLED IN THE FUTURE.

Benchmark

How is the overall performance?

The benchmark is running under v0.2.2 version and Ruby 3.0.0-rc1. See evt-server-benchmark for test code, the test is running under a single-thread server.

The test command is wrk -t4 -c8192 -d30s http://localhost:3001.

All of the systems have set their file descriptor limit to maximum.

OS CPU Memory Backend req/s
Linux Ryzen 2700x 64GB epoll 54680.08
Linux Ryzen 2700x 64GB io_uring 50245.53
Linux Ryzen 2700x 64GB IO.select (using poll) 44159.23
macOS i7-6820HQ 16GB kqueue 37855.53
macOS i7-6820HQ 16GB IO.select (using poll) 28293.36

Very impressive. The results improvements are from lots of aspects. Current async frameworks like Falcon uses nio4r. The backend of nio4r is libev. The performance of libev is average due to the extreme compatibility design. Existing async frameworks also requires lots of meta-programming. But this extension is almost written in C, with only the features the scheduler need.

Comparing to my previous tests on preview 1, this version uses long connection, and Ruby nonblock I/O also has fixed a lot. The wrk results are very error-sensitive. All of these things makes our performance 10 times faster comparing to what we have done 3 months ago.

wrk is very error-sensitive, the parser in the benchmark is incorrect, which could not close the socket properly. I updated my Midori to a Ruby 3 Scheduler project, the performance could reach 247k req/s with kqueue and 647k req/s with epoll, which is more than 100x times faster comparing to blocking I/O.

Combining with Ractor

I also wrote a post on November about Ractor Ruby 3 Ractor Dev Guide (in Chinese) Combining Fiber with Ractor is always a interesting thing. We have two routes for that:

  1. Receive accpets in the main Ractor, and dispatch the request to sub-Ractors. After transferring the results back, return it from the main Ractor with scheduler.
  2. Use Linux SO_REUSEPORT feature to let all Ractor listen to the port at the same time, which is very easy to deal with with exisiting server archs.

Unfortunately, either of these are functioning correctly now. Some Fiber features are not available in Ractor. I suppose this is a bug, and have submitted a patch GitHub #3971. According to my previous benchmarks, Ractor my increase about 4 times the performance by multi-core.

But since API servers are usually stateless, these improvements could be acheived by multi-processes. Ractor’s majot contribution may be fewer memory consumption.

I would test it with Ruby 3.0 future updates.

Conclusion

We acheived a 10 times performance improvement comparing to preview 1, and almost 36 times faster comparing to blocking I/O. The major performance issue of Ruby servers are I/O blocking instead of VM performance. With the I/O scheduler is included, we could improve the I/O performance of Ruby 3 into a new era. The future work is to wait for the updates of some C extension libraries like database connections. Then if we use an async scheduler with a Fiber based Web server like Falcon, you don’t have to do anything about your business code to get dozens of times of performance improvements.

Let’s continue happy programming with Ruby.

Comments