Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
# Ignore data files in notebooks folder
notebooks/**/*.json
notebooks/**/*.yaml
notebooks/**/*.parquet
notebooks/**/*.pkl
notebooks/**/*.db

# Ignore vscode settings
.vscode/

Expand Down
5 changes: 3 additions & 2 deletions notebooks/02_orcabridge_basic_usage.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -803,9 +803,10 @@
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"import tempfile\n",
"import json\n",
"import tempfile\n",
"\n",
"import numpy as np\n",
"\n",
"\n",
"def compute_stats(bin_file: PathLike, output_file=None):\n",
Expand Down
3 changes: 2 additions & 1 deletion notebooks/03_orcabridge_qol_features.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,12 @@
"metadata": {},
"outputs": [],
"source": [
"from orcabridge.pod import function_pod\n",
"import json\n",
"import tempfile\n",
"from pathlib import Path\n",
"\n",
"from orcabridge.pod import function_pod\n",
"\n",
"json_source = ob.GlobSource(\"json_file\", \"../examples/dataset2\", \"*.json\")\n",
"\n",
"\n",
Expand Down
7 changes: 4 additions & 3 deletions notebooks/04_orcabridge_tracker.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
"metadata": {},
"outputs": [],
"source": [
"from orcabridge.tracker import Tracker\n",
"from orcabridge.pod import function_pod\n",
"from orcabridge.source import GlobSource\n",
"from orcabridge.store import DirDataStore\n",
"from orcabridge.pod import function_pod"
"from orcabridge.tracker import Tracker"
]
},
{
Expand Down Expand Up @@ -62,9 +62,10 @@
"metadata": {},
"outputs": [],
"source": [
"import json\n",
"import tempfile\n",
"from pathlib import Path\n",
"import json\n",
"\n",
"import yaml\n",
"\n",
"# use default data store location of `./pod_data`\n",
Expand Down
12 changes: 7 additions & 5 deletions notebooks/05_orcabridge_dj_integration.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,15 @@
"metadata": {},
"outputs": [],
"source": [
"from orcabridge.pod import function_pod\n",
"from orcabridge.source import GlobSource\n",
"from orcabridge.store import DirDataStore\n",
"import json\n",
"import tempfile\n",
"from pathlib import Path\n",
"import json\n",
"\n",
"import yaml\n",
"\n",
"from orcabridge.pod import function_pod\n",
"from orcabridge.source import GlobSource\n",
"from orcabridge.store import DirDataStore\n",
"\n",
"# define data source\n",
"data_source = GlobSource(\n",
Expand Down Expand Up @@ -162,9 +163,10 @@
}
],
"source": [
"from orcabridge.dj.tracker import QueryTracker\n",
"import datajoint as dj\n",
"\n",
"from orcabridge.dj.tracker import QueryTracker\n",
"\n",
"schema = dj.schema(\"enigma_orcabridge_test\")\n",
"\n",
"\n",
Expand Down
13 changes: 10 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@ description = "Function-based Oracapod Pipeline implementation in Python"
dynamic = ["version"]
dependencies = [
"xxhash",
"networkx",
"typing_extensions",
"matplotlib>=3.10.3",
"networkx",
"typing_extensions",
"matplotlib>=3.10.3",
"pandas>=2.2.3",
"pyyaml>=6.0.2",
"pyarrow>=20.0.0",
"polars>=1.30.0",
]
readme = "README.md"
requires-python = ">=3.10"
Expand All @@ -36,10 +40,13 @@ version_file = "src/orcabridge/_version.py"

[dependency-groups]
dev = [
"deltalake>=1.0.2",
"httpie>=3.2.4",
"ipykernel>=6.29.5",
"pyiceberg>=0.9.1",
"pytest>=8.3.5",
"pytest-cov>=6.1.1",
"redis>=6.2.0",
"ruff>=0.11.11",
"tqdm>=4.67.1",
]
20 changes: 7 additions & 13 deletions src/orcabridge/__init__.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
from . import hashing
from . import pod
from . import mapper
from . import stream
from . import source
from . import store
from .mapper import MapTags, MapPackets, Join, tag, packet
from . import hashing, mappers, pod, sources, store, streams
from .mappers import Join, MapPackets, MapTags, packet, tag
from .pod import FunctionPod, function_pod
from .source import GlobSource
from .sources import GlobSource
from .store import DirDataStore, SafeDirDataStore
from .tracker import GraphTracker

from .pipeline import GraphTracker

DEFAULT_TRACKER = GraphTracker()
DEFAULT_TRACKER.activate()
Expand All @@ -20,9 +14,9 @@
"store",
"pod",
"dir_data_store",
"mapper",
"stream",
"source",
"mappers",
"streams",
"sources",
"MapTags",
"MapPackets",
"Join",
Expand Down
98 changes: 88 additions & 10 deletions src/orcabridge/base.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from orcabridge.hashing import HashableMixin
from orcabridge.types import Tag, Packet
from typing import Any
# Collection of base classes for operations and streams in the orcabridge framework.
import threading
from collections.abc import Collection, Callable, Iterator
from abc import ABC, abstractmethod
from collections.abc import Callable, Collection, Iterator
from typing import Any

from orcabridge.hashing import HashableMixin
from orcabridge.types import Packet, Tag


class Operation(HashableMixin):
class Operation(ABC, HashableMixin):
"""
Operation defines a generic operation that can be performed on a stream of data.
It is a base class for all operations that can be performed on a collection of streams
Expand Down Expand Up @@ -79,10 +82,24 @@ def __str__(self):
return f"{self.__class__.__name__}({self._label})"
return self.__class__.__name__

def claims_unique_tags(
self, *streams: "SyncStream", trigger_run: bool = True
) -> bool:
"""
Returns True if the operation claims that it has unique tags, False otherwise.
This method is useful for checking if the operation can be used as a source
for other operations that require unique tags.
Subclasses should override this method if it can provide reasonable check/guarantee
of unique tags. The default implementation returns False, meaning that the operation
does not claim to have unique tags.
"""
return False

@abstractmethod
def forward(self, *streams: "SyncStream") -> "SyncStream": ...


class Tracker:
class Tracker(ABC):
"""
A tracker is a class that can track the invocations of operations. Only "active" trackers
participate in tracking and its `record` method gets called on each invocation of an operation.
Expand Down Expand Up @@ -124,9 +141,12 @@ def __enter__(self):
def __exit__(self, exc_type, exc_val, ext_tb):
self.deactivate()

@abstractmethod
def record(self, invocation: "Invocation") -> None: ...


# This is NOT an abstract class, but rather a concrete class that
# represents an invocation of an operation on a collection of streams.
class Invocation(HashableMixin):
"""
This class represents an invocation of an operation on a collection of streams.
Expand All @@ -138,6 +158,7 @@ class Invocation(HashableMixin):
def __init__(
self,
operation: Operation,
# TODO: technically this should be Stream to stay consistent with Stream interface
streams: Collection["SyncStream"],
) -> None:
self.operation = operation
Expand Down Expand Up @@ -171,8 +192,20 @@ def __lt__(self, other: Any) -> bool:
# otherwise, order by the operation
return hash(self.operation) < hash(other.operation)

def claims_unique_tags(self, trigger_run: bool = True) -> bool:
"""
Returns True if the invocation has unique tags, False otherwise.
This method is useful for checking if the invocation can be used as a source
for other operations that require unique tags. None is returned if the
uniqueness of tags cannot be determined.
Note that uniqueness is best thought of as a "claim" by the operation
that it has unique tags. The actual uniqueness can only be verified
by iterating over the streams and checking the tags.
"""
return self.operation.claims_unique_tags(*self.streams, trigger_run=trigger_run)


class Stream(HashableMixin):
class Stream(ABC, HashableMixin):
"""
A stream is a collection of tagged-packets that are generated by an operation.
The stream is iterable and can be used to access the packets in the stream.
Expand Down Expand Up @@ -242,6 +275,20 @@ def keys(self) -> tuple[Collection[str] | None, Collection[str] | None]:
tag, packet = next(iter(self))
return list(tag.keys()), list(packet.keys())

def claims_unique_tags(self) -> bool:
"""
Returns True if the stream has unique tags, False otherwise.
This method is useful for checking if the stream can be used as a source
for other operations that require unique tags. None is returned if the
uniqueness of tags cannot be determined.
If the stream is generated by an operation, the invocation is consulted for
the information about unique tags.
"""
if self.invocation is not None:
return self.invocation.claims_unique_tags()
return False

@abstractmethod
def __iter__(self) -> Iterator[tuple[Tag, Packet]]:
raise NotImplementedError("Subclasses must implement __iter__ method")

Expand All @@ -260,6 +307,29 @@ class SyncStream(Stream):
will have to wait for the stream to finish before proceeding.
"""

def claims_unique_tags(self, *, trigger_run=True) -> bool:
"""
For synchronous streams, if the stream is generated by an operation, the invocation
is consulted first to see if the uniqueness of tags can be determined without iterating over the stream.
If uniqueness cannot be determined from the invocation and if trigger_run is True, uniqueness is checked
by iterating over all elements and verifying uniqueness.
Consequently, this may trigger upstream computations and can be expensive.
If trigger_run is False, the method will return None if the uniqueness cannot be determined.
Since this consults the invocation, the resulting value is ultimately a claim and not a guarantee
of uniqueness. If guarantee of uniquess is required, then use has_unique_tags method
"""
result = super().claims_unique_tags()
if result is not None or not trigger_run:
return result

# If the uniqueness cannot be determined from the invocation, iterate over the stream
unique_tags = set()
for idx, (tag, packet) in enumerate(self):
if tag in unique_tags:
return False
unique_tags.add(tag)
return True

def head(self, n: int = 5) -> None:
"""
Print the first n elements of the stream.
Expand All @@ -281,27 +351,32 @@ def __len__(self) -> int:
return sum(1 for _ in self)

def __rshift__(
self, transformer: Callable[["SyncStream"], "SyncStream"]
self, transformer: dict | Callable[["SyncStream"], "SyncStream"]
) -> "SyncStream":
"""
Returns a new stream that is the result of applying the mapping to the stream.
The mapping is applied to each packet in the stream and the resulting packets
are returned in a new stream.
"""
# TODO: remove just in time import
from .mapper import MapPackets
from .mappers import MapPackets

if isinstance(transformer, dict):
return MapPackets(transformer)(self)
elif isinstance(transformer, Callable):
return transformer(self)

# Otherwise, do not know how to handle the transformer
raise TypeError(
"transformer must be a dictionary or a callable that takes a SyncStream"
)

def __mul__(self, other: "SyncStream") -> "SyncStream":
"""
Returns a new stream that is the result joining with the other stream
"""
# TODO: remove just in time import
from .mapper import Join
from .mappers import Join

if not isinstance(other, SyncStream):
raise TypeError("other must be a SyncStream")
Expand All @@ -321,6 +396,9 @@ class Source(Operation, SyncStream):
type of Operation that takes no input and produces a stream of packets.
For convenience, the source itself is also a stream and thus can be used
as an input to other operations directly.
However, note that Source is still best thought of as an Operation that
produces a stream of packets, rather than a stream itself. On almost all occasions,
Source acts as an Operation.
"""

def __init__(self, label: str | None = None, **kwargs) -> None:
Expand Down
9 changes: 5 additions & 4 deletions src/orcabridge/dj/mapper.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from .stream import QueryStream
from .operation import QueryOperation
from ..mapper import Mapper, Join, MapPackets, MapTags
from typing import Optional
import warnings
from typing import Optional

from orcabridge.mappers import Join, MapPackets, Mapper, MapTags
from .operation import QueryOperation
from .stream import QueryStream


class QueryMapper(QueryOperation, Mapper):
Expand Down
2 changes: 1 addition & 1 deletion src/orcabridge/dj/operation.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .stream import QueryStream
from ..base import Operation
from .stream import QueryStream


class QueryOperation(Operation):
Expand Down
Loading
Loading