Google Cloud Workflow a partir de eventos
A tarefa mais comum de um software é automatizar fluxos de trabalho.
Muitas vezes temos diferentes fluxos usando diferentes combinações de etapas comuns que acabam tornando nossos sistemas verdadeiros spaghettis. Além disso, não é incomum que um fluxo atravesse diferentes softwares/serviços.
Utilizar uma ferramenta de automação de workflow pode economizar um grande tempo em desenvolvimento: podemos nos preocupar em desenvolver nossas regras de negócio e não desperdiçamos tempo reinventando a roda, cuja versão caseira nem sempre é tão… redonda.
Nesse artigo irei mostrar como utilizar a ferramenta de Workflow do serviço de Cloud da Google, o "Workflows" (irei chamar de GCP Workflows quando estiver me referenciando ao serviço ao invés do fluxo de trabalho em si) .
O GCP Workflows é um serviço completamente gerenciado pela GCP que você paga somente quando usa e o custo é baixíssimo. A cobrança é por cada passo de execução e você paga, em valores atuais, US$ 0.01 a cada mil passos internos executados. Passos internos são chamadas a serviços hospedados dentro da GCP. Passos que envolvam chamados a APIs externas são tarifados em US$ 0.025 a cada 1000 passos.
Você precisa apenas observar que qualquer execução é cobrada, incluindo atribuições de variáveis, avaliação de condicionais, etc. Mesmo assim o custo é muito baixo. Nessa página você pode encontrar detalhes sobre a cobrança e dicas para otimizar os custos.
É possível chamar qualquer serviço da GCP diretamente de um Workflow, mas provavelmente o mais comum será executar regras de negócio dentro de Cloud Functions ou containers hospedados no serviço Cloud Run.
Este post não será uma análise profunda das possibilidades do GCP Workflows (que são muitas). O objetivo é trazer um exemplo prático para demonstrar algumas funcionalidades.
Definição do domínio do fluxo de exemplo
O nosso fluxo de exemplo simulará o processamento de uma solicitação de aluguel de uma casa. Em um aplicativo/página da internet uma pessoa faz a solicitação e o backend a enfilera em um tópico do Google Pub/Sub. Isso dispara um gatilho que inicia o processamento do fluxo. É a partir daqui que iremos trabalhar.
Observação: nossos serviços serão apenas mocks para focarmos no GCP Workflows em si.
As regras de negócio do nosso Workflow são as abaixo: (lembrando, é apenas um fluxo imaginário, não há a pretensão de simular um serviço real)
- cliente confirmou intenção de alugar uma casa (post/publicação no pub/sub)
- verifica se é um cliente conhecido
se não for, vai para etapa de "identificação do cliente"
se for, vai para scoragem do cliente
- identificação do cliente (verificação documentos com OCR ou alguma biometria)
se documentos ok, vai para scoragem do cliente
senão, encerra o fluxo
- scoragem do cliente:
se score >= 80, vai para reserva de imóvel
senão, se >=40, manda email para o cliente pedindo fiador
senão (<40), encerra fluxo
- reserva de imóvel
se reserva com sucesso, adiciona no tópico de sucesso ( para confecção do
contrato, etc)
senão, adiciona na fila de insucesso (para avisar o cliente, etc)
Produtos utilizados e Setup inicial
Serão utilizados os seguintes produtos:
a) Cloud Functions — será criada uma cloud function para cada etapa;
b) Pub/Sub — um tópico para recebimento de solicitações de aluguel, um para resultado ok da solicitação e um para as solicitações que foram negadas por algum motivo qualquer;
c) GCP Eventarc — plataforma para envio de eventos da GCP. Aqui ela será utilizada para disparar a execução do Workflow assim que uma mensagem chegar no tópico de entrada;
d) e, obviamente, Google Workflows!
Se você ainda não for cliente da GCP pode criar uma conta para testes com uma boa quantidade de créditos grátis. No entanto mesmo para clientes antigos há uma quantidade de créditos gratuitos mensais e nossos testes só utilizarão uma parte deles. Ou seja, essa demonstração é totalmente 0800 :)
A sua conta na GCP pode ser organizada em projetos. Sugiro que você crie um projeto específico para os códigos abaixo. Assim bastará apagar o projeto que todos os recursos na GCP serão liberados assim que você terminar seus testes.
Será necessário que você associe uma conta de Billing ao projeto (não se preocupe, nada será cobrado se você não exceder a cota). Veja nesse tutorial como fazer isso, mas basicamente basta você entrar na opção de Billing do Console da GCP. Se ainda não estiver associada estará disponível uma opção para fazer a operação. O projeto utilizado em nosso exemplo foi nomeado como real-state-wf. Caso você crie com outro nome faça os ajustes necessários.
Setup
Iremos rodar nosso exemplo utilizando a cli da GCP. Nessa página você tem informações de como instalá-la caso ainda não possua.
Após instalada abra um terminal e cole os comandos abaixo para criar algumas variáveis de ambiente e habilitar as APIs necessárias:
export PROJECT_ID=real-state-wf #altere para o nome desejado
export SERVICE_ACCOUNT=workflows-sa
export INCOMING_TOPIC_ID=rent-request
export SUCCES_TOPIC_ID=sucess-topic
export NOT_SUCESS_TOPIC_ID=not-sucess-topic
export REGION=us-east1 #você pode alterar pela sua preferência
export WORKFLOW_NAME=house-rent
gcloud config set project ${PROJECT_ID}
gcloud config set run/region ${REGION}
gcloud services enable cloudbuild.googleapis.com cloudfunctions.googleapis.com \
run.googleapis.com storage.googleapis.com containerregistry.googleapis.com \
workflows.googleapis.com eventarc.googleapis.com
Os comandos abaixo definem a região padrão para os workflows e criam a conta de serviço necessária:
gcloud config set workflows/location ${REGION}
gcloud config set eventarc/location ${REGION}
gcloud iam service-accounts create ${SERVICE_ACCOUNT}
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
--member "serviceAccount:${SERVICE_ACCOUNT}@${PROJECT_ID}.iam.gserviceaccount.com" \
--role "roles/run.invoker"
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
--member="serviceAccount:${SERVICE_ACCOUNT}@${PROJECT_ID}.iam.gserviceaccount.com" \
--role="roles/workflows.invoker"
Agora iremos criar os tópicos e subscriptions do Pub/Sub:
#create incoming topic for incominf rent requests
gcloud pubsub topics create ${INCOMING_TOPIC_ID} --project=${PROJECT_ID}
#result topics
gcloud pubsub topics create ${SUCCES_TOPIC_ID} --project=${PROJECT_ID}
gcloud pubsub topics create ${NOT_SUCESS_TOPIC_ID} --project=${PROJECT_ID}
#subscriptions
gcloud pubsub subscriptions create ${SUCCES_TOPIC_ID}"-subscription" --topic=${SUCCES_TOPIC_ID} --project=${PROJECT_ID}
gcloud pubsub subscriptions create ${NOT_SUCESS_TOPIC_ID}"-subscription" --topic=${NOT_SUCESS_TOPIC_ID} --project=${PROJECT_ID}
Código de Exemplo
Todo código do exemplo está disponível nesse repositório do GitHub:
git clone git@github.com:cassioeskelsen/gcp_workflow.git
A primeira coisa que iremos fazer é criar as Cloud Functions (check_person_exists, identify_person, score_person e reserve_house). Entre em cada diretório e digite o comando respectivo para criar as cloud functions, conforme demonstrado abaixo:
cd gcp_workflow/check_person_exists
gcloud functions deploy check_person_exists \
--runtime python38 \
--trigger-http \
--allow-unauthenticated
cd ../identify_person
gcloud functions deploy identify_person \
--runtime python38 \
--trigger-http \
--allow-unauthenticated
cd ../score_person
gcloud functions deploy score_person \
--runtime python38 \
--trigger-http \
--allow-unauthenticated
cd ../reserve_house
gcloud functions deploy reserve_house \
--runtime python38 \
--trigger-http \
--allow-unauthenticated
cd ..
Testando as Cloud Functions
Antes de prosseguir com a criação do Workflow é interessante testarmos se todas as cloud functions estão funcionando corretamente. Isso pode ser feito com os comandos abaixo. Cada um deverá retornar um Json.
curl $(gcloud functions describe check_person_exists --format='value(httpsTrigger.url)') \
-X POST \
-H "content-type: application/json" \
-d '{"person_id":"123.123.123-13"}'
curl $(gcloud functions describe identify_person --format='value(httpsTrigger.url)') \
-X POST \
-H "content-type: application/json" \
-d '{"person_id":"123.123.123-13","photo_id":"a8ec3803-d2c5-4e0a-8c38-629840ad212"}'
curl $(gcloud functions describe score_person --format='value(httpsTrigger.url)') \
-X POST \
-H "content-type: application/json" \
-d '{"person_id":"123.123.123-13","photo_id":"a8ec3803-d2c5-4e0a-8c38-629840ad212"}'
curl $(gcloud functions describe reserve_house --format='value(httpsTrigger.url)') \
-X POST \
-H "content-type: application/json" \
-d '{"person_id":"123.123.123-13","photo_id":"a8ec3803-d2c5-4e0a-8c38-629840ad212", "house_id": "1"}'
Os resultados esperados são, respectivamente:
- {“person_exists”:true}
- {“identified”:true}
- {“score”:90}
- {“message”:”House reserved or already rented”,”result”:false}
Criando o nosso Workflow
Um Workflow nada mais é do que um arquivo yaml ou json contendo a descrição de todos os passos.
Essa página traz um overview sobre a estrutura de um workflow.
O workflow completo está na raíz do repositório (arquivo workflow.yaml). Abaixo irei comentar cada trecho dele:
main:
params: [ event ]
steps:
- init:
assign:
- project: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
- sucess_topic: "sucess-topic"
- not_sucess_topic: "not-sucess-topic"
- decode_pubsub_message:
assign:
- event_base64: ${base64.decode(event.data.message.data)}
- rent_request: ${json.decode(text.decode(event_base64))}
O trecho acima recebe o evento como parâmetro, vindo do gatilho do Eventarc. Em seguida define algumas variáveis globais bem como decodifica a mensagem do PubSub. (Para fins didáticos isso foi separado em dois blocos assign, mas se quisermos economizar custos podemos juntar em um bloco assign só para que seja tarifado como um step apenas).
- check_person_exists:
steps:
- check:
call: http.post
args:
url: https://us-central1-real-state-wf.cloudfunctions.net/check_person_exists
body:
person_id: ${rent_request.person_id}
result: check_person_exists_result
- switch_check:
switch:
- condition: ${check_person_exists_result.body.person_exists == true}
next: score_person
Esse é o primeiro passo "de negócio". Ele tentará identificar o cliente chamando a nossa Cloud Function "check_person_exists".
Perceba que logo abaixo da url temos uma seção chamada body. Aqui informaremos os valores que serão passados para a Cloud Function como argumento. Apenas os valores aqui listados estarão disponíveis dentro da Cloud Function. Veja abaixo no código Python da Cloud Function como o valor é recuperado:
@functions_framework.http
def check_person_exists(request):
request_json = request.get_json()
if request_json['person_id'] in persons:
output = {'person_exists': True}
else:
output = {'person_exists': False}
return jsonify(output)
A linha result: check_person_exists_result faz com que o retorno da Cloud Function (o json) seja atribuído à variável check_person_exists_result. Dessa forma o resultado poderá ser usado em qualquer passo seguinte.
O step switch_check contém um desvio condicional (switch). Ele verifica dentro do retorno da Cloud Function check_person_exists se o field person_exists está com o valor True. Se estiver, desvia para o step score_person. Senão, seguirá o fluxo normalmente, que no caso é o step identify_person.
Cabe mencionar que os steps check e switch_check foram colocados dentro de outro step chamado check_person_exists. Isso foi feito apenas por questão de organização de código, não é um procedimento obrigatório. Check e switch_check poderiam estar logo abaixo do step main.
- identify_person:
steps:
- identify:
call: http.post
args:
url: https://us-central1-real-state-wf.cloudfunctions.net/identify_person
body:
person_id: ${rent_request.person_id}
photo_id: ${rent_request.photo_id}
result: identify_person_exists_result
- switch_identify:
switch:
- condition: ${not(identify_person_exists_result.body.identified)}
next: cant_identified
A etapa seguinte será executada apenas se na primeira etapa não foi possível encontrar o cliente. Nesse caso tentaremos "cadastrar" o cliente a partir de um documento ou biometria que está dentro de um bucket identificado por photo_id (use a imaginação aqui, a Cloud Funcion respectiva é apenas um mock!).
Caso ainda não tenha sido possível identificar o cliente a Cloud Function retornará False e nessa caso a condition ${not(identify_person_exists_result.body.identified)} fará com que o fluxo seja desviado para um step de erro, o "cant_identified".
Uma vez identificado o cliente, podemos ir para a etapa de Score:
- score_person:
steps:
- calc_score:
call: http.post
args:
url: https://us-central1-real-state-wf.cloudfunctions.net/score_person
body:
person_id: ${rent_request.person_id}
result: score_person_result
- switch_score:
switch:
- condition: ${score_person_result.body.score >= 80}
next: reserve_house
- condition: ${score_person_result.body.score >= 40 and score_person_result.body.score < 80}
next: request_guarantor
- condition: ${score_person_result.body.score < 40}
next: not_approved
Essa etapa desvia o fluxo da seguinte forma:
- Se o score retornado for maior ou igual a 80, chama a etapa que tenta reservar a casa.
- Se o score retornado estiver entre 40 e 80, chama a etapa que irá solicitar um fiador.
- Se o score for abaixo de 40, interrompe totalmente o processo chamando a etapa de não aprovação.
Importante notar que o GCP Workflows avalia todas as etapas, por isso precisamos definir bem as faixas e não podemos usar a forma simplifica que usaríamos normalmente em uma linguagem de programação (≥=80, ≥40, ≥0 ).
A última Cloud Function irá tentar reservar a casa:
- reserve_house:
steps:
- try_reserve:
call: http.post
args:
url: https://us-central1-real-state-wf.cloudfunctions.net/reserve_house
body:
person_id: ${rent_request.person_id}
house_id: ${rent_request.house_id}
result: reserve_house_result
- switch_reserve:
switch:
- condition: ${reserve_house_result.body.result == true}
next: rental_approved
- condition: ${reserve_house_result.body.result == false}
next: rental_not_possible
Em caso de algum problema (casa já reservada ou já alugada), irá chamar a etapa "rental_not_possible", caso contrário irá para a etapa de sucesso "rental_approved".
As próximas etapas são as chamadas em caso de algum problema:
- cant_identified:
assign:
- final_result: "Can't identified person"
next: not_sucess
- not_approved:
assign:
- final_result: "Can't approve - low score"
next: not_sucess
- request_guarantor:
assign:
- final_result: "Request guarantor"
next: not_sucess
- rental_not_possible:
assign:
- final_result: "House reserved or already rented"
next: not_sucess
Tendo em vista a simplificação do artigo, apenas iremos setar uma mensagem apropriada para cada caso de negativação e invocar a etapa not_sucess que irá postar no tópico de insucesso.
- not_sucess:
steps:
- prep_vars_not_sucess:
assign:
- msg: ${"{ 'person_id':"+ rent_request.person_id + ",'person_email':"+rent_request.person_email+",'house_id':"+ rent_request.house_id+", 'not_sucess_reason':" +final_result + "}"}
- base64_msg: ${base64.encode(json.encode(msg))}
- publish_not_sucess:
call: googleapis.pubsub.v1.projects.topics.publish
args:
topic: ${"projects/" + project + "/topics/" + not_sucess_topic}
body:
messages:
- data: ${base64_msg}
- finish:
next: return_result
Essa etapa posta no tópico de insucesso do PubSub.
No bloco de assign criamos uma mensagem com todos os dados que temos: código cliente, código da casa, email e motivo da negativação. Depois convertemos para base64, que é o que a API do PubSub(googleapis.pubsub.v1.projects.topics.publish) espera.
Por fim, fazemos um jump para o passo return_result para evitar que a etapa seguinte (de sucesso) não seja chamada também.
- rental_approved:
steps:
- prep_vars_sucess:
assign:
- msg: { "person_id": rent_request.person_id,"person_email": rent_request.person_email, "house_id": rent_request.house_id }
- base64_msg: ${base64.encode(json.encode(msg))}
- final_result: "rental approved"
- publish_sucess:
call: googleapis.pubsub.v1.projects.topics.publish
args:
topic: ${"projects/" + project + "/topics/" + sucess_topic}
body:
messages:
- data: ${base64_msg}
A etapa que posta no tópico de sucesso é muito semelhante à anterior. O que muda é o tópico onde postamos e a mensagem que não possui o motivo de insucesso.
Considerando as etapas anteriores podemos concluir que essa etapa só será chamada se o cliente for identificado, o score for maior que 80 e a casa estiver disponível.
A etapa final apenas faz um log e retorna o resultado final:
- return_result:
steps:
- log:
call: sys.log
args:
text: ${final_result}
severity: INFO
- return:
return: ${final_result}
sys.log, assim como base64.encode, json.encode, etc, fazem parte da Standard Library do GCP Workflows que possui uma série de funções muito úteis.
O log gerado acima pode ser consultado na Console GCP clicando no ID da respectiva execução:
Devido ao tamanho não irei colar aqui todo o workflow. Ele está disponível no repositório do GitHub.
Precisamos agora fazer o deploy do Workflow. Isso pode ser feito, na raíz do diretório do nosso projeto, com o comando:
gcloud workflows deploy ${WORKFLOW_NAME} --source=workflow.yaml
Esse mesmo comando é utilizado para atualizar o Workflow em caso de alteração do seu conteúdo.
Por fim, precisamos criar o gatilho com o Eventarc. Isso é feito apenas uma vez com o comando:
gcloud eventarc triggers create new-house-rent-request \
--location=${REGION} \
--service-account="${SERVICE_ACCOUNT}@${PROJECT_ID}.iam.gserviceaccount.com" \
--transport-topic=projects/real-state-wf/topics/${INCOMING_TOPIC_ID} \
--destination-workflow=${WORKFLOW_NAME} \
--destination-workflow-location=${REGION} \
--event-filters="type=google.cloud.pubsub.topic.v1.messagePublished"
Esse trigger irá ouvir todas as mensagens que são publicadas (“type=google.cloud.pubsub.topic.v1.messagePublished”) no tópico de entrada ( — transport-topic=projects/real-state-wf/topics/${INCOMING_TOPIC_ID}) disparando a execução do nosso Workflow ( — destination-workflow=${WORKFLOW_NAME} )
Testando nosso Workflow
Iremos usar a gcloud cli para postar mensagens no tópico de entrada com vários cenários diferentes. Os mocks possuem valores "chumbados" que permitem simularmos os comportamentos.
Cenários de insucesso:
#pesssoa não existe
gcloud pubsub topics publish ${INCOMING_TOPIC_ID} --message '{"person_id":"123.123.123-16", "person_email": "email@email.com", "photo_id":"a8ec3803-d2c5-4e0a-8c38-629840ad212", "house_id":"2"}'
#pesssoa não existe mas foi identificada por OCR/Biometria mas o score é baixo
gcloud pubsub topics publish ${INCOMING_TOPIC_ID} --message '{"person_id":"125.125.125-15", "person_email": "email@email.com", "photo_id":"a8ec3803-d2c5-4e0a-8c38-629840ad212", "house_id":"2"}'
#pesssoa existe mas o score exige fiador
gcloud pubsub topics publish ${INCOMING_TOPIC_ID} --message '{"person_id":"124.124.124-14", "person_email": "email@email.com", "photo_id":"a8ec3803-d2c5-4e0a-8c38-629840ad212", "house_id":"2"}'
#pesssoa existe, o score é bom mas a casa não está mais disponível
gcloud pubsub topics publish ${INCOMING_TOPIC_ID} --message '{"person_id":"123.123.123-13", "person_email": "email@email.com", "photo_id":"a8ec3803-d2c5-4e0a-8c38-629840ad212", "house_id":"1"}'
Colando e executando os comandos acima você terá um retorno como esse:
Isso indica que as mensagens foram postadas com sucesso.
Para verificar o resultado vamos consultar a subscrição do tópico de insucesso com o comando:
gcloud pubsub subscriptions pull not-sucess-topic-subscription --auto-ack --limit 10
O resultado também pode ser visto no Console da GCP, no meu de subscriptions do PubSub: https://console.cloud.google.com/cloudpubsub/subscription/list?project=real-state-wf
Por fim, vamos testar o caminho feliz:
#caminho feliz: pessoa encontrada, score bom e casa disponível
gcloud pubsub topics publish ${INCOMING_TOPIC_ID} --message '{"person_id":"123.123.123-13", "person_email": "email@email.com", "photo_id":"a8ec3803-d2c5-4e0a-8c38-629840ad212", "house_id":"2"}'
Consultando o resultado no tópico de saída de Sucesso:
#Consultando resultado
gcloud pubsub subscriptions pull sucess-topic-subscription --auto-ack --limit 10
No Console da GCP, na página do workflow (https://console.cloud.google.com/workflows/workflow/us-east1/house-rent/executions?project=real-state-wf) você pode ver todas as execuções do Workflow:
Clicando em cada ID você pode ver mais detalhes sobre cada execução, bem como tentar executá-la novamente com os mesmos parâmetros. Além disso encontra várias informações para troubleshooting:
Finalizando (por hora)…
Esse artigo apenas dá um overview sobre algumas das possibilidades do GCP Workflows. Existem vários outros comandos, steps paralelos, deploy com terraform, etc, que podem ser cobertos com outros artigos.
Por fim, você pode encontrar um Gist com todos os comandos utilizados aqui nesse link: https://gist.github.com/cassioeskelsen/bce24f51a9d02e9478c5daeabde2f114
Bons fluxos!