@@ -175,6 +175,70 @@ public async Task Run_ShouldRetryQuery_IfDocDBThrowsCanceled()
175
175
Times . Once ) ;
176
176
}
177
177
178
+ [ Fact ]
179
+ public async Task Run_ShouldThrow_IfObserverThrows ( )
180
+ {
181
+ Mock . Get ( documentQuery )
182
+ . Reset ( ) ;
183
+
184
+ Mock . Get ( documentQuery )
185
+ . SetupSequence ( query => query . ExecuteNextAsync < Document > ( It . Is < CancellationToken > ( token => token == cancellationTokenSource . Token ) ) )
186
+ . ReturnsAsync ( feedResponse ) ;
187
+
188
+ Mock . Get ( observer )
189
+ . SetupSequence ( feedObserver => feedObserver
190
+ . ProcessChangesAsync ( It . IsAny < IChangeFeedObserverContext > ( ) , It . IsAny < IReadOnlyList < Document > > ( ) , It . IsAny < CancellationToken > ( ) ) )
191
+ . Throws ( new CustomException ( ) )
192
+ . Returns ( Task . CompletedTask ) ;
193
+
194
+ Exception exception = await Record . ExceptionAsync ( ( ) => sut . RunAsync ( cancellationTokenSource . Token ) ) ;
195
+ Assert . IsAssignableFrom < CustomException > ( exception ) ;
196
+
197
+ Mock . Get ( documentQuery )
198
+ . Verify ( query => query . ExecuteNextAsync < Document > ( It . Is < CancellationToken > ( token => token == cancellationTokenSource . Token ) ) , Times . Once ) ;
199
+
200
+ Mock . Get ( observer )
201
+ . Verify ( feedObserver => feedObserver
202
+ . ProcessChangesAsync (
203
+ It . Is < IChangeFeedObserverContext > ( context => context . PartitionKeyRangeId == processorSettings . PartitionKeyRangeId ) ,
204
+ It . Is < IReadOnlyList < Document > > ( list => list . SequenceEqual ( documents ) ) ,
205
+ It . IsAny < CancellationToken > ( ) ) ,
206
+ Times . Once ) ;
207
+ }
208
+
209
+ [ Fact ]
210
+ public async Task Run_ShouldThrow_IfObserverThrowsDocumentClientException ( )
211
+ {
212
+ // If the user code throws a DCE, we should bubble it up to stop the Observer and not treat it as a DCE from the Feed Query
213
+
214
+ Mock . Get ( documentQuery )
215
+ . Reset ( ) ;
216
+
217
+ Mock . Get ( documentQuery )
218
+ . Setup ( query => query . ExecuteNextAsync < Document > ( It . Is < CancellationToken > ( token => token == cancellationTokenSource . Token ) ) )
219
+ . ReturnsAsync ( feedResponse )
220
+ . Callback ( ( ) => cancellationTokenSource . Cancel ( ) ) ;
221
+
222
+ Mock . Get ( observer )
223
+ . Setup ( feedObserver => feedObserver
224
+ . ProcessChangesAsync ( It . IsAny < IChangeFeedObserverContext > ( ) , It . IsAny < IReadOnlyList < Document > > ( ) , It . IsAny < CancellationToken > ( ) ) )
225
+ . Throws ( DocumentExceptionHelpers . CreateRequestRateTooLargeException ( ) ) ;
226
+
227
+ Exception exception = await Record . ExceptionAsync ( ( ) => sut . RunAsync ( cancellationTokenSource . Token ) ) ;
228
+ Assert . IsAssignableFrom < DocumentClientException > ( exception ) ;
229
+
230
+ Mock . Get ( documentQuery )
231
+ . Verify ( query => query . ExecuteNextAsync < Document > ( It . Is < CancellationToken > ( token => token == cancellationTokenSource . Token ) ) , Times . Once ) ;
232
+
233
+ Mock . Get ( observer )
234
+ . Verify ( feedObserver => feedObserver
235
+ . ProcessChangesAsync (
236
+ It . Is < IChangeFeedObserverContext > ( context => context . PartitionKeyRangeId == processorSettings . PartitionKeyRangeId ) ,
237
+ It . Is < IReadOnlyList < Document > > ( list => list . SequenceEqual ( documents ) ) ,
238
+ It . IsAny < CancellationToken > ( ) ) ,
239
+ Times . Once ) ;
240
+ }
241
+
178
242
/// <summary>
179
243
/// (1) Read normal feed
180
244
/// (2) Get 400 with
@@ -250,5 +314,9 @@ public async Task Run_ShouldDecreaseMaxItemCountWhenNeeded()
250
314
251
315
Assert . Equal ( "token.token2.token3." , accumulator ) ;
252
316
}
317
+
318
+ private class CustomException : Exception
319
+ {
320
+ }
253
321
}
254
322
}
0 commit comments