
    i                        d Z ddlmZ ddlZddlZddlZddlZddlZddlm	Z	 ddl
mZmZ  ej        d          Z e            Z e            Z e            Ze	 G d d                      Z G d	 d
          ZdS )u~  Gateway streaming consumer — bridges sync agent callbacks to async platform delivery.

The agent fires stream_delta_callback(text) synchronously from its worker thread.
GatewayStreamConsumer:
  1. Receives deltas via on_delta() (thread-safe, sync)
  2. Queues them to an asyncio task via queue.Queue
  3. The async run() task buffers, rate-limits, and progressively edits
     a single message on the target platform

Design: Uses the edit transport (send initial message, then editMessageText).
This is universally supported across Telegram, Discord, and Slack.

Credit: jobless0x (#774, #1312), OutThisLife (#798), clicksingh (#697).
    )annotationsN)	dataclass)AnyOptionalzgateway.stream_consumerc                  L    e Zd ZU dZdZded<   dZded<   dZd	ed
<   dZded<   dS )StreamConsumerConfigz5Runtime config for a single stream consumer instance.g      ?floatedit_interval(   intbuffer_thresholdu    ▉strcursorFboolbuffer_onlyN)	__name__
__module____qualname____doc__r
   __annotations__r   r   r        ?/home/agentuser/.hermes/hermes-agent/gateway/stream_consumer.pyr   r   (   s]         ??MFKr   r   c                  2   e Zd ZdZdZdZdZ	 	 d1d2dZed3d            Z	ed3d            Z
d4dZd5dZddd6dZd5dZd4dZd5dZd4dZd4dZ ej        d           Zed7d!            Zd8d$Zd9d%Zd:d'Zed;d+            Zd5d,Zd3d-Zd4d.Zd<d/Zd<d0Z dS )=GatewayStreamConsumera2  Async consumer that progressively edits a platform message with streamed tokens.

    Usage::

        consumer = GatewayStreamConsumer(adapter, chat_id, config, metadata=metadata)
        # Pass consumer.on_delta as stream_delta_callback to AIAgent
        agent = AIAgent(..., stream_delta_callback=consumer.on_delta)
        # Start the consumer as an asyncio task
        task = asyncio.create_task(consumer.run())
        # ... run agent in thread pool ...
        consumer.finish()  # signal completion
        await task         # wait for final edit
       )z<REASONING_SCRATCHPAD>z<think>z<reasoning>z
<THINKING>z
<thinking>z	<thought>)z</REASONING_SCRATCHPAD>z</think>z</reasoning>z</THINKING>z</thinking>z
</thought>Nadapterr   chat_idr   configOptional[StreamConsumerConfig]metadataOptional[dict]c                T   || _         || _        |pt                      | _        || _        t          j                    | _        d| _        d | _	        d| _
        d| _        d| _        d| _        d| _        d| _        d| _        | j        j        | _        d| _        d| _        d| _        d S )N FTg        r   )r   r   r   cfgr!   queueQueue_queue_accumulated_message_id_already_sent_edit_supported_last_edit_time_last_sent_text_fallback_final_send_fallback_prefix_flood_strikesr
   _current_edit_interval_final_response_sent_in_think_block_think_buffer)selfr   r   r   r!   s        r   __init__zGatewayStreamConsumer.__init__P   s     3133 #(;==*."#"!$)! "&*h&<#$)!  %r   returnr   c                    | j         S )z?True if at least one message was sent or edited during the run.)r+   r6   s    r   already_sentz"GatewayStreamConsumer.already_sentl   s     !!r   c                    | j         S )zBTrue when the stream consumer delivered the final assistant reply.)r3   r:   s    r   final_response_sentz)GatewayStreamConsumer.final_response_sentq   s     ((r   Nonec                D    | j                             t                     dS )z>Finalize the current stream segment and start a fresh message.N)r(   put_NEW_SEGMENTr:   s    r   on_segment_breakz&GatewayStreamConsumer.on_segment_breakv   s    %%%%%r   textc                P    |r#| j                             t          |f           dS dS )z7Queue a completed interim assistant commentary message.N)r(   r@   _COMMENTARYr6   rC   s     r   on_commentaryz#GatewayStreamConsumer.on_commentaryz   s3     	1KOO[$/00000	1 	1r   Fpreserve_no_editrI   c               j    |r| j         dk    rd S d | _         d| _        d| _        d| _        d| _        d S )N__no_edit__r$   F)r*   r)   r.   r/   r0   )r6   rI   s     r   _reset_segment_statez*GatewayStreamConsumer._reset_segment_state   sJ     	 0M A AF!$)! "r   c                r    |r| j                             |           dS ||                                  dS dS )u2  Thread-safe callback — called from the agent's worker thread.

        When *text* is ``None``, signals a tool boundary: the current message
        is finalized and subsequent text will be sent as a new message so it
        appears below any tool-progress messages the gateway sent in between.
        N)r(   r@   rB   rF   s     r   on_deltazGatewayStreamConsumer.on_delta   sJ      	$KOOD!!!!!\!!##### \r   c                D    | j                             t                     dS )z#Signal that the stream is complete.N)r(   r@   _DONEr:   s    r   finishzGatewayStreamConsumer.finish   s    r   c                   | j         |z   }d| _         |rm| j        rd}d}| j        D ]:}|                    |          }|dk    r|dk    s||k     r|}t	          |          };|rd| _        |||z   d         }nt          d | j        D                       }t	          |          |k    r|| d         n|| _         dS d}d}| j        D ]}d}	 |                    ||          }|dk    rn|dk    r#| j         p| j                            d          }	n|d|         }
|
	                    d          }|dk    r;| j         s| j                            d          o|

                                dk    }	n#|
|d	z   d         
                                dk    }	|	r|dk    s||k     r|}t	          |          }n|d	z   }|r-| xj        |d|         z  c_        d| _        |||z   d         }nd}| j        D ]G}t          d	t	          |                    D ]'}|                    |d|                   r||k    r|}(H|r*| xj        |d|          z  c_        || d         | _         n| xj        |z  c_        dS |kdS dS )
ab  Add a text delta to the accumulated buffer, suppressing think blocks.

        Uses a state machine that tracks whether we are inside a
        reasoning/thinking block.  Text inside such blocks is silently
        discarded.  Partial tags at buffer boundaries are held back in
        ``_think_buffer`` until enough characters arrive to decide.
        r$   r   FNc              3  4   K   | ]}t          |          V  d S )N)len).0ts     r   	<genexpr>z?GatewayStreamConsumer._filter_and_accumulate.<locals>.<genexpr>   s(      !I!IQ#a&&!I!I!I!I!I!Ir   T
   )r5   r4   _CLOSE_THINK_TAGSfindrU   max_OPEN_THINK_TAGSr)   endswithrfindstriprange)r6   rC   bufbest_idxbest_lentagidxmax_tagsearch_startis_boundary	precedinglast_nl	held_backis                 r   _filter_and_accumulatez,GatewayStreamConsumer._filter_and_accumulate   sd     4' K	# J1 , ,C((3--Cbyyh"nnh#&#&s88 	+0D(h1223CC "!I!I$2H!I!I!IIIG;>s88g;M;MgXYYSVD&F 0 / /C#$L/!hhsL99"99!!88$($5 5 !D#'#4#=#=d#C#C (K
 ),DSD	I&/ood&;&;G&"}})-):%: &I(,(9(B(B4(H(H%@(1(9(9R(? !, /8!.E.K.K.M.MQS.S& "HNNcHnn'*H'*3xxH!'*Qw5/8  %%YhY7%%+/D(h1223CC !"I#4 . .!&q#c((!3!3 . .A"||CG44 .Y,-	. ! 1))S9*-==))-0)-=**))S0))FW  K	 K	 K	 K	 K	r   c                b    | j         r%| j        s | xj        | j         z  c_        d| _         dS dS dS )zFlush any held-back partial-tag buffer into accumulated text.

        Called when the stream ends (got_done) so that partial text that
        was held back waiting for a possible opening tag is not lost.
        r$   N)r5   r4   r)   r:   s    r   _flush_think_bufferz)GatewayStreamConsumer._flush_think_buffer   sR      	$d&: 	$!33!#D	$ 	$ 	$ 	$r   c                L
  K   t          | j        dd          }t          d|t          | j        j                  z
  dz
            }	 	 d}d}d}	 	 | j                                        }|t          u rd}nx|t          u rd}nlt          |t                    r+t          |          dk    r|d	         t          u r	|d
         }n,|                     |           n# t          j        $ r Y nw xY w|r|                                  t#          j                    }|| j        z
  }|p|p|du}	| j        j        s6|	p3|| j        k    r| j        p!t          | j                  | j        j        k    }	d}
|	r| j        rt          | j                  |k    r| j        | j                            | j        |          }|D ]#}|                     || j                   d{V  $d| _        d| _        t#          j                    | _        |r| j        | _        dS |rd| _        d| _        d| _        t          | j                  |k    r| j        | j         r| j        !                    dd	|          }||dz  k     r|}| j        d|         }| "                    |           d{V }| j        s|sn[| j        |d         #                    d          | _        d| _        d| _        t          | j                  |k    r| j        | j         | j        }|s|s||| j        j        z  }| "                    |           d{V }
t#          j                    | _        |r| j        r| j        r!| $                    | j                   d{V  nc|
rd| _        nY| j        r&| "                    | j                   d{V | _        n,| j        s%| "                    | j                   d{V | _        dS |[| %                                 | &                    |           d{V  t#          j                    | _        | %                                 |r| %                    d           tO          j(        d           d{V  5# tN          j)        $ ri d}| j        rF| j        r?	 tU          | "                    | j                   d{V           }n# tV          $ r Y nw xY w|r| j        sd| _        Y dS Y dS Y dS tV          $ r&}tX          -                    d|           Y d}~dS d}~ww xY w)z@Async task that drains the queue and edits the platform message.MAX_MESSAGE_LENGTH     d   TFN   r   rZ   r$   rY   rH   g?zStream consumer error: %s).getattrr   r]   rU   r%   r   r(   
get_nowaitrP   rA   
isinstancetuplerE   ro   r&   Emptyrq   time	monotonicr-   r   r2   r)   r   r*   truncate_message_send_new_chunkr.   r+   r3   r/   r0   r,   r`   _send_or_editlstrip_send_fallback_finalrL   _send_commentaryasynciosleepCancelledErrorr   	Exceptionloggererror)r6   
_raw_limit_safe_limitgot_donegot_segment_breakcommentary_textitemnowelapsedshould_editcurrent_update_visiblechunkschunksplit_atokdisplay_text_best_effort_okes                     r   runzGatewayStreamConsumer.run  s      T\+?FF
#zC,@,@@3FGGb	9M* $)!"&#{55775=='+H!<//04-!%dE22 "s4yyA~~$q'U`J`J`.21gO!33D9999 ;   &  /,,... n&& 44 3(3&d2 
 x+ "- # D$?? 2 $ 1Ot011TX5NN   */& 9<4#4 9< D-..<< ,4 "&!>!> -{" " &, P PE"&"6"6ud>N"O"OOOOOOOOO,.)/1,/3~/?/?,# #8<8JD5"F, 7/3D,8=D546D1 
 D-..<< ,8 0 9 $(#4#:#:4K#P#P#kQ&666'2H $ 1)8) <#'#5#5e#<#<<<<<<<4 "B " ",0,=hii,H,O,OPT,U,U)+/(/1,% D-..<< ,8 0 9& $(#4L# 8,= 8/BY$7373E3El3S3S-S-S-S-S-S-S*+/>+;+;D( 
 ( d4 d"&";";D<M"N"NNNNNNNNN3 d8<D55!- d>B>P>PQUQb>c>c8c8c8c8c8c8cD55!%!3 d>B>P>PQUQb>c>c8c8c8c8c8c8cD5F".--/////@@@@@@@@@+/>+;+;D(--///  % E--t-DDDmD)))))))))[M*^ % 	1 	1 	1#O  T%5 &*1C1CDDU1V1V+V+V+V+V+V+V&W&WOO    D  1t'@ 1,0))))1 1 1 1 1 1 	9 	9 	9LL4a888888888	9s   Q= $C 4Q= 5C  Q= ?C  Q= C Q= C)&Q= (C))D'Q= GQ= ,BQ= =T#-S
T#
ST#ST#4	T#=TT#z[`"']?MEDIA:\s*\S+[`"']?c                    d| vrd| vr| S |                      dd          }t          j                            d|          }t	          j        dd|          }|                                S )u  Strip MEDIA: directives and internal markers from text before display.

        The streaming path delivers raw text chunks that may include
        ``MEDIA:<path>`` tags and ``[[audio_as_voice]]`` directives meant for
        the platform adapter's post-processing.  The actual media files are
        delivered separately via ``_deliver_media_from_response()`` after the
        stream finishes — we just need to hide the raw directives from the
        user.
        zMEDIA:z[[audio_as_voice]]r$   z\n{3,}z

)replacer   	_MEDIA_REsubrerstrip)rC   cleaneds     r   _clean_for_displayz(GatewayStreamConsumer._clean_for_display  sn     4$8$D$DK,,3R88'155b'BB&FG44~~r   reply_to_idOptional[str]c                  K   |                      |          }|                                s|S 	 | j        rt          | j                  ni }| j                            | j        |||           d{V }|j        rB|j        r;t          |j                  | _
        d| _        || _        t          |j                  S d| _        |S # t          $ r'}t                              d|           |cY d}~S d}~ww xY w)zSend a new message chunk, optionally threaded to a previous message.

        Returns the message_id so callers can thread subsequent chunks.
        )r   contentreply_tor!   NTFzStream send chunk error: %s)r   ra   r!   dictr   sendr   success
message_idr   r*   r+   r.   r,   r   r   r   )r6   rC   r   metaresultr   s         r   r   z%GatewayStreamConsumer._send_new_chunk  s:     
 &&t,,zz|| 		*.-?4&&&RD<,,$	 -        F ~ #&"3 ##&v'8#9#9 %)"'+$6,---',$"" 	 	 	LL6:::	s$   BC >C 
C8C3-C83C8c                    | j         pd}| j        j        rA|                    | j        j                  r"|dt	          | j        j                            }|                     |          S )z>Return the visible text already shown in the streamed message.r$   N)r.   r%   r   r_   rU   r   r6   prefixs     r   _visible_prefixz%GatewayStreamConsumer._visible_prefix  se    %+8? 	4vtx?? 	42c$(/22223F&&v...r   
final_textc                    | j         p|                                 }|r>|                    |          r)|t          |          d                                         S |S )zAReturn only the part of final_text the user has not already seen.N)r0   r   
startswithrU   r   )r6   r   r   s      r   _continuation_textz(GatewayStreamConsumer._continuation_text  s_    &@$*>*>*@*@ 	5j++F33 	5c&kkll+22444r   limitr   	list[str]c                l   t          |           |k    r| gS g }| }t          |          |k    ro|                    dd|          }||dz  k     r|}|                    |d|                    ||d                             d          }t          |          |k    o|r|                    |           |S )z;Split text into reasonably sized chunks for fallback sends.rY   r   rw   N)rU   r`   appendr   )rC   r   r   	remainingr   s        r   _split_text_chunksz(GatewayStreamConsumer._split_text_chunks  s     t996M	)nnu$$ tQ66H%1*$$ MM)IXI.///!()),33D99I )nnu$$  	%MM)$$$r   c                  K   |                      |          }|                     |          }d| _        |                                s?|                                r||                                 k    r|}nd| _        d| _        dS t          | j        dd          }t          d|dz
            }| 
                    ||          }d}d}d}	|D ]}
d}t          d	          D ]}| j                            | j        |
| j        
           d{V }|j        r nQ|dk    rJ|                     |          r5t"                              d           t'          j        d           d{V   |r|j        sG|	r&d| _        d| _        || _        || _        d| _         dS d| _        d| _        d| _        d| _         dS d}	|
}|j        p|}|| _        d| _        d| _        |d         | _        d| _        dS )zSend the final continuation after streaming edits stop working.

        Retries each chunk once on flood-control failures with a short delay.
        FTNrs   rt   ru   rv   r$   rw   r   r   r!   r   z.Flood control on fallback send, retrying in 3sg      @rS   )r   r   r/   ra   r   r+   r3   rx   r   r]   r   rb   r   r   r!   r   _is_flood_errorr   debugr   r   r*   r.   r0   r   )r6   rC   r   continuation	raw_limit
safe_limitr   last_message_idlast_successful_chunksent_any_chunkr   r   attempts                r   r   z*GatewayStreamConsumer._send_fallback_final   s~     
 ,,T22
..z::$)!!!## 	 !! jD4H4H4J4J&J&J)%)",0)DL*>EE	i#o..
((zBB)- " '	C '	CEF 88  #|00 L!!]  1          
 > Ea<<D$8$8$@$@<LLH   "-,,,,,,,,,,  ! 	 *.D&04D-'6D$+@D(,.D)FF &+"#' ')$(*%!N$)!$/B?OO*!$(!%bz "r   c                h    t          |dd          pd}|                                }d|v pd|v pd|v S )zFCheck if a SendResult failure is due to flood control / rate limiting.r   r$   floodzretry afterrate)rx   lower)r6   r   err	err_lowers       r   r   z%GatewayStreamConsumer._is_flood_errorL  sD    fgr**0bIIKK	)#X}	'AXVyEXXr   c                  K   | j         r| j         dk    rdS |                                 }|r|                                sdS 	 | j                            | j        | j         |           d{V  || _        dS # t          $ r Y dS w xY w)u   Best-effort edit to remove the cursor from the last visible message.

        Called when entering fallback mode so the user doesn't see a stuck
        cursor (▉) in the partial message.
        rK   Nr   r   r   )r*   r   ra   r   edit_messager   r.   r   r   s     r   _try_strip_cursorz'GatewayStreamConsumer._try_strip_cursorR  s        	4#3}#D#DF%%'' 	V\\^^ 	F	,+++ ,         
 $*D    	 	 	DD	s   4A: :
BBc                ,  K   |                      |          }|                                sdS 	 | j                            | j        || j                   d{V }|j        S # t          $ r&}t          	                    d|           Y d}~dS d}~ww xY w)z6Send a completed interim assistant commentary message.Fr   NzCommentary send error: %s)
r   ra   r   r   r   r!   r   r   r   r   )r6   rC   r   r   s       r   r   z&GatewayStreamConsumer._send_commentaryg  s      &&t,,zz|| 	5	<,, -        F >! 	 	 	LL4a88855555	s   3A# #
B-BBc                  K   |                      |          }|}| j        j        r |                    | j        j        d          }|                                }|sdS |                                sdS d}| j        /| j        j        r#| j        j        |v rt          |          |k     rdS 	 | j        d| j        rZ|| j        k    rdS | j	        
                    | j        | j        |           d{V }|j        rd| _        || _        d| _        dS |                     |          r| xj        dz  c_        t!          | j        dz  d	          | _        t$                              d
| j        | j        | j                   | j        | j        k     rt+          j                    | _        dS t$                              d| j                   |                                 | _        d| _        d| _        d| _        |                                  d{V  dS dS | j	                            | j        || j                   d{V }|j        rY|j        r|j        | _        nd| _        d| _        || _        |j        s'|                                 | _        d| _        d| _        dS d| _        dS # t>          $ r&}t$                               d|           Y d}~dS d}~ww xY w)a  Send or edit the streaming message.

        Returns True if the text was successfully delivered (sent or edited),
        False otherwise.  Callers like the overflow split loop use this to
        decide whether to advance past the delivered chunk.
        r$   T   Nr   r   rZ   rw   g      $@u@   Flood control on edit (strike %d/%d), backoff interval → %.1fsFz0Edit failed (strikes=%d), entering fallback moder   rK   zStream send/edit error: %s)!r   r%   r   r   ra   r*   rU   r,   r.   r   r   r   r   r+   r1   r   minr2   r   r   _MAX_FLOOD_STRIKESr}   r~   r-   r   r0   r/   r   r   r!   r   r   r   )r6   rC   visible_without_cursor_visible_stripped_MIN_NEW_MSG_CHARSr   r   s          r   r   z#GatewayStreamConsumer._send_or_edit|  s      &&t,, "&8? 	Y%;%C%CDHOUW%X%X"288::  	4zz|| 	4 $HO %HOt++)**-???4Y	+' :!t333#t#'<#<#< $#'#3 $ $= $ $      F
 ~ ,%-1*/3,./+#t  //77 - //14//:= $ ;a ?; ;D7 #LL!= $ 3 $ 7 $ ;    $2T5LLL 8<~7G7G 4',u
 N /   150D0D0F0F-481/4,-1* #44666666666$u !5  $|00 L !]  1          
 > !( 5+1+<((/4,)-D&+/D(!, 9040D0D0F0F-481 ,9(4 ,1D( 5 	 	 	LL5q99955555	s:   -J/ 
A	J/ BJ/ -A(J/ BJ/ &J/ /
K9KK)NN)r   r   r   r   r   r    r!   r"   )r8   r   )r8   r>   )rC   r   r8   r>   )rI   r   r8   r>   )rC   r   r8   r   )rC   r   r   r   r8   r   )r8   r   )r   r   r8   r   )rC   r   r   r   r8   r   )rC   r   r8   r   )!r   r   r   r   r   r^   r[   r7   propertyr;   r=   rB   rG   rL   rN   rQ   ro   rq   r   r   compiler   staticmethodr   r   r   r   r   r   r   r   r   r   r   r   r   r   r   1   sS          
 26#'         8 " " " X" ) ) ) X)& & & &1 1 1 1
 @E # # # # # #
$ 
$ 
$ 
$   V V V Vp$ $ $ $h9 h9 h9 h9Z 
:;;I      \ &   8/ / / /       \ J# J# J# J#XY Y Y Y   *   *} } } } } }r   r   )r   
__future__r   r   loggingr&   r   r}   dataclassesr   typingr   r   	getLoggerr   objectrP   rA   rE   r   r   r   r   r   <module>r      s$    # " " " " "    				  ! ! ! ! ! !                		4	5	5 	 vxx fhh        H H H H H H H H H Hr   