retenção de dados Snowflake
A Braze anonimiza, ou seja, remove informações pessoalmente identificáveis (IPI), de todos os dados de eventos com mais de dois anos. Se você usar o compartilhamento de dados do Snowflake, poderá optar por reter os dados completos dos eventos em seu ambiente armazenando uma cópia em sua conta do Snowflake antes que a política de retenção seja aplicada.
Esta página apresenta duas maneiras de reter dados não anonimizados:
- Copie seus dados para outro banco de dados Snowflake
- Descarregue seus dados para um estágio
Braze anonimamente os dados de eventos para usuários que são excluídos do Braze, conforme descrito em Assistência Técnica de Proteção de Dados. Nenhum dado copiado fora do banco de dados compartilhado será incluído neste processo, pois a Braze não o gerencia mais.
Copie todos os dados para outro banco de dados Snowflake
Você pode reter dados não anonimizados copiando seus dados do esquema compartilhado BRAZE_RAW_EVENTS
para outro banco de dados e esquema no Snowflake. Para fazer isso, siga estas etapas:
- Na sua conta Snowflake, crie o procedimento
COPY_BRAZE_SHARE
, que será usado para copiar todos os dados compartilhados pela Braze para outro banco de dados e esquema dentro do Snowflake.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
CREATE PROCEDURE COPY_BRAZE_SHARE(
SOURCE_DATABASE STRING, -- Database name of the braze data share
SOURCE_SCHEMA STRING, -- Schema name of the braze data share
DESTINATION_DATABASE STRING, -- Name of the database to which you want to copy shared the data
DESTINATION_SCHEMA STRING, -- Name of the schema to which you want to copy shared the data
MAX_DATE DATE default DATEADD(year, -2, CURRENT_DATE()), -- Copy data on or before the maximum date default DATEADD(year, -2, CURRENT_DATE())
TABLE_NAME_FILTER STRING default 'USERS_%' -- Filter to select table that will be unloaded, default to 'USER_%'
)
RETURNS TABLE (TABLE_NAME STRING, SUCCESS BOOLEAN, INFO STRING)
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'run'
AS
$$
import snowflake.snowpark as snowpark
from snowflake.snowpark.exceptions import SnowparkSQLException
def run(session: snowpark.Session, SOURCE_DATABASE: str, SOURCE_SCHEMA: str, DESTINATION_DATABASE: str, DESTINATION_SCHEMA: str, MAX_DATE: str, TABLE_NAME_FILTER: str):
result = []
-- Get the list of filtered table names
table_query = f"""
SELECT table_name
FROM {SOURCE_DATABASE}.INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = '{SOURCE_SCHEMA}' AND table_name LIKE '{TABLE_NAME_FILTER}'
"""
tables = session.sql(table_query).collect()
-- Iterate through each table and copy data
for row in tables:
table_name = row['TABLE_NAME']
-- Skip archive tables
if table_name.endswith('_ARCHIVED'):
continue
-- Check if the destination table exists
check_table_query = f"""
SELECT COUNT(*) as count
FROM {DESTINATION_DATABASE}.INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = '{DESTINATION_SCHEMA}' AND TABLE_NAME = '{table_name}'
"""
table_exists = session.sql(check_table_query).collect()[0]['COUNT'] > 0
if table_exists:
-- Find the current, most recent `SF_CREATED_AT` in the existing table
cur_max_date = None
date_query = f"""
SELECT MAX(SF_CREATED_AT) as CUR_MAX_DATE
FROM {DESTINATION_DATABASE}.{DESTINATION_SCHEMA}.{table_name}
"""
date_result = session.sql(date_query).collect()
if date_result:
cur_max_date = date_result[0]['CUR_MAX_DATE']
if cur_max_date:
-- If the destination table is not empty, only add data that is newer than `cur_max_date` and older than`MAX_DATE`
copy_query = f"""
INSERT INTO {DESTINATION_DATABASE}.{DESTINATION_SCHEMA}.{table_name}
SELECT * FROM {SOURCE_DATABASE}.{SOURCE_SCHEMA}.{table_name}
WHERE SF_CREATED_AT <= '{MAX_DATE}'
AND SF_CREATED_AT > '{cur_max_date}'
"""
else:
-- If the destination table is empty, copy all data before `MAX_DATE`
copy_query = f"""
INSERT INTO {DESTINATION_DATABASE}.{DESTINATION_SCHEMA}.{table_name}
SELECT * FROM {SOURCE_DATABASE}.{SOURCE_SCHEMA}.{table_name}
WHERE SF_CREATED_AT <= '{MAX_DATE}'
"""
else:
-- If the table doesn't exist, create it and copy data
copy_query = f"""
CREATE TABLE {DESTINATION_DATABASE}.{DESTINATION_SCHEMA}.{table_name} AS
SELECT * FROM {SOURCE_DATABASE}.{SOURCE_SCHEMA}.{table_name}
WHERE SF_CREATED_AT <= '{MAX_DATE}'
"""
try:
session.sql(copy_query).collect()
result.append([table_name, True, ""])
except SnowparkSQLException as e:
result.append([table_name, False, str(e)])
-- Return the results
return session.create_dataframe(result, schema=['TABLE_NAME', 'SUCCESS', 'INFO'])
$$;
2. Execute um dos comandos abaixo na sua conta do Snowflake para executar o procedimento.
Por padrão, o procedimento fará backup dos dados com mais de dois anos para todos os tipos de eventos USERS_*
.
1
2
3
4
-- Copy all the rows that are two years or older in all the 'USERS_*' tables
-- from 'SOURCE_DB'.'SOURCE_SCHEMA' to 'DEST_DB'.'DEST_SCHEMA'
CALL COPY_BRAZE_SHARE('SOURCE_DB', 'SOURCE_SCHEMA', 'DEST_DB', 'DEST_SCHEMA')
Especifique um filtro para escolher quais dados de idade fazer backup e especifique um filtro de nome de tabela para fazer backup apenas das tabelas de eventos selecionadas.
1
2
3
4
-- Copy all the rows that are one year or older in all the 'USERS_BEHAVIORS_*' tables
-- from 'SOURCE_DB'.'SOURCE_SCHEMA' to 'DEST_DB'.'DEST_SCHEMA'
CALL COPY_BRAZE_SHARE('SOURCE_DB', 'SOURCE_SCHEMA', 'DEST_DB', 'DEST_SCHEMA', DATEADD(year, -1, CURRENT_DATE()), 'USERS_BEHAVIORS_%')
Executar repetidamente o procedimento não criará registros duplicados porque ele verifica o SF_CREATED_AT
mais recente e só faz backup de dados posteriores.
Descarregar dados para o estágio
Você pode reter dados não anonimizados descarregando dados do esquema compartilhado BRAZE_RAW_EVENTS
para um estágio. Para fazer isso, siga estas etapas:
- Crie o procedimento
UNLOAD_BRAZE_SHARE
, que será usado para copiar todos os dados compartilhados pela Braze para o estágio especificado.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
CREATE PROCEDURE UNLOAD_BRAZE_SHARE(
SOURCE_DATABASE STRING, -- Database name of the braze data share
SOURCE_SCHEMA STRING, -- Schema name of the braze data share
STAGE_NAME STRING, -- Snowflake stage where the data will be unloaded
MIN_DATE DATE, -- Copy data from this date (inclusive)
MAX_DATE DATE, -- Copy data till this date (exclusive)
TABLE_NAME_FILTER STRING default 'USERS_%' -- Filter to select table that will be unloaded, default to 'USER_%'
)
RETURNS TABLE (TABLE_NAME STRING, SUCCESS BOOLEAN, INFO STRING)
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'run'
AS
$$
import snowflake.snowpark as snowpark
from snowflake.snowpark.exceptions import SnowparkSQLException
def run(session: snowpark.Session, DATABASE_NAME: str, SCHEMA_NAME: str, STAGE_NAME: str, MIN_DATE: str, MAX_DATE: str, TABLE_NAME_FILTER: str):
result = []
if MIN_DATE >= MAX_DATE:
result.append(["MIN_DATE cannot be more recent than MAX_DATE", False, ""])
return session.create_dataframe(result, schema=['TABLE_NAME', 'SUCCESS', 'INFO'])
-- Get list of tables
table_query = f"""
SELECT TABLE_NAME
FROM {DATABASE_NAME}.INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = '{SCHEMA_NAME}' AND TABLE_NAME LIKE '{TABLE_NAME_FILTER}'
"""
tables = session.sql(table_query).collect()
for table in tables:
table_name = table['TABLE_NAME']
-- Skip archive tables
if table_name.endswith('_ARCHIVED'):
continue
-- Create CSV file name
csv_file_name = f"{table_name}_{MIN_DATE}_{MAX_DATE}.csv"
-- Construct `COPY INTO` command with date filter
copy_cmd = f"""
COPY INTO @{STAGE_NAME}/{csv_file_name}
FROM (
SELECT *
FROM {DATABASE_NAME}.{SCHEMA_NAME}.{table_name}
WHERE SF_CREATED_AT >= TO_DATE('{MIN_DATE}') and SF_CREATED_AT < TO_DATE('{MAX_DATE}')
)
FILE_FORMAT = (TYPE = CSV FIELD_OPTIONALLY_ENCLOSED_BY = '"')
HEADER = TRUE
OVERWRITE = FALSE
"""
-- Execute COPY INTO command
try:
session.sql(copy_cmd).collect()
result.append([table_name, True, csv_file_name])
except SnowparkSQLException as e:
result.append([table_name, False, str(e)])
return session.create_dataframe(result, schema=['TABLE_NAME', 'SUCCESS', 'INFO'])
$$;
2. Execute um dos comandos abaixo para executar o procedimento.
Por padrão, o procedimento copiará todas as tabelas com o prefixo USERS_
.
1
2
3
4
5
6
7
8
9
10
-- Create a Snowflake stage to store the file
create stage MY_EXPORT_STAGE;
-- Call the procedure
-- to unload date between '2020-01-01' and '2021-01-01'
-- from tables with 'USERS_' prefix in 'DATABASE_NAME'.'SCHEMA'
CALL UNLOAD_BRAZE_SHARE('DATABASE_NAME', 'SCHEMA', 'MY_EXPORT_STAGE', '2020-01-01', 2021-01-01');
-- List the files that are unloaded
LIST @MY_EXPORT_STAGE;
Especifique um filtro no procedimento para descarregar apenas as tabelas especificadas.
1
2
3
4
5
6
7
8
9
-- Create a Snowflake stage to store the file
create stage MY_EXPORT_STAGE;
-- Unload date between '2020-01-01' and '2021-01-01'
-- from tables with 'USERS_BEHAVIORS_' prefix in 'DATABASE_NAME'.'SCHEMA'
CALL EXPORT_BRAZE_SHARE_TO_STAGE('DATABASE_NAME', 'SCHEMA', 'MY_EXPORT_STAGE', '2020-01-01', 2021-01-01', 'USERS_BEHAVIORS_%');
-- List the files that are unloaded
LIST @MY_EXPORT_STAGE;