from flask import Blueprint, request
import requests
import logging
from datetime import datetime, timedelta
from firebase.firebase import Firebase
import pytz 
import utility.function as func
import os
import time
import pandas as pd
from bigquery.bq import BigQuery
import concurrent.futures
logging.basicConfig(level=logging.INFO)

batch = Blueprint('batch', __name__, url_prefix='/batch')

fb = Firebase(host=os.environ.get("FIREBASE_HOST"))
timezone = pytz.timezone('Asia/Bangkok')

@batch.route('/user/',  methods=['POST'])
@batch.route('/user',  methods=['POST'])
def batch_user_hourly():

    #Start flow
    try:
        allAccount = fb.db.reference().child("account").get(shallow=True)
        for acc in allAccount:
            allProfile = fb.db.reference().child(f"account/{acc}/profile").get()
            transformedData = []
            for pro in allProfile:
                update_at = allProfile[pro]['updated_at']
                update_at_datetime = datetime.fromisoformat(update_at)
                # change type to check only date not timezone
                if func.Function.recent_by_date_only(update_at_datetime):
                # if update_at_datetime > (datetime.now(timezone) - timedelta(days=30)):
                    # User property  data transform
                    userProperty = []
                    for prop in allProfile[pro]:
                        if prop not in ['created_at', 'user_pseudo_id', 'updated_at']:
                            propContext = allProfile[pro][prop]
                            # Get ID context
                            listId = []
                            for log in propContext:
                                listId.append(str(propContext[log]['id']))

                            channel_context = {
                                "key": prop,
                                "value": list(set(listId))
                            }
                            userProperty.append(channel_context)
                    transfromformat = {
                        "user_pseudo_id": allProfile[pro]['user_pseudo_id'],
                        "created_at": allProfile[pro].get('created_at', allProfile[pro]['updated_at']),
                        "lastupdate": allProfile[pro]['updated_at'],
                        "userProperty": userProperty,
                    }
                    transformedData.append(transfromformat)
            
            df = pd.DataFrame(transformedData)
            if not df.empty:
                df['created_at']  = func.Function.parse_mixed_to_utc(df['created_at'])
                df['lastupdate']  = func.Function.parse_mixed_to_utc(df['lastupdate'])
                df.drop_duplicates(subset='user_pseudo_id', inplace=True)
                df["userProperty"] = df["userProperty"].apply(lambda x: [x] if isinstance(x, dict) else x)
                
                #Bigquery task delete when match
                bq = BigQuery()
                bq.delete_data("customer-360-profile", f"client_{acc}", "user_temp")
                bq.load_data_df(f"client_{acc}", "user_temp", df)
                condition = "ON (ori.user_pseudo_id = temp.user_pseudo_id) "
                bq.delete_when_match("customer-360-profile", f"client_{acc}", "user", f"client_{acc}", "user_temp", condition)
                bq.load_data_df(f"client_{acc}", "user", df)

                logging.info(f"Property {acc} update user successfully")

        return {"status": "success", "message": "User profile update to Bigquery successfully"}, 200 
    
    except Exception as e:
        logging.error(f"Batch: [User] Error {e}")
        return {'status': 'error', 'message': f"{e}"}, 500
    

@batch.route('/suggestion/',  methods=['POST'])
@batch.route('/suggestion',  methods=['POST'])
def batch_suggestion():

    #List account
    bq = BigQuery()
    accounts = fb.db.reference().child(f"account").get(shallow=True)

    try:
        for acc in accounts:
            #Event
            query = f"""
            SELECT 
                main.eventName as event_name, 
                ep.key as key,
                ep.value as value 
            FROM `{os.environ.get("GCP_PROJECT")}.client_{acc}.event` as main
                LEFT JOIN UNNEST(eventProperty) as ep
            WHERE main.eventTimeStamp >=  DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
                AND ep.key NOT IN ("message", "event_timestamp", "facebook_name", "ps_id","target_id")
            GROUP BY 1,2,3
            ORDER BY 1,2
            """
            df = bq.get_query_df(query)
            if df.empty:
                continue

            nested = {}

            for _, row in df.iterrows():
                event = row['event_name']
                key = row['key']
                value = row['value']

                # Ensure nesting
                if event not in nested:
                    nested[event] = {}
                if key not in nested[event]:
                    nested[event][key] = set()
                
                # Add value
                nested[event][key].add(value)

            # Convert sets to lists for JSON serializability
            nested_json = {e: {k: list(v) for k, v in keys.items()} for e, keys in nested.items()}
            
            fb.db.reference().child(f"account/{acc}/cache/suggestion/event").set(nested_json)

            #Add
            queryAd = f"""
            SELECT main.referral.ad_id, main.referral.ads_context_data.ad_title
            FROM {os.environ.get("GCP_PROJECT")}.client_{acc}.event as main 
            WHERE main.eventTimeStamp >=  DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
                AND main.referral.ad_id IS NOT NULL
            GROUP BY 1, 2
            ORDER BY 1;
            """
            df = bq.get_query_df(queryAd)
            if df.empty:
                continue
            
            dictData = {
                "ad_id": list(df['ad_id'].unique()),
                "ad_title": list(df['ad_title'].unique()),
            }

            fb.db.reference().child(f"account/{acc}/cache/suggestion/ad_refferal").set(dictData)


        return {"status": "success", "message": "User suggestion update to Bigquery successfully"}, 200 
    
    except Exception as e:
        logging.error(f"Batch: [suggestion] Error {e}")
        return {'status': 'error', 'message': f"{e}"}, 500

@batch.route('/profile_explore/',  methods=['POST'])
@batch.route('/profile_explore',  methods=['POST'])
def batch_sprofile_explore():
    try:
        from utility.function import Key
        import random
        from dashboard.genQuery import GenerateQuery
        from bigquery.bq import BigQuery
        bq = BigQuery()

        genders = ["Male", "Female", "Prefer not to say"]
        provinces_thailand = [
            "Bangkok", "Amnat Charoen", "Ang Thong", "Bueng Kan", "Buriram",
            "Chachoengsao", "Chai Nat", "Chaiyaphum", "Chanthaburi", "Chiang Mai",
            "Chiang Rai", "Chonburi", "Chumphon", "Kalasin", "Kamphaeng Phet",
            "Kanchanaburi", "Khon Kaen", "Krabi", "Lampang", "Lamphun", "Loei",
            "Lopburi", "Mae Hong Son", "Maha Sarakham", "Mukdahan", "Nakhon Nayok",
            "Nakhon Pathom", "Nakhon Phanom", "Nakhon Ratchasima", "Nakhon Sawan",
            "Nakhon Si Thammarat", "Nan", "Narathiwat", "Nong Bua Lamphu",
            "Nong Khai", "Nonthaburi", "Pathum Thani", "Pattani", "Phang Nga",
            "Phatthalung", "Phayao", "Phetchabun", "Phetchaburi", "Phichit",
            "Phitsanulok", "Phrae", "Phra Nakhon Si Ayutthaya", "Phuket",
            "Prachinburi", "Prachuap Khiri Khan", "Ranong", "Ratchaburi",
            "Rayong", "Roi Et", "Sa Kaeo", "Sakon Nakhon", "Samut Prakan",
            "Samut Sakhon", "Samut Songkhram", "Saraburi", "Satun", "Sing Buri",
            "Sisaket", "Songkhla", "Sukhothai", "Suphan Buri", "Surat Thani",
            "Surin", "Tak", "Trang", "Trat", "Ubon Ratchathani", "Udon Thani",
            "Uthai Thani", "Uttaradit", "Yala", "Yasothon"
        ]
        interests = [
            "Technology", "Travel", "Music", "Sports", "Art",
            "Cooking", "Reading", "Gaming", "Fitness", "Photography",
            "Fashion", "Movies", "Nature", "Science", "History",
            "Crafts", "Finance", "Education", "Languages", "Volunteering"
        ]

        private_key = Key.load_key_from_env("private_key")
        for acc in fb.db.reference().child(f"account").get(shallow=True):
            # profiles = fb.db.reference().child(f"account/{acc}/profile").get(shallow=True)
            genq = GenerateQuery(acc)
            event_offline_existed = bq.table_exists(f"client_{acc}", "event_offline")
            
            if event_offline_existed:
                query = genq.getUserLastActive7Days_includeOffline()
            else:
                query = genq.getUserLastActive7Days()
                
            df = bq.get_query_df(query)
            
            if not df.empty:
                distinctUser = list(df['user_pseudo_id'].unique())
                acc_profile_enrich = []
                for pro in distinctUser:
                    profile_context = fb.db.reference().child(f"account/{acc}/profile/{pro}").get(shallow=True)
                    
                    random_gender = random.choice(genders)
                    random_province = random.choice(provinces_thailand)
                    random_interests = random.sample(interests, 3)

                    #Line
                    line_context, facebook_context, phoneNumber_context, phoneNumber_hashed_context, email_context, email_hashed_context, FirstName_context, LastName_context, customerID_context = [],[],[],[],[],[],[],[],[]
                    profile_image = '-'
                    if 'line' in profile_context:
                        ids = fb.db.reference().child(f"account/{acc}/profile/{pro}/line").get()
                        for id in ids:
                            try:
                                value = ids[id]['displayName']
                                profile_image = ids[id]['pictureUrl']
                                line_context.append(value)
                            except:
                                pass

                    if 'facebook' in profile_context:
                        ids = fb.db.reference().child(f"account/{acc}/profile/{pro}/facebook").get()
                        for id in ids:
                            value = ids[id]['id']
                            facebook_context.append(value)

                    if 'phoneNumber_PGP' in profile_context:
                        ids = fb.db.reference().child(f"account/{acc}/profile/{pro}/phoneNumber_PGP").get()
                        for id in ids:
                            value = ids[id]['id']
                            d_value = Key.pgp_decrypt(value,private_key)
                            phoneNumber_context.append(d_value)
                            
                    if 'phoneNumber' in profile_context:
                        ids = fb.db.reference().child(f"account/{acc}/profile/{pro}/phoneNumber").get()
                        for id in ids:
                            value = ids[id]['id']
                            phoneNumber_hashed_context.append(value)

                    if 'email_PGP' in profile_context:
                        ids = fb.db.reference().child(f"account/{acc}/profile/{pro}/email_PGP").get()
                        for id in ids:
                            value = ids[id]['id']
                            d_value = Key.pgp_decrypt(value,private_key)
                            email_context.append(d_value)
                            
                    if 'email' in profile_context:
                        ids = fb.db.reference().child(f"account/{acc}/profile/{pro}/email").get()
                        for id in ids:
                            value = ids[id]['id']
                            email_hashed_context.append(value)
                            
                    if 'FirstName_PGP' in profile_context:
                        ids = fb.db.reference().child(f"account/{acc}/profile/{pro}/FirstName_PGP").get()
                        for id in ids:
                            value = ids[id]['id']
                            d_value = Key.pgp_decrypt(value,private_key)
                            FirstName_context.append(d_value)
                            
                    if 'LastName_PGP' in profile_context:
                        ids = fb.db.reference().child(f"account/{acc}/profile/{pro}/LastName_PGP").get()
                        for id in ids:
                            value = ids[id]['id']
                            d_value = Key.pgp_decrypt(value,private_key)
                            LastName_context.append(d_value)
                            
                    if 'CustomerID' in profile_context:
                        ids = fb.db.reference().child(f"account/{acc}/profile/{pro}/CustomerID").get()
                        for id in ids:
                            value = ids[id]['id']
                            customerID_context.append(value)
                    
                    profile_data = {
                        "user_pseudo_id": pro,
                        "profile_image": profile_image,
                        "line": list(set(line_context)),
                        "facebook": facebook_context,
                        "phoneNumber": phoneNumber_context,
                        "phoneNumber_hashed": phoneNumber_hashed_context,
                        "email": email_context,
                        "email_hashed": email_hashed_context,
                        "first_name": FirstName_context,
                        "last_name": LastName_context,
                        "customer_id": customerID_context,
                        "status": random.choice(['Inactive', 'Active']),
                        "tags": {
                            "gender": [random_gender],
                            "province": [random_province],
                            "interests": random_interests
                        }
                    }
                    acc_profile_enrich.append(profile_data)
                
                #Set to cache
                fb.db.reference().child(f"account/{acc}/cache/profile_explore").set(acc_profile_enrich)

                #set tags
                from collections import defaultdict
                inverted = {
                    'gender': defaultdict(list),
                    'province': defaultdict(list),
                    'interests': defaultdict(list),
                    'status': defaultdict(list)
                }

                for user in acc_profile_enrich:
                    uid = user['user_pseudo_id']
                    tags = user['tags']
                    
                    for interest in tags['gender']:
                        inverted['gender'][interest].append(uid)
                    
                    for interest in tags['province']:
                        inverted['province'][interest].append(uid)

                    for interest in tags['interests']:
                        inverted['interests'][interest].append(uid)
                    
                    inverted['status'][user['status']].append(uid)

                # Convert defaultdicts to dicts for final output
                inverted = {
                    key: dict(value)
                    for key, value in inverted.items()
                }

                fb.db.reference().child(f"account/{acc}/tags").set(inverted)
        return {"status": "success", "message": "Profile update to Firebase successfully"}, 200 
    except Exception as e:
        logging.error(f"Batch: [profile_explore] Error {e}")
        return {'status': 'error', 'message': f"{e}"}, 500
    

@batch.route('/profile_explore/looker',  methods=['POST'])
@batch.route('/profile_explore/looker/',  methods=['POST'])
def batch_sprofile_explore_looker():
    from utility.lookerSDK import LookerSDK
    import json
    import ast
    from concurrent.futures import ThreadPoolExecutor, as_completed
    bq = BigQuery()
    private_key = func.Key.load_key_from_env("private_key")
    try:
        def get_line_name(fb,property_id, user_pseudo_id, uid):
            lines = fb.db.reference().child(f"account/{property_id}/profile/{user_pseudo_id}/line").get()
            line_name = '-'
            for l in lines:
                context = lines[l]
                if context['id'] != uid:
                    continue
                else:
                    line_name = context.get('displayName', '-')
                    break
            
            return {
                        "user.user_pseudo_id": user_pseudo_id,
                        "user__user_property.key": "line_name",
                        "user__user_property.value": [line_name]
                    }

        def get_line_profile(fb,property_id, user_pseudo_id, uid):
            lines = fb.db.reference().child(f"account/{property_id}/profile/{user_pseudo_id}/line").get()
            line_profile = '-'
            for l in lines:
                context = lines[l]
                if context['id'] != uid:
                    continue
                else:
                    line_profile = context.get('pictureUrl', '-')
                    break
            
            return {
                        "user.user_pseudo_id": user_pseudo_id,
                        "user__user_property.key": "line_profile_url",
                        "user__user_property.value": [line_profile]
                    }
        
        def get_pgp_value(val, user_pseudo_id, key:str, private_key):
            pack = []
            if val and type(val) == list:
                for v in val:
                    decode_id = func.Key.pgp_decrypt(v,private_key)
                    pack.append(decode_id)
            
            return {
                        "user.user_pseudo_id": user_pseudo_id,
                        "user__user_property.key": key.replace("_PGP", ""),
                        "user__user_property.value": pack
                    }
        
        def safe_to_list(x):
            if isinstance(x, list):
                return x
            if isinstance(x, str):
                try:
                    parsed = ast.literal_eval(x)
                    if isinstance(parsed, list):
                        return parsed
                    return [parsed]
                except Exception:
                    return [x]
            return [x]
            
        def process_account(acc, fb, bq, private_key):
            """Process a single account end-to-end."""
            date_now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

            try:
                looker = LookerSDK(property_id=acc)

                # Query definitions
                query = {
                    "view": "user",
                    "fields": [
                        "user.user_pseudo_id",
                        "user__user_property.key",
                        "user__user_property.value",
                        "user.lastupdate_date"
                    ],
                    "filters": {
                        "user__user_property.key": "",
                        "user.lastupdate_date": "4 hour",
                        "user__user_property.value": ""
                    },
                    "model": f"c360_client_{acc}",
                    "limit": "-1"
                }

                query_keyword = {
                    "view": "user",
                    "fields": [
                        "message_labeling.user_pseudo_id",
                        "single_grouping.list_value"
                    ],
                    "filters": {"message_labeling.user_pseudo_id": "-NULL"},
                    "model": f"c360_client_{acc}",
                    "limit": "-1"
                }

                # --- Run Looker Queries
                data = looker.sdk.run_query(looker.sdk.create_query(query).id, result_format="json")
                data_kw = looker.sdk.run_query(looker.sdk.create_query(query_keyword).id, result_format="json")

                df = pd.DataFrame(json.loads(data))
                if df.empty:
                    return False
                df['user__user_property.value'] = df['user__user_property.value'].apply(json.loads)
                df['user__user_property.value'] = df['user__user_property.value'].apply(lambda x: list(set(x)))

                df_line = df[df['user__user_property.key'] == 'line']
                df_pgp = df[df['user__user_property.key'].str.contains('PGP')]
                df_else = df[df['user__user_property.key'].isin(['facebook', 'CustomerID'])]

                # --- Line name enrichment
                line_name = [
                    get_line_name(fb, acc, r['user.user_pseudo_id'], r['user__user_property.value'][0])
                    for _, r in df_line.iterrows()
                ]
                df_line_name = pd.DataFrame(line_name)

                # --- Line name enrichment
                line_profile_url = [
                    get_line_profile(fb, acc, r['user.user_pseudo_id'], r['user__user_property.value'][0])
                    for _, r in df_line.iterrows()
                ]
                df_line_profile = pd.DataFrame(line_profile_url)

                # --- PGP decryption
                pgp_value = [
                    get_pgp_value(r['user__user_property.value'], r['user.user_pseudo_id'], r['user__user_property.key'], private_key)
                    for _, r in df_pgp.iterrows()
                ]
                df_pgp_val = pd.DataFrame(pgp_value)

                # --- Combine dataframes
                df_concat = pd.concat([df_else, df_line, df_line_name, df_line_profile, df_pgp_val], ignore_index=True)
                df_concat.rename(columns={
                    "user.lastupdate_date": "lastupdate",
                    "user.user_pseudo_id": "user_pseudo_id",
                    "user__user_property.key": "key",
                    "user__user_property.value": "value"
                }, inplace=True)
                df_concat["value"] = df_concat["value"].apply(safe_to_list)
                # df_concat["lastupdate"] = date_now
                df_concat["lastupdate"] = pd.to_datetime(df_concat["lastupdate"])

                # --- Nest user properties
                df_nested = (
                    df_concat.groupby(["lastupdate", "user_pseudo_id"], as_index=False)
                    .apply(lambda g: pd.Series({
                        "lastupdate": g["lastupdate"].iloc[0],
                        "user_pseudo_id": g["user_pseudo_id"].iloc[0],
                        "user_property": g.apply(lambda r: {"key": r["key"], "value": r["value"]}, axis=1).tolist()
                    }))
                )

                # --- Keyword
                df_kw = pd.DataFrame(json.loads(data_kw)).rename(columns={
                    "message_labeling.user_pseudo_id": "user_pseudo_id",
                    "single_grouping.list_value": "keywords"
                })

                df_final = df_nested.merge(df_kw, how="left", on="user_pseudo_id")
                df_final.drop_duplicates(subset='user_pseudo_id', inplace=True)

                # --- BigQuery Upsert
                bq.delete_data("customer-360-profile", f"client_{acc}", "profile_explore_temp")
                bq.load_data_df(f"client_{acc}", "profile_explore_temp", df_final)
                bq.delete_when_match(
                    "customer-360-profile", f"client_{acc}", "profile_explore",
                    f"client_{acc}", "profile_explore_temp", "ON ori.user_pseudo_id = temp.user_pseudo_id "
                )
                bq.load_data_df(f"client_{acc}", "profile_explore", df_final)

                logging.info(f"[{acc}] profile_explore: Done")
                return acc, "ok", None

            except Exception as e:
                logging.error(f"[{acc}] profile_explore: {e}")
                return acc, "error", str(e)

        
        private_key = func.Key.load_key_from_env("private_key")
        all_accounts = fb.db.reference().child("account").get(shallow=True)

        results = []
        with ThreadPoolExecutor(max_workers=5) as executor:
            future_to_acc = {executor.submit(process_account, acc, fb, bq, private_key): acc for acc in all_accounts}

            for future in as_completed(future_to_acc):
                acc = future_to_acc[future]
                try:
                    result = future.result()
                    results.append(result)
                except Exception as e:
                    logging.error(f"[{acc}] Unexpected error: {e}")

        logging.info("All properties processed.")
        return {"status": "ok", "results": results}, 200

    except Exception as e:
        logging.error(f"Batch: [profile_explore] Error {e}")
        return {'status': 'error', 'message': f"{e}"}, 500
    

@batch.route('/profile_explore/looker/manual',  methods=['POST'])
@batch.route('/profile_explore/looker/manual/',  methods=['POST'])
def batch_sprofile_explore_looker_manual():
    from utility.lookerSDK import LookerSDK
    import json
    import ast
    from concurrent.futures import ThreadPoolExecutor, as_completed
    bq = BigQuery()
    private_key = func.Key.load_key_from_env("private_key")
    hook = request.get_json()
    for req in ['property_id']:
        if req not in hook:
            return {"status":"error",'message': f"{req} is required"}, 400
    
    property_id = hook.get("property_id")
    
    def transform_profile(pro, profile):
        update_at = profile['updated_at']
        update_at_datetime = datetime.fromisoformat(update_at)

        # Only run if recent
        if not func.Function.recent_by_date_only(update_at_datetime, time_type='day', time_num=1):
            return None

        # User property transform
        userProperty = []
        for prop in profile:
            if prop not in ['created_at', 'user_pseudo_id', 'updated_at']:
                propContext = profile[prop]

                # Extract ID list
                listId = [str(propContext[log]['id']) for log in propContext]

                userProperty.append({
                    "key": prop,
                    "value": list(set(listId)),
                })

        return {
            "user_pseudo_id": profile['user_pseudo_id'],
            "created_at": profile.get('created_at', profile['updated_at']),
            "lastupdate": profile['updated_at'],
            "userProperty": userProperty,
        }


    #Add context to user table
    def batch_user_hourly_manual(property_id):

        #Start flow
        try:
            # allAccount = fb.db.reference().child("account").get(shallow=True)
            allAccount = [property_id]
            cutoff_date = datetime.now() - timedelta(days=1)
            cutoff_str = cutoff_date.isoformat() 
            for acc in allAccount:
                ref = fb.db.reference().child(f"account/{acc}/profile")
                allProfile = ref.order_by_child('updated_at').start_at(cutoff_str).get()
                logging.info(f"MANUAL PROFILE TOTAL: {len(allProfile)}")
                # allProfile = fb.db.reference().child(f"account/{acc}/profile").get()
                if not allProfile:
                    continue
                transformedData = []
                
                with concurrent.futures.ThreadPoolExecutor() as executor:
                    # Submit tasks
                    futures = [
                        executor.submit(transform_profile, pro, allProfile[pro])
                        for pro in allProfile
                    ]

                    # Collect results
                    for future in concurrent.futures.as_completed(futures):
                        result = future.result()
                        if result:
                            transformedData.append(result)
                
                df = pd.DataFrame(transformedData)
                if not df.empty:
                    df['created_at']  = func.Function.parse_mixed_to_utc(df['created_at'])
                    df['lastupdate']  = func.Function.parse_mixed_to_utc(df['lastupdate'])
                    df.drop_duplicates(subset='user_pseudo_id', inplace=True)
                    df["userProperty"] = df["userProperty"].apply(lambda x: [x] if isinstance(x, dict) else x)
                    
                    #Bigquery task delete when match
                    bq = BigQuery()
                    bq.delete_data("customer-360-profile", f"client_{acc}", "user_temp")
                    bq.load_data_df(f"client_{acc}", "user_temp", df)
                    condition = "ON (ori.user_pseudo_id = temp.user_pseudo_id) "
                    bq.delete_when_match("customer-360-profile", f"client_{acc}", "user", f"client_{acc}", "user_temp", condition)
                    bq.load_data_df(f"client_{acc}", "user", df)

                    logging.info(f"MANUAL: Property {acc} update user successfully")
                else:
                    logging.info(f"MANUAL: Property {acc} df.empty")
        except Exception as e:
            logging.error(e)
            return {'status': 'error', 'message': f"{e}"}, 500
    

    batch_user_hourly_manual(property_id)

    try:
        def get_line_name(fb,property_id, user_pseudo_id, uid):
            lines = fb.db.reference().child(f"account/{property_id}/profile/{user_pseudo_id}/line").get()
            line_name = '-'
            for l in lines:
                context = lines[l]
                if context['id'] != uid:
                    continue
                else:
                    line_name = context.get('displayName', '-')
                    break
            
            return {
                        "user.user_pseudo_id": user_pseudo_id,
                        "user__user_property.key": "line_name",
                        "user__user_property.value": [line_name]
                    }

        def get_line_profile(fb,property_id, user_pseudo_id, uid):
            lines = fb.db.reference().child(f"account/{property_id}/profile/{user_pseudo_id}/line").get()
            line_profile = '-'
            for l in lines:
                context = lines[l]
                if context['id'] != uid:
                    continue
                else:
                    line_profile = context.get('pictureUrl', '-')
                    break
            
            return {
                        "user.user_pseudo_id": user_pseudo_id,
                        "user__user_property.key": "line_profile_url",
                        "user__user_property.value": [line_profile]
                    }
        
        def get_pgp_value(val, user_pseudo_id, key:str, private_key):
            pack = []
            if val and type(val) == list:
                for v in val:
                    decode_id = func.Key.pgp_decrypt(v,private_key)
                    pack.append(decode_id)
            
            return {
                        "user.user_pseudo_id": user_pseudo_id,
                        "user__user_property.key": key.replace("_PGP", ""),
                        "user__user_property.value": pack
                    }
        
        def safe_to_list(x):
            if isinstance(x, list):
                return x
            if isinstance(x, str):
                try:
                    parsed = ast.literal_eval(x)
                    if isinstance(parsed, list):
                        return parsed
                    return [parsed]
                except Exception:
                    return [x]
            return [x]
            
        def process_account(acc, fb, bq, private_key):
            """Process a single account end-to-end."""
            date_now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

            try:
                looker = LookerSDK(property_id=acc)

                # Query definitions
                query = {
                    "view": "user",
                    "fields": [
                        "user.user_pseudo_id",
                        "user__user_property.key",
                        "user__user_property.value",
                        "user.lastupdate_time"
                    ],
                    "filters": {
                        "user__user_property.key": "",
                        "user.lastupdate_time": "3 hour",
                        "user__user_property.value": ""
                    },
                    "model": f"c360_client_{acc}",
                    "limit": "-1"
                }

                query_keyword = {
                    "view": "user",
                    "fields": [
                        "message_labeling.user_pseudo_id",
                        "single_grouping.list_value"
                    ],
                    "filters": {"message_labeling.user_pseudo_id": "-NULL"},
                    "model": f"c360_client_{acc}",
                    "limit": "-1"
                }

                # --- Run Looker Queries
                data = looker.sdk.run_query(looker.sdk.create_query(query).id, result_format="json")
                data_kw = looker.sdk.run_query(looker.sdk.create_query(query_keyword).id, result_format="json")

                df = pd.DataFrame(json.loads(data))
                if df.empty:
                    return False
                df['user__user_property.value'] = df['user__user_property.value'].apply(json.loads)
                df['user__user_property.value'] = df['user__user_property.value'].apply(lambda x: list(set(x)))

                df_line = df[df['user__user_property.key'] == 'line']
                df_pgp = df[df['user__user_property.key'].str.contains('PGP')]
                df_else = df[df['user__user_property.key'].isin(['facebook', 'CustomerID'])]

                # --- Line name enrichment
                line_name = [
                    get_line_name(fb, acc, r['user.user_pseudo_id'], r['user__user_property.value'][0])
                    for _, r in df_line.iterrows()
                ]
                df_line_name = pd.DataFrame(line_name)

                # --- Line name enrichment
                line_profile_url = [
                    get_line_profile(fb, acc, r['user.user_pseudo_id'], r['user__user_property.value'][0])
                    for _, r in df_line.iterrows()
                ]
                df_line_profile = pd.DataFrame(line_profile_url)

                # --- PGP decryption
                pgp_value = [
                    get_pgp_value(r['user__user_property.value'], r['user.user_pseudo_id'], r['user__user_property.key'], private_key)
                    for _, r in df_pgp.iterrows()
                ]
                df_pgp_val = pd.DataFrame(pgp_value)

                # --- Combine dataframes
                df_concat = pd.concat([df_else, df_line, df_line_name, df_line_profile, df_pgp_val], ignore_index=True)
                df_concat.rename(columns={
                    "user.lastupdate_time": "lastupdate",
                    "user.user_pseudo_id": "user_pseudo_id",
                    "user__user_property.key": "key",
                    "user__user_property.value": "value"
                }, inplace=True)
                df_concat["value"] = df_concat["value"].apply(safe_to_list)
                # df_concat["lastupdate"] = date_now
                df_concat["lastupdate"] = pd.to_datetime(df_concat["lastupdate"])

                # --- Nest user properties
                df_nested = (
                    df_concat.groupby(["lastupdate", "user_pseudo_id"], as_index=False)
                    .apply(lambda g: pd.Series({
                        "lastupdate": g["lastupdate"].iloc[0],
                        "user_pseudo_id": g["user_pseudo_id"].iloc[0],
                        "user_property": g.apply(lambda r: {"key": r["key"], "value": r["value"]}, axis=1).tolist()
                    }))
                )

                # --- Keyword
                df_kw = pd.DataFrame(json.loads(data_kw)).rename(columns={
                    "message_labeling.user_pseudo_id": "user_pseudo_id",
                    "single_grouping.list_value": "keywords"
                })

                df_final = df_nested.merge(df_kw, how="left", on="user_pseudo_id")
                df_final.drop_duplicates(subset='user_pseudo_id', inplace=True)

                # --- BigQuery Upsert
                bq.delete_data("customer-360-profile", f"client_{acc}", "profile_explore_temp")
                bq.load_data_df(f"client_{acc}", "profile_explore_temp", df_final)
                bq.delete_when_match(
                    "customer-360-profile", f"client_{acc}", "profile_explore",
                    f"client_{acc}", "profile_explore_temp", "ON ori.user_pseudo_id = temp.user_pseudo_id "
                )
                bq.load_data_df(f"client_{acc}", "profile_explore", df_final)

                logging.info(f"[{acc}] profile_explore: Done")
                return acc, "ok", None

            except Exception as e:
                logging.error(f"[{acc}] profile_explore: {e}")
                return acc, "error", str(e)

        
        private_key = func.Key.load_key_from_env("private_key")
        # all_accounts = fb.db.reference().child("account").get(shallow=True)
        all_accounts = [property_id]

        results = []
        with ThreadPoolExecutor(max_workers=5) as executor:
            future_to_acc = {executor.submit(process_account, acc, fb, bq, private_key): acc for acc in all_accounts}

            for future in as_completed(future_to_acc):
                acc = future_to_acc[future]
                try:
                    result = future.result()
                    results.append(result)
                except Exception as e:
                    logging.error(f"[{acc}] Unexpected error: {e}")

        logging.info("All properties processed.")
        return {"status": "ok", "results": results}, 200

    except Exception as e:
        logging.error(f"Batch: [profile_explore] Error {e}")
        return {'status': 'error', 'message': f"{e}"}, 500