Skip to content

Commit ad1b8dc

Browse files
committed
fix errors handling
1 parent 1390931 commit ad1b8dc

File tree

1 file changed

+5
-48
lines changed

1 file changed

+5
-48
lines changed

src/inferencesh/models/llm.py

Lines changed: 5 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ def render_message(msg: ContextMessage, allow_multipart: bool) -> str | List[dic
233233
messages = [{"role": "system", "content": input_data.system_prompt}] if input_data.system_prompt is not None and input_data.system_prompt != "" else []
234234

235235
def merge_messages(messages: List[ContextMessage]) -> ContextMessage:
236-
text = " ".join(msg.text for msg in messages if msg.text)
236+
text = "\n\n".join(msg.text for msg in messages if msg.text)
237237
images = [msg.image for msg in messages if msg.image]
238238
image = images[0] if images else None # TODO: handle multiple images
239239
return ContextMessage(role=messages[0].role, text=text, image=image)
@@ -585,9 +585,7 @@ def stream_generate(
585585
output_cls: type[BaseLLMOutput] = LLMOutput,
586586
) -> Generator[BaseLLMOutput, None, None]:
587587
"""Stream generate from LLaMA.cpp model with timing and usage tracking."""
588-
589-
print("[DEBUG] Starting stream_generate")
590-
588+
591589
# Create queues for communication between threads
592590
response_queue = Queue()
593591
error_queue = Queue()
@@ -599,7 +597,6 @@ def stream_generate(
599597
def _generate_worker():
600598
"""Worker thread to run the model generation."""
601599
try:
602-
print("[DEBUG] Worker thread started")
603600
# Build completion kwargs
604601
completion_kwargs = {
605602
"messages": messages,
@@ -616,28 +613,18 @@ def _generate_worker():
616613

617614
# Signal that we're starting
618615
keep_alive_queue.put(("init", time.time()))
619-
print("[DEBUG] Worker sent init signal")
620616

621617
completion = model.create_chat_completion(**completion_kwargs)
622-
print("[DEBUG] Got completion iterator from model")
623618

624-
chunk_count = 0
625619
for chunk in completion:
626-
chunk_count += 1
627-
if verbose:
628-
print(chunk)
629-
if chunk_count % 10 == 0: # Log every 10th chunk to avoid spam
630-
print(f"[DEBUG] Processing chunk {chunk_count}")
631620
response_queue.put(("chunk", chunk))
632621
# Update keep-alive timestamp
633622
keep_alive_queue.put(("alive", time.time()))
634623

635624
# Signal completion
636-
print(f"[DEBUG] Worker finished processing {chunk_count} chunks")
637625
response_queue.put(("done", None))
638626

639627
except Exception as e:
640-
print(f"[DEBUG] Worker thread caught exception: {type(e).__name__}: {str(e)}")
641628
# Preserve the full exception with traceback
642629
import sys
643630
error_queue.put((e, sys.exc_info()[2]))
@@ -649,7 +636,6 @@ def _generate_worker():
649636
# Start generation thread
650637
generation_thread = Thread(target=_generate_worker, daemon=True)
651638
generation_thread.start()
652-
print("[DEBUG] Started worker thread")
653639

654640
# Initialize response state
655641
response = StreamResponse()
@@ -665,25 +651,18 @@ def _generate_worker():
665651
try:
666652
msg_type, timestamp = keep_alive_queue.get(timeout=init_timeout)
667653
if msg_type != "init":
668-
print(f"[DEBUG] Raising due to unexpected init message: {msg_type}")
669654
raise RuntimeError("Unexpected initialization message")
670655
last_activity = timestamp
671-
print("[DEBUG] Received init signal from worker")
672656
except Empty:
673-
print(f"[DEBUG] Raising due to init timeout after {init_timeout}s")
674657
raise RuntimeError(f"Model failed to initialize within {init_timeout} seconds")
675658

676-
chunk_count = 0
677-
output_count = 0
678659
while True:
679660
# Check for errors - now with proper exception chaining
680661
if not error_queue.empty():
681662
exc, tb = error_queue.get()
682663
if isinstance(exc, Exception):
683-
print(f"[DEBUG] Raising worker thread exception: {type(exc).__name__}: {str(exc)}")
684664
raise exc.with_traceback(tb)
685665
else:
686-
print(f"[DEBUG] Raising unknown worker thread error: {exc}")
687666
raise RuntimeError(f"Unknown error in worker thread: {exc}")
688667

689668
# Check keep-alive
@@ -697,76 +676,54 @@ def _generate_worker():
697676

698677
# Check for timeout
699678
if time.time() - last_activity > chunk_timeout:
700-
print(f"[DEBUG] Raising due to chunk timeout after {chunk_timeout}s")
701679
raise RuntimeError(f"No response from model for {chunk_timeout} seconds")
702680

703681
# Get next chunk
704682
try:
705683
msg_type, data = response_queue.get(timeout=0.1)
706-
chunk_count += 1
707-
if chunk_count % 10 == 0: # Log every 10th chunk to avoid spam
708-
print(f"[DEBUG] Main loop received chunk {chunk_count} chunk sample: {data}")
709684
except Empty:
710685
continue
711686

712687
if msg_type == "error":
713688
# If we get an error message but no exception in error_queue,
714689
# create a new error
715-
print(f"[DEBUG] Raising due to error message: {data}")
716690
raise RuntimeError(f"Generation error: {data}")
717691
elif msg_type == "done":
718-
print("[DEBUG] Received done signal from worker")
719692
break
720693

721694
chunk = data
722695

696+
if verbose:
697+
print(chunk)
698+
723699
# Mark first token time
724700
if not timing.first_token_time:
725701
timing.mark_first_token()
726-
print("[DEBUG] Marked first token time")
727702

728703
# Update response state from chunk
729704
response.update_from_chunk(chunk, timing)
730705

731706
# Yield output if we have updates
732707
if response.has_updates():
733-
output_count += 1
734-
if output_count % 10 == 0: # Log every 10th output to avoid spam
735-
print(f"[DEBUG] Yielding output {output_count}")
736-
if hasattr(response, 'usage_stats'):
737-
print(f"[DEBUG] Current usage stats: {response.usage_stats}")
738708
output, buffer = response.to_output(buffer, transformer)
739709
yield output
740710

741711
# Break if we're done
742712
if response.finish_reason:
743-
print(f"[DEBUG] Breaking loop due to finish_reason: {response.finish_reason}")
744713
break
745714

746-
print(f"[DEBUG] Main loop finished. Processed {chunk_count} chunks, yielded {output_count} outputs")
747-
if hasattr(response, 'usage_stats'):
748-
print(f"[DEBUG] Final usage stats: {response.usage_stats}")
749-
750715
# Wait for generation thread to finish
751716
if generation_thread.is_alive():
752-
print("[DEBUG] Waiting for worker thread to finish")
753717
generation_thread.join(timeout=5.0) # Increased timeout to 5 seconds
754718
if generation_thread.is_alive():
755719
# Thread didn't finish - this shouldn't happen normally
756-
print("[DEBUG] Raising due to thread not finishing after 5s timeout")
757720
raise RuntimeError("Generation thread failed to finish")
758-
else:
759-
print("[DEBUG] Worker thread finished successfully")
760721

761722
except Exception as e:
762723
# Check if there's a thread error we should chain with
763724
if not error_queue.empty():
764725
thread_exc, thread_tb = error_queue.get()
765726
if isinstance(thread_exc, Exception):
766-
print(f"[DEBUG] Chaining main exception with worker thread exception:")
767-
print(f"[DEBUG] Main exception: {type(e).__name__}: {str(e)}")
768-
print(f"[DEBUG] Worker exception: {type(thread_exc).__name__}: {str(thread_exc)}")
769727
raise e from thread_exc
770728
# If no thread error, raise the original exception
771-
print(f"[DEBUG] Raising main thread exception: {type(e).__name__}: {str(e)}")
772729
raise

0 commit comments

Comments
 (0)