Uma jornada na construção de um Data Lakehouse —Parte 3 — Ingestão de dados com CDC e Debezium

Cassio R. Eskelsen
8 min readApr 13, 2022

--

Como visto no post anterior, optamos por utilizar stream de eventos ao invés de batch ETL para hidratar o nosso data lake.

Infelizmente não possuímos ainda um bus de eventos integrando toda a empresa, portanto tivemos que buscar os dados de outra maneira. Modificar os sistemas para que enviassem os dados para o Data Lake estava completamente fora de cogitação, restando então a alternativa de utilizar CDC (Change Data Capture) direto no banco de dados.
CDC é uma feature que a maioria dos bancos de dados implementam e que permite ter um registro completo de todas operações ocorridas com os dados armazenados, sejam estas operações de alteração de estrutura, inserção, alteração ou exclusão de dados. Você pode pensar no CDC como se fosse uma trigger adicionada em todas as tabelas que é disparada em qualquer ação efetuada na estrutura da tabela ou nos dados em si.

O CDC cria os eventos, faltando transportá-los para o Data Lake. Existem várias opções para isso: Azure Data Factory, StreamSet, Debezium, etc. Optamos pelo Debezium por sua excelente relação custo/benefício: é uma ferramenta open-source, ao contrário do StreamSet, e faz todo o trabalho quase que sozinho, ao contrário do Azure Data Factory onde é necessária muita implementação de código.

Esse post não tem como objetivo ser um tutorial completo, existem muitos artigos sobre CDC com Debezium na internet, inclusive específicos para o Databricks como esse. O objetivo é compartilhar algumas dificuldades e aprendizados que tivemos durante a implantação, bem como alguns trechos de código.

CDC no SQL Server

O CDC em bancos SQL Server está disponível tanto em instâncias on-premise quanto nas versões rodando na Azure (nós utilizamos Azure SQL Server Managed Instance).
Um detalhe importante: o CDC só está disponível na Azure nos tiers acima de "Standard 3". Os tiers Basic, S0, S1, S2 não permitem habilitar esse recurso.

Tanto nas instalações on-premise quanto nos serviços da Azure a ativação do CDC é feita da mesma maneira, através da execução de procedures do próprio SQL.
O CDC é ativado em duas etapas: uma para habilitação do CDC em si no database e outra por tabela.

Ativando o CDC no database

Ativando o CDC para cada tabela desejada

Para o exemplo vamos supor que temos um schema Faturamento e uma tabela NotasFiscais

A partir desse momento o SQL já estará registrando todas as modificações da tabela Faturamento.NotasFiscais.

O registro de alterações feito pelo CDC é salvo em um schema especial chamado "cdc" dentro do database, sendo que o nome da tabela que persistirá as alterações está especificado no parâmetro "captureinstance".

Comunicando o CDC que a estrutura da tabela foi alterada

A tabela "capture instance" tem uma estrutura que representa a estrutura da tabela de origem no momento em que o CDC foi ativado para a tabela.

Infelizmente o SQL Server não atualiza essa estrutura automaticamente quando efetuamos uma alteração na estrutura da tabela de origem, o que implica que precisamos efetuar esse processo manualmente. Caso o procedimento não seja efetuado, novas colunas do banco de dados não serão levadas para o Data Lake.

O procedimento envolve três etapas:

  1. Alteração da Estrutura da tabela de origem
  2. Criação de uma nova "Capture Instance".
  3. Exclusão da antiga "Capture Instance".

Exemplo das etapas 2 e 3:

Note que se alguma alteração for executada nos dados da tabela "NotasFiscais" entre a criação da nova capture instance e a exclusão da antiga, o agent do SQL server irá persistir a alteração nas duas instâncias e o Debezium irá enviar as duas cópias para o Event Hub. É necessário portanto ter isso em vista quando for desenvolvido o código que recebe os eventos e efetivamente os persiste no Data Lake.

Configurando o Debezium

O Debezium é uma plataforma para CDC criada inicialmente para bancos de dados open-source como MySQL e PostgreSQL utilizando como base o Apache Kafka.
Posteriormente evoluiu para suportar outros bancos de dados e outros softwares de mensageria.

Em nosso caso não pretendíamos criar uma infraestrutura de Kafka e sim utilizar o Azure Event Hub já existente em nossa cloud. Para isso foi necessário utilizar o pacote "Debezium Server" que não utiliza o Kafka como base e permite utilizar em seu lugar Azure Event Hub, Apache Pulsar, Redis, Amazon Kinesis e Google Cloud Pub/Sub.
Um tutorial bem detalhado da sua utilização pode ser encontrado neste link.

O Debezium não só envia todas as alterações do CDC para o serviço de mensageria como cria um snapshot inicial de todo o database. Ou seja, você aponta o Debezium Server para uma determinada tabela, ele busca o estado atual de toda a tabela e manda como eventos para a mensageria e uma vez concluído o snapshot ele passa a enviar a as alterações nos dados. Para isso funcionar corretamente é necessário que o CDC já esteja ativo na tabela para que o Debezium saiba em que ponto a tabela estava quando iniciou o snapshot.

O Debezium persiste em um arquivo (por default chamado offsets.dat) qual foi a última posição da tabela que ele enviou para a mensageria. Esse é um ponto fraco seu: caso você perca o arquivo ou ele for corrompido, o Debezium reinicia todo o processamento da tabela, ou seja, cria um novo snapshot inicial. Imagine isso acontecendo com uma tabela com quase 100 milhões de registros!

Por esse motivo, e para ter o controle individual de cada tabela, decidimos criar uma instância do Debezium Server para cada tabela, ou grupo de tabelas quando forem tabelas muito pequenas. Em nosso caso isso significou criar um deployment no Kubernetes por instância do Debezium.
O Debezium utiliza o Quarkus, por isso o consumo de recursos por deployment é baixíssimo e subir uma instância por tabela não teve impacto significativo em nosso cluster de Kubernetes.

Todos os deployments de Debezium utilizam uma mesma imagem base hospedada em nosso Azure Container Registry. Essa imagem é criada a partir do Dockerfile disponibilizado pela comunidade do Debezium:

Uma menção importante desse Dockerfile é a adição da linha 57 (ENV JAVA_OPTS: -Dfile.encoding=UTF-8). O objetivo dela foi resolver um problema de collation entre o SQL Server e o Debezium. Sem ela, os caracteres acentuados/especiais iriam para o Data Lake completamente desconfigurados.

O arquivo de deployment no kubernetes é o abaixo:

Alguns trechos que valem ser observados:

  • Da linha 1 à linha 13 estamos criando um storage para guardar o arquivo com o offset e demais dados mutáveis da instância.
  • Em algumas linhas como a 4,5, 18 e 19 você pode perceber algumas chaves especiais como #{deployment}#. Essas chaves são substituídas pelos valores finais pela pipeline de CI/CD (utilizamos Azure DevOps)

O Debezium espera ainda alguns parâmetros de configuração. Esses parâmetros podem ser inseridos através de um arquivo Config do k8s. Perceba que na linha 79 especificamos o nome do arquivo de configuração (debezium-server-config).
Como temos múltiplos deploys de debezium, na prática esse nome é definido pela pipeline de CI/CD e adotamos o padrão debezium-nome-da-tabela-config, mas aqui vamos simplificar.

Um exemplo de arquivo config segue abaixo:

Alguns parâmetros são auto-explicativos, outros merecem uma menção:

Linha 2, debezium.sink.eventhubs.connectionstring, string de conexão do Event Hub.

Linha 3, debezium.sink.eventhubs.hubname, nome do hub no Event Hub (abaixo veremos mais detalhes de configuração do Event Hub).

Linha 9, debezium.source.database.database.applicationIntent=ReadOnly, aqui estamos especificando para o Debezium ler da réplica do SQL Server. Se você não possui uma réplica, comente essa linha.

Linhas 10–15, configurações do SQL Server.

Linha 17, debezium.source.table.include.list=Faturamento.NotasFiscais, aqui especificamos qual tabela o Debezium essa instância do Debezium deverá processar. Pode ser mais de uma tabela, basta separar por virgula.

Esse arquivo de parâmetros pode ser aplicado para o k8s com um comando semelhante abaixo. Em nosso caso esse comando é aplicado pela pipeline de CI/CD.

kubectl create configmap debezium-server-config --from-file=application.properties --namespace <namespace_name>

Configurando o Azure Event Hub

Para termos uma garantia de não perder nenhum dado do Event Hub optamos por não ler diretamente do Event Hub e ativamos a opção de Capture. Essa opção envia todos os eventos que chegam para um blob storage no formato avro. O processamento do Databricks é feito a partir desses arquivos avro.

Para utilizar o Capture é necessário habilitar a camada Standard de preços do Event Hub.

Você precisa criar no mínimo um hub (e informar o seu nome na opção debezium.sink.eventhubs.hubname=xxxxx do arquivo de properties).
A criação do hub pode ser iniciada clicando no link "+ Event Hub"

Na página seguinte, informe o nome do Hub e coloque o Message Retention no máximo.

Avance para a opção Capture. Aqui que iremos definir o envio para o storage:

O capture não irá gerar um arquivo para cada evento. Ele irá tentar agrupá-los e irá disparar a criação do avro em uma das condições determinadas em Time Window e Size Window. A criação será feita quando a primeira condição for satisfeita.
Por exemplo: se não temos 190 Mb de arquivos mas se passou um minuto, o arquivo será criado mesmo que haja apenas um evento.
A frequência mínima é de 1 minuto ou 10 Mb. Ou seja, na prática você sempre terá um pequeno delay entre o momento em que o evento chega e o momento em que o avro está disponível para processamento no Databricks.

Em Azure Storage Container deve ser definido para qual container/blob storage os dados devem ser enviados.

Configurando o Event Hub para suportar o fluxo de eventos

O Event Hub trabalha com o conceito de Throughput Units(TU). Uma TU suporta:

Entrada de eventos: 1Mb de eventos por segundo ou 1000 eventos (o que ocorrer primeiro)
Saída de eventos: 2Mb por segundo ou 4096 eventos (o que ocorrer primeiro)

Se a sua realidade for a de muitos eventos, uma única TU não será suficiente. Outra situação em que uma TU não é suficiente é quando você tem um banco de dados enorme que irá gerar muitos eventos de snapshot inicial. Como o Debezium é MUITO rápido, irá esgotar rapidamente a capacidade.

Por esse motivo precisamos habilitar a escala automática de TUs. Isso é feito na opção marcada abaixo:

Na página que irá abrir, habilite a opção auto-inflate:

Defina o máximo que pode ser escalado (recomendo deixar em 40). Em nossos snapshot iniciais o número de TUs escaladas foi de, em média, 15.

Mas tome cuidado: o Event Hub apenas escala para cima e não diminui o número de TUs quando elas não estiverem sendo usadas e isso aumenta desnecessarimente os custos. Após concluído o snapshot inicial recomendo voltar o primeiro slide, THROUGHPUT UNITS, para 1.

Fontes adicionais para referência

Como CDC e Debezium são assuntos amplos, é impossível esgotar tudo em um post. Por isso vou deixar uma lista de fontes de informação que usei para montar a nossa solução:

  1. Enable and disable change data capture
  2. How to Analyze and Read Change Data Capture (CDC) Records
  3. Change Data Capture for auditing SQL Server
  4. Streaming de dados do SQL Server, CDC e Apache Kafka usando Debezium.
  5. Realtime analytics from SQL Server to Power BI with Debezium
  6. Azure SQL / SQL Server Change Stream with Debezium

--

--