I appreciate your help.
I am encountering an issue with the new GraphAPI v17 update. I was using an API ("from facebook_business.adobjects.offlineconversiondataset import OfflineConversionDataSet"), but with this new version of GraphAPI, it no longer works because it is said to have been merged with the Conversions API, meaning the Offline Conversions API does not function in recent GraphAPI versions.
However, I have not found any explanation on how to continue working with Offline Data Sources. In my integration, I did the following:
Here is the code for better explanation:
from facebook_business.api import FacebookAdsApi from facebook_business.adobjects.offlineconversiondataset import OfflineConversionDataSet import json import hashlib from datetime import date, datetime import boto3
dbutils.widgets.text("marca", "") marca = dbutils.widgets.get("marca")
if brand == "brand1": cd_brand = 1 fb_id_brand = 'example id 00000000000000' elif brand == "brand2": cd_brand = 2 fb_id_brand = 'example id 00000000000000' elif brand == "brand3": cd_brand = 4 fb_id_brand = 'example id 00000000000000' else: raise RuntimeError("Brand Not Found")
date = datetime.today().strftime("%Y-%m-%d") query_midias = spark.sql(f"""SELECT * FROM your_table""")
df = query_midias
df = df.selectExpr( "event_time", "event_name", "replace(value, ',', '.') as value", "currency", "data_processing_options", "sha2(trim(lower(email)), 256) as em", "sha2(ltrim(regexp_replace(phone, '[^0-9]', ''), '0'), 256) as ph", "sha2(trim(lower(fn)), 256) as fn", "sha2(trim(lower(ln)), 256) as ln", "sha2(regexp_replace(trim(dt_nascimento), '[^a-zA-Z0-9]', ''), 256) as db", "sha2(regexp_replace(translate(trim(lower(cidade)), 'áãâéêíóôõúÁÃÂÉÊÍÓÔÕÚ', 'aaaeeiooouAAAEEIOOOU'), ' ', ''), 256) as ct", "sha2(regexp_replace(CAST(venda_id AS STRING), '[^a-zA-Z0-9]', ''), 256) as order_id", "sha2(regexp_replace(sku, '[^a-zA-Z0-9]', ''), 256) as item_code", "sha2(regexp_replace(trim(lower(grupo_produto)), '[^a-zA-Z0-9]', ''), 256) as grupo_produto", "sha2(regexp_replace(trim(lower(subcategoria)), '[^a-zA-Z0-9]', ''), 256) as subcategoria", "sha2(regexp_replace(trim(lower(nm_segmento)), '[^a-zA-Z0-9]', ''), 256) as nm_segmento" )
rdd = df.rdd.map(lambda row: ( row['event_time'], row['event_name'], row['data_processing_options'], json.dumps({k: [row[k]] for k in ['em', 'ph', 'fn', 'ln', 'ct', 'db']}), json.dumps({ 'value': row['value'], 'currency': row['currency'], 'order_id': row['order_id'], 'item_code': row['item_code'], 'grupo_produto': row['grupo_produto'], 'subcategoria': row['subcategoria'], 'nm_segmento': row['nm_segmento'] }) ))
df = rdd.toDF([ 'event_time', 'event_name', 'data_processing_options', 'match_keys', 'custom_data' ])
data = df.toPandas().to_dict(orient="records")
session = boto3.session.Session() client = session.client(service_name='secretsmanager', region_name='us-east-1') response = client.get_secret_value(SecretId='your_secret_id') database_secrets = json.loads(response['SecretString']) app_id = database_secrets['app_id'] app_secret = database_secrets['app_secret'] access_token = database_secrets['access_token']
fb = FacebookAdsApi.init(app_id=app_id, app_secret=app_secret, access_token=access_token) fb.offline_dataset = OfflineConversionDataSet(fb_id_brand)
batch_size = 1000
from datetime import date
for i in range(0, len(data), batch_size):
params = {
'upload_tag': date.today().strftime('%Y-%m-%d') + " - Automation " + event_name + " Events",
'data': data[i:i+batch_size]
}
fb.offline_dataset.create_event(params=params)
same issue. Did you find a solution?
Yes Paolo, I created my own method using pyspark.
first of all you will need to you need to create a new dataset.. a normal dataset not a offline one in Facebook.
You can have it:
from facebook_business.api import FacebookAdsApi from facebook_business.adobjects.business import Business from facebook_business.adobjects.adaccount import AdAccount import json import hashlib from datetime import date import boto3 from datetime import datetime from pyspark.sql.functions import sha2, lower, trim, regexp_replace, translate import re import requests import time
session = boto3.session.Session() client = session.client( service_name='secretsmanager', region_name='.....' ) response = client.get_secret_value( SecretId='mysecret in aws' ) database_secrets = json.loads(response['SecretString']) app_id = (database_secrets['app_id']) app_secret= (database_secrets['app_secret']) access_token = (database_secrets['access_token'])
FacebookAdsApi.init(app_id=app_id, app_secret=app_secret, access_token=access_token)
date = datetime.today().strftime("%Y-%m-%d")
query = spark.sql( f"""
SELECT * FROM MY_QUERY_SQL
""" )
df = query
hashed_df = df.selectExpr( "event_time", "event_name", "replace(value, ',', '.') as value", "currency", "data_processing_options", "sha2(trim(lower(email)), 256) as em", "sha2(ltrim(regexp_replace(phone, '[^0-9]', ''), '0'), 256) as ph", "sha2(trim(lower(fn)), 256) as fn", "sha2(trim(lower(ln)), 256) as ln", "sha2(regexp_replace(trim(dt_birth), '[^a-zA-Z0-9]', ''), 256) as db", "sha2(regexp_replace(translate(trim(lower(city)), 'áãâéêíóôõúÁÃÂÉÊÍÓÔÕÚ', 'aaaeeiooouAAAEEIOOOU'), ' ', ''), 256) as ct", "sha2(regexp_replace(CAST(id_sell AS STRING), '[^a-zA-Z0-9]', ''), 256) as order_id", "sha2(regexp_replace(sku, '[^a-zA-Z0-9]', ''), 256) as item_code", "sha2(regexp_replace(trim(lower(group)), '[^a-zA-Z0-9]', ''), 256) as group", "sha2(regexp_replace(trim(lower(category)), '[^a-zA-Z0-9]', ''), 256) as category", "sha2(regexp_replace(trim(lower(segment)), '[^a-zA-Z0-9]', ''), 256) as segment" )
events_data = [row.asDict() for row in hashed_df.collect()]
url = f'https://graph.facebook.com/v19.0/{dataset_id}/events' BATCH_SIZE = 1000
def send_event_batch(batch): payload = { 'data': batch, 'access_token': access_token } response = requests.post(url, json=payload) if response.ok: print(f"sended to dataset: {dataset_id} in brand: {brand}.") else: print(f"Error. Response: {response.status_code} - {response.text}")
for i in range(0, len(events_data), BATCH_SIZE): batch = events_data[i:i + BATCH_SIZE] events_payload = []
for event in batch:
user_data = {
'em': event['em'],
'ph': event['ph'],
'fn': event['fn'],
'ln': event['ln'],
'db': event['db'],
'ct': event['ct'],
}
custom_data = {
'value': event['value'],
'currency': event['currency'],
'order_id': event['order_id'],
'item_number': event['item_code'],
'product_group': event['grupo_produto'],
'product_subcategory': event['subcategoria'],
'segment_name': event['nm_segmento'],
}
event_payload = {
'event_name': event['event_name'],
'event_time': event['event_time'],
'action_source': 'physical_store',
'user_data': user_data,
'custom_data': custom_data,
}
events_payload.append(event_payload)
send_event_batch(events_payload)