from fastapi import Request, Query, Depends, Header, APIRouter, HTTPException
from fastapi.responses import JSONResponse
import pytz
import os
import uuid
from datetime import datetime, timedelta
from connectors.firebase.firebase import Firebase
import utility.function as func
from connectors.bigquery.bq import BigQuery
import concurrent.futures

from models.batch import *

import logging
logging.basicConfig(level=logging.INFO)
timezone_utc = pytz.utc
timezone_bkk = pytz.timezone('Asia/Bangkok')

fb = Firebase(host=os.environ.get("FIREBASE_HOST"))

router = APIRouter()

LOGGING_PREFIX = "api_job_batch"

@router.post("/data/user", description="Batch job to update user data")
async def post_api_job_batch_user(default_request: Request):
    import pandas as pd
    try:
        allAccount = fb.db.reference().child("account").get(shallow=True)
        for acc in allAccount:
            allProfile = fb.db.reference().child(f"account/{acc}/profile").get()
            transformedData = []
            for pro in allProfile:
                update_at = allProfile[pro]['updated_at']
                update_at_datetime = datetime.fromisoformat(update_at)
                # change type to check only date not timezone
                if func.Function.recent_by_date_only(update_at_datetime):
                    userProperty = []
                    for prop in allProfile[pro]:
                        if prop not in ['created_at', 'user_pseudo_id', 'updated_at']:
                            propContext = allProfile[pro][prop]
                            # Get ID context
                            listId = []
                            for log in propContext:
                                listId.append(str(propContext[log]['id']))

                            channel_context = {
                                "key": prop,
                                "value": list(set(listId))
                            }
                            userProperty.append(channel_context)
                    transfromformat = {
                        "user_pseudo_id": allProfile[pro]['user_pseudo_id'],
                        "created_at": allProfile[pro].get('created_at', allProfile[pro]['updated_at']),
                        "lastupdate": allProfile[pro]['updated_at'],
                        "userProperty": userProperty,
                    }
                    transformedData.append(transfromformat)
            
            df = pd.DataFrame(transformedData)
            if not df.empty:
                df['created_at']  = func.Function.parse_mixed_to_utc(df['created_at'])
                df['lastupdate']  = func.Function.parse_mixed_to_utc(df['lastupdate'])
                df.drop_duplicates(subset='user_pseudo_id', inplace=True)
                df["userProperty"] = df["userProperty"].apply(lambda x: [x] if isinstance(x, dict) else x)
                
                #Bigquery task delete when match
                bq = BigQuery()
                bq.delete_data("customer-360-profile", f"client_{acc}", "user_temp")
                bq.load_data_df(f"client_{acc}", "user_temp", df)
                condition = "ON (ori.user_pseudo_id = temp.user_pseudo_id) "
                bq.delete_when_match("customer-360-profile", f"client_{acc}", "user", f"client_{acc}", "user_temp", condition)
                bq.load_data_df(f"client_{acc}", "user", df)

                logging.info(f"Property {acc} update user successfully")

        return JSONResponse(status_code=200, content={"status": "success", "message": "User profile update to Bigquery successfully"}) 
    except Exception as e:
        logging.error(f"Error parsing {LOGGING_PREFIX}_{default_request.path_params}: {e}")
        return JSONResponse(status_code=500, content={'status': 'error', 'message': str(e)})

@router.post("/data/dimension/suggestion", description="Batch job to update dimension suggestions")
async def post_api_job_batch_dimension_suggestion(default_request: Request):
    try:
        bq = BigQuery()
        accounts = fb.db.reference().child(f"account").get(shallow=True)
        for acc in accounts:
            #Event
            query = f"""
            SELECT 
                main.eventName as event_name, 
                ep.key as key,
                ep.value as value 
            FROM `{os.environ.get("GCP_PROJECT")}.client_{acc}.event` as main
                LEFT JOIN UNNEST(eventProperty) as ep
            WHERE main.eventTimeStamp >=  DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
                AND ep.key NOT IN ("message", "event_timestamp", "facebook_name", "ps_id","target_id")
            GROUP BY 1,2,3
            ORDER BY 1,2
            """
            df = bq.get_query_df(query)
            if df.empty:
                continue

            nested = {}

            for _, row in df.iterrows():
                event = row['event_name']
                key = row['key']
                value = row['value']

                # Ensure nesting
                if event not in nested:
                    nested[event] = {}
                if key not in nested[event]:
                    nested[event][key] = set()
                
                # Add value
                nested[event][key].add(value)

            # Convert sets to lists for JSON serializability
            nested_json = {e: {k: list(v) for k, v in keys.items()} for e, keys in nested.items()}
            
            fb.db.reference().child(f"account/{acc}/cache/suggestion/event").set(nested_json)

            #Add
            queryAd = f"""
            SELECT main.referral.ad_id, main.referral.ads_context_data.ad_title
            FROM {os.environ.get("GCP_PROJECT")}.client_{acc}.event as main 
            WHERE main.eventTimeStamp >=  DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
                AND main.referral.ad_id IS NOT NULL
            GROUP BY 1, 2
            ORDER BY 1;
            """
            df = bq.get_query_df(queryAd)
            if df.empty:
                continue
            
            dictData = {
                "ad_id": list(df['ad_id'].unique()),
                "ad_title": list(df['ad_title'].unique()),
            }

            fb.db.reference().child(f"account/{acc}/cache/suggestion/ad_refferal").set(dictData)
        return JSONResponse(status_code=200, content={"status": "success", "message": "Dimension suggestion update to Bigquery successfully"})
    except Exception as e:
        logging.error(f"Error parsing {LOGGING_PREFIX}_{default_request.path_params}: {e}")
        return JSONResponse(status_code=500, content={'status': 'error', 'message': str(e)})

@router.post("/data/profile_explore/looker", description="Batch job to update profile explore from looker")
async def post_api_job_batch_profile_explore(default_request: Request):
    try:
        from connectors.looker.lookerSDK import LookerSDK
        import json
        import ast
        import pandas as pd
        from concurrent.futures import ThreadPoolExecutor, as_completed
        bq = BigQuery()
        func_key = func.Key()
        private_key = func_key.load_key_from_env(env_var_name="private_key")
        def get_line_name(fb,property_id, user_pseudo_id, uid):
            lines = fb.db.reference().child(f"account/{property_id}/profile/{user_pseudo_id}/line").get()
            line_name = '-'
            for l in lines:
                context = lines[l]
                if context['id'] != uid:
                    continue
                else:
                    line_name = context.get('displayName', '-')
                    break
            
            return {
                        "user.user_pseudo_id": user_pseudo_id,
                        "user__user_property.key": "line_name",
                        "user__user_property.value": [line_name]
                    }

        def get_line_profile(fb,property_id, user_pseudo_id, uid):
            lines = fb.db.reference().child(f"account/{property_id}/profile/{user_pseudo_id}/line").get()
            line_profile = '-'
            for l in lines:
                context = lines[l]
                if context['id'] != uid:
                    continue
                else:
                    line_profile = context.get('pictureUrl', '-')
                    break
            
            return {
                        "user.user_pseudo_id": user_pseudo_id,
                        "user__user_property.key": "line_profile_url",
                        "user__user_property.value": [line_profile]
                    }
        
        def get_pgp_value(val, user_pseudo_id, key:str, private_key):
            pack = []
            if val and type(val) == list:
                for v in val:
                    decode_id = func.Key.pgp_decrypt(v,private_key)
                    pack.append(decode_id)
            
            return {
                        "user.user_pseudo_id": user_pseudo_id,
                        "user__user_property.key": key.replace("_PGP", ""),
                        "user__user_property.value": pack
                    }
        
        def safe_to_list(x):
            if isinstance(x, list):
                return x
            if isinstance(x, str):
                try:
                    parsed = ast.literal_eval(x)
                    if isinstance(parsed, list):
                        return parsed
                    return [parsed]
                except Exception:
                    return [x]
            return [x]
            
        def process_account(acc, fb, bq, private_key):
            """Process a single account end-to-end."""
            date_now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

            try:
                looker = LookerSDK(property_id=acc)

                # Query definitions
                query = {
                    "view": "user",
                    "fields": [
                        "user.user_pseudo_id",
                        "user__user_property.key",
                        "user__user_property.value",
                        "user.lastupdate_date"
                    ],
                    "filters": {
                        "user__user_property.key": "",
                        "user.lastupdate_date": "4 hour",
                        "user__user_property.value": ""
                    },
                    "model": f"c360_client_{acc}",
                    "limit": "-1"
                }

                query_keyword = {
                    "view": "user",
                    "fields": [
                        "message_labeling.user_pseudo_id",
                        "single_grouping.list_value"
                    ],
                    "filters": {"message_labeling.user_pseudo_id": "-NULL"},
                    "model": f"c360_client_{acc}",
                    "limit": "-1"
                }

                # --- Run Looker Queries
                data = looker.sdk.run_query(looker.sdk.create_query(query).id, result_format="json")
                data_kw = looker.sdk.run_query(looker.sdk.create_query(query_keyword).id, result_format="json")

                df = pd.DataFrame(json.loads(data))
                if df.empty:
                    return False
                df['user__user_property.value'] = df['user__user_property.value'].apply(json.loads)
                df['user__user_property.value'] = df['user__user_property.value'].apply(lambda x: list(set(x)))

                df_line = df[df['user__user_property.key'] == 'line']
                df_pgp = df[df['user__user_property.key'].str.contains('PGP')]
                df_else = df[df['user__user_property.key'].isin(['facebook', 'CustomerID'])]

                # --- Line name enrichment
                line_name = [
                    get_line_name(fb, acc, r['user.user_pseudo_id'], r['user__user_property.value'][0])
                    for _, r in df_line.iterrows()
                ]
                df_line_name = pd.DataFrame(line_name)

                # --- Line name enrichment
                line_profile_url = [
                    get_line_profile(fb, acc, r['user.user_pseudo_id'], r['user__user_property.value'][0])
                    for _, r in df_line.iterrows()
                ]
                df_line_profile = pd.DataFrame(line_profile_url)

                # --- PGP decryption
                pgp_value = [
                    get_pgp_value(r['user__user_property.value'], r['user.user_pseudo_id'], r['user__user_property.key'], private_key)
                    for _, r in df_pgp.iterrows()
                ]
                df_pgp_val = pd.DataFrame(pgp_value)

                # --- Combine dataframes
                df_concat = pd.concat([df_else, df_line, df_line_name, df_line_profile, df_pgp_val], ignore_index=True)
                df_concat.rename(columns={
                    "user.lastupdate_date": "lastupdate",
                    "user.user_pseudo_id": "user_pseudo_id",
                    "user__user_property.key": "key",
                    "user__user_property.value": "value"
                }, inplace=True)
                df_concat["value"] = df_concat["value"].apply(safe_to_list)
                # df_concat["lastupdate"] = date_now
                df_concat["lastupdate"] = pd.to_datetime(df_concat["lastupdate"])

                # --- Nest user properties
                df_nested = (
                    df_concat.groupby(["lastupdate", "user_pseudo_id"], as_index=False)
                    .apply(lambda g: pd.Series({
                        "lastupdate": g["lastupdate"].iloc[0],
                        "user_pseudo_id": g["user_pseudo_id"].iloc[0],
                        "user_property": g.apply(lambda r: {"key": r["key"], "value": r["value"]}, axis=1).tolist()
                    }))
                )

                # --- Keyword
                df_kw = pd.DataFrame(json.loads(data_kw)).rename(columns={
                    "message_labeling.user_pseudo_id": "user_pseudo_id",
                    "single_grouping.list_value": "keywords"
                })

                df_final = df_nested.merge(df_kw, how="left", on="user_pseudo_id")
                df_final.drop_duplicates(subset='user_pseudo_id', inplace=True)

                # --- BigQuery Upsert
                bq.delete_data("customer-360-profile", f"client_{acc}", "profile_explore_temp")
                bq.load_data_df(f"client_{acc}", "profile_explore_temp", df_final)
                bq.delete_when_match(
                    "customer-360-profile", f"client_{acc}", "profile_explore",
                    f"client_{acc}", "profile_explore_temp", "ON ori.user_pseudo_id = temp.user_pseudo_id "
                )
                bq.load_data_df(f"client_{acc}", "profile_explore", df_final)

                logging.info(f"[{acc}] profile_explore: Done")
                return acc, "ok", None

            except Exception as e:
                logging.error(f"[{acc}] profile_explore: {e}")
                return acc, "error", str(e)

        all_accounts = fb.db.reference().child("account").get(shallow=True)
        results = []
        with ThreadPoolExecutor(max_workers=5) as executor:
            future_to_acc = {executor.submit(process_account, acc, fb, bq, private_key): acc for acc in all_accounts}

            for future in as_completed(future_to_acc):
                acc = future_to_acc[future]
                try:
                    result = future.result()
                    results.append(result)
                except Exception as e:
                    logging.error(f"[{acc}] Unexpected error: {e}")

        logging.info("All properties processed.")
        return JSONResponse(status_code=200, content={"status": "ok", "message": "Profile explore update to Bigquery successfully", "data": results})
    except Exception as e:
        logging.error(f"Error parsing {LOGGING_PREFIX}_{default_request.path_params}: {e}")
        return JSONResponse(status_code=500, content={'status': 'error', 'message': str(e)})

@router.post("/data/profile_explore/looker/manual", description="Batch job to update profile explore from looker manually")
async def post_api_job_batch_profile_explore_manual(request: Property,default_request: Request):
    try:
        from connectors.looker.lookerSDK import LookerSDK
        import json
        import ast
        import time
        import pandas as pd
        from concurrent.futures import ThreadPoolExecutor, as_completed
        bq = BigQuery()
        func_key = func.Key()
        private_key = func_key.load_key_from_env(env_var_name="private_key")
        property_id = request.property_id
        
        def transform_profile(pro, profile):
            update_at = profile['updated_at']
            update_at_datetime = datetime.fromisoformat(update_at)

            # Only run if recent
            if not func.Function.recent_by_date_only(update_at_datetime, time_type='day', time_num=1):
                return None

            # User property transform
            userProperty = []
            for prop in profile:
                if prop not in ['created_at', 'user_pseudo_id', 'updated_at']:
                    propContext = profile[prop]

                    # Extract ID list
                    listId = [str(propContext[log]['id']) for log in propContext]

                    userProperty.append({
                        "key": prop,
                        "value": list(set(listId)),
                    })

            return {
                "user_pseudo_id": profile['user_pseudo_id'],
                "created_at": profile.get('created_at', profile['updated_at']),
                "lastupdate": profile['updated_at'],
                "userProperty": userProperty,
            }


        #Add context to user table
        def batch_user_hourly_manual(property_id):

            #Start flow
            try:
                # allAccount = fb.db.reference().child("account").get(shallow=True)
                allAccount = [property_id]
                cutoff_date = datetime.now() - timedelta(days=1)
                cutoff_str = cutoff_date.isoformat() 
                for acc in allAccount:
                    ref = fb.db.reference().child(f"account/{acc}/profile")
                    allProfile = ref.order_by_child('updated_at').start_at(cutoff_str).get()
                    logging.info(f"MANUAL PROFILE TOTAL: {len(allProfile)}")
                    # allProfile = fb.db.reference().child(f"account/{acc}/profile").get()
                    if not allProfile:
                        continue
                    transformedData = []
                    
                    with concurrent.futures.ThreadPoolExecutor() as executor:
                        # Submit tasks
                        futures = [
                            executor.submit(transform_profile, pro, allProfile[pro])
                            for pro in allProfile
                        ]

                        # Collect results
                        for future in concurrent.futures.as_completed(futures):
                            result = future.result()
                            if result:
                                transformedData.append(result)
                    
                    df = pd.DataFrame(transformedData)
                    if not df.empty:
                        df['created_at']  = func.Function.parse_mixed_to_utc(df['created_at'])
                        df['lastupdate']  = func.Function.parse_mixed_to_utc(df['lastupdate'])
                        df.drop_duplicates(subset='user_pseudo_id', inplace=True)
                        df["userProperty"] = df["userProperty"].apply(lambda x: [x] if isinstance(x, dict) else x)
                        
                        #Bigquery task delete when match
                        bq = BigQuery()
                        bq.delete_data("customer-360-profile", f"client_{acc}", "user_temp")
                        bq.load_data_df(f"client_{acc}", "user_temp", df)
                        condition = "ON (ori.user_pseudo_id = temp.user_pseudo_id) "
                        bq.delete_when_match("customer-360-profile", f"client_{acc}", "user", f"client_{acc}", "user_temp", condition)
                        bq.load_data_df(f"client_{acc}", "user", df)

                        logging.info(f"MANUAL: Property {acc} update user successfully")
                    else:
                        logging.info(f"MANUAL: Property {acc} df.empty")
            except Exception as e:
                logging.error(e)
                return {'status': 'error', 'message': f"{e}"}, 500
        

        batch_user_hourly_manual(property_id)

        time.sleep(5)

        try:
            def get_line_name(fb,property_id, user_pseudo_id, uid):
                lines = fb.db.reference().child(f"account/{property_id}/profile/{user_pseudo_id}/line").get()
                line_name = '-'
                for l in lines:
                    context = lines[l]
                    if context['id'] != uid:
                        continue
                    else:
                        line_name = context.get('displayName', '-')
                        break
                
                return {
                            "user.user_pseudo_id": user_pseudo_id,
                            "user__user_property.key": "line_name",
                            "user__user_property.value": [line_name]
                        }

            def get_line_profile(fb,property_id, user_pseudo_id, uid):
                lines = fb.db.reference().child(f"account/{property_id}/profile/{user_pseudo_id}/line").get()
                line_profile = '-'
                for l in lines:
                    context = lines[l]
                    if context['id'] != uid:
                        continue
                    else:
                        line_profile = context.get('pictureUrl', '-')
                        break
                
                return {
                            "user.user_pseudo_id": user_pseudo_id,
                            "user__user_property.key": "line_profile_url",
                            "user__user_property.value": [line_profile]
                        }
            
            def get_pgp_value(val, user_pseudo_id, key:str, private_key):
                pack = []
                if val and type(val) == list:
                    for v in val:
                        decode_id = func.Key.pgp_decrypt(v,private_key)
                        pack.append(decode_id)
                
                return {
                            "user.user_pseudo_id": user_pseudo_id,
                            "user__user_property.key": key.replace("_PGP", ""),
                            "user__user_property.value": pack
                        }
            
            def safe_to_list(x):
                if isinstance(x, list):
                    return x
                if isinstance(x, str):
                    try:
                        parsed = ast.literal_eval(x)
                        if isinstance(parsed, list):
                            return parsed
                        return [parsed]
                    except Exception:
                        return [x]
                return [x]
                
            def process_account(acc, fb, bq, private_key):
                """Process a single account end-to-end."""
                date_now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

                try:
                    looker = LookerSDK(property_id=acc)

                    # Query definitions
                    query = {
                        "view": "user",
                        "fields": [
                            "user.user_pseudo_id",
                            "user__user_property.key",
                            "user__user_property.value",
                            "user.lastupdate_time"
                        ],
                        "filters": {
                            "user__user_property.key": "",
                            "user.lastupdate_time": "3 hour",
                            "user__user_property.value": ""
                        },
                        "model": f"c360_client_{acc}",
                        "limit": "-1"
                    }

                    query_keyword = {
                        "view": "user",
                        "fields": [
                            "message_labeling.user_pseudo_id",
                            "single_grouping.list_value"
                        ],
                        "filters": {"message_labeling.user_pseudo_id": "-NULL"},
                        "model": f"c360_client_{acc}",
                        "limit": "-1"
                    }

                    # --- Run Looker Queries
                    data = looker.sdk.run_query(looker.sdk.create_query(query).id, result_format="json")
                    data_kw = looker.sdk.run_query(looker.sdk.create_query(query_keyword).id, result_format="json")

                    df = pd.DataFrame(json.loads(data))
                    if df.empty:
                        return False
                    df['user__user_property.value'] = df['user__user_property.value'].apply(json.loads)
                    df['user__user_property.value'] = df['user__user_property.value'].apply(lambda x: list(set(x)))

                    df_line = df[df['user__user_property.key'] == 'line']
                    df_pgp = df[df['user__user_property.key'].str.contains('PGP')]
                    df_else = df[df['user__user_property.key'].isin(['facebook', 'CustomerID'])]

                    # --- Line name enrichment
                    line_name = [
                        get_line_name(fb, acc, r['user.user_pseudo_id'], r['user__user_property.value'][0])
                        for _, r in df_line.iterrows()
                    ]
                    df_line_name = pd.DataFrame(line_name)

                    # --- Line name enrichment
                    line_profile_url = [
                        get_line_profile(fb, acc, r['user.user_pseudo_id'], r['user__user_property.value'][0])
                        for _, r in df_line.iterrows()
                    ]
                    df_line_profile = pd.DataFrame(line_profile_url)

                    # --- PGP decryption
                    pgp_value = [
                        get_pgp_value(r['user__user_property.value'], r['user.user_pseudo_id'], r['user__user_property.key'], private_key)
                        for _, r in df_pgp.iterrows()
                    ]
                    df_pgp_val = pd.DataFrame(pgp_value)

                    # --- Combine dataframes
                    df_concat = pd.concat([df_else, df_line, df_line_name, df_line_profile, df_pgp_val], ignore_index=True)
                    df_concat.rename(columns={
                        "user.lastupdate_time": "lastupdate",
                        "user.user_pseudo_id": "user_pseudo_id",
                        "user__user_property.key": "key",
                        "user__user_property.value": "value"
                    }, inplace=True)
                    df_concat["value"] = df_concat["value"].apply(safe_to_list)
                    # df_concat["lastupdate"] = date_now
                    df_concat["lastupdate"] = pd.to_datetime(df_concat["lastupdate"])

                    # --- Nest user properties
                    df_nested = (
                        df_concat.groupby(["lastupdate", "user_pseudo_id"], as_index=False)
                        .apply(lambda g: pd.Series({
                            "lastupdate": g["lastupdate"].iloc[0],
                            "user_pseudo_id": g["user_pseudo_id"].iloc[0],
                            "user_property": g.apply(lambda r: {"key": r["key"], "value": r["value"]}, axis=1).tolist()
                        }))
                    )

                    # --- Keyword
                    df_kw = pd.DataFrame(json.loads(data_kw)).rename(columns={
                        "message_labeling.user_pseudo_id": "user_pseudo_id",
                        "single_grouping.list_value": "keywords"
                    })

                    df_final = df_nested.merge(df_kw, how="left", on="user_pseudo_id")
                    df_final.drop_duplicates(subset='user_pseudo_id', inplace=True)

                    # --- BigQuery Upsert
                    bq.delete_data("customer-360-profile", f"client_{acc}", "profile_explore_temp")
                    bq.load_data_df(f"client_{acc}", "profile_explore_temp", df_final)
                    bq.delete_when_match(
                        "customer-360-profile", f"client_{acc}", "profile_explore",
                        f"client_{acc}", "profile_explore_temp", "ON ori.user_pseudo_id = temp.user_pseudo_id "
                    )
                    bq.load_data_df(f"client_{acc}", "profile_explore", df_final)

                    logging.info(f"[{acc}] profile_explore: Done")
                    return acc, "ok", None

                except Exception as e:
                    logging.error(f"[{acc}] profile_explore: {e}")
                    return acc, "error", str(e)

            func_key = func.Key()
            private_key = func_key.load_key_from_env(env_var_name="private_key")
            all_accounts = [property_id]

            results = []
            with ThreadPoolExecutor(max_workers=5) as executor:
                future_to_acc = {executor.submit(process_account, acc, fb, bq, private_key): acc for acc in all_accounts}

                for future in as_completed(future_to_acc):
                    acc = future_to_acc[future]
                    try:
                        result = future.result()
                        results.append(result)
                    except Exception as e:
                        logging.error(f"[{acc}] Unexpected error: {e}")

            logging.info("All properties processed.")
            return JSONResponse(status_code=200, content={"status": "ok", "message": "Profile explore update to Bigquery successfully", "data": results})

        except Exception as e:
            logging.error(f"Error parsing {LOGGING_PREFIX}_{default_request.path_params}: {e}")
            return JSONResponse(status_code=500, content={'status': 'error', 'message': str(e)})
    
    except Exception as e:
        logging.error(f"Error parsing {LOGGING_PREFIX}_{default_request.path_params}: {e}")
        return JSONResponse(status_code=500, content={'status': 'error', 'message': str(e)})

@router.post("/data/user/profile_mapping", description="Batch job to update user profile mapping")
async def post_api_job_batch_user_profile_mapping(default_request: Request):
    try:
        bq = BigQuery()
        import pandas as pd
        properties = fb.db.reference().child("property").get(shallow=True)
        for prop in properties:
            allMappingProfile = fb.db.reference().child(f"account/{prop}/mapping").get()
            if not allMappingProfile:
                logging.info(f"No mapping profile found for property {prop}")
                continue
            output_list = []
            for user_pseudo_id, unified_user_pseudo_id in allMappingProfile.items():
                output_list.append({
                    "user_pseudo_id": user_pseudo_id,
                    "unified_user_pseudo_id": unified_user_pseudo_id
                })
            df = pd.DataFrame(output_list)
            df['lastupdate'] = datetime.now()
            df['lastupdate'] = pd.to_datetime(df['lastupdate'])
            
            #Bigquery task
            bq.delete_data("customer-360-profile", f"client_{prop}", "profile_mapping_temp")
            bq.load_data_df(f"client_{prop}", "profile_mapping_temp", df)
            condition = "ON (ori.user_pseudo_id = temp.user_pseudo_id) "
            bq.delete_when_match("customer-360-profile", f"client_{prop}", "profile_mapping", f"client_{prop}", "profile_mapping_temp", condition)
            bq.load_data_df(f"client_{prop}", "profile_mapping", df)
        
        return JSONResponse(status_code=200, content={"status": "success", "message": "User mapping update to Bigquery successfully"})
    except Exception as e:
        logging.error(f"Error parsing {LOGGING_PREFIX}_{default_request.path_params}: {e}")
        return JSONResponse(status_code=500, content={'status': 'error', 'message': str(e)})

@router.post("/data/facebook/get_msg_summary", description="Batch job to get Facebook message summary")
async def post_api_job_batch_facebook_get_msg_summary(default_request: Request):
    from connectors.facebook.facebook_media import Facebook
    try:
        property = fb.db.reference(f'property')
        all_property = property.get(shallow=True)

        # Loop to get a pairs of property_id and facebook_page_id
        property_facebook_pairs = []
        for property_id in all_property:
            facebook_channels = fb.db.reference(f'property/{property_id}/channel/facebook').get()
            for page_id in facebook_channels.keys():
                property_facebook_pairs.append({
                    "property_id": property_id,
                    "facebook_page_id": page_id
            })

        pages = Facebook.get_all_page()
        access_token_map = {page['id']: page['access_token'] for page in pages}

        # Update original list with access token if matched
        property_facebook_pairs = [
            {**entry, 'access_token': access_token_map[entry['facebook_page_id']]}
            for entry in property_facebook_pairs
            if entry['facebook_page_id'] in access_token_map
        ]
                
        FIELDS = "message_count,name,unread_count,updated_time,can_reply,id,snippet,former_participants,participants,messages.limit(10){message,attachments{image_data,video_data,id,file_url},from,created_time}"
        LIMIT = 100
        Facebook.update_msg_summary(property_facebook_pairs, FIELDS, LIMIT, fb)
        return JSONResponse(status_code=200, content={"status": "success", "message": "Facebook message summary updated successfully"})
    except Exception as e:
        logging.error(f"Error parsing {LOGGING_PREFIX}_{default_request.path_params}: {e}")
        return JSONResponse(status_code=500, content={'status': 'error', 'message': str(e)})

@router.post("/data/facebook/get_fb_ads_insight", description="Batch job to get Facebook ads insight")
async def post_api_job_batch_facebook_get_ads_insight(default_request: Request):
    try:
        from connectors.bigquery.bq import Table
        from connectors.bigquery.media import MediaBigQuery
        property = fb.db.reference(f'property')
        all_property = property.get(shallow=True)
        bq = BigQuery()
        for property_id in all_property:
            
            if fb.db.reference(f'property/{property_id}/adaccount/facebook').get() is not None:
                
                start = (datetime.now() - timedelta(days=15)).strftime('%Y-%m-%d')
                end = datetime.now().strftime('%Y-%m-%d')
                fb_adaccount_list = list(fb.db.reference(f'property/{property_id}/adaccount/facebook').get().keys())
                fb_adaccount_where = ', '.join([f'\"{x}\"' for x in fb_adaccount_list])
                
                facebook_query = MediaBigQuery.facebook_single_view_query(fb_adaccount_where,start,end)
                
                # -- Check if schema exists
                media_single_schema = Table.tableMediaSingleData()
                bq.ensure_table(f"customer-360-profile.client_{property_id}.media_single_data",media_single_schema)
                bq.ensure_table(f"customer-360-profile.client_{property_id}.media_single_data_temp",media_single_schema)
                
                fb_media_df = bq.get_query_df(facebook_query)
                obj_numeric = [
                    "cost","impression","clicks","view","video_play_action",
                    "video_view_25","video_view_50","video_view_75","video_view_100"
                ]
                fb_media_df[obj_numeric] = fb_media_df[obj_numeric].astype('float')
                
                bq.delete_data("customer-360-profile",f"client_{property_id}","media_single_data_temp")
                bq.load_data_df(f"client_{property_id}","media_single_data_temp",fb_media_df)
                bq.delete_when_match("customer-360-profile",f"client_{property_id}","media_single_data",f"client_{property_id}","media_single_data_temp",
                                    "ON ori.date = temp.date AND ori.account_id = temp.account_id AND ori.ad_id = temp.ad_id ")
                bq.load_data_df(f"client_{property_id}","media_single_data",fb_media_df)
        
        return JSONResponse(status_code=200, content={"status": "success", "message": "Facebook ads insight updated successfully"})
    except Exception as e:
        logging.error(f"Error parsing {LOGGING_PREFIX}_{default_request.path_params}: {e}")
        return JSONResponse(status_code=500, content={'status': 'error', 'message': str(e)})