@@ -254,6 +254,9 @@ def __init__(self, upstream=None, upstreams=None, stream_name=None,
254
254
else :
255
255
self .upstreams = []
256
256
257
+ # Lazily loaded exception handler to avoid recursion
258
+ self ._on_exception = None
259
+
257
260
self ._set_asynchronous (asynchronous )
258
261
self ._set_loop (loop )
259
262
if ensure_io_loop and not self .loop :
@@ -445,14 +448,18 @@ def _emit(self, x, metadata=None):
445
448
446
449
result = []
447
450
for downstream in list (self .downstreams ):
448
- r = downstream .update (x , who = self , metadata = metadata )
449
-
450
- if type (r ) is list :
451
- result .extend (r )
452
- else :
453
- result .append (r )
451
+ try :
452
+ r = downstream .update (x , who = self , metadata = metadata )
454
453
455
- self ._release_refs (metadata )
454
+ if type (r ) is list :
455
+ result .extend (r )
456
+ else :
457
+ result .append (r )
458
+ except Exception as exc :
459
+ # Push this exception to the on_exception handler on the downstream that raised
460
+ downstream .on_exception ().update ((x , exc ) , who = self , metadata = metadata )
461
+ finally :
462
+ self ._release_refs (metadata )
456
463
457
464
return [element for element in result if element is not None ]
458
465
@@ -671,6 +678,30 @@ def _release_refs(self, metadata, n=1):
671
678
if 'ref' in m :
672
679
m ['ref' ].release (n )
673
680
681
+ def on_exception (self ):
682
+ """Returns the exception handler associated with this stream
683
+ """
684
+ self ._on_exception = self ._on_exception or _on_exception ()
685
+ return self ._on_exception
686
+
687
+
688
+ class InvalidDataError (Exception ):
689
+ pass
690
+
691
+ class _on_exception (Stream ):
692
+
693
+ def __init__ (self , * args , ** kwargs ):
694
+ self .silent = False
695
+ Stream .__init__ (self , * args , ** kwargs )
696
+
697
+ def update (self , x , who = None , metadata = None ):
698
+ cause , exc = x
699
+
700
+ if self .silent or len (self .downstreams ) > 0 :
701
+ self ._emit (x , metadata = metadata )
702
+ else :
703
+ logger .exception (exc )
704
+ raise InvalidDataError (cause ) from exc
674
705
675
706
@Stream .register_api ()
676
707
class map (Stream ):
@@ -706,13 +737,8 @@ def __init__(self, upstream, func, *args, **kwargs):
706
737
Stream .__init__ (self , upstream , stream_name = stream_name )
707
738
708
739
def update (self , x , who = None , metadata = None ):
709
- try :
710
- result = self .func (x , * self .args , ** self .kwargs )
711
- except Exception as e :
712
- logger .exception (e )
713
- raise
714
- else :
715
- return self ._emit (result , metadata = metadata )
740
+ result = self .func (x , * self .args , ** self .kwargs )
741
+ self ._emit (result , metadata = metadata )
716
742
717
743
718
744
@Stream .register_api ()
@@ -890,11 +916,7 @@ def update(self, x, who=None, metadata=None):
890
916
else :
891
917
return self ._emit (x , metadata = metadata )
892
918
else :
893
- try :
894
- result = self .func (self .state , x , ** self .kwargs )
895
- except Exception as e :
896
- logger .exception (e )
897
- raise
919
+ result = self .func (self .state , x , ** self .kwargs )
898
920
if self .returns_state :
899
921
state , result = result
900
922
else :
0 commit comments