ReAG

Sólo voy a sumergirme en esto: Digamos que quieres mover datos de A a B usando un servicio de streaming de eventos como Snowplow, pero una vez que aterriza, quieres automatizar la prueba de los datos en sí también.

Claro, Snowplow puede probar que el esquema de eventos tiene sentido. Pero, ¿qué pasa si de repente tienes 1 millón de eventos en lugar de 10.000? Es probable que desees hacer una pausa para que tus costes de computación no se multipliquen por 100.

Nunca he utilizado Snowplow pero este es el enfoque:

Tabla de contenidos

Paso 1 – Acabar con la interfaz de usuario

Queremos automatizar las cosas de forma masiva. Esto significa que alguien debería ser capaz de hacer un cambio de configuración y empezar a mover datos de A a B usando Snowplow muy fácilmente.

Chat GPT dice:

GPT START – – – – – – – – – – – – – – – –

Qué hará esta configuración

Construyamos un flujo basado en Terraform donde los datos de seguimiento de marketing de los sitios web (a través de Snowplow JS Tracker) aterricen en:

  • S3 – para eventos sin procesar y enriquecidos
  • Snowflake – datos de eventos estructurados, listos para BI/análisis

Esto supone:

  • Eres un cliente de Snowplow Cloud.
  • Snowplow Cloud está entregando eventos a través de su proveedor de nube preferido (por ejemplo, AWS).
  • Tiene credenciales de Snowflake y una base de datos/esquema para utilizar.

Terraform Infraestructura: Visión general

Vamos a cubrir:

  1. Configuración de buckets S3 para eventos enriquecidos (fallback o archivo opcional)
  2. Configuración de un destino Snowflake a través de Snowplow (normalmente lo haces a través de Snowplow Console, pero la config de conexión al almacén también se puede gestionar a través de Terraform)
  3. Configuración de roles IAM para que Snowplow pueda escribir en S3 (si es necesario)
  4. Snowplow no ofrece módulos Terraform para todo, pero podemos automatizar la infra de soporte a su alrededor.

Snowplow Cloud + S3 (Código Terraform)

1. Crear S3 Buckets

resource "aws_s3_bucket" "snowplow_enriched_good" {   bucket = "marketing-data-snowplow-enriched"   force_destroy = true }
resource "aws_s3_bucket" "snowplow_enriched_bad" {
bucket = "marketing-data-snowplow-bad"
force_destroy = true
}

2. Rol IAM que Snowplow Cloud puede asumir

Snowplow Cloud necesitará escribir en su cubo. Definirá una relación de confianza y permisos

data "aws_iam_policy_document" "assume_role_policy" {
statement {
actions = ["sts:AssumeRole"]
principals {
type = "AWS"
identifiers = ["arn:aws:iam::<SNOWPLOW_CLOUD_ACCOUNT_ID>:root"]
}
}
}

resource "aws_iam_role" "snowplow_s3_delivery" {
name = "snowplow-s3-delivery-role"
assume_role_policy = data.aws_iam_policy_document.assume_role_policy.json
}resource "aws_iam_policy" "s3_access" {
name = "SnowplowS3AccessPolicy" policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Effect = "Allow",
Action = [
"s3:PutObject",
"s3:PutObjectAcl"
],
Resource = [
"${aws_s3_bucket.snowplow_enriched_good.arn}/*",
"${aws_s3_bucket.snowplow_enriched_bad.arn}/*"
]
}
]
})
}resource "aws_iam_role_policy_attachment" "attach_s3_access" {
role = aws_iam_role.snowplow_s3_delivery.name
policy_arn = aws_iam_policy.s3_access.arn
}

Proporcionarás este ARN de rol a Snowplow a través de su interfaz de usuario o a través de su equipo de soporte para finalizar la integración S3.

Snowplow to Snowflake (Overview + Terraform-Able Elements)

Snowplow gestionará la mayor parte de la canalización y carga de datos para Snowflake a través de su cargador. Sólo tiene que proporcionar:

  • Credenciales del almacén Snowflake
  • Esquema/base de datos
  • Y opcionalmente, un objeto de configuración Snowplow (normalmente vía UI o API)

Si quieres automatizar esto, esto es lo que puedes hacer:

3. Configuración de Snowflake mediante Terraform

provider "snowflake" {   username = var.snowflake_username   password = var.snowflake_password   account  = var.snowflake_account   role     = "SYSADMIN"   region   = "eu-west-1"  # Or wherever your Snowflake instance is }  resource "snowflake_database" "marketing_data" {   name = "MARKETING_DATA" } resource "snowflake_schema" "snowplow_events" {   database = snowflake_database.marketing_data.name   name     = "SNOWPLOW_EVENTS" } resource "snowflake_warehouse" "analytics" {   name         = "ANALYTICS_WH"   size         = "XSMALL"   auto_suspend = 60 }

Crear un usuario para Snowplow

resource "snowflake_user" "snowplow_loader" {
name = "SNOWPLOW_LOADER"
password = var.snowplow_loader_password
default_warehouse = snowflake_warehouse.analytics.name
default_role = "SYSADMIN"
default_schema = snowflake_schema.snowplow_events.name

A continuación, concedería los privilegios necesarios sobre las tablas y el esquema.

4. Conectándolo todo

Una vez que tengas:

✅ Rol IAM que Snowplow Cloud puede asumir

✅ Cubos creados

✅ Usuario Snowflake y esquema creados.

A continuación, dentro de Snowplow Console,

  1. Vaya a Destinos → Añadir nuevo → Elija S3 o Snowflake
  2. Para S3:
  • Pega el ARN del rol IAM
  • Define la partición de eventos si es necesario.
  1. Para Snowflake:
  • Proporcione la cadena de conexión JDBC
  • Proporcione el usuario/contraseña, esquema/base de datos de Snowflake.

Nota final

  • Snowplow Cloud abstrae gran parte de la complejidad: no es necesario desplegar colectores, enriquecedores, etc.
  • Utiliza Terraform para aprovisionar las partes que controla: S3, IAM y Snowflake.
  • Utiliza la consola o la API de Snowplow para configurar las canalizaciones/destinos.
  • También puede utilizar la API SaaS privada de Snowplow para programar estos destinos si desea automatizar aún más (póngase en contacto con su equipo para obtener documentación sobre la API).

CHAT GPT END – – – – – – – – – – – – – – – – – –

Ahora puedes ver que no hay código terraform aquí para automatizar el despliegue de recursos en Snowplow Private Cloud. No hay un proveedor de terraformación, pero hay una API, por lo que para esa parte necesitarías:

resource "null_resource" "create_snowflake_destination" {
provisioner "local-exec" {
command = <<EOT
curl -X POST https://<your-snowplow-api-endpoint>/destinations \
-H "Authorization: Bearer ${var.snowplow_api_token}" \
-H "Content-Type: application/json" \
-d '{
"type": "snowflake",
"name": "Marketing Warehouse",
"config": {
"host": "youraccount.region.snowflakecomputing.com",
"database": "MARKETING_DATA",
"schema": "SNOWPLOW_EVENTS",
"username": "SNOWPLOW_LOADER",
"password": "${var.snowflake_password}",
"warehouse": "ANALYTICS_WH",
"role": "SYSADMIN"
}
}'
EOT
}
}

Creo que esto es un poco anti-patrón. El objetivo de terraformar es facilitar las cosas. Si el objetivo es que alguien sea capaz de hacer fácilmente un cambio en una interfaz fácil de usar para mover datos de A a B usando Snowplow, se siente extraño que descuidemos la interfaz de usuario, en su lugar, controlando todo a través de terraform y backolving un proveedor de terraform usando la API, y obligar a los usuarios finales a añadir básicamente nuevos bits de configuración en el código bruto de terraform.

Inevitablemente entonces querrías construir una UI para hacer PRs que es la UI de Snowplow.

Conclusión – no matar a la interfaz de usuario

En realidad no deberíamos estar usando la interfaz de usuario en absoluto. Obviamente, si lo que queremos es controlar la versión de las cosas que suceden en Snowplow, entonces ese es un problema de Snowplow, no de nosotros (los usuarios). Estoy seguro de que pronto tendrán una integración git como con Orchestra (ver aquí).

Pruebas de calidad

Si hemos establecido que los usuarios van a utilizar la interfaz de usuario, supongamos que van a hacerlo y van a empezar a introducir datos en Snowflake.

Tenemos que ser capaces de controlar fácilmente lo que está sucediendo en Snowflake, para asegurar algunas cosas:

  1. El aterrizaje de datos es de buena calidad
  2. No hay un montón de aterrizaje de datos al azar.

Por desgracia, no hay una buena respuesta a [2]. Claro, podemos identificar proactivamente el # de tablas en una capa sin procesar en Snowflake, pero ¿cómo sabrás qué esperar?

Digamos que un usuario es responsable de establecer la configuración de Snowplow y de añadirla en otro lugar para decir «Hey, acabo de configurar Snowplow, estoy esperando el evento A, el evento B, el evento C como Tablas A, B y C» – inevitablemente este patrón falla ya que el usuario no tiene ningún incentivo para mantener esa lista actualizada.

La única solución a [2] es revisar los cambios en Snowplow y asegurarse de que cualquier cambio adicional está ahí.

Comprobación de datos aleatorios utilizando Orchestra

Esto es bastante fácil en Orchestra. Sólo tiene que añadir una prueba DQ que se ejecuta en su propio:

  6222388f-e2da-427e-a4cf-04c8854568ff:
tasks:
4cbfdd5d-3e1e-4108-8470-d5cd3bb1b74e:
integration: SNOWFLAKE
integrationJob: SNOWFLAKE_RUN_TEST
parameters:
statement: |
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'YOUR_SCHEMA_NAME'
AND table_type = 'BASE TABLE';
AND table_name in ['my','list']
error_threshold_expression: '> 0'
warn_threshold_expression: '> 0'
dependsOn: []
name: Check Tables
tags: []
dependsOn:
- dc18b9c2-b776-4ce6-a5b6-08965465a65c
name: ''

Sólo tienes que asegurarte de que cada vez que alguien añade una tabla actualizas la lista. Este paso tiene que ser manual. Alguien tiene que hacerlo. Automatizarlo no tiene sentido. El objetivo de la gobernanza es limitar lo que se crea, por lo que necesita que la lista (arriba) sea controlada en código en alguna parte.

Garantizar que los datos aterrizados son de buena calidad

Ahora bien, esta parte deberíamos automatizarla.

Digamos que tiene un buen proceso manual de trabajo, donde los analistas entran en la interfaz de usuario de Snowplow, crean nuevos productos de datos, y luego están actualizando muy diligentemente la lista de tablas a utilizar en Orchestra. Espléndido. Ya no tenemos aterrizajes de datos aleatorios.

El siguiente paso es automatizar la prueba de estos datos, y moverlos a un escenario.

Esto es bastante simple; puede crear las pruebas en Orchestra para que se ejecuten, y luego si tienen éxito, ejecutar una sentencia clone. Esto también debe tener un calendario en función del SLA requerido para los datos.

image
En este caso, usamos Fivetran como ejemplo – los datos fivetran aterrizan datos en un esquema snowlfake «raw». A continuación, probamos los datos para nulos, el número de filas, etc. Si los datos son correctos, los clonamos en un esquema «de confianza».

Esto son 88 líneas de código. En realidad, si lo escribieras tú, serían muchas menos, ya que perderías una línea por todas las líneas innecesarias (muchos de los campos que aparecen a continuación son opcionales, pero la interfaz de usuario de Orchestra los incluye todos para completar).

version: v1
name: Fivetran Tasks and snowflake tests
pipeline:
790e6d29-0f05-4f7b-a9a8-9342389a8a53:
tasks:
33a0be38-cec8-4660-9142-900230db9c06:
integration: FIVETRAN
integration_job: FIVETRAN_SYNC_ALL
parameters:
connector_id: manual_carrier
depends_on: []
condition: null
name: Fivetran
tags: []
connection: null
operation_metadata: null
treat_failure_as_warning: null
configuration: null
depends_on: []
condition: null
name: ''
6fa4a77c-9c49-442a-804e-c6574527e3e7:
tasks:
dc4ced09-16c0-45dd-9f55-55bc6160c391:
integration: SNOWFLAKE
integration_job: SNOWFLAKE_RUN_TEST
parameters:
statement: '

select * from bigquery_sample.public.hubspot_contacts where vid is null;

'
error_threshold_expression: '> 0'
warn_threshold_expression: '> 0'
role: ORCHESTRA_ROLE
database: BIGQUERY_SAMPLE
schema: PUBLIC
depends_on: []
condition: null
name: Test Nulls
tags: []
connection: snowflake_tables_24182
operation_metadata: null
treat_failure_as_warning: null
configuration: null
depends_on:
- 790e6d29-0f05-4f7b-a9a8-9342389a8a53
condition: null
name: ''
fcf487ef-539d-45b2-8119-a305a4a9bd9d:
tasks:
917c386b-4893-479f-a0a3-a3db877cb6d5:
integration: SNOWFLAKE
integration_job: SNOWFLAKE_RUN_TEST
parameters:
statement: '

select * from bigquery_sample.public.hubspot_contacts where Firstname
in (''James'',''Hugo'')'
error_threshold_expression: '> 0'
warn_threshold_expression: '> 0'
role: ORCHESTRA_ROLE
database: BIGQUERY_SAMPLE
schema: PUBLIC
depends_on: []
condition: null
name: Check Values
tags: []
connection: snowflake_tables_24182
operation_metadata: null
treat_failure_as_warning: null
configuration: null
depends_on:
- 790e6d29-0f05-4f7b-a9a8-9342389a8a53
condition: null
name: ''
0232d8db-2f31-4e17-a512-1505a9728198:
tasks:
004c9095-7207-4f0f-aa2a-7eec0dff1c00:
integration: SNOWFLAKE
integration_job: SNOWFLAKE_RUN_TEST
parameters:
statement: select * from bigquery_sample.public.hubspot_contacts;
error_threshold_expression: < 5
warn_threshold_expression: '>= 5'
role: ORCHESTRA_ROLE
database: BIGQUERY_SAMPLE
schema: PUBLIC
depends_on: []
condition: null
name: Check Rows
tags: []
connection: snowflake_tables_24182
operation_metadata: null
treat_failure_as_warning: null
configuration: null
depends_on:
- 790e6d29-0f05-4f7b-a9a8-9342389a8a53
condition: null
name: ''
86101df3-5503-4c77-ba4b-a7ddd35ff6a7:
tasks:
c58a86b5-55c6-4609-a72a-09515efe4c87:
integration: SNOWFLAKE
integration_job: SNOWFLAKE_RUN_QUERY
parameters:
statement: create or replace table bigquery_sample.trusted.hubspot_contracts_trusted
clone bigquery_sample.public.hubspot_contacts;
role: ORCHESTRA_ROLE
set_outputs: false
depends_on: []
condition: null
name: 'Move to trusted '
tags: []
connection: null
operation_metadata: null
treat_failure_as_warning: null
configuration: null
depends_on:
- fcf487ef-539d-45b2-8119-a305a4a9bd9d
- 0232d8db-2f31-4e17-a512-1505a9728198
- 6fa4a77c-9c49-442a-804e-c6574527e3e7
condition: null
name: ''
schedule: []
sensors: {}
trigger_events: []
webhook:
enabled: false
operation_metadata: null
run_inputs: null
configuration: null
inputs:
LoadDate:
type: string
default: ${{ ORCHESTRA.CURRENT_TIME }}
optional: null

Opción 1: crear el pipeline manualmente

Cada vez que se crea una nueva fuente, el usuario debe clonar un pipeline y volver a crearlo. Funciona bien, pero es manual. Es bueno para que el usuario se haga cargo de las cosas de principio a fin.

El usuario probablemente acabaría ignorando las pruebas.

Opción 2: forzar un PR automáticamente

Algo que podrías hacer es crear un PR para orquestar automáticamente dependiendo de lo que esté pasando en tu esquema.

Ejecutas una tarea Snowflake para obtener los datos más recientes. Digamos que encuentras una nueva tabla:

select table_name from tables where table_name not in <list>`

Devuelve:

[“new” , “new”]

Ahora, puedes establecer esto como salida y pasarlo a una tarea python predefinida. Esta tarea python va a crear un PR con una especie de estructura vacía:

import os
import tempfile
from github import Github
from git import Repo
import shutil
import yaml

# Load environment variables
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN")
REPO_NAME = os.getenv("GITHUB_REPO") # Format: "org/repo"
TABLES = os.getenv("TABLE_NAMES", "")
TABLE_LIST = [t.strip() for t in TABLES.split(",") if t.strip()]

# YAML content (same for each file)
def generate_yaml_content(table_name):
return f"""version: v1
name: Fivetran Tasks and snowflake tests
pipeline:
790e6d29-0f05-4f7b-a9a8-9342389a8a53:
tasks:
dq_test_1_nulls_{table_name}:
tasks:
dc4ced09-16c0-45dd-9f55-55bc6160c391:
integration: SNOWFLAKE
integration_job: SNOWFLAKE_RUN_TEST
parameters:
statement: '

select * from
bigquery_sample.public.hubspot_contacts where {column_name} is null;
'
error_threshold_expression: '> 0'
warn_threshold_expression: '> 0'
role: ORCHESTRA_ROLE
database: BIGQUERY_SAMPLE
schema: PUBLIC
depends_on: []
condition: null
name: Test Nulls
tags: []
connection: snowflake_tables_24182
operation_metadata: null
treat_failure_as_warning: null
configuration: null
depends_on:
- 790e6d29-0f05-4f7b-a9a8-9342389a8a53
condition: null
name: ''
fcf487ef-539d-45b2-8119-a305a4a9bd9d:
tasks:
917c386b-4893-479f-a0a3-a3db877cb6d5:
integration: SNOWFLAKE
integration_job: SNOWFLAKE_RUN_TEST
parameters:
statement: '

select * from bigquery_sample.public.hubspot_contacts where Firstname
in (''James'',''Hugo'')'
error_threshold_expression: '> 0'
warn_threshold_expression: '> 0'
role: ORCHESTRA_ROLE
database: BIGQUERY_SAMPLE
schema: PUBLIC
depends_on: []
condition: null
name: Check Values
tags: []
connection: snowflake_tables_24182
operation_metadata: null
treat_failure_as_warning: null
configuration: null
depends_on:
- 790e6d29-0f05-4f7b-a9a8-9342389a8a53
condition: null
name: ''
0232d8db-2f31-4e17-a512-1505a9728198:
tasks:
004c9095-7207-4f0f-aa2a-7eec0dff1c00:
integration: SNOWFLAKE
integration_job: SNOWFLAKE_RUN_TEST
parameters:
statement: select * from bigquery_sample.public.hubspot_contacts;
error_threshold_expression: < 5
warn_threshold_expression: '>= 5'
role: ORCHESTRA_ROLE
database: BIGQUERY_SAMPLE
schema: PUBLIC
depends_on: []
condition: null
name: Check Rows
tags: []
connection: snowflake_tables_24182
operation_metadata: null
treat_failure_as_warning: null
configuration: null
depends_on:
- 790e6d29-0f05-4f7b-a9a8-9342389a8a53
condition: null
name: ''
86101df3-5503-4c77-ba4b-a7ddd35ff6a7:
tasks:
c58a86b5-55c6-4609-a72a-09515efe4c87:
integration: SNOWFLAKE
integration_job: SNOWFLAKE_RUN_QUERY
parameters:
statement: create or replace table bigquery_sample.trusted.hubspot_contracts_trusted
clone bigquery_sample.public.hubspot_contacts;
role: ORCHESTRA_ROLE
set_outputs: false
depends_on: []
condition: null
name: 'Move to trusted '
tags: []
connection: null
operation_metadata: null
treat_failure_as_warning: null
configuration: null
depends_on:
- fcf487ef-539d-45b2-8119-a305a4a9bd9d
- 0232d8db-2f31-4e17-a512-1505a9728198
- 6fa4a77c-9c49-442a-804e-c6574527e3e7
condition: null
name: ''
schedule: []
sensors: {}
trigger_events: []
webhook:
enabled: false
operation_metadata: null
run_inputs: null
configuration: null
inputs:
LoadDate:
type: string
default: ${{ ORCHESTRA.CURRENT_TIME }}
optional: null
"""

# Authenticate with GitHub
g = Github(GITHUB_TOKEN)
repo = g.get_repo(REPO_NAME)

for table in TABLE_LIST:
branch_name = f"add-orchestra-job-{table.lower().replace('.', '-')}"
file_path = f"Orchestra/{table}.yml"

with tempfile.TemporaryDirectory() as tmp_dir:
local_repo = Repo.clone_from(repo.clone_url.replace("https://", f"https://{GITHUB_TOKEN}@"), tmp_dir, branch='main')
git = local_repo.git

# Create new branch
git.checkout('HEAD', b=branch_name)

# Write YAML file
orchestra_dir = os.path.join(tmp_dir, "Orchestra")
os.makedirs(orchestra_dir, exist_ok=True)
full_file_path = os.path.join(orchestra_dir, f"{table}.yml")
with open(full_file_path, "w") as f:
f.write(generate_yaml_content(table))

# Commit and push
local_repo.index.add([full_file_path])
local_repo.index.commit(f"Add Orchestra job for {table}")
origin = local_repo.remote(name='origin')
origin.push(refspec=f"{branch_name}:{branch_name}")

# Create PR
repo.create_pull(
title=f"Add Orchestra job for {table}",
body=f"This adds an Orchestra pipeline config for the `{table}` table.",
head=branch_name,
base="main"
)

print("All PRs created.")

Aquí podemos ser muy inteligentes. En realidad podemos pre-poblar toda la pipeline con valores ficticios. No he hecho esto para cada tarea, pero imagina que quieres, como mínimo, sugerir pruebas para:

  • Nulos
  • Singularidad
  • Asegurarse de que las cosas no suben 10 veces

Puedes escribir esas pruebas una vez y luego crear automáticamente el PR con esas tres pruebas.

image 1
Parametrización de las pruebas

Mira aquí – vamos a terminar añadiendo valores ficticios que podrían ser parametrizados sobre la marcha como nombre_columna. Pero lo ideal es que el nombre de la columna para, por ejemplo, una comprobación de unicidad sea estático (porque entonces puedes rechazar cualquier cosa que no tenga el nombre de columna de unicidad que deseas, obligando así a todo el mundo a aterrizar datos con la clave unqiue en una columna con el mismo nombre, lo que crea consistencia).

Aquí es donde entra git-bridge. Con Orchestra, puede tirar en una pipeline de cualquier rama. El usuario puede entonces importar el pipeline desde git (habremos creado un PR en una nueva rama, recuerde) y editarlo en la UI

image 2
Creación de un nuevo pipeline a partir de una nueva rama
Snowplow
Mirando el pipeline que se creó automáticamente

Lo ideal sería que este RP se creara automáticamente cuando se añadiera el código del Quitanieves. Pero dada la Parte 1, esto parece poco práctico. En su lugar, optamos por la comprobación de datos en el lugar y, a continuación, la creación de Orquesta pipelines de forma dinámica, pero utilizando git – para evitar todo lo dispersa-gunning.

Conclusión – ¿Orchestra para DevOps?

Lo bueno de usar Orchestra aquí es que estamos usando Orchestra para:

  • Comprobar los datos una vez que llegan
  • Crear un PR en Github que a su vez crea un nuevo pipeline de Orchestra para la tabla recién creada
  • Usar Orchestra para procesar automática e incrementalmente esos datos una vez que el PR se fusiona y los datos fluyen a raw desde Snowplow y luego a Trusted vía Orchestra usando DQ Checks y una sentencia clone/upsert

Como he dicho, normalmente se manejarían [1] y [2] al mismo tiempo que el aprovisionamiento de infra en Snowplow utilizando código, pero en ausencia de eso estamos aprovechando la integración de Orchestra Python para hacerlo en su lugar. Obviamente, no tienes que hacer [2] en Orquesta, podrías hacer eso como una acción Git y luego desencadenar que de Orquesta.

Hugo Lu

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *