Zig NEWS

jack
jack

Posted on

A simple async task library

Hello everyone, hopes you're all doing well. Today I want to share my experience writing a very simple async task library.

Recently I need to write a desktop application for processing and displaying very large datasets. Of course, we need to put data processing into some background threads, so that user's mouse/keyboard can continue interacting with application fluently.

The solution seems very simple: I can directly call std.Thread.spawn to put works into background. But wait, how do I know the job is done? Furthermore, how do I get the results? The problem immediately reminds me of C++'s async task, using which I can fire up tasks in background and then check tasks' result when convenient. After some googling, I didn't find any zig based async library suits my needs very well, so I decide to write my own. Maybe there is, but it's always fun to write some interesting code, right? :-)

Let's begin the work by defining Future, which is meant to be returned by async tasks, here's simplified code:

pub fn Future(comptime T: type) type {
    return struct {
        const Self = @This();

        allocator: std.mem.Allocator,
        mutex: std.Thread.Mutex,
        cond: std.Thread.Condition,
        data: ?T,

        pub fn init(allocator: std.mem.Allocator) !*Self {
            var self = try allocator.create(Self);
            self.* = .{
                .allocator = allocator,
                .mutex = .{},
                .cond = .{},
                .data = null,
            };
            return self;
        }

        /// Wait until data is granted
        /// WARNING: data must be granted after this call, or the function won't return forever
        pub fn wait(self: *Self) T {
            self.mutex.lock();
            defer self.mutex.unlock();
            while (self.data == null) {
                self.cond.wait(&self.mutex);
            }
            std.debug.assert(self.data != null);
            return self.data.?;
        }

        /// Wait until data is granted or timeout happens
        pub fn timedWait(self: *Self, time_ms: u64) ?T {
            self.mutex.lock();
            defer self.mutex.unlock();
            var total_wait_time = @intCast(i64, time_ms);
            while (self.data == null and total_wait_time > 0) {
                const wait_begin = std.time.milliTimestamp();
                if (self.cond.timedWait(&self.mutex, @intCast(u64, total_wait_time))) {
                    total_wait_time -= (std.time.milliTimestamp() - wait_begin);
                } else |e| {
                    switch (e) {
                        error.Timeout => break,
                        else => {},
                    }
                }
            }
            return self.data;
        }

        /// Grant data and send signal to waiting threads
        pub fn grant(self: *Self, data: T) void {
            self.mutex.lock();
            defer self.mutex.unlock();
            self.data = data;
            self.cond.broadcast();
        }
    };
}
Enter fullscreen mode Exit fullscreen mode

As you can see, I'm using conditional variable to provide wait/signal mechanism. The code mostly works, with annoying flaws however. First, wait and timedWait need to be called before grant, otherwise the condvar's wakeup event might not be received by waiting threads, which means we need to check data ourself after aquired mutex. Second, threads waiting for condvar could be wakeup incidentally without actually being notified, which is called spurious wakeup. To deal with this problem, threads have to repeatedly check data's status each time been wakeup from waiting. As you can see, the code is not good looking at all.

After digging more into zig's standard library, I found a nice synchronization tool: std.Thread.ResetEvent, which takes care of all the problem I just mentioned above. The code now looks like this:

/// Represent a value returned by async task in the future.
pub fn Future(comptime T: type) type {
    return struct {
        const Self = @This();

        allocator: std.mem.Allocator,
        done: std.Thread.ResetEvent,
        data: ?T,

        pub fn init(allocator: std.mem.Allocator) !*Self {
            var self = try allocator.create(Self);
            self.* = .{
                .allocator = allocator,
                .done = .{},
                .data = null,
            };
            return self;
        }

        /// Wait until data is granted
        pub fn wait(self: *Self) T {
            self.done.wait();
            std.debug.assert(self.data != null);
            return self.data.?;
        }

        /// Wait until data is granted or timeout happens
        pub fn timedWait(self: *Self, time_ns: u64) ?T {
            self.done.timedWait(time_ns) catch {};
            return self.data;
        }

        /// Grant data and send signal to waiting threads
        pub fn grant(self: *Self, data: T) void {
            self.data = data;
            self.done.set();
        }
    };
}
Enter fullscreen mode Exit fullscreen mode

How simple! Not only is code less, it's even more stable in my opinion. Next, we need to implement Task, which is in charge of creating thread and wrap it's return value into Future. The code is actually very short, here it goes:

/// Async task runs in another thread
pub fn Task(comptime fun: anytype) type {
    return struct {
        pub const FunType = @TypeOf(fun);
        pub const ArgsType = std.meta.ArgsTuple(FunType);
        pub const ReturnType = @typeInfo(FunType).Fn.return_type.?;
        pub const FutureType = Future(ReturnType);

        /// Internal thread function, run user's function and
        /// grant result to future.
        fn task(future: *FutureType, args: ArgsType) void {
            const ret = @call(.{}, fun, args);
            future.grant(ret);
        }

        /// Create task thread and detach from it
        pub fn launch(allocator: std.mem.Allocator, args: ArgsType) !*FutureType {
            var future = try FutureType.init(allocator);
            errdefer future.deinit();
            var thread = try std.Thread.spawn(.{}, task, .{ future, args });
            thread.detach();
            return future;
        }
    };
}
Enter fullscreen mode Exit fullscreen mode

Essentially, we take advantage of Zig's compile-time feature to grab new thread's main function's return type, based on which Future's concrete type is established. The main function's argments tuple type is also established using this technique. With all the information, we can very easily write easy to use api.

The code is done. Let's write some tests!

const S = struct {
    fn add(f1: *Future(u128), f2: *Future(u128)) u128 {
        const a = f1.wait();
        const b = f2.wait();
        return a + b;
    }
};

const TestTask = Task(S.add);
var fs: [100]*TestTask.FutureType = undefined;
fs[0] = try TestTask.FutureType.init(std.testing.allocator);
fs[1] = try TestTask.FutureType.init(std.testing.allocator);
fs[0].grant(0);
fs[1].grant(1);

// compute 100th fibonacci number
var i: u32 = 2;
while (i < 100) : (i += 1) {
    fs[i] = try TestTask.launch(std.testing.allocator, .{ fs[i - 2], fs[i - 1] });
}
try testing.expectEqual(@as(u128, 218922995834555169026), fs[99].wait());
for (fs) |f| f.deinit();
Enter fullscreen mode Exit fullscreen mode

Works nicely! Now I can go ahead write my application :-).

The code has been simplified for blogging purpose, you can checkout the full source if interested.

Top comments (3)

Collapse
 
kristoff profile image
Loris Cro

Thank you for sharing!

Have you considered using Zig's async/await to implement your tasks? Admittedly there isn't a ton of information about it out there yet, but it seems that most of what you implemented in your Future abstraction can be offered out-of-the-box by async frames.

Or is there some specific reason why you wanted a different approach?

Collapse
 
jackji profile image
jack • Edited

Hi Loris, glad you like it. I've considered zig's async/await feature actually. But as you said, there isn't enough information about it yet. I think I'll give it a shot eventually!

EDIT: Another reason is I want to make code easy to understand. Async/await is cool, but they'll make code harder to reason about (a controversial topic).

Collapse
 
kristoff profile image
Loris Cro

Understood, makes sense, thanks for the clarification.