Skip to content

Commit 74c8a5c

Browse files
committed
feat: add observeAsync function
1 parent 82a580c commit 74c8a5c

File tree

4 files changed

+66
-1
lines changed

4 files changed

+66
-1
lines changed

index.ts

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import {Observable, Subscription} from "rxjs";
2+
3+
export interface Sink<T> {
4+
subscribed: () => boolean;
5+
emit: (value: T) => void;
6+
emitAll: (values: Observable<T>) => Promise<void>;
7+
}
8+
9+
export class EmitAfterUnsubscribeError extends Error {
10+
public constructor() {
11+
super("Emitted a value after subscriber unsubscribed");
12+
}
13+
}
14+
15+
export function observeAsync<T>(action: (sink: Sink<T>) => Promise<void>): Observable<T> {
16+
return new Observable<T>(subscriber => {
17+
let subscribed = true;
18+
const subscription = new Subscription(() => {
19+
subscribed = false;
20+
});
21+
22+
const emit = (value: T): void => {
23+
if (subscribed) {
24+
void subscriber.next(value);
25+
} else {
26+
throw new EmitAfterUnsubscribeError();
27+
}
28+
};
29+
30+
void action({
31+
subscribed: () => subscribed,
32+
emit,
33+
emitAll: async values =>
34+
new Promise((resolve, reject) => {
35+
const innerSubscription = values.subscribe({
36+
next: value => void emit(value),
37+
error: (error: unknown) => {
38+
subscriber.error(error);
39+
reject(error);
40+
},
41+
complete: () => {
42+
resolve();
43+
subscription.remove(innerSubscription);
44+
}
45+
});
46+
subscription.add(innerSubscription);
47+
})
48+
}).then(
49+
() => {
50+
subscriber.complete();
51+
subscription.unsubscribe();
52+
},
53+
(error: unknown) => {
54+
subscriber.error(error);
55+
subscriber.complete();
56+
subscription.unsubscribe();
57+
}
58+
);
59+
60+
return subscription;
61+
});
62+
}

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
"node": "^18 || ^20 || >=21"
2727
},
2828
"dependencies": {
29+
"rxjs": "^6.0.0 || ^7.0.0",
2930
"tslib": "^2.6.2"
3031
},
3132
"devDependencies": {

tsconfig.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
{
22
"extends": "@softwareventures/tsconfig/commonjs",
33
"compilerOptions": {
4-
"types": []
4+
"types": [],
5+
"skipLibCheck": true
56
},
67
"exclude": ["**/*.test.ts", "**/test.ts"]
78
}

yarn.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1036,6 +1036,7 @@ __metadata:
10361036
inquirer: "npm:9.2.17"
10371037
pinst: "npm:3.0.0"
10381038
prettier: "npm:3.2.5"
1039+
rxjs: "npm:^7.8.1"
10391040
semantic-release: "npm:23.0.6"
10401041
ts-node: "npm:10.9.2"
10411042
tslib: "npm:^2.6.2"

0 commit comments

Comments
 (0)