Skip to content

Commit a88db17

Browse files
Add mqtt wildcard support for # and + symbols
1 parent 545105a commit a88db17

File tree

2 files changed

+29
-1
lines changed

2 files changed

+29
-1
lines changed

src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs

+9-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
namespace SlimMessageBus.Host.Mqtt;
22

33
using System.Collections.Generic;
4+
using System.Text.RegularExpressions;
45
using System.Threading;
56

67
using Microsoft.Extensions.DependencyInjection;
@@ -107,9 +108,16 @@ protected override async Task DestroyConsumers()
107108
}
108109
}
109110

111+
private static bool CheckTopic(string allowedTopic, string topic)
112+
{
113+
var realTopicRegex = allowedTopic.Replace(@"/", @"\/").Replace("+", @"[a-zA-Z0-9 _.-]*").Replace("#", @"[a-zA-Z0-9 \/_#+.-]*");
114+
var regex = new Regex(realTopicRegex);
115+
return regex.IsMatch(topic);
116+
}
117+
110118
private Task OnMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
111119
{
112-
var consumer = Consumers.Cast<MqttTopicConsumer>().FirstOrDefault(x => x.Path == arg.ApplicationMessage.Topic);
120+
var consumer = Consumers.Cast<MqttTopicConsumer>().FirstOrDefault(x => CheckTopic(x.Path, arg.ApplicationMessage.Topic));
113121
if (consumer != null)
114122
{
115123
var headers = new Dictionary<string, object>();

src/Tests/SlimMessageBus.Host.Mqtt.Test/MqttMessageBusIt.cs

+20
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,26 @@ public async Task BasicPubSubOnTopic(bool bulkProduce)
5555
await BasicPubSub(1, bulkProduce: bulkProduce);
5656
}
5757

58+
[Theory]
59+
[InlineData("test-ping/+/first", "test-ping/test/first", 1)]
60+
[InlineData("test-ping/+/first", "test-ping/test/first/first", 0)]
61+
[InlineData("test-ping/+/first", "test-ping/first/test", 0)]
62+
[InlineData("test-ping/#", "test-ping/test/first", 1)]
63+
[InlineData("test-ping/#", "test-ping/test/first/first", 1)]
64+
public async Task WildCardSubOnTopic(string wildCardTopic, string topic, int expected)
65+
{
66+
var concurrency = 2;
67+
68+
AddBusConfiguration(mbb =>
69+
{
70+
mbb
71+
.Produce<PingMessage>(x => x.DefaultTopic(topic))
72+
.Consume<PingMessage>(x => x.Topic(wildCardTopic).Instances(concurrency));
73+
});
74+
75+
await BasicPubSub(expected, false);
76+
}
77+
5878
private async Task BasicPubSub(int expectedMessageCopies, bool bulkProduce)
5979
{
6080
// arrange

0 commit comments

Comments
 (0)