Skip to content

Commit e354925

Browse files
committed
fix errors handling
1 parent 86c856c commit e354925

File tree

1 file changed

+32
-0
lines changed

1 file changed

+32
-0
lines changed

src/inferencesh/models/llm.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -586,6 +586,8 @@ def stream_generate(
586586
) -> Generator[BaseLLMOutput, None, None]:
587587
"""Stream generate from LLaMA.cpp model with timing and usage tracking."""
588588

589+
print("[DEBUG] Starting stream_generate")
590+
589591
# Create queues for communication between threads
590592
response_queue = Queue()
591593
error_queue = Queue()
@@ -597,6 +599,7 @@ def stream_generate(
597599
def _generate_worker():
598600
"""Worker thread to run the model generation."""
599601
try:
602+
print("[DEBUG] Worker thread started")
600603
# Build completion kwargs
601604
completion_kwargs = {
602605
"messages": messages,
@@ -613,17 +616,24 @@ def _generate_worker():
613616

614617
# Signal that we're starting
615618
keep_alive_queue.put(("init", time.time()))
619+
print("[DEBUG] Worker sent init signal")
616620

617621
completion = model.create_chat_completion(**completion_kwargs)
622+
print("[DEBUG] Got completion iterator from model")
618623

624+
chunk_count = 0
619625
for chunk in completion:
626+
chunk_count += 1
620627
if verbose:
621628
print(chunk)
629+
if chunk_count % 10 == 0: # Log every 10th chunk to avoid spam
630+
print(f"[DEBUG] Processing chunk {chunk_count}")
622631
response_queue.put(("chunk", chunk))
623632
# Update keep-alive timestamp
624633
keep_alive_queue.put(("alive", time.time()))
625634

626635
# Signal completion
636+
print(f"[DEBUG] Worker finished processing {chunk_count} chunks")
627637
response_queue.put(("done", None))
628638

629639
except Exception as e:
@@ -639,6 +649,7 @@ def _generate_worker():
639649
# Start generation thread
640650
generation_thread = Thread(target=_generate_worker, daemon=True)
641651
generation_thread.start()
652+
print("[DEBUG] Started worker thread")
642653

643654
# Initialize response state
644655
response = StreamResponse()
@@ -657,10 +668,13 @@ def _generate_worker():
657668
print(f"[DEBUG] Raising due to unexpected init message: {msg_type}")
658669
raise RuntimeError("Unexpected initialization message")
659670
last_activity = timestamp
671+
print("[DEBUG] Received init signal from worker")
660672
except Empty:
661673
print(f"[DEBUG] Raising due to init timeout after {init_timeout}s")
662674
raise RuntimeError(f"Model failed to initialize within {init_timeout} seconds")
663675

676+
chunk_count = 0
677+
output_count = 0
664678
while True:
665679
# Check for errors - now with proper exception chaining
666680
if not error_queue.empty():
@@ -689,6 +703,9 @@ def _generate_worker():
689703
# Get next chunk
690704
try:
691705
msg_type, data = response_queue.get(timeout=0.1)
706+
chunk_count += 1
707+
if chunk_count % 10 == 0: # Log every 10th chunk to avoid spam
708+
print(f"[DEBUG] Main loop received chunk {chunk_count}")
692709
except Empty:
693710
continue
694711

@@ -698,33 +715,48 @@ def _generate_worker():
698715
print(f"[DEBUG] Raising due to error message: {data}")
699716
raise RuntimeError(f"Generation error: {data}")
700717
elif msg_type == "done":
718+
print("[DEBUG] Received done signal from worker")
701719
break
702720

703721
chunk = data
704722

705723
# Mark first token time
706724
if not timing.first_token_time:
707725
timing.mark_first_token()
726+
print("[DEBUG] Marked first token time")
708727

709728
# Update response state from chunk
710729
response.update_from_chunk(chunk, timing)
711730

712731
# Yield output if we have updates
713732
if response.has_updates():
733+
output_count += 1
734+
if output_count % 10 == 0: # Log every 10th output to avoid spam
735+
print(f"[DEBUG] Yielding output {output_count}")
736+
if hasattr(response, 'usage_stats'):
737+
print(f"[DEBUG] Current usage stats: {response.usage_stats}")
714738
output, buffer = response.to_output(buffer, transformer)
715739
yield output
716740

717741
# Break if we're done
718742
if response.finish_reason:
743+
print(f"[DEBUG] Breaking loop due to finish_reason: {response.finish_reason}")
719744
break
720745

746+
print(f"[DEBUG] Main loop finished. Processed {chunk_count} chunks, yielded {output_count} outputs")
747+
if hasattr(response, 'usage_stats'):
748+
print(f"[DEBUG] Final usage stats: {response.usage_stats}")
749+
721750
# Wait for generation thread to finish
722751
if generation_thread.is_alive():
752+
print("[DEBUG] Waiting for worker thread to finish")
723753
generation_thread.join(timeout=5.0) # Increased timeout to 5 seconds
724754
if generation_thread.is_alive():
725755
# Thread didn't finish - this shouldn't happen normally
726756
print("[DEBUG] Raising due to thread not finishing after 5s timeout")
727757
raise RuntimeError("Generation thread failed to finish")
758+
else:
759+
print("[DEBUG] Worker thread finished successfully")
728760

729761
except Exception as e:
730762
# Check if there's a thread error we should chain with

0 commit comments

Comments
 (0)