Skip to content

Commit fc46fa3

Browse files
Evaluation: Uploading dataset concurrently (#461)
* fix: add threadpool based concurrency to speeden up langfuse dataset upload * chore: fix precommit linting issues * fix: cleanup and deleted CELERY.md * chore: formatting --------- Co-authored-by: Akhilesh Negi <akhileshnegi.an3@gmail.com>
1 parent 974884c commit fc46fa3

File tree

2 files changed

+44
-30
lines changed

2 files changed

+44
-30
lines changed

backend/app/crud/evaluations/langfuse.py

Lines changed: 40 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
"""
1010

1111
import logging
12+
from concurrent.futures import ThreadPoolExecutor, as_completed
1213
from typing import Any
1314

1415
import numpy as np
@@ -247,42 +248,55 @@ def upload_dataset_to_langfuse(
247248
f"duplication_factor={duplication_factor}"
248249
)
249250

251+
def upload_item(item: dict[str, str], duplicate_num: int) -> bool:
252+
try:
253+
langfuse.create_dataset_item(
254+
dataset_name=dataset_name,
255+
input={"question": item["question"]},
256+
expected_output={"answer": item["answer"]},
257+
metadata={
258+
"original_question": item["question"],
259+
"duplicate_number": duplicate_num + 1,
260+
"duplication_factor": duplication_factor,
261+
},
262+
)
263+
return True
264+
except Exception as e:
265+
logger.error(
266+
f"[upload_dataset_to_langfuse] Failed to upload item | "
267+
f"duplicate={duplicate_num + 1} | "
268+
f"question={item['question'][:50]}... | {e}"
269+
)
270+
return False
271+
250272
try:
251273
# Create or get dataset in Langfuse
252274
dataset = langfuse.create_dataset(name=dataset_name)
253275

254-
# Upload items with duplication
276+
upload_tasks = [
277+
(item, duplicate_num)
278+
for item in items
279+
for duplicate_num in range(duplication_factor)
280+
]
281+
282+
# Upload items concurrently using ThreadPoolExecutor
255283
total_uploaded = 0
256-
for item in items:
257-
# Duplicate each item N times
258-
for duplicate_num in range(duplication_factor):
259-
try:
260-
langfuse.create_dataset_item(
261-
dataset_name=dataset_name,
262-
input={"question": item["question"]},
263-
expected_output={"answer": item["answer"]},
264-
metadata={
265-
"original_question": item["question"],
266-
"duplicate_number": duplicate_num + 1,
267-
"duplication_factor": duplication_factor,
268-
},
269-
)
284+
with ThreadPoolExecutor(max_workers=4) as executor:
285+
# Submit all upload tasks and collect the futures
286+
futures = []
287+
for item, dup_num in upload_tasks:
288+
future = executor.submit(upload_item, item, dup_num)
289+
futures.append(future)
290+
291+
for future in as_completed(futures):
292+
upload_successful = future.result()
293+
if upload_successful:
270294
total_uploaded += 1
271-
except Exception as e:
272-
logger.error(
273-
f"[upload_dataset_to_langfuse] Failed to upload item | "
274-
f"duplicate={duplicate_num + 1} | "
275-
f"question={item['question'][:50]}... | {e}"
276-
)
277-
278-
# Flush after each original item's duplicates to prevent race conditions
279-
# in Langfuse SDK's internal batching that could mix up Q&A pairs
280-
langfuse.flush()
281295

282296
# Final flush to ensure all items are uploaded
283297
langfuse.flush()
284298

285-
langfuse_dataset_id = dataset.id if hasattr(dataset, "id") else None
299+
langfuse_dataset_id = dataset.id
286300

287301
logger.info(
288302
f"[upload_dataset_to_langfuse] Successfully uploaded to Langfuse | "

backend/app/tests/crud/evaluations/test_langfuse.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -415,8 +415,8 @@ def test_upload_dataset_to_langfuse_success(self, valid_items):
415415
# Verify dataset items were created (3 original * 5 duplicates = 15)
416416
assert mock_langfuse.create_dataset_item.call_count == 15
417417

418-
# Verify flush was called (once per original item + final flush = 4 times for 3 items)
419-
assert mock_langfuse.flush.call_count == 4 # 3 items + 1 final
418+
# Verify flush was called once (final flush)
419+
assert mock_langfuse.flush.call_count == 1
420420

421421
def test_upload_dataset_to_langfuse_duplication_metadata(self, valid_items):
422422
"""Test that duplication metadata is included."""
@@ -483,8 +483,8 @@ def test_upload_dataset_to_langfuse_single_duplication(self, valid_items):
483483

484484
assert total_items == 3 # 3 items * 1 duplication
485485
assert mock_langfuse.create_dataset_item.call_count == 3
486-
# 3 items + 1 final flush
487-
assert mock_langfuse.flush.call_count == 4
486+
# final flush once
487+
assert mock_langfuse.flush.call_count == 1
488488

489489
def test_upload_dataset_to_langfuse_item_creation_error(self, valid_items):
490490
"""Test that item creation errors are logged but don't stop processing."""

0 commit comments

Comments
 (0)