Building ComputedAsync for Signals in Angular

Handling async operations using Signals and Rxjs in Angular

Enea Jahollari
ITNEXT

--

Photo by Tuân Nguyễn Minh on Unsplash

A little bit of history

Handling async operations in Angular has always been the job of Observables. Observables are a great way to handle async operations. But with the introduction of Signals in Angular, everyone is trying to use Signals for everything, but Signals are not meant to handle async operations. Signals are meant to handle value and not events. So, how do we handle async operations using Signals? Let’s find out.

Motivation

I will use a simple example for an API call. Let’s say we have a component that calls an API and displays the data. We will use the `HttpClient` to make the API call. The `HttpClient` returns an `Observable` which we can subscribe to and get the data. Let’s see how we can do this using Signals.

export class UserComponent {
private imagesService = inject(ImagesService);

user = input.required<User>();

favoriteImages = signal<string[]>([]);

constructor() {
effect(() => {
this.imagesService.getImages(this.user().favoriteImages).subscribe(images => {
this.favoriteImages.set(images);
});
});
}
}

As you can see, I’m also using signal inputs to use the built-in reactivity.

But, we don’t handle the unsubscribe part. So, we need to do that manually. The effect function includes a callback function that is called every time the effect runs. We can use that to unsubscribe from the subscription.

This is how we can do it.

export class UserComponent {
constructor() {
// onCleanup is a callback function that is called everytime the effect runs
effect((onCleanup) => {
const sub = this.imagesService.getImages(this.user().favoriteImages).subscribe(images => {
this.favoriteImages.set(images);
});
onCleanup(() => sub.unsubscribe()) // unsubscribe from the subscription
});
}
}

To sum up, here’s what’s happening in the above code:

  • We register an effect that will run whenever the `user` input changes.
  • The effect runs at least once by default, so it will make that initial API call.
  • The effect will run again whenever the `user` input changes.
  • Every time the effect re-runs, the onCleanup function will call the callback function passed to it.
  • Our callback function will unsubscribe from the previous subscription. (So, it will behave like the `switchMap` operator in RxJS)
  • When the API call returns, we set the value of the `favoriteImages` signal.

Problem

What we are trying to do in the above example is to have some derived state based on the user `favoriteImages` ids. So, using an effect may not be the straightforward way to do this, or we may forget to unsubscribe from our subscription.

One other solution is to use `toObservable` helper function.

export class UserComponent {
private imagesService = inject(ImagesService);

user = input.required<User>();

favoriteImages = toSignal(toObservable(this.user).pipe(
switchMap(user => this.imagesService.getImages(user.favoriteImages))
), { initialValue: [] });
}

This is a better solution, but it’s not perfect. What happens if we include some other input and we need to include it in our API call? We will have to use `combineLatest` operator.

export class UserComponent {
private imagesService = inject(ImagesService);

user = input.required<User>();
otherInput = input.required<string>();

favoriteImages = toSignal(combineLatest([
toObservable(this.user),
toObservable(this.otherInput)
]).pipe(
switchMap(([user, otherInput]) => this.imagesService.getImages(user.favoriteImages, otherInput))
), { initialValue: [] });
}

This gets messy very quickly! Because we start to include more and more rxjs operators, and we have to use `toObservable` and `toSignal` everywhere.

We can do better!

Building ComputedAsync

We want our `computedAsync` function to behave like the `computed` function, but it should handle async operations. Basically, it should return a signal that will have the value of the async operation.

favoriteImages = computedAsync(() => 
this.imagesService.getImages(this.user().favoriteImages)
);

We want to return an observable (or promise), and we want the `computedAsync` function to handle the subscription and unsubscribe from it.

Handling the callback function

We want the developer to be able to pass either an observable or a promise, or just normal value. So, we need to handle all of these cases.

Here are the possible cases:

type ComputationResult<T> = Promise<T> | Observable<T> | T | undefined;

We want to accept a callback fn and return a Signal. So, we need to accept a callback function that will return a `ComputationResult<T>`.

export function computedAsync<T>(
computation: () => ComputationResult<T>
): Signal<T> {
// ...
}

Handling the current value and the result

We need to handle the current value and to return the result of the computation. We can use a `WritableSignal` to handle the current value and a computed signal to return the result of the computation.

export function computedAsync<T>(
computation: () => ComputationResult<T>
): Signal<T> {
const sourceValue = signal<T | undefined>(undefined);
return computed(() => sourceValue()!);
}

Handling the computation

Because the computation will include signals, and the only way to listen to signal changes is by using `effect`, we need to use `effect` to handle the computation.

import { isObservable } from 'rxjs';

export function computedAsync<T>(
computation: () => ComputationResult<T>
): Signal<T> {
const sourceValue = signal<T | undefined>(undefined);

effect(() => {
const value = computation(); // store the result of the computation

// handle the result if it's an observable or a promise or a normal value
if (isObservable(value) || isPromise(value)) {
// TODO: handle observable and promise
} else {
// TODO: handle normal value
}
});

return computed(() => sourceValue()!);
}

// helper function to check if the value is a promise
function isPromise<T>(value: any): value is Promise<T> {
return value && typeof value.then === 'function';
}

But, `effect` depends on the `DestroyRef` token, so it needs to be in an injection context, or we need to pass an injector to it. Let’s handle that.

Handling injection context

I will use the assertInjector helper function (created by Chau Tran) provided in the `ngxtension` library.

The `assertInjector` function will check if the injector is provided, and if it’s not, it will throw an error. In the third argument, we can pass a callback function that will be called in the injection context.

Let’s create a `ComputedAsyncOptions` interface that will include the injector and the equal function (that the normal `computed` function also includes).

interface ComputedAsyncOptions<T> extends CreateComputedOptions<T> {
injector?: Injector;
}

Now, we can use the `assertInjector` function.

export function computedAsync<T>(
computation: () => ComputationResult<T>,
options?: ComputedAsyncOptions<T>
): Signal<T> {
return assertInjector(computedAsync, options?.injector, () => {
// here we can safely use effect and inject function because we are in an injection context
effect(() => { /* ... */ }, { injector: options?.injector });
});
}

Handling the subscription

Just like we have a `sourceValue` signal to handle the current value, we need a `sourceEvent$` observable to handle the subscription. We will use a `Subject` to handle the subscription.

We want our value of the `sourceEvent$` to be either a promise or an observable.

const sourceEvent$ = new Subject<Promise<T> | Observable<T>>();

Let’s subscribe to the `sourceEvent$` and set the value of the `sourceValue` signal.

We also have to take care of flattening the observable, because we will pass an observable or promise to our `sourceEvent$`.

That’s why we will use `switchMap` operator to flatten the observable.

As a side effect, `switchMap` will also unsubscribe from the previous subscription 🎉.

const sourceResult = sourceEvent$
.pipe(switchMap(s$ => s$))
.subscribe({
// set the value of the sourceValue signal when the source$ emits a value
next: (value) => sourceValue.set(value),
error: (error) => {
// NOTE: Error should be handled by the user (using catchError or .catch())
sourceValue.set(error);
}
});

Having `switchMap(s$ => s$)` can be replaced with `switchAll()` operator.

Thanks to Petrus Nguyễn Thái Học and Lucas Garcia for pointing that out.

const sourceResult = sourceEvent$
.pipe(switchAll())
.subscribe();

Handling the subscription cleanup

Because we subscribed to the `sourceEvent$` observable, we need to unsubscribe from it. We can use the `DestroyRef` token to handle that. `DestroyRef` has an `onDestroy` method that will call the callback function passed to it when the current injection context is destroyed, in our case, when the component is destroyed.

export function computedAsync<T>(
computation: () => ComputationResult<T>,
options?: ComputedAsyncOptions<T>
): Signal<T> {
return assertInjector(computedAsync, options?.injector, () => {
const destroyRef = inject(DestroyRef);

const sourceEvent$ = new Subject<Promise<T> | Observable<T>>();

effect(() => { /* ... */ });

const sourceResult = source$.subscribe(/* ... */);

destroyRef.onDestroy(() => {
sourceResultSubcription.unsubscribe();
});
});
}

That’s it! We subscribe to get the value and we unsubscribe when the component is destroyed.

Handling the returned observable or promise in the computation

This is currently how `computedAsync` looks like:

export function computedAsync<T>(
computation: () => ComputationResult<T>,
options?: ComputedAsyncOptions<T>
): Signal<T> {
return assertInjector(computedAsync, options?.injector, () => {
const destroyRef = inject(DestroyRef);

const sourceValue = signal<T | undefined>(undefined);

const sourceEvent$ = new Subject<Promise<T> | Observable<T>>();

effect(() => {
const value = computation(); // store the result of the computation

// handle the result if it's an observable or a promise or a normal value
if (isObservable(value) || isPromise(value)) {
// TODO: handle observable and promise
} else {
// TODO: handle normal value
}
});

const sourceResult = sourceEvent$
.pipe(switchAll())
.subscribe({
next: (value) => sourceValue.set(value),
error: (error) => sourceValue.set(error)
});

destroyRef.onDestroy(() => {
sourceResultSubcription.unsubscribe();
});

return computed(() => sourceValue()!);
});
}

Let’s handle the TODOs in the above code.

First, let’s handle the normal value case. We just need to set the value of the `sourceValue` signal.

effect(() => {
const value = computation(); // store the result of the computation

// handle the result if it's an observable or a promise or a normal value
if (isObservable(value) || isPromise(value)) {
// TODO: handle observable and promise
} else {
sourceValue.set(value);
}
});

This will throw an error, because we cannot set the value of a signal inside an effect, without first enabling it for the effect.

effect(() => {
// ...
}, { allowSignalWrites: true }) // enable signal writes

But, there’s another way to solve this problem. We can use the `untracked` function to set the value of the signal without enabling it for the effect (this is basically doing the same thing as the above code). Read more about this trick here.

Let’s use it:

untracked(() => sourceValue.set(value));

Let’s handle the observable and promise case. Just like we set the value in the signal, we need to `next` to the `sourceEvent$` observable.

effect(() => {
const value = computation(); // store the result of the computation

// handle the result if it's an observable or a promise or a normal value
if (isObservable(value) || isPromise(value)) {
sourceEvent$.next(value);
} else {
untracked(() => sourceValue.set(value));
}
});

This may cause some issues, if we just leave it like that. How so?

Look at this example:

export class UserComponent {
private imagesService = inject(ImagesService);
user = input.required<User>();

someValue = signal<string>('');

favoriteImages = computedAsync(() => {
return this.imagesService.getImages(this.user().favoriteImages).pipe(
tap(() => this.someValue.set('some value'))
);
});
}

The `someValue` signal will be set inside our computation, but our computation is inside an effect, basically, we are setting the value of a signal inside an effect. This will throw an error. So, we need to untrack the `sourceEvent$.next()`.

effect(() => {
const value = computation(); // store the result of the computation

// handle the result if it's an observable or a promise or a normal value
if (isObservable(value) || isPromise(value)) {
untracked(() => sourceEvent$.next(value));
} else {
untracked(() => sourceValue.set(value));
}
});

And now, our `computedAsync` function is complete 🎉!

Initial value

By default, the initial value of the `sourceValue` signal is `undefined`. But, we can pass an initial value to the `computedAsync` function.

interface ComputedAsyncOptions<T> extends CreateComputedOptions<T> {
initialValue?: T;
injector?: Injector;
}

export function computedAsync<T>(
computation: () => ComputationResult<T>,
options?: ComputedAsyncOptions<T>
): Signal<T> {
return assertInjector(computedAsync, options?.injector, () => {
// ...
const sourceValue = signal<T | undefined>(options?.initialValue ?? undefined);
// ...
});
}

Now, we can pass an initial value to the `computedAsync` function.

export class UserComponent {
private imagesService = inject(ImagesService);
user = input.required<User>();

favoriteImages = computedAsync(() => {
return this.imagesService.getImages(this.user().favoriteImages);
}, { initialValue: [] });
}

Handling race conditions (behavior)

Currently, we use only `switchAll` operator to handle the subscription and it will cancel the previous calls. But the devs may want to have a different behavior, and for this, we can add a `behavior` option to the `computedAsync` function.

type ComputedAsyncBehavior = 'switch' | 'merge' | 'concat' | 'exhaust';

interface ComputedAsyncOptions<T> extends CreateComputedOptions<T> {
initialValue?: T;
injector?: Injector;
behavior?: ComputedAsyncBehavior;
}

We can use the `behavior` option to handle the subscription.

Let’s create a `createFlattenObservable` function that will handle the operator based on the `behavior` option.

function createFlattenObservable<T>(
source: Subject<Promise<T> | Observable<T>>,
behavior: ComputedAsyncBehavior,
): Observable<T> {
const KEY_OPERATOR_MAP = {
merge: mergeAll,
concat: concatAll,
exhaust: exhaustAll,
switch: switchAll,
};

return source.pipe(KEY_OPERATOR_MAP[behavior]());
}

Now, we can use the `createFlattenObservable` function to handle the subscription.

const source$: Observable<T> = createFlattenObservable(
sourceEvent$,
options?.behavior ?? 'switch',
);

By default, we use `switch` behavior, but we can pass a different behavior.

export class UserComponent {
private imagesService = inject(ImagesService);
user = input.required<User>();

favoriteImages = computedAsync(() =>
this.imagesService.getImages(this.user().favoriteImages),
{ initialValue: [], behavior: 'merge' }
);
}

Because rxjs operators also support Promises, we can pass a promise to the `sourceEvent$` and it will handle it, just like it handles observables.

export class UserComponent {
private imagesService = inject(ImagesService);
user = input.required<User>();

favoriteImages = computedAsync(() =>
fetch(`https://localhost/api/images/${this.user().favoriteImages}`).then(res => res.json()),
{ initialValue: [], behavior: 'merge' }
);
}

How to use the previous value in the computation?

Inside the effect, we can get the current value from the `sourceValue` signal. But, reading a signal inside an effect, registers it as a dependency. So, we need to untrack it first, then we can pass it to the `computation` function.

effect(() => {
const currentSourceValue = untracked(() => sourceValue());
const value = computation(currentSourceValue); // store the result of the computation
// ...
});

This enables us to use the previous value in the computation.

export class UserComponent {
private imagesService = inject(ImagesService);
user = input.required<User>();

favoriteImages = computedAsync((previousFavoriteImages) => {
if (previousFavoriteImages) { /* do something */ }
return this.imagesService.getImages(this.user().favoriteImages);
},
{ initialValue: [], behavior: 'merge' });
}

Use computedAsync from ngxtension

The `computedAsync` function is available in the `ngxtension` library.

npm install ngxtension
# or
yarn add ngxtension

Then, you can import it from `ngxtension` library.

import { computedAsync } from 'ngxtension/computed-async';

And use it like this:

export class UserComponent {
private imagesService = inject(ImagesService);
user = input.required<User>();

favoriteImages = computedAsync(() =>
this.imagesService.getImages(this.user().favoriteImages),
{ initialValue: [] }
);
}

📚 Documentation is available here.

🔨 Read more about the development of `computedAsync` here.

Thanks for reading!

If this article was interesting and useful to you, and you want to learn more about Angular, support me by buying me a coffee ☕️ or follow me on X (formerly Twitter) @Enea_Jahollari where I tweet and blog a lot about Angular latest news, signals, videos, podcasts, updates, RFCs, pull requests and so much more. 💎

--

--

GDE for Angular | Software Engineer @ push-based.io | Performance & architecture enthusiast