-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathCosmosDBBindingAsyncCollector.cs
74 lines (64 loc) · 2.75 KB
/
CosmosDBBindingAsyncCollector.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.WebJobs;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
namespace CosmosDBBinding.Step3
{
public class CosmosDBBindingAsyncCollector<T>: IAsyncCollector<T>
{
private CosmosDBBindingContext cosmosContext;
public CosmosDBBindingAsyncCollector(CosmosDBBindingContext cosmosContext) => this.cosmosContext = cosmosContext;
public async Task AddAsync(
T item,
CancellationToken cancellationToken = default(CancellationToken))
{
if (item == null)
{
throw new ArgumentNullException(nameof(item));
}
if (this.cosmosContext.ResolvedAttribute.CreateIfNotExists)
{
await InitializeContainer(this.cosmosContext);
}
await UpsertDocument(this.cosmosContext, item);
}
public Task FlushAsync(CancellationToken cancellationToken = default(CancellationToken))
{
// no-op
return Task.FromResult(0);
}
private static async Task InitializeContainer(CosmosDBBindingContext context)
{
DatabaseResponse databaseResponse = await context.CosmosClient.GetDatabase(context.ResolvedAttribute.DatabaseName).ReadAsync();
if (databaseResponse.StatusCode == System.Net.HttpStatusCode.NotFound)
{
await context.CosmosClient.CreateDatabaseAsync(context.ResolvedAttribute.DatabaseName);
}
ContainerResponse containerResponse = await context.CosmosClient.GetContainer(context.ResolvedAttribute.DatabaseName, context.ResolvedAttribute.ContainerName).ReadAsync();
if (containerResponse.StatusCode == System.Net.HttpStatusCode.NotFound)
{
await context.CosmosClient.GetDatabase(context.ResolvedAttribute.DatabaseName).CreateContainerAsync(
new CosmosContainerSettings(context.ResolvedAttribute.ContainerName, context.ResolvedAttribute.PartitionKey),
context.ResolvedAttribute.ContainerThroughput
);
}
}
private static async Task UpsertDocument(CosmosDBBindingContext context, T item)
{
// DocumentClient does not accept strings directly.
object convertedItem = item;
if (item is string)
{
convertedItem = JObject.Parse(item.ToString());
}
await context.CosmosClient
.GetContainer(context.ResolvedAttribute.DatabaseName,
context.ResolvedAttribute.ContainerName)
.UpsertItemAsync<T>(item);
}
}
}