Photo by Denys Nevozhai
Introduction
This is the first part in a series I'm planning to write about how to consume http apis with rate limiting in mind. This post will focus on how to handle rate limit when the limit is set using concurrent requests. The next post will focus on "request per seconds" kind of rate limiting.
The problem
We need to fetch weather data for 10 000 different locations. We need to fetch the data as fast as possible. The API only allows 10 requests at the same time. If we send more than that they will respond with the 429 Too Many Requests status code.
Prerequisite
This is the code that fetches the data from the external API, it will not change during our different approaches. As you can see, we have no error handling except the EnsureSuccessStatusCode call, if you are interested in how to handle common errors when using HttpClient, check out my post about that here
public class DummyApiHttpClient
{
private static readonly JsonSerializerOptions DefaultJsonSerializerOptions = new JsonSerializerOptions(JsonSerializerDefaults.General)
{
PropertyNameCaseInsensitive = true
};
private readonly HttpClient _httpClient;
public DummyApiHttpClient(HttpClient httpClient)
{
_httpClient = httpClient ?? throw new ArgumentNullException(nameof(httpClient));
}
public async Task<IReadOnlyCollection<WeatherForecastResponse>> GetWeatherForecast(string location)
{
using var request = new HttpRequestMessage(HttpMethod.Get, $"/weatherforecast?location={location}");
using var response = await _httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead);
response.EnsureSuccessStatusCode();
await using var responseContent = await response.Content.ReadAsStreamAsync();
return await JsonSerializer.DeserializeAsync<IReadOnlyCollection<WeatherForecastResponse>>(responseContent, DefaultJsonSerializerOptions);
}
}
WeatherForecaseResponse
public class WeatherForecastResponse
{
public DateTime Date { get; set; }
public int TemperatureC { get; set; }
public string Summary { get; set; }
}
Fake API
I've created a fake API with the following controller
[ApiController]
[Route("[controller]")]
public class WeatherForecastController : ControllerBase
{
private static readonly List<WeatherForecast> Forecasts = new List<WeatherForecast>
{
new WeatherForecast
{
Date = DateTime.UtcNow.Date,
TemperatureC = 30,
Summary = "Freezing"
},
new WeatherForecast
{
Date = DateTime.UtcNow.AddDays(1).Date,
TemperatureC = 30,
Summary = "Freezing"
},
new WeatherForecast
{
Date = DateTime.UtcNow.AddDays(2).Date,
TemperatureC = 30,
Summary = "Freezing"
},
new WeatherForecast
{
Date = DateTime.UtcNow.AddDays(3).Date,
TemperatureC = 30,
Summary = "Freezing"
},
new WeatherForecast
{
Date = DateTime.UtcNow.AddDays(4).Date,
TemperatureC = 30,
Summary = "Freezing"
},
};
[HttpGet]
public async Task<IEnumerable<WeatherForecast>> Get()
{
await Task.Delay(25);
return Forecasts;
}
}
Approaches
Sequential
We start of with the most basic approach. Given a list of locations, we just loop through them one by one and call the API sequentially.
public class GetWeatherForecastQuerySequentially : IGetWeatherForecastQuery
{
private readonly DummyApiHttpClient _dummyApiHttpClient;
public GetWeatherForecastQuerySequentially(DummyApiHttpClient dummyApiHttpClient)
{
_dummyApiHttpClient = dummyApiHttpClient ?? throw new ArgumentNullException(nameof(dummyApiHttpClient));
}
public async Task<IReadOnlyCollection<WeatherForecastResponse>> Execute(IReadOnlyList<string> locations)
{
var responses = new List<IReadOnlyCollection<WeatherForecastResponse>>(locations.Count);
for (var i = 0; i < locations.Count; i++)
{
var result = await _dummyApiHttpClient.GetWeatherForecast(locations[i]);
responses.Add(result);
}
return responses.SelectMany(x => x).ToArray();
}
}
Straight forward solution:
- Call the API
- Add the response to the responses list
- Flatten the responses and return a new array.
When dealing with the amount of data that we are dealing with (20 000 locations), this is a really slow approach. Imagine that each call takes 1 second...it quickly ends up being horribly slow.
No limits
Don't use this code in production.
This is our first, naive, approach to speed things up.
public class GetWeatherForecastQueryNoLimits : IGetWeatherForecastQuery
{
private readonly DummyApiHttpClient _dummyApiHttpClient;
public GetWeatherForecastQueryNoLimits(DummyApiHttpClient dummyApiHttpClient)
{
_dummyApiHttpClient = dummyApiHttpClient ?? throw new ArgumentNullException(nameof(dummyApiHttpClient));
}
public async Task<IReadOnlyCollection<WeatherForecastResponse>> Execute(IReadOnlyList<string> locations)
{
var responses = new ConcurrentBag<IReadOnlyCollection<WeatherForecastResponse>>();
var tasks = locations.Select(async location => {
var response = await _dummyApiHttpClient.GetWeatherForecast(location);
responses.Add(response);
});
await Task.WhenAll(tasks);
return responses.SelectMany(x => x).ToArray();
}
}
- We create a ConcurrentBag where we will store all responses.
- We create a task for each location -> fetch the response.
- We add the response to the ConcurrentBag.
- We wait for all the calls to complete with Task.WhenAll
- We flatten the responses and return a new array.
This code is not throttled in any way and you can run into all kind of problems by using this code (running out of resources, getting rate limited...). Don't use it.
SemaphoreSlim
This approach is inspired by this answer found on Stack Overflow.
SemaphoreSlim represents a lightweight alternative to Semaphore that limits the number of threads that can access a resource or pool of resources concurrently.
We can't use a regular lock
here since it doesn't work with await.
public async Task<IReadOnlyCollection<WeatherForecastResponse>> Execute(IReadOnlyList<string> locations)
{
var semaphoreSlim = new SemaphoreSlim(
initialCount: 10,
maxCount: 10);
var responses = new ConcurrentBag<IReadOnlyCollection<WeatherForecastResponse>>();
var tasks = locations.Select(async location =>
{
await semaphoreSlim.WaitAsync();
try
{
var response = await _dummyApiHttpClient.GetWeatherForecast(location);
responses.Add(response);
}
finally
{
semaphoreSlim.Release();
}
});
await Task.WhenAll(tasks);
return responses.SelectMany(x => x).ToArray();
}
- We create a new SempahoreSlim where we set initialCount and maxCount to 10. This is how we throttle our requests.
- We create a ConcurrentBag that will hold the forecast responses.
- For each location we create a task which represents the fetching of the forecast.
- We wait until the SemaphoreSlim allows us to "enter".
- We add the response to the ConcurrentBag.
- We then call Release on the SemaphoreSlim which means that a new request is now allowed to be sent.
- We wait for all tasks to finish
- We flatten the list and return a new array.
TransformBlock
Another solution to the problem is to use a TransformBlock.
public class GetWeatherForecastQueryTransformBlock : IGetWeatherForecastQuery
{
private readonly DummyApiHttpClient _dummyApiHttpClient;
public GetWeatherForecastQueryTransformBlock(DummyApiHttpClient dummyApiHttpClient)
{
_dummyApiHttpClient = dummyApiHttpClient ?? throw new ArgumentNullException(nameof(dummyApiHttpClient));
}
public async Task<IReadOnlyCollection<WeatherForecastResponse>> Execute(IReadOnlyList<string> locations)
{
var transformBlock = new TransformBlock<string, IReadOnlyCollection<WeatherForecastResponse>>(
_dummyApiHttpClient.GetWeatherForecast,
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 10
}
);
var buffer = new BufferBlock<IReadOnlyCollection<WeatherForecastResponse>>();
transformBlock.LinkTo(buffer);
for (var i = 0; i < locations.Count; i++)
{
await transformBlock.SendAsync(locations[i]);
}
transformBlock.Complete();
await transformBlock.Completion;
return buffer.TryReceiveAll(out var forecasts)
? forecasts.SelectMany(x => x).ToArray()
: throw new Exception("Error when trying to receive items from Buffer");
}
}
You can think of the TransformBlock as a message queue.
- We create a new TransformBlock. We specify 10 as MaxDegreeOfParallelism, this is how we throttle our requests.
- We create a BufferBlock and link it to the TransformBlock. The BufferBlock will hold our responses.
- We loop through all locations and call SendAsync on the TransformBlock. This publishes a "message" to the TransformBlock, basically queueing up the work that should be done. Note that we are passing the location as a parameter.
- We then call Complete on the transformBlock, meaning that it should not accept any more "messages".
- We then wait for all the "messages" to be processed.
- We then try to get all "messages" from the buffer and flatten it to a new array.
Benchmarks
We will now run a couple of different benchmarks to see how each version performs.
10 Locations
| Method | Mean | Error | StdDev | Ratio | RatioSD | Gen 0 | Gen 1 | Gen 2 | Allocated |
|--------------- |----------:|----------:|---------:|------:|--------:|------:|------:|------:|----------:|
| Sequentially | 309.23 ms | 41.27 ms | 2.262 ms | 1.00 | 0.00 | - | - | - | 48.8 KB |
| TransformBlock | 32.24 ms | 114.33 ms | 6.267 ms | 0.10 | 0.02 | - | - | - | 58.97 KB |
| SemaphoreSlim | 28.92 ms | 33.52 ms | 1.837 ms | 0.09 | 0.01 | - | - | - | 53.59 KB |
| NoLimits | 43.75 ms | 44.22 ms | 2.424 ms | 0.14 | 0.01 | - | - | - | 53.8 KB |
As one would expect, the sequential approach is really slow.
100 Locations
| Method | Mean | Error | StdDev | Ratio | Gen 0 | Gen 1 | Gen 2 | Allocated |
|--------------- |-----------:|---------:|---------:|------:|------:|------:|------:|----------:|
| Sequentially | 3,199.8 ms | 647.2 ms | 35.48 ms | 1.00 | - | - | - | 487.42 KB |
| TransformBlock | 318.8 ms | 167.0 ms | 9.15 ms | 0.10 | - | - | - | 509.91 KB |
| SemaphoreSlim | 328.1 ms | 471.2 ms | 25.83 ms | 0.10 | - | - | - | 495.21 KB |
| NoLimits | 216.0 ms | 465.4 ms | 25.51 ms | 0.07 | - | - | - | 487.07 KB |
The "NoLimits" approach is still faster than the TransformBlock and SemaphoreSlim since it's not throttled at all.
1000 Locations
| Method | Mean | Error | StdDev | Ratio | Gen 0 | Gen 1 | Gen 2 | Allocated |
|--------------- |---------:|---------:|---------:|------:|----------:|------:|------:|----------:|
| Sequentially | 31.492 s | 3.6081 s | 0.1978 s | 1.00 | 1000.0000 | - | - | 4.52 MB |
| TransformBlock | 3.197 s | 0.2569 s | 0.0141 s | 0.10 | 1000.0000 | - | - | 4.86 MB |
| SemaphoreSlim | 3.224 s | 0.7269 s | 0.0398 s | 0.10 | 1000.0000 | - | - | 4.84 MB |
| NoLimits | 2.301 s | 0.2653 s | 0.0145 s | 0.07 | 1000.0000 | - | - | 4.75 MB |
Same story here, as expected.
10 000 Locations
| Method | Mean | Error | StdDev | Ratio | RatioSD | Gen 0 | Gen 1 | Gen 2 | Allocated |
|--------------- |---------:|--------:|--------:|------:|--------:|-----------:|----------:|------:|-----------:|
| Sequentially | 318.11 s | 9.195 s | 0.504 s | 1.00 | 0.00 | 10000.0000 | 3000.0000 | - | 47664384 B |
| TransformBlock | 32.05 s | 2.752 s | 0.151 s | 0.10 | 0.00 | 10000.0000 | 1000.0000 | - | 50970464 B |
| SemaphoreSlim | 32.03 s | 2.216 s | 0.121 s | 0.10 | 0.00 | 10000.0000 | 1000.0000 | - | 50812704 B |
| NoLimits | NA | NA | NA | ? | ? | - | - | - | - |
What happend here? Well it turns out that my computer really didn't like spawning (and running) 10 000 tasks at the same time. I've tried it multiple times but I can't get any results for the NoLimits method. Either my benchmark crashes or my DummyAPI. This is why you should not use this method in production. By not limiting your code you are at risk of either crashing your own application or the API that you are consuming.
Conclusions
- When dealing with multiple I/O operations that can be processed concurrently, you should really do so. The Sequential approach is really really slow (as expected).
- Don't use the NoLimits approach.
- There is not that much of a difference between the TransformBlock and the SemaphoreSlim approach. Personally I think that the SemaphoreSlim approach is a bit easier to read.
All code can be found on GitHub.