Conversation
|
Just added a quick performance check we have Main: This PR |
RubenVerborgh
left a comment
There was a problem hiding this comment.
Yes, good stuff, @jeswr. We're absolutely on the right track.
In general, we need to be conservative/defensive when programming derived classes, and not assume too many behaviors of them. I should probably have a list of allowed assumptions, but it's not black and white. Briefly, let's not assume that source is not one of the implementations we've written, but rather a more unstable one. We have to assume its state and its readable are correct, but that's about it.
asynciterator.ts
Outdated
| if (this._source.done) { | ||
| this.close(); | ||
| return null; | ||
| } |
There was a problem hiding this comment.
| if (this._source.done) { | |
| this.close(); | |
| return null; | |
| } | |
| if (!this._source.readable) { | |
| this.readable = false; | |
| if (this._source.done) | |
| this.close(); | |
| return null; | |
| } |
^ We should be maximally well-behaved and not attempt to call .read on the source unless it advertises itself as potentially readable.
There was a problem hiding this comment.
@RubenVerborgh Is it worth opening an issue to apply this idea to other iterators, for instance in _readAndTransform and _readAndTransformSimple.
There was a problem hiding this comment.
Note that TransformIterator and SimpleTransformIterator also accept regular Node.js Readable instances, which do not define readable. So the goal there is to also work with non-AsyncIterators (see wrap).
We can have the same goal for MappingIterator, but that requires different tests.
|
Thanks, @jeswr, looking really good. Amazing what two fresh pairs of eyes can do, innit? 😉 |
|
I think that since this is close, clean it up and then we can release. After that is done we could put most of the logic into an abstract class that I have in mind that it will look something like this (which does have all tests current passing and is over 2x faster on the current performance tests): /**
* A synchronous mapping function from one element to another.
* A return value of `null` means that nothing should be emitted for a particular item.
*/
export type MapFunction<S, D = S> = (item: S) => D | null;
/**
An iterator that calls a synchronous operation
on every item from its source iterator.
@extends module:asynciterator.AsyncIterator
*/
export abstract class SynchronousTransformIterator<S, D = S> extends AsyncIterator<D> {
private readonly _destroySource: boolean;
protected _source: InternalSource<S>;
/**
* Applies the given mapping to the source iterator.
*/
constructor(
_source: AsyncIterator<S>,
options: SourcedIteratorOptions = {}
) {
super();
this._source = _validateSource(_source);
this._destroySource = options.destroySource !== false;
if (_source.done) {
this.close();
}
else {
this._source._destination = this;
this._source.on('end', destinationClose);
this._source.on('error', destinationEmitError);
this._source.on('readable', destinationSetReadable);
// If we are given a source that is already readable
// then we need to set the state of this iterable to readable
// also as there is no guarantee that the is no forthcoming
// readable event from the source
this.readable = this._source.readable;
}
}
protected abstract safeRead(): D | null;
read(): D | null {
// Do not read the source if the current iterator is ended
if (this.done)
return null;
// A source should only be read from if readable is true
if (!this._source.readable) {
this.readable = false;
if (this._source.done)
this.close();
return null;
}
const item = this.safeRead();
if (item !== null) {
return item;
}
// This will set readable to false on the current iterator
// if the source is no longer readable
this.readable = false;
return null;
}
/* Cleans up the source iterator and ends. */
protected _end(destroy: boolean) {
this._source.removeListener('end', destinationClose);
this._source.removeListener('error', destinationEmitError);
this._source.removeListener('readable', destinationSetReadable);
delete this._source._destination;
if (this._destroySource)
this._source.destroy();
super._end(destroy);
}
}
export class MappingIterator<S, D = S> extends SynchronousTransformIterator<S, D> {
constructor(
source: AsyncIterator<S>,
private readonly _map: MapFunction<S, D>,
options: SourcedIteratorOptions = {}
) {
super(source, options);
}
safeRead() {
let item: S | null;
while ((item = this._source.read()) !== null) {
const mapped = this._map(item);
if (mapped !== null)
return mapped;
}
return null;
}
map<K>(map: MapFunction<D, K>, self?: any): AsyncIterator<K> {
return new CompositeMappingIterator(this._source, [this._map, bind(map, self)], this);
}
}
export class CompositeMappingIterator<S, D = S> extends SynchronousTransformIterator<S, D> {
constructor(
private root: AsyncIterator<S>,
private mappings: MapFunction<any, any>[] = [],
source: AsyncIterator<any>,
options: SourcedIteratorOptions = {},
) {
super(source, options);
}
safeRead() {
// TODO: See if this is actually necessary
// A source should only be read from if readable is true
if (!this.root.readable) {
this.readable = false;
// TODO: See if this should be here
if (this.root.done)
this.close();
return null;
}
let mapped : any = null;
while (mapped === null && (mapped = this.root.read()) !== null) {
for (let i = 0; i < this.mappings.length; i++) {
mapped = this.mappings[i](mapped);
if (mapped === null)
break;
}
}
return mapped;
}
map<K>(map: MapFunction<D, K>, self?: any): AsyncIterator<K> {
return new CompositeMappingIterator(this.root, [...this.mappings, bind(map, self)], this);
}
}
export class UntilIterator<S> extends SynchronousTransformIterator<S> {
constructor(
source: AsyncIterator<S>,
private limit: number,
options: SourcedIteratorOptions = {}
) {
super(source, options);
}
safeRead() {
if (this.limit <= 0) {
this.close();
return null;
}
this.limit -= 1;
return this._source.read();
}
}I'll open up a branch with a cleaned up, and more heavily tested version of the code above (minus the |
f75f86a to
eeb4796
Compare
|
@jeswr I have done a review and have arrived at the code you can now see in this PR.
|
|
Ooh interesting - I thought those were failing before without. I think it was historical when @jacoscaz needed a similar pattern to get unit tests passing. If tests are working then I think it is safe to get rid of it provided we have good enough coverage with asynchronous |
Nope - tests still pass making the changes to convert to the
|
There was a problem hiding this comment.
"Approved" @RubenVerborgh with one suggested change (I don't have permissions to click the actual review button :))
asynciterator.ts
Outdated
| return mapped; | ||
| } | ||
| } | ||
| else if (this._source.done) { |
There was a problem hiding this comment.
| else if (this._source.done) { | |
| if (this._source.done) { |
Only one suggested change - see #75 (comment).
7b010d5 to
43c502f
Compare
Yeah; and I tend to optimize for fewer
Yes—and that's the benefit of not messing with the base
And fully agreed—the code is even leaner now. I really like what we landed on: read(): D | null {
if (!this.done) {
// Try to read an item that maps to a non-null value
if (this._source.readable) {
let item: S | null, mapped: D | null;
while ((item = this._source.read()) !== null) {
if ((mapped = this._map(item)) !== null)
return mapped;
}
}
this.readable = false;
// Close this iterator if the source is empty
if (this._source.done)
this.close();
}
return null;
}This has been a common occurrence for me with |
43c502f to
42cae48
Compare
42cae48 to
def9c09
Compare
| return this.transform({ filter: self ? filter.bind(self) : filter }); | ||
| return this.map(function (this: any, item: T) { | ||
| return filter.call(self || this, item) ? item : null; | ||
| }); |
There was a problem hiding this comment.
@jeswr Found this nasty bug still when restoring all of the main tests. (The this pointer filter is called with would be wrong because of the nested function.)
|
I had very little to do with this! Great job. I'll rebase #63 ASAP. |
|
@jacoscaz Reviews and chats are important! See Gitter 😉 |
|
Nice work! @jeswr Do you have any insights into the performance impact on Comunica? |
A fresh branch, that does not use transduction, as discussed in #59.
I think this resolves all readability problems @RubenVerborgh though I'm hesitant to say that for certain until I look with fresh eyes in the morning.
Getting close (I think) to correct readable behaviour - I just need to get one more set of new tests that I just created working. The currently failing tests test the case where the upstream iterator becomes non-readable without closing.I'll come back to this in a few hrs later this evening :)