@@ -62,6 +62,7 @@ groups() ->
62
62
[
63
63
{backing_queue_tests , [], [
64
64
msg_store ,
65
+ msg_store_read_many_fanout ,
65
66
msg_store_file_scan ,
66
67
{backing_queue_v2 , [], Common ++ V2Only }
67
68
]}
@@ -320,6 +321,68 @@ msg_store1(_Config) ->
320
321
restart_msg_store_empty (),
321
322
passed .
322
323
324
+ msg_store_read_many_fanout (Config ) ->
325
+ passed = rabbit_ct_broker_helpers :rpc (Config , 0 ,
326
+ ? MODULE , msg_store_read_many_fanout1 , [Config ]).
327
+
328
+ msg_store_read_many_fanout1 (_Config ) ->
329
+ GenRefFun = fun (Key ) -> V = case get (Key ) of undefined -> 0 ; V0 -> V0 end , put (Key , V + 1 ), V end ,
330
+ GenRef = fun () -> GenRefFun (msc ) end ,
331
+ % % We will fill the first message store file with random messages
332
+ % % + 1 fanout message (written once for now). We will then write
333
+ % % two messages from our queue, then the fanout message (to +1
334
+ % % from our queue), and two more messages. We expect all messages
335
+ % % from our queue to be in the current write file, except the
336
+ % % fanout message. We then try to read the messages.
337
+ restart_msg_store_empty (),
338
+ CRef1 = rabbit_guid :gen (),
339
+ CRef2 = rabbit_guid :gen (),
340
+ {ok , FileSize } = application :get_env (rabbit , msg_store_file_size_limit ),
341
+ PayloadSizeBits = 65536 ,
342
+ Payload = <<0 :PayloadSizeBits >>,
343
+ % % @todo -7 because -1 and -hd, fix better.
344
+ NumRandomMsgs = (FileSize div (PayloadSizeBits div 8 )) - 1 ,
345
+ RandomMsgIds = [{GenRef (), msg_id_bin (X )} || X <- lists :seq (1 , NumRandomMsgs )],
346
+ FanoutMsgId = {GenRef (), msg_id_bin (NumRandomMsgs + 1 )},
347
+ [Q1 , Q2 , Q3 , Q4 ] = [{GenRef (), msg_id_bin (X )} || X <- lists :seq (NumRandomMsgs + 2 , NumRandomMsgs + 5 )],
348
+ QueueMsgIds0 = [Q1 , Q2 ] ++ [FanoutMsgId ] ++ [Q3 , Q4 ],
349
+ QueueMsgIds = [{GenRef (), M } || {_ , M } <- QueueMsgIds0 ],
350
+ BasicMsgFun = fun (MsgId ) ->
351
+ Ex = rabbit_misc :r (<<>>, exchange , <<>>),
352
+ BasicMsg = rabbit_basic :message (Ex , <<>>,
353
+ # 'P_basic' {delivery_mode = 2 },
354
+ Payload ),
355
+ {ok , Msg0 } = mc_amqpl :message (Ex , <<>>, BasicMsg # basic_message .content ),
356
+ mc :set_annotation (id , MsgId , Msg0 )
357
+ end ,
358
+ ok = with_msg_store_client (
359
+ ? PERSISTENT_MSG_STORE , CRef1 ,
360
+ fun (MSCStateM ) ->
361
+ [begin
362
+ Msg = BasicMsgFun (MsgId ),
363
+ ok = rabbit_msg_store :write (SeqId , MsgId , Msg , MSCStateM )
364
+ end || {SeqId , MsgId } <- [FanoutMsgId ] ++ RandomMsgIds ],
365
+ MSCStateM
366
+ end ),
367
+ ok = with_msg_store_client (
368
+ ? PERSISTENT_MSG_STORE , CRef2 ,
369
+ fun (MSCStateM ) ->
370
+ [begin
371
+ Msg = BasicMsgFun (MsgId ),
372
+ ok = rabbit_msg_store :write (SeqId , MsgId , Msg , MSCStateM )
373
+ end || {SeqId , MsgId } <- QueueMsgIds ],
374
+ MSCStateM
375
+ end ),
376
+ ok = with_msg_store_client (
377
+ ? PERSISTENT_MSG_STORE , CRef2 ,
378
+ fun (MSCStateM ) ->
379
+ QueueOnlyMsgIds = [M || {_ , M } <- QueueMsgIds ],
380
+ {#{}, MSCStateN } = rabbit_msg_store :read_many (
381
+ QueueOnlyMsgIds , MSCStateM ),
382
+ MSCStateN
383
+ end ),
384
+ passed .
385
+
323
386
restart_msg_store_empty () ->
324
387
ok = rabbit_variable_queue :stop_msg_store (? VHOST ),
325
388
ok = rabbit_variable_queue :start_msg_store (? VHOST ,
0 commit comments