Understand Async Iterators Without Really Trying

Matt Krick
ITNEXT
Published in
7 min readAug 1, 2018

--

Photo by paolo candelo

Nobody likes a callback. It’s like a modern day equivalent of a GOTO statement. But how can we replace those pesky ones like click handlers? Chances are, you’ve refactored all all the easy ones into async/await statements, but there are still those pesky holdouts like DOM events, WebSockets, event emitters, and any other stream-like object. Sure, we could turn those into Observables, but turning 1 callback into 3 doesn’t seem like much of an improvement. If I can promisify a function, why can’t I iterable-ify an event stream? The answer is the new 2018 async iterables, and since it was first proposed a few years ago, there has been a lot of old, confusing, and just plain wrong info out there. So, through the magic of trial and error, here’s how I wrote a helper to turn click handlers into for-loops.

Starting Simple

In programming, just like in life, lazy is usually better. So, when I build something new, I like to be as lazy as possible and delay writing code that makes me think. So, let’s just write what we want it to look like & stick all the hard stuff in a black box:

// I want to go from this
document.addEventListener('click', (event) => console.log('click')

// to this
const
clicks = streamify(document, 'click')
for await (const event of clicks) {
console.log('click')
}

So far so good! Now we just need that magical streamify function to actually do something. We know it’s going to call addEventListener, which takes a handler that gets the event. So let’s fill in just the easy parts and put the hard stuff in another black box:

function streamify(element, listener) {
const handler = (event) => {
// do magic here
}
element.addEventListener(listener, handler)
}

Progress! Now avoiding the difficult things, let’s figure how to return an “async iterator”. To be honest, I’m not really sure what that looks like, I can’t just write new AsyncIterator(), but I know async means promise and an iterator is something that returns more than 1 value, kinda like a generator, so let’s roll with that and return an endless supply of promises. Useless functions are my specialty, so that’s easy:

function* streamify(element, listener) {
const handler = (event) => {}
element.addEventListener(listener, handler)
while (true) {
yield new Promise(resolve => resolve(event))
}

}

Now we’re hoeing where there’s taters! It looks like the handler is getting an event and the Promise is yielding an event, so now I just have to somehow move the event from the handler to the Promise. The dumbest thing I can think of is just sharing a variable, so let’s do that:

function* streamify(element, event) {
let nextResolve
const handler = (event) => {
nextResolve(event)
}
element.addEventListener(event, handler)
while (true) {
yield new Promise(resolve => {
nextResolve = resolve
})
}
}

And there we have it! We have a working streamifier! When a click event comes in, it resolves with the promise we gave it. Pretty sweet! …until someone clicks really fast.

Robustifying the Streamifier

The naive solution only works for slow events. If 2 events get fired in quick succession, the 2nd call will get swallowed. We don’t want that, especially if we’re using this for something like websockets. So, what the heck, if 1 shared resolver is good, an array of them must be better! Let’s just queue the events as they come in:

function* streamify(element, event) {
const pushQueue = []
const handler = (event) => {
pushQueue.push(event)
}
element.addEventListener(event, handler)
while (true) {
yield new Promise(resolve => {
const nextEvent = pushQueue.shift()
resolve(nextEvent) // error! no nextEvent
})
}
}

Succe…er, no. This solves the push problem, but now we’re getting an error because the iterator is requesting a click event before one exists. Dang. Well, a queue worked for pushing events, why not use one for pulling in resolvers?

function* streamify(element, event) {
const pushQueue = []
const pullQueue = []
const handler = (event) => {
const nextResolve = pullQueue.shift()
if (nextResolve) {
nextResolve(event)
} else {
pushQueue.push(event)
}

}
element.addEventListener(event, handler)
const pullValue = () => new Promise(resolve => {
const nextEvent = pushQueue.shift()
if (nextEvent) {
resolve(nextEvent)
} else {
pullQueue.push(resolve)
}
})

while (true) {
yield pullValue()
}
}

Wooo! it works! Now I can click as fast as I want forever! …but what if I only want to listen to the first couple? Natively, I would call removeEventListener, but the handler function is hidden away in our sweet new wrapper. How can I remove it when the stream ends? Well, I’ll start simple & just write how I want it to look:

// from this
let
clickCount = 0
if (clickCount++ > 2) {
document.removeEventListener('click', handler)
console.log('Bye!')
}


// to this
let
clickCount = 0
for await (const event of clicks) {
console.log('click', event)
if (clickCount++ > 2) clicks.return() // this breaks the loop
}
console.log('Bye!')

I know the generator value has that magic return() method, but I wish i could write it myself so I could also call removeEventListener. I know that little * isn’t actually magic; it just tells the function to wrap the return value in a special object with 4 methods. So, let’s ditch the star & write the generator how the engine actually reads it (plus add our little extra handler):

function streamify(element, event) {
...
return {
[Symbol.asyncIterator] () {
return this
},
next: () => ({
done, // TODO how do we calculate this?
value: pullValue()
}),
return: () => {
element.removeEventListener(listener, handler)
return {done: true}
},
throw: (error) => ({done, value: Promise.reject(error)})
}
}

That may look daunting, but it’s really just a bunch of boilerplate that the * gives us for free. In the bad old days before generators, we used to have to write similar hacky things all the time. Today, that boilerplate should look pretty familiar because it’s the same stuff you see when you log Set or Map to the console.

Let’s see what we got: TheSymbol.asyncIterator() method tells the rest of the world to treat this like a generator instead of an object that just happens to have the exact same fields. Everything else returns an object with a done field and maybe a value. If done is false, we know we can expect some kind of value — and since this is an async iterator, I’m guessing that value is gonna be the same pullValue() promise we were yielding before. All that’s left to do is figure out the value of done when next() gets called. Since we know done should be true if throw or return gets called, let’s do like we did before & and share the variable in the outer scope:

function streamify(element, event) {
...
let done = false
return {
[Symbol.asyncIterator] () {
return this
},
next: () => ({done, value: pullValue()}),
return: () => {
done = true
element.removeEventListener(listener, handler)
return {done}
},
throw: (error) => {
done = true
return {done, value: Promise.reject(error)}
}
}
}

… and we’re done! I can now write a for-loop that iterates every time an event comes in. If I don’t like the value, I can call throw. When I’ve had enough, I can call return.

Here’s how it looks all put together:

const streamify = function (element, event) {
const pullQueue = []
const pushQueue = []
let done = false
const
pushValue = async (args) => {
if (pullQueue.length !== 0) {
const resolver = pullQueue.shift()
resolver(...args)
} else {
pushQueue.push(args)
}
}

const pullValue = () => {
return new Promise((resolve) => {
if (pushQueue.length !== 0) {
const args = pushQueue.shift()
resolve(...args)
} else {
pullQueue.push(resolve)
}
})
}

const handler = (...args) => {
pushValue(args)
}

element.addEventListener(event, handler)
return {
[Symbol.asyncIterator]() {
return this
},
next: () => ({
done,
value: done ? undefined : pullValue()
}),
return: () => {
done = true
element.removeEventListener(event, handler)
return {done}
},
throw: (error) => {
done = true
return
{
done,
value: Promise.reject(error)
}
}
}
}

Conclusion

If you’ve scrolled all the way to the bottom to find the link to the GitHub repo, here it is.

In 5 minutes, we’ve written a 50 LOC wrapper that goes toe-to-toe with all the competing dependency-infested packages out there. After wrapping our events, we can finally proclaim that we no longer write code with callbacks. The dream of 2012 is here! Best of all, instead of learning what an async iterable is, we learned how to use it. We didn’t have to break out the functional programming textbooks to study the definition of a Subject, Deferrable, or Disposable. Instead, we built something practical — like taking a cooking class and walking away with dinner. Way more fun than CS theory.

The only question that remains: just because we can, does that mean we should? From a clean code perspective, maybe! But what about performance? Promises are inherently slower than callbacks, and on top of that, we’re calling shift() a bunch on 2 queues. Running the code in Chrome 67, it’s 2x-100x+ slower than native callbacks for 100–1,000,000 events. When the code gets transpiled to ES5, that grows to 20x-1000x slower! So from a benchmarking standpoint, it’s plain awful. But in the real world, that equates to literally an extra 0.1ms to handle 10 concurrent events. Pretty fair trade for ditching the callbacks!

--

--