Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion sigllm/primitives/forecasting/huggingface.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
DEFAULT_PAD_TOKEN = '<pad>'

VALID_NUMBERS = list('0123456789')
VALID_MULTIVARIATE_SYMBOLS = []

DEFAULT_MODEL = 'mistralai/Mistral-7B-Instruct-v0.2'

Expand Down Expand Up @@ -53,6 +54,7 @@ def __init__(
raw=False,
samples=1,
padding=0,
multivariate_allowed_symbols = [],
):
self.name = name
self.sep = sep
Expand All @@ -62,6 +64,7 @@ def __init__(
self.raw = raw
self.samples = samples
self.padding = padding
self.multivariate_allowed_symbols = multivariate_allowed_symbols

self.tokenizer = AutoTokenizer.from_pretrained(self.name, use_fast=False)

Expand All @@ -85,6 +88,9 @@ def __init__(
token = self.tokenizer.convert_tokens_to_ids(number)
valid_tokens.append(token)

for symbol in self.multivariate_allowed_symbols:
valid_tokens.append(self.tokenizer.convert_tokens_to_ids(symbol))

valid_tokens.append(self.tokenizer.convert_tokens_to_ids(self.sep))
self.invalid_tokens = [
[i] for i in range(len(self.tokenizer) - 1) if i not in valid_tokens
Expand Down Expand Up @@ -116,7 +122,7 @@ def forecast(self, X, **kwargs):
tokenized_input = self.tokenizer([text], return_tensors='pt').to('cuda')

input_length = tokenized_input['input_ids'].shape[1]
average_length = input_length / len(text.split(','))
average_length = input_length / len(text.split(self.sep))
max_tokens = (average_length + self.padding) * self.steps

generate_ids = self.model.generate(
Expand Down
21 changes: 21 additions & 0 deletions sigllm/primitives/formatting/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"""Multivariate formatting methods for time series data."""

from sigllm.primitives.formatting.multivariate_formatting import MultivariateFormattingMethod
from sigllm.primitives.formatting.json_format import JSONFormat
from sigllm.primitives.formatting.univariate_control import UnivariateControl
from sigllm.primitives.formatting.persistence_control import PersistenceControl
from sigllm.primitives.formatting.value_concatenation import ValueConcatenation
from sigllm.primitives.formatting.value_interleave import ValueInterleave
from sigllm.primitives.formatting.digit_interleave import DigitInterleave

__all__ = [
'MultivariateFormattingMethod',
'JSONFormat',
'UnivariateControl',
'PersistenceControl',
'ValueConcatenation',
'ValueInterleave',
'DigitInterleave',
]


72 changes: 72 additions & 0 deletions sigllm/primitives/formatting/digit_interleave.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from .multivariate_formatting import MultivariateFormattingMethod
import numpy as np


class DigitInterleave(MultivariateFormattingMethod):
def __init__(self, verbose: bool = False, **kwargs):
super().__init__("digit_interleave", verbose=verbose, **kwargs)


def format_as_string(self, data: np.ndarray, digits_per_timestamp = 3, separator = ",") -> str:
max_digits = max(len(str(abs(int(v)))) for window in data for ts in window for v in ts)
width_used = max(digits_per_timestamp, max_digits)
self.metadata['width_used'] = width_used

def interleave_digits(timestamp):
str_values = [str(int(val)) for val in timestamp]
padded_values = [s.zfill(width_used) for s in str_values]
result_str = ''
for digit_pos in range(width_used):
for padded_val in padded_values:
result_str += padded_val[digit_pos]

return result_str

result = [
separator.join(interleave_digits(timestamp) for timestamp in window) + separator
for window in data
]
return result


def format_as_integer(self, data: list[str], separator = ",", trunc = None, digits_per_timestamp = 3) -> np.ndarray:
width_used = self.metadata['width_used']

def deinterleave_timestamp(interleaved_str):
"""Convert interleaved digits back to original values"""
total_digits = len(interleaved_str)
num_values = total_digits // width_used

values = []
for value_idx in range(num_values):
value_digits = []
for digit_pos in range(width_used):
pos = digit_pos * num_values + value_idx
if pos < total_digits:
value_digits.append(interleaved_str[pos])

if value_digits:
values.append(int(''.join(value_digits)))

return np.array(values)[:trunc] if trunc else np.array(values)

result = np.array([
[
deinterleave_timestamp(timestamp)
for sample in entry
for timestamp in sample.lstrip(separator).rstrip(separator).split(separator)[:trunc]
if timestamp.strip()
]
for entry in data
], dtype=object)
return result



if __name__ == "__main__":
method = DigitInterleave(digits_per_timestamp=3)
method.test_multivariate_formatting_validity(verbose=False)
errs, y_hat, y = method.run_pipeline(return_y_hat=True)
print(errs)
print(y_hat)
print(y)
48 changes: 48 additions & 0 deletions sigllm/primitives/formatting/json_format.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from .multivariate_formatting import MultivariateFormattingMethod
import numpy as np
import re

class JSONFormat(MultivariateFormattingMethod):
def __init__(self, verbose: bool = False, **kwargs):
super().__init__("json_format", verbose=verbose, **kwargs)

def format_as_string(self, data: np.ndarray, separator = ",") -> str:
def window_to_json(data):
rows = []
for row in data:
parts = [f"d{i}:{val}" for i, val in enumerate(row)]
rows.append(",".join(parts))
return ",".join(rows)

out = [window_to_json(window) for window in data]
return out


def format_as_integer(self, data, trunc=None):
batch_rows = []
for window in data:
samples = []
for sample in window:
tokens = re.findall(r'd\d+:\d+', sample)
flat, current = [], []
for token in tokens:
key, val = token.split(":")
if key == "d0" and current:
flat.extend(current)
current = []
current.append(int(val))
if current:
flat.extend(current)
if trunc:
flat = flat[:trunc]
samples.append(flat)
batch_rows.append(samples)
return np.array(batch_rows, dtype=object)




if __name__ == "__main__":
method = JSONFormat()
method.test_multivariate_formatting_validity(verbose=False)
method.run_pipeline(multivariate_allowed_symbols=["d", ":", ","])
161 changes: 161 additions & 0 deletions sigllm/primitives/formatting/multivariate_formatting.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
import numpy as np
from mlblocks import MLPipeline
import pandas as pd

class MultivariateFormattingMethod:
def __init__(self, method_name: str, verbose: bool = False, **kwargs):
self.method_name = method_name
self.config = kwargs
self.metadata = {}
self.verbose = verbose

if self.method_name != "persistence_control":
self.test_multivariate_formatting_validity(verbose=verbose)


def format_as_string(self, data: np.ndarray, **kwargs) -> str:
raise NotImplementedError()


def format_as_integer(self, data: str, **kwargs) -> np.ndarray:
raise NotImplementedError()


def normalize_data(self, df: pd.DataFrame) -> pd.DataFrame:
ts = df[["timestamp"]]
vals = df.drop(columns=["timestamp"])
normed = (vals - vals.mean(axis=0)) / vals.std(axis=0)
return pd.concat([ts, normed], axis=1)[df.columns]


@staticmethod
def create_test_data(N = 25):
x1 = np.linspace(10, 9+N, N) / 100
x2 = np.array([i % 2 for i in range(N)])
x3 = np.linspace(N+40, 41, N) / 100

return pd.DataFrame({
'timestamp': np.linspace(0, 3600*(N-1), N),
'x1': x1,
'x2': x2,
'x3': x3,
})


def run_pipeline(self, data=create_test_data(),
interval=3600,
window_size=15,
verbose=True,
samples=7,
normalize=False,
temp=0.1,
return_y_hat = False,
multivariate_allowed_symbols = [],
pipeline_name = 'mistral_detector',
stride = 1,
n_clusters = 2,
strategy = 'binning'):
pipeline = MLPipeline(pipeline_name)
digits_per_timestamp = self.config.get('digits_per_timestamp', 2)

test_hyperparameters = {
"mlstars.custom.timeseries_preprocessing.time_segments_aggregate#1": {
"interval": interval
},
"sigllm.primitives.forecasting.custom.rolling_window_sequences#1": {
"target_column": 0,
"window_size": window_size,
"target_size": 1,
"step_size": stride,
},
"sigllm.primitives.forecasting.huggingface.HF#1": {
"samples": samples,
"temp": temp,
"multivariate_allowed_symbols": multivariate_allowed_symbols,
},
}

if strategy == 'binning':
test_hyperparameters["sigllm.primitives.transformation.Float2Scalar#1"] = {
"strategy": "binning",
"n_clusters": n_clusters,
}

elif strategy == 'scaling':
test_hyperparameters["sigllm.primitives.transformation.Float2Scalar#1"] = {
"strategy": "scaling",
"decimal": digits_per_timestamp,
"rescale": True,
}
else:
raise ValueError(f"Invalid strategy: {strategy}")

pipeline.set_hyperparameters(test_hyperparameters)
if normalize:
data = self.normalize_data(data)
context = pipeline.fit(data, start_=0, output_=3)
context['X'] = self.format_as_string(context['X'], **self.config)

if self.method_name == "persistence_control":
context['y_hat'] = context['X']

else:
context = pipeline.fit(**context, start_=5, output_=5)

if verbose:
print(f"y_hat example: {context['y_hat'][0][0]}")

context['y_hat'] = self.format_as_integer(context['y_hat'], trunc=1)
if verbose:
print(f"y_hat example: {context['y_hat'][0][0]}")
context = pipeline.fit(**context, start_=7, output_=10)

errors = np.round(context['errors'], 7)
MAE = np.mean(abs(errors))

if verbose:
print(f"y_hat: {context['y_hat']}")
print(f"y: {context['y']}")
print(f"errors: {errors}")

if return_y_hat:
return errors, context['y_hat'], context['y']
else:
return errors


def test_multivariate_formatting_validity(self, data=None, verbose=False):
if verbose:
print("Testing multivariate formatting method validity")

if data is None:
raw_data = np.array(self.create_test_data())[:, 1:]
windowed_data = np.array([raw_data[i:i+15,:] for i in range(0, len(raw_data)-15, 1)])
data = (1000 * windowed_data).astype(int)
if verbose:
print(data.shape)

string_data = self.format_as_string(data, **self.config)
LLM_mock_output = np.array(string_data).reshape(-1, 1)
if verbose:
print(LLM_mock_output)
integer_data = self.format_as_integer(LLM_mock_output, **self.config)
if verbose:
print(f"Format as string output: {string_data}")

assert isinstance(string_data, list)
assert isinstance(string_data[0], str)
assert isinstance(integer_data, np.ndarray)

if self.method_name == "univariate_control":
assert np.all(integer_data.flatten() == data[:, :, 0].flatten())
else:
assert np.all(integer_data.flatten() == data.flatten())

if verbose:
print("Validation suite passed")


if __name__ == "__main__":
method = MultivariateFormattingMethod(method_name="test")
print(method.normalize_data(method.create_test_data()))
27 changes: 27 additions & 0 deletions sigllm/primitives/formatting/persistence_control.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from .multivariate_formatting import MultivariateFormattingMethod
import numpy as np


class PersistenceControl(MultivariateFormattingMethod):
def __init__(self, verbose: bool = False, **kwargs):
super().__init__("persistence_control", verbose=verbose, **kwargs)

def format_as_string(self, data: np.ndarray, separator = ",") -> str:
result = []
for row in data[:, :, 0]:
result.append(separator.join(map(str, row.flatten())))
return result

def format_as_integer(self, data: list[str], separator = ",", trunc = None) -> np.ndarray:
result = [
[np.array([int(x) for x in entry.lstrip(separator).split(separator) if x])[-1:]]
for entry in data
]
out = np.array(result, dtype=object)
return out



if __name__ == "__main__":
method = PersistenceControl()
method.run_pipeline(stride=5)
Loading