Quando falamos de streaming de dados, a primeira coisa que vem a cabeça são streaming mídia: áudio e mídia. Mas em algumas situações, pode ser necessário a implementação de streaming de dados textuais.
Utilizado para a entrega de dados em tempo real, o streaming de dados em uma API é uma técnica utilizada principalmente por redes sociais, como Twitter e Facebook.
No .NET Core este tipo de recurso pode ser implementado via SignalR, mas neste artigo exemplificaremos isso utilizando a estrutura tradicional do ASP.NET Core. Futuramente abordarei o uso do SignalR.
Curso C# (C Sharp) - APIs REST com ASP.NET Web API
Conhecer o cursoCriando uma Web API Simples
Para exemplificar a API Streaming, inicialmente iremos criar uma API simples:
dotnet new web -n ApiSimples
Que conterá apenas um model:
namespace ApiSimples.Models
{
public class Item
{
public long Id { get; set; }
public string Name { get; set; }
public bool IsComplete { get; set; }
public override string ToString() => $"{Id} - {Name} - {IsComplete}";
}
}
E um controller:
namespace ApiSimples.Controllers
{
[Route("api/[controller]")]
[ApiController]
public class TodoController : ControllerBase
{
private static List<Item> _itens = new List<Item>();
[HttpGet]
public ActionResult<List<Item>> Get() => _itens;
[HttpPost]
public async Task<ActionResult<Item>> Post([FromBody] Item value)
{
if(value == null)
return BadRequest();
if(value.Id == 0)
{
var max = _itens.Max(i => i.Id);
value.Id = max+1;
}
_itens.Add(value);
return value;
}
[HttpPut("{id}")]
public async Task<ActionResult<Item>> Put(long id, [FromBody] Item value)
{
var item = _itens.SingleOrDefault(i => i.Id == id);
if(item != null)
{
_itens.Remove(item);
value.Id = id;
_itens.Add(value);
return item;
}
return BadRequest();
}
[HttpDelete("{id}")]
public async Task<ActionResult> Delete(long id)
{
var item = _itens.SingleOrDefault(i => i.Id == id);
if(item != null)
{
_itens.Remove(item);
return Ok(new { Description = "Item removed" });
}
return BadRequest();
}
}
}
À frente este controller será modificado com adição do streaming.
Não se esqueça de adicionar o suporte para o controller em Startup
:
public class Startup
{
public void ConfigureServices(IServiceCollection services) => services.AddControllers();
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
});
}
}
Adicionando o streaming de dados
No ASP.NET Core, ao acessar uma rota, a solicitação é processada e o seu resultado retornado para o usuário. Ao definir um streaming de dados, a solicitação se manterá em um processamento contínuo, retornando dados de um evento, enquanto a conexão se mantiver ativa.
Por não seguir o comportamento padrão, o streaming não pode retornar um objeto ActionResult
, que é o tipo de retorno mais comum. Assim, a primeira coisa a ser feita é definir um ActionResult
customizado:
public class StreamResult : IActionResult
{
private readonly CancellationToken _requestAborted;
private readonly Action<Stream, CancellationToken> _onStreaming;
public StreamResult(Action<Stream, CancellationToken> onStreaming, CancellationToken requestAborted)
{
_requestAborted = requestAborted;
_onStreaming = onStreaming;
}
public Task ExecuteResultAsync(ActionContext context)
{
var stream = context.HttpContext.Response.Body;
context.HttpContext.Response.GetTypedHeaders().ContentType = new MediaTypeHeaderValue("text/event-stream");
_onStreaming(stream, _requestAborted);
return Task.CompletedTask;
}
}
Note que esta classe implementa a interface IActionResult
. Assim, ela poderá ser utilizada no lugar de ActionResult
.
O ponto mais importante dela é a definição do callback onStreaming
, que é recebido por parâmetro:
public StreamResult(Action<Stream, CancellationToken> onStreaming, CancellationToken requestAborted)
{
_requestAborted = requestAborted;
_onStreaming = onStreaming;
}
Este callback será utilizado para salvar os clients que estiverem “ouvindo” o streaming. E o CancellationToken
será utilizado para finalizar o streaming caso a solicitação seja interrompida/cancelada.
Voltando ao controller, é necessário definir uma coleção para salvar os clients:
private static ConcurrentBag<StreamWriter> _clients = new ConcurrentBag<StreamWriter>();
Esta coleção precisa ser thread-safe, por isso que foi utilizada uma coleção do namespace System.Collections.Concurrent
.
Agora é possível definir o endpoint do streaming de dados:
[HttpGet]
[Route("streaming")]
public IActionResult Streaming()
{
return new StreamResult(
(stream, cancelToken) => {
var wait = cancelToken.WaitHandle;
var client = new StreamWriter(stream);
_clients.Add(client);
wait.WaitOne();
StreamWriter ignore;
_clients.TryTake(out ignore);
},
HttpContext.RequestAborted);
}
Note que na função callback da classe StreamResult
estamos adicionando o client na coleção e aguarda-se que a solicitação seja cancelada. Com isso, ao acessar o endpoint, a solicitação se manterá ativa.
No momento já temos um streaming de dados, mas nada está sendo enviado para os clients. Para isso, definiremos um método que irá percorrê-los e escreverá dados no stream:
private async Task WriteOnStream(Item data, string action)
{
foreach (var client in _clients)
{
string jsonData = string.Format("{0}\n", JsonSerializer.Serialize(new { data, action }));
await client.WriteAsync(jsonData);
await client.FlushAsync();
}
}
Os dados escritos, serão as ações do nosso controller:
public async Task<ActionResult<Item>> Post([FromBody] Item value)
{
if(value == null)
return BadRequest();
if(value.Id == 0)
{
var max = _itens.Max(i => i.Id);
value.Id = max+1;
}
_itens.Add(value);
await WriteOnStream(value, "Item added");
return value;
}
[HttpPut("{id}")]
public async Task<ActionResult<Item>> Put(long id, [FromBody] Item value)
{
var item = _itens.SingleOrDefault(i => i.Id == id);
if(item != null)
{
_itens.Remove(item);
value.Id = id;
_itens.Add(value);
await WriteOnStream(value, "Item updated");
return item;
}
return BadRequest();
}
[HttpDelete("{id}")]
public async Task<ActionResult> Delete(long id)
{
var item = _itens.SingleOrDefault(i => i.Id == id);
if(item != null)
{
_itens.Remove(item);
await WriteOnStream(item, "Item removed");
return Ok(new { Description = "Item removed" });
}
return BadRequest();
}
Pronto, o nosso streaming de dados já está completo.
Curso F# (F Sharp) - Fundamentos
Conhecer o curso123 Testando…
Para testar a API, inicialmente é necessário acessar o endpoint do streaming de dados:
Note que o navegador indicará que a página está sendo carregada. Este é o comportamento padrão. Ao enviar uma solicitação para o controller, ela será mostrada nesta página:
A página sempre mostrará que está sendo carregada, mas todas as ações que forem geradas do nosso controller serão exibidas nela.
Simples, não é? Você pode ver esta aplicação completa no meu Github.
No próximo artigo mostrarei como obter isso utilizando o SignalR.