Skip to content

Commit 3bbd297

Browse files
committed
implementation
1 parent a4e3144 commit 3bbd297

File tree

2 files changed

+98
-4
lines changed

2 files changed

+98
-4
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
using System.Collections.Generic;
5+
using System.Diagnostics;
6+
using System.IO;
7+
using System.Threading;
8+
using System.Threading.Tasks;
9+
using Azure.Core;
10+
11+
namespace Azure.Messaging.EventGrid.Namespaces
12+
{
13+
internal class CloudEventRequestContent : RequestContent
14+
{
15+
private readonly IEnumerable<CloudEvent> _cloudEvents;
16+
private const string TraceParentHeaderName = "traceparent";
17+
private const string TraceStateHeaderName = "tracestate";
18+
private readonly bool _isDistributedTracingEnabled;
19+
private RequestContent _serializedContent;
20+
21+
public CloudEventRequestContent(IEnumerable<CloudEvent> cloudEvents, bool isDistributedTracingEnabled)
22+
{
23+
_cloudEvents = cloudEvents;
24+
_isDistributedTracingEnabled = isDistributedTracingEnabled;
25+
}
26+
27+
public CloudEventRequestContent(CloudEvent cloudEvent, bool isDistributedTracingEnabled)
28+
{
29+
_cloudEvents = [ cloudEvent ];
30+
_isDistributedTracingEnabled = isDistributedTracingEnabled;
31+
}
32+
33+
public override void Dispose()
34+
{
35+
}
36+
37+
public override bool TryComputeLength(out long length)
38+
{
39+
EnsureSerialized();
40+
return _serializedContent.TryComputeLength(out length);
41+
}
42+
43+
public override void WriteTo(Stream stream, CancellationToken cancellationToken)
44+
{
45+
EnsureSerialized();
46+
_serializedContent.WriteTo(stream, cancellationToken);
47+
}
48+
49+
public override async Task WriteToAsync(Stream stream, CancellationToken cancellationToken)
50+
{
51+
EnsureSerialized();
52+
await _serializedContent.WriteToAsync(stream, cancellationToken).ConfigureAwait(false);
53+
}
54+
55+
private void EnsureSerialized()
56+
{
57+
if (_serializedContent != null)
58+
{
59+
return;
60+
}
61+
62+
if (_isDistributedTracingEnabled)
63+
{
64+
string currentActivityId = null;
65+
string traceState = null;
66+
Activity currentActivity = Activity.Current;
67+
if (currentActivity != null && (currentActivity.IdFormat == ActivityIdFormat.W3C))
68+
{
69+
currentActivityId = currentActivity.Id;
70+
traceState = currentActivity.TraceStateString;
71+
}
72+
73+
foreach (CloudEvent cloudEvent in _cloudEvents)
74+
{
75+
if (currentActivityId != null &&
76+
!cloudEvent.ExtensionAttributes.ContainsKey(TraceParentHeaderName) &&
77+
!cloudEvent.ExtensionAttributes.ContainsKey(TraceStateHeaderName))
78+
{
79+
cloudEvent.ExtensionAttributes.Add(TraceParentHeaderName, currentActivityId);
80+
if (traceState != null)
81+
{
82+
cloudEvent.ExtensionAttributes.Add(TraceStateHeaderName, traceState);
83+
}
84+
}
85+
}
86+
}
87+
88+
_serializedContent = RequestContent.Create(_cloudEvents);
89+
}
90+
}
91+
}

sdk/eventgrid/Azure.Messaging.EventGrid.Namespaces/src/Customization/EventGridSenderClient.cs

+7-4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ namespace Azure.Messaging.EventGrid.Namespaces
1818
public partial class EventGridSenderClient
1919
{
2020
private readonly string _topicName;
21+
private readonly bool _isDistributedTracingEnabled;
2122

2223
/// <summary> Initializes a new instance of EventGridSenderClient. </summary>
2324
/// <param name="endpoint"> The host name of the namespace, e.g. namespaceName1.westus-1.eventgrid.azure.net. </param>
@@ -66,6 +67,7 @@ public EventGridSenderClient(Uri endpoint,
6667
_endpoint = endpoint;
6768
_apiVersion = options.Version;
6869
_topicName = topicName;
70+
_isDistributedTracingEnabled = options.Diagnostics.IsDistributedTracingEnabled;
6971
}
7072

7173
/// <summary> Initializes a new instance of EventGridSenderClient. </summary>
@@ -93,6 +95,7 @@ public EventGridSenderClient(Uri endpoint,
9395
_endpoint = endpoint;
9496
_apiVersion = options.Version;
9597
_topicName = topicName;
98+
_isDistributedTracingEnabled = options.Diagnostics.IsDistributedTracingEnabled;
9699
}
97100

98101
/// <summary> Publish Single Cloud Event to namespace topic. In case of success, the server responds with an HTTP 200 status code with an empty JSON object in response. Otherwise, the server can return various error codes. For example, 401: which indicates authorization failure, 403: which indicates quota exceeded or message is too large, 410: which indicates that specific topic is not found, 400: for bad request, and 500: for internal server error. </summary>
@@ -104,7 +107,7 @@ public virtual Response Send(CloudEvent cloudEvent, CancellationToken cancellati
104107
Argument.AssertNotNull(cloudEvent, nameof(cloudEvent));
105108

106109
RequestContext context = FromCancellationToken(cancellationToken);
107-
return Send(_topicName, RequestContent.Create(cloudEvent), context);
110+
return Send(_topicName, new CloudEventRequestContent(cloudEvent, _isDistributedTracingEnabled), context);
108111
}
109112

110113
/// <summary> Publish Single Cloud Event to namespace topic. In case of success, the server responds with an HTTP 200 status code with an empty JSON object in response. Otherwise, the server can return various error codes. For example, 401: which indicates authorization failure, 403: which indicates quota exceeded or message is too large, 410: which indicates that specific topic is not found, 400: for bad request, and 500: for internal server error. </summary>
@@ -118,7 +121,7 @@ public virtual async Task<Response> SendAsync(
118121
Argument.AssertNotNull(cloudEvent, nameof(cloudEvent));
119122

120123
RequestContext context = FromCancellationToken(cancellationToken);
121-
return await SendAsync(_topicName, RequestContent.Create(cloudEvent), context).ConfigureAwait(false);
124+
return await SendAsync(_topicName, new CloudEventRequestContent(cloudEvent, _isDistributedTracingEnabled), context).ConfigureAwait(false);
122125
}
123126

124127
/// <summary> Publish Batch Cloud Event to namespace topic. In case of success, the server responds with an HTTP 200 status code with an empty JSON object in response. Otherwise, the server can return various error codes. For example, 401: which indicates authorization failure, 403: which indicates quota exceeded or message is too large, 410: which indicates that specific topic is not found, 400: for bad request, and 500: for internal server error. </summary>
@@ -136,7 +139,7 @@ public virtual async Task<Response> SendAsync(IEnumerable<CloudEvent> cloudEvent
136139
scope.Start();
137140
try
138141
{
139-
return await SendEventsAsync(_topicName, RequestContent.Create(cloudEvents), context).ConfigureAwait(false);
142+
return await SendEventsAsync(_topicName, new CloudEventRequestContent(cloudEvents, _isDistributedTracingEnabled), context).ConfigureAwait(false);
140143
}
141144
catch (Exception e)
142145
{
@@ -160,7 +163,7 @@ public virtual Response Send(IEnumerable<CloudEvent> cloudEvents, CancellationTo
160163
scope.Start();
161164
try
162165
{
163-
return SendEvents(_topicName, RequestContent.Create(cloudEvents), context);
166+
return SendEvents(_topicName, new CloudEventRequestContent(cloudEvents, _isDistributedTracingEnabled), context);
164167
}
165168
catch (Exception e)
166169
{

0 commit comments

Comments
 (0)