import json
from datetime import datetime, date, time
from decimal import Decimal
import pytz
from google.cloud import bigquery
from google.cloud import bigquery_storage
from google.oauth2 import service_account
from google.api_core.exceptions import Conflict
from google.api_core.exceptions import NotFound
from typing import Any, Dict, List, Optional
import pandas as pd
import logging
import json
import os
import re

logging.basicConfig(level=logging.INFO)

class BigQuery:
    def __init__(self, credential_env_name=None):
        self.project_id = os.environ.get('GCP_PROJECT')
        if credential_env_name:
            self.client = bigquery.Client(project=self.project_id, credentials=service_account.Credentials.from_service_account_info(json.loads(os.environ.get(credential_env_name))))
        else: 
            self.client = bigquery.Client(project=self.project_id)
        
        if credential_env_name:
            self.bigquery_storage = bigquery_storage.BigQueryReadClient(credentials=service_account.Credentials.from_service_account_info(json.loads(os.environ.get(credential_env_name))))
        else: 
            self.bigquery_storage = bigquery_storage.BigQueryReadClient()

    def get_query(self, query=''):
        data = self.client.query(query)
        obj = [dict(row) for row in data]
        return obj
    
    def get_query_df(self, query):
        data = self.client.query(query).to_dataframe(bqstorage_client=self.bigquery_storage)
        return data
    
    def load_data(self, target_table=None, data={}):

        load_job = self.client.insert_rows_json(target_table, json_rows=data)
        return load_job
    
    def load_data_df(self, dataset_name, table_name, data):
        """ Load data of type DataFrame to BigQuery Table
        
        Args:
            client (client): BigQuery client
            dataset_name (str): Dataset Name in BigQuery
            table_name (str): Table Name in BigQuery
            data (DataFrame): DataFrame to be inserted to a table in BigQuery
        """
        dataset = self.client.dataset(dataset_name)
        table_id = dataset.table(table_name)

        job_config = bigquery.LoadJobConfig()
        load_job = self.client.load_table_from_dataframe(data, table_id, job_config = job_config)
        return load_job.result()

    def delete_data(self, project_name, dataset_name, table_name):
        """ Empty a table in BigQuery
        
        Args:
            client (client): BigQuery client
            dataset_name (str): Dataset Name in BigQuery
            table_name (str): Table Name in BigQuery
        """

        delete_statement = (
        "TRUNCATE TABLE " + project_name + "." + dataset_name + "." + table_name + " "
        )
        query_job = self.client.query(delete_statement)
        query_job.result()
        return True
    
    def delete_version(self, property_id, table_id, version_key):

        delete_statement = f"""
        DELETE FROM `customer-360-profile.client_{property_id}.offline_{table_id}`
        WHERE version_key = "{version_key}"
        """
        query_job = self.client.query(delete_statement)
        query_job.result()
        return True
    
    def delete_table(self, project_name, dataset_name, table_name):
        """ Delete a table in BigQuery
        
        Args:
            project_name (str): Project Name in BigQuery
            dataset_name (str): Dataset Name in BigQuery
            table_name (str): Table Name in BigQuery
        """

        table_id = f"{project_name}.{dataset_name}.{table_name}"
        self.client.delete_table(table_id, not_found_ok=True)
        return True
        
        
    def delete_when_match(self, project_name, dataset_name_ori, table_name_ori,dataset_name_temp,table_name_temp,condition):
        """ Delete rows in main table when it is already existed

        Args:
            client (client): BigQuery client
            dataset_name_ori (str): Main Dataset Name in BigQuery
            table_name_ori (str): Main Table Name in BigQuery
            dataset_name_temp (str): Dataset Name in BigQuery for Temporary Storage
            table_name_temp (str): Table Name in BigQuery for Temporary Storage
            condition (str): "WHERE" condition to delete when it matches original columns
        """

        delete_statement = (
        "MERGE "+ project_name + "." + dataset_name_ori + "." + table_name_ori + " ori "
        "USING "+ project_name + "." + dataset_name_temp + "." + table_name_temp + " temp "
        + condition +
        "WHEN MATCHED THEN DELETE"
        )
        query_job = self.client.query(delete_statement)
        query_job.result()
        return True
    
    def insert_into_select_date(client, account_ids, account_id_column, project_from, project_to, table_from, table_to, date_column, start, end):
        """ Insert Data from table to another on specific condition"""
        
        insert_statement = (
        "INSERT INTO " + project_to + ".rda_analytics_temp." + table_to + " "
        "SELECT * FROM " + project_from + ".rda_analytics." + table_from  + " "
        "WHERE " + account_id_column + " IN " + account_ids + " "
        "AND " + date_column + " BETWEEN '" + start + "' AND '" + end + "' "
        )
        
        query_job = client.query(insert_statement)
        query_job.result()
        return True
    
    def get_bq_columns(self, dataset: str, table: str):
        client = bigquery.Client(project="customer-360-profile")
        table_ref = f"{dataset}.{table}"
        try:
            table_obj = client.get_table(table_ref)  # API call
        except:
            return []
        return [f.name for f in table_obj.schema]

    def getJourneyEvent(self, property_id=None, user_pseudo_id=None, start_date=None, end_date=None):
        """ Get Journey Event from BigQuery
        
        Returns:
            list: List of journey events
        """
        query = f"""
        SELECT * EXCEPT(pageId, source,eventId)
        FROM `customer-360-profile.client_{property_id}.event`
        WHERE user_pseudo_id = "{user_pseudo_id}"
        AND eventTimeStamp between DATETIME("{start_date}") AND DATETIME("{end_date}")
        ORDER BY eventTimeStamp
        """
        data = self.client.query(query)
        obj = [dict(row) for row in data]
        return obj

    def getSankeyData(self, property_id=None, start_date=None, end_date=None, look_back_start_date=None, look_back_end_date=None):
        query = f"""
        WITH focus_user AS (
        SELECT DISTINCT(user_pseudo_id) as user_pseudo_id
        FROM `customer-360-profile.client_{property_id}.event`
        WHERE eventTimeStamp between DATETIME("{start_date}") AND DATETIME("{end_date}")
        )

        SELECT 
            main.eventId,
            main.user_pseudo_id,
            main.eventName,
            main.eventTimeStamp,
            main.referral.ad_id,
            main.referral.ads_context_data.ad_title
        FROM `customer-360-profile.client_{property_id}.event` as main
        INNER JOIN
          focus_user ON main.user_pseudo_id = focus_user.user_pseudo_id
        WHERE eventTimeStamp between DATETIME("{look_back_start_date}") AND DATETIME("{look_back_end_date}")
        """
        data = self.client.query(query)
        obj = [dict(row) for row in data]
        return obj
    
    def getCustomerProfile(self, dataset_name, table_name, start_date=None, end_date=None):
        if start_date and end_date:
            query = f"""
            SELECT * FROM {dataset_name}.{table_name} 
            WHERE createdate BETWEEN '{start_date}' AND '{end_date}'
            """
        else:
            query = f"""
            SELECT * FROM {dataset_name}.{table_name}
            """
        data = self.client.query(query)
        obj = [dict(row) for row in data]
        return obj
    
    def getOverviewDashboard(self, property_id=None, start_date=None, end_date=None):
        query = f"""
        SELECT
            main.eventId,
            main.user_pseudo_id,
            main.eventName,
            main.eventTimeStamp,
            main.referral.ad_id,
            ep.value as message,
            main.referral.ads_context_data.ad_title
        FROM `customer-360-profile.client_{property_id}.event` as main
        LEFT JOIN UNNEST(eventProperty) as ep
            ON ep.key = "message"
        WHERE eventTimeStamp between DATETIME("{start_date}") AND DATETIME("{end_date}")
        ORDER BY eventTimeStamp;
        """
        data = self.client.query(query)
        obj = [dict(row) for row in data]
        return obj
    
    def create_dataset(self, dataset_id, location="asia-southeast1"):
        dataset_ref = bigquery.Dataset(f"{self.project_id}.{dataset_id}")
        dataset_ref.location = location

        try:
            dataset = self.client.create_dataset(dataset_ref)
            logging.info(f"Created dataset: {dataset.dataset_id}")
            return True
        except Exception as e:
            if "Already Exists" in str(e):
                logging.error(f"Dataset {dataset_id} already exists.")
            else:
                raise e
            return False
        
    def parse_schema_field(self,field):
        if field["type"] == "RECORD":
            return bigquery.SchemaField(
                name=field["name"],
                field_type=field["type"],
                mode=field.get("mode", "NULLABLE"),
                description=field.get("description"),
                fields=[self.parse_schema_field(f) for f in field.get("fields", [])]
            )
        else:
            return bigquery.SchemaField(
                name=field["name"],
                field_type=field["type"],
                mode=field.get("mode", "NULLABLE"),
                description=field.get("description")
            )

    def create_table_from_json_schema(self, dataset_id, table_id, schema_json, partition_field=None,cluster_fields=None):

        table_ref = f"{self.project_id}.{dataset_id}.{table_id}"
        # schema = [self.parse_schema_field(field) for field in schema_json]
        schema = [bigquery.SchemaField.from_api_repr(f) for f in schema_json]

        table = bigquery.Table(table_ref, schema=schema)

        if partition_field:
            table.time_partitioning = bigquery.TimePartitioning(
                type_=bigquery.TimePartitioningType.DAY,
                field=partition_field,
            )

        if cluster_fields:
            table.clustering_fields = cluster_fields

        try:
            table = self.client.create_table(table)
            print(f"✅ Created table: {table.table_id}")
        except Exception as e:
            if "Already Exists" in str(e):
                print(f"⚠️ Table {table_id} already exists.")
            else:
                raise e
            
    def get_schema_dataset_offline(self, property_id):
        query = f"""
        SELECT table_name, column_name, data_type
        FROM `client_{property_id}.INFORMATION_SCHEMA.COLUMNS`
        WHERE table_name LIKE 'offline_%'
        ORDER BY table_name, ordinal_position
        """
        df = self.client.query(query).to_dataframe()
        return df
    
    def get_schema_dataset_offline_name(self, property_id, table_name):
        query = f"""
        SELECT table_name, column_name, data_type, mode, description 
        FROM `client_{property_id}.INFORMATION_SCHEMA.COLUMNS`
        WHERE table_name = 'offline_{table_name}'
        ORDER BY table_name, ordinal_position
        """
        df = self.client.query(query).to_dataframe()
        return df
    
    def get_schema_dataset(self, property_id, table_name):
        table_ref = f"customer-360-profile.client_{property_id}.offline_{table_name}"
        schema_list = []
        try:
            table = self.client.get_table(table_ref)  # API call
            for f in table.schema:
                schema_list.append({
                    "name": f.name,
                    "type": f.field_type,
                    "mode": f.mode,
                    "description": f.description or ""
                })
        except NotFound:
            # Table doesn't exist — return empty schema_list
            pass
        return schema_list
    
    def _schemafield_to_bqjson(field: bigquery.SchemaField) -> dict:
        """
        Convert a google.cloud.bigquery.SchemaField into the JSON object that
        BigQuery shows when you copy/export schema in the UI:
        {
        "name": "...",
        "mode": "...",
        "type": "...",
        "description": "...",
        "fields": [ ...nested... ]
        }
        """
        # ensure exact key order: name, mode, type, description, fields
        obj = {
            "name": field.name,
            "mode": field.mode or "NULLABLE",
            "type": field.field_type,  # BigQuery types: STRING, RECORD, FLOAT, DATETIME, etc.
            "description": field.description or "",
            "fields": []
        }
        if field.field_type.upper() == "RECORD" and field.fields:
            obj["fields"] = [BigQuery._schemafield_to_bqjson(sub) for sub in field.fields]
        return obj

    def get_offline_table_schema_as_bqjson(self, property_id: str, table_name: str, *, project: str = "customer-360-profile") -> List[dict]:
        """
        Return the schema of a single table (by name) in BigQuery UI's "Copy as JSON" format.
        Example output item: {"name": "...", "mode": "NULLABLE", "type": "STRING", "description": "", "fields": []}

        Args:
            property_id: e.g. "9234725065" -> dataset is client_<property_id>
            table_name:  e.g. "offline_transaction" (no dataset or project prefix)
            project:     GCP project id (default: "customer-360-profile")

        Returns:
            List[dict]: top-level schema entries in the exact BQ export shape/order.
        """
        if "." in table_name or "`" in table_name:
            raise ValueError("table_name must be an unqualified name (e.g., 'offline_transaction').")

        dataset_id = f"client_{property_id}"
        full_table_id = f"{project}.{dataset_id}.{table_name}"

        table = self.client.get_table(full_table_id)
        return [BigQuery._schemafield_to_bqjson(f) for f in table.schema]
            
    def cast_dataframe_to_bq_schema(self, df: pd.DataFrame, schema: list) -> pd.DataFrame:
        """Casts DataFrame columns to match BigQuery schema, including REPEATED fields"""
        import ast
        import pandas as pd

        for field in schema:
            col = field.name
            dtype = field.field_type.upper()
            mode = field.mode.upper() if field.mode else "NULLABLE"

            if col not in df.columns:
                continue

            # REPEATED fields (arrays/lists)
            if mode == 'REPEATED':
                def to_list_safe(x):
                    if x is None or (isinstance(x, float) and pd.isna(x)):
                        return []
                    if isinstance(x, list):
                        return x
                    if isinstance(x, str) and x.startswith('[') and x.endswith(']'):
                        try:
                            return ast.literal_eval(x)
                        except (ValueError, SyntaxError):
                            return [x]
                    return [x]

                df[col] = df[col].apply(to_list_safe)

                if dtype == 'STRING':
                    df[col] = df[col].apply(lambda x: [str(i) for i in x])
                elif dtype == 'INTEGER':
                    df[col] = df[col].apply(lambda x: [int(i) if i is not None and not pd.isna(i) else None for i in x])
                elif dtype == 'FLOAT':
                    df[col] = df[col].apply(lambda x: [float(i) if i is not None and not pd.isna(i) else None for i in x])
                continue  # Skip scalar casting for repeated fields

            # Scalar (nullable) fields
            try:
                if dtype == 'STRING':
                    df[col] = df[col].astype('string')
                elif dtype == 'INTEGER':
                    df[col] = pd.to_numeric(df[col], errors='coerce').astype('Int64')
                elif dtype == 'FLOAT':
                    df[col] = pd.to_numeric(df[col], errors='coerce')
                elif dtype == 'BOOLEAN':
                    df[col] = df[col].astype('boolean')
                elif dtype == 'TIMESTAMP':
                    df[col] = pd.to_datetime(df[col], errors='coerce')
                elif dtype == 'DATE':
                    df[col] = pd.to_datetime(df[col], errors='coerce').dt.date
                elif dtype == 'DATETIME':
                    df[col] = pd.to_datetime(df[col], errors='coerce')
                elif dtype == 'RECORD':
                    # STRUCT/Nested fields: pass for now or handle recursively
                    pass
            except Exception as e:
                print(f"⚠️ Failed to cast column '{col}' to {dtype}: {e}")

        return df
    
    def get_all_channel_property(self, property_id):
        query = f"""
            SELECT distinct user_profile.channel 
            FROM `customer-360-profile.client_{property_id}.customer_profile`,
            UNNEST(user_profile) AS user_profile
        """
        data = self.client.query(query)
        channels = [row['channel'] for row in data]
        return channels

    def get_profile_by_property_id(self, property_id, existing_channel, limit=50):
        query = f"""
            SELECT * FROM (
            SELECT  
                user_pseudo_id,
                user_profile.channel,
                context.id AS context_id,
                createdate
            FROM `customer-360-profile.client_{property_id}.customer_profile`,
                UNNEST(user_profile) AS user_profile,
                UNNEST(user_profile.context) AS context
            )
            PIVOT (
            MAX(context_id) FOR channel IN ({existing_channel})
            )
            ORDER BY createdate DESC
            LIMIT {limit}
        """
        data = self.client.query(query)
        obj = [dict(row) for row in data]
        return obj
    
    def get_data_with_limit(self, property_id, table_name, limit=50):
        query = f"""
            SELECT * 
            FROM `customer-360-profile.{property_id}.{table_name}`
            LIMIT {limit}
        """
        df = self.client.query(query).to_dataframe()
        return df
    
    def get_ga4_user_data(self, ga4_property_id, date):
        query = f"""
        SELECT
            pseudo_user_id,user_info,device,geo,audiences,user_properties
        FROM `customer-360-profile.analytics_{ga4_property_id}.pseudonymous_users_{date.replace("-", "")}`
            WHERE ARRAY_LENGTH(user_properties) > 0;
        """
        try:
            data = self.client.query(query)
            obj = [dict(row) for row in data]
            return obj
        except NotFound:
            return None
    
    def get_distinct_ga4_user_property_name(self, ga4_property_id, date):
        query = f"""
        SELECT 
            distinct `user_properties`[SAFE_OFFSET(0)].value.user_property_name as user_property_name
        FROM `customer-360-profile.analytics_{ga4_property_id}.pseudonymous_users_{date.replace("-", "")}` 
        where `user_properties`[SAFE_OFFSET(0)].value.user_property_name is not null
        """
        data = self.client.query(query)
        obj = [dict(row) for row in data]
        return obj
    
    def json_to_bq_schema(fields_json):
        schema = []
        for f in fields_json:
            sub = BigQuery.json_to_bq_schema(f.get("fields", [])) if f.get("fields") else []
            schema.append(
                bigquery.SchemaField(
                    name=f["name"],
                    field_type=f.get("type", "STRING"),
                    mode=f.get("mode", "NULLABLE"),
                    description=f.get("description", ""),
                    fields=sub
                )
            )
        return schema

    def ensure_dataset(self, dataset_fqid: str, location: str = None):
        try:
            self.client.get_dataset(dataset_fqid)
        except NotFound:
            ds = bigquery.Dataset(dataset_fqid)
            if location:
                ds.location = location
            self.client.create_dataset(ds)

    def ensure_table(self, table_fqid: str, schema_json):
        try:
            self.client.get_table(table_fqid)
        except NotFound:
            schema = BigQuery.json_to_bq_schema(schema_json)
            tbl = bigquery.Table(table_fqid, schema=schema)
            self.client.create_table(tbl)
            
    def ensure_table_exist(self, table_id):
        try:
            return True
        except NotFound:
            return False
            
    def bq_count_rows(self, table_id: str, where: str | None = None,
                        exact: bool = False, timeout: int = 60) -> int:

        # Fast path: metadata count (exact for loaded data; streaming may lag)
        if not exact and where is None:
            table = self.client.get_table(table_id)  # API call
            return int(table.num_rows or 0)

        # Exact path: SQL count (respects WHERE)
        query = f"SELECT COUNT(*) AS cnt FROM `{table_id}`"
        if where:
            query += f" WHERE {where}"
        job = self.client.query(query)
        row = next(job.result(timeout=timeout))
        return int(row["cnt"])
    
    def bq_count_rows_with_condition(self, table_id: str, where: str | None = None):

        query = f"SELECT COUNT(*) AS cnt FROM `{table_id}`"
        if where:
            query += f" WHERE {where}"
        job = self.client.query(query)
        row = next(job.result())
        return int(row["cnt"])
    
    def bq_avg_column_with_condition(self, table_id: str, column_name: str, where: str | None = None):

        query = f"SELECT AVG({column_name}) AS average_column FROM `{table_id}`"
        if where:
            query += f" WHERE {where}"
        job = self.client.query(query)
        row = next(job.result())
        return int(row["average_column"])
    
    def query_transaction_with_condition(self, tablie_id, where: str):
        query = f"SELECT FORMAT_DATE('%F', DATE(receipt_date)) AS date, item_name AS product, total_price AS price FROM `{tablie_id}`"
        if where:
            query += f" WHERE {where}"
        job = self.client.query(query)
        rows = job.result()
        return [{k: row[k] for k in row.keys()} for row in rows]
    
    def query_bq_to_dataframe(self, query):
        df = self.client.query(query).to_dataframe()
        return df
    
    def table_exists(self, dataset: str, table: str) -> bool:
        try:
            self.client.get_table(f"customer-360-profile.{dataset}.{table}")
            return True
        except NotFound:
            return False
    
    NUMERIC_TYPES = {"INTEGER", "INT64", "FLOAT", "FLOAT64", "NUMERIC", "BIGNUMERIC"}
    
    def _dataset_for_property(self, property_id: str) -> str:
        return f"client_{property_id}"
    
    def _qualify_table_id(self, table_name: str, property_id: str) -> str:
        """
        Resolve to fully-qualified: project.client_{property_id}.table
        Accepts:
          - "table"                                 -> project.client_{pid}.table
          - "dataset.table" (must match client_{pid})
          - "project.dataset.table" (must match project + client_{pid})
        """
        clean = table_name.replace("`", "")
        parts = clean.split(".")
        ds_expected = self._dataset_for_property(property_id)

        if len(parts) == 1:
            # just "table"
            return f"{self.project_id}.{ds_expected}.{parts[0]}"

        if len(parts) == 2:
            ds, tbl = parts
            if ds != ds_expected:
                raise ValueError(f"dataset must be '{ds_expected}' for property_id={property_id}")
            return f"{self.project_id}.{ds}.{tbl}"

        if len(parts) == 3:
            prj, ds, tbl = parts
            if prj != self.project_id:
                # Allow cross-project if you really want; otherwise enforce:
                raise ValueError(f"project must be '{self.project_id}'")
            if ds != ds_expected:
                raise ValueError(f"dataset must be '{ds_expected}' for property_id={property_id}")
            return f"{prj}.{ds}.{tbl}"

        raise ValueError("table_name must be 'table', 'dataset.table', or 'project.dataset.table'")

    def _get_table(self, table_name: str, property_id: str):
        fq = self._qualify_table_id(table_name, property_id)
        return self.client.get_table(fq)

    @staticmethod
    def _walk_schema(fields, path_parts):
        nodes = []
        field_map = {f.name: f for f in fields}
        for i, part in enumerate(path_parts):
            f = field_map.get(part)
            if not f:
                prev = ".".join(path_parts[:i]) or "<root>"
                raise KeyError(f"Field '{part}' not found under '{prev}'")
            node = {
                "name": f.name,
                "type": (f.field_type or "").upper(),
                "mode": (f.mode or "NULLABLE").upper(),
                "fields": list(f.fields) if (f.field_type or "").upper() == "RECORD" else [],
                "is_repeated": (f.mode or "NULLABLE").upper() == "REPEATED",
            }
            nodes.append(node)
            if i < len(path_parts) - 1:
                if node["type"] != "RECORD":
                    raise TypeError(f"Path '{'.'.join(path_parts[:i+1])}' is not a RECORD (got {node['type']}).")
                field_map = {sf.name: sf for sf in f.fields}
        return nodes, nodes[-1]

    @staticmethod
    def _list_record_subfields(record_node, prefix):
        return [f"{prefix}.{sf.name}" for sf in record_node["fields"]]

    @staticmethod
    def _build_from_and_leaf_expr(fq_table_id: str, nodes):
        """
        Builds:
          FROM `fq_table_id` t [CROSS JOIN UNNEST(...) AS aN]*
        And returns (from_sql, leaf_expr) for the final scalar.
        """
        from_clauses = [f"`{fq_table_id}` t"]
        current_ref = "t"  # can be alias (t/aN) or a dotted chain (t.field.subfield)
        leaf_expr = None

        for i, n in enumerate(nodes):
            name = n["name"]
            # If this segment is repeated:
            if n["is_repeated"]:
                # If it's a repeated scalar: UNNEST(...) AS aN and that's the leaf value if scalar
                if n["type"] != "RECORD":
                    alias = f"a{i}"
                    from_clauses.append(f"UNNEST({current_ref}.{name}) AS {alias}")
                    leaf_expr = alias  # array<scalar> element
                    current_ref = alias  # for completeness
                else:
                    # repeated RECORD: UNNEST and keep alias to reach its children
                    alias = f"a{i}"
                    from_clauses.append(f"UNNEST({current_ref}.{name}) AS {alias}")
                    current_ref = alias
            else:
                # not repeated
                if n["type"] == "RECORD":
                    current_ref = f"{current_ref}.{name}"
                else:
                    # scalar field
                    leaf_expr = f"{current_ref}.{name}"
                    current_ref = leaf_expr

        if leaf_expr is None:
            # Shouldn't happen unless last node was a RECORD (handled earlier)
            leaf_expr = current_ref

        return ", ".join(from_clauses), leaf_expr

    def get_field_values(self, *, property_id: str, table_name: str, field_path: str, distinct_limit: int = 1000):
        """
        STRING -> distinct values
        NUMERIC -> min/max
        RECORD -> error + subfields
        """
        fq_table = self._qualify_table_id(table_name, property_id)
        table = self._get_table(table_name, property_id)

        parts = [p for p in field_path.split(".") if p]
        nodes, leaf = self._walk_schema(table.schema, parts)

        if leaf["type"] == "RECORD":
            return {
                "status": "error",
                "message": f"'{field_path}' is a RECORD. Please choose a subfield.",
                "available_subfields": self._list_record_subfields(leaf, field_path),
            }

        from_sql, leaf_expr = self._build_from_and_leaf_expr(fq_table, nodes)
        ftype = leaf["type"]

        try:
            if ftype == "STRING":
                sql = f"""
                    SELECT DISTINCT {leaf_expr} AS value
                    FROM {from_sql}
                    WHERE {leaf_expr} IS NOT NULL AND CAST({leaf_expr} AS STRING) != ''
                    ORDER BY value
                    LIMIT {int(distinct_limit)}
                """
                rows = self.get_query(sql)
                return {"status":"success","field":field_path,"type":"STRING",
                        "values":[r["value"] for r in rows],"count":len(rows)}

            elif ftype in self.NUMERIC_TYPES:
                sql = f"""
                    SELECT MIN({leaf_expr}) AS min_value, MAX({leaf_expr}) AS max_value
                    FROM {from_sql}
                    WHERE {leaf_expr} IS NOT NULL
                """
                rows = self.get_query(sql)
                row = rows[0] if rows else {"min_value": None, "max_value": None}
                return {"status":"success","field":field_path,"type":"NUMERIC",
                        "min":row["min_value"],"max":row["max_value"]}

            else:
                return {"status":"error",
                        "message": f"Unsupported field type '{ftype}' for '{field_path}'. "
                                   "Supported: STRING (distinct), numeric (min/max), or RECORD with subfield."}
        except Exception as e:
            logging.exception("BigQuery get_field_values failed")
            return {"status":"error","message":f"Query failed: {e}"}

class Table:
    def tableEvent():
        schema = [
            {
                "name": "eventId",
                "mode": "REQUIRED",
                "type": "STRING",
                "description": "",
                "fields": []
            },
            {
                "name": "eventTimeStamp",
                "mode": "REQUIRED",
                "type": "DATETIME",
                "description": "",
                "fields": []
            },
            {
                "name": "eventName",
                "mode": "REQUIRED",
                "type": "STRING",
                "description": "",
                "fields": []
            },
            {
                "name": "id",
                "mode": "NULLABLE",
                "type": "STRING",
                "description": "",
                "fields": []
            },
            {
                "name": "eventProperty",
                "mode": "REPEATED",
                "type": "RECORD",
                "description": "",
                "fields": [
                {
                    "name": "key",
                    "mode": "NULLABLE",
                    "type": "STRING",
                    "description": "",
                    "fields": []
                },
                {
                    "name": "value",
                    "mode": "NULLABLE",
                    "type": "STRING",
                    "description": "",
                    "fields": []
                }
                ]
            },
            {
                "name": "userProperty",
                "mode": "REPEATED",
                "type": "RECORD",
                "description": "",
                "fields": [
                {
                    "name": "key",
                    "mode": "NULLABLE",
                    "type": "STRING",
                    "description": "",
                    "fields": []
                },
                {
                    "name": "value",
                    "mode": "NULLABLE",
                    "type": "STRING",
                    "description": "",
                    "fields": []
                }
                ]
            },
            {
                "name": "referral",
                "mode": "NULLABLE",
                "type": "RECORD",
                "description": "Ad referral",
                "fields": [
                {
                    "name": "ref",
                    "mode": "NULLABLE",
                    "type": "STRING",
                    "description": "",
                    "fields": []
                },
                {
                    "name": "ad_id",
                    "mode": "NULLABLE",
                    "type": "STRING",
                    "description": "",
                    "fields": []
                },
                {
                    "name": "source",
                    "mode": "NULLABLE",
                    "type": "STRING",
                    "description": "",
                    "fields": []
                },
                {
                    "name": "type",
                    "mode": "NULLABLE",
                    "type": "STRING",
                    "description": "",
                    "fields": []
                },
                {
                    "name": "ads_context_data",
                    "mode": "NULLABLE",
                    "type": "RECORD",
                    "description": "",
                    "fields": [
                    {
                        "name": "ad_title",
                        "mode": "NULLABLE",
                        "type": "STRING",
                        "description": "",
                        "fields": []
                    },
                    {
                        "name": "photo_url",
                        "mode": "NULLABLE",
                        "type": "STRING",
                        "description": "",
                        "fields": []
                    },
                    {
                        "name": "video_url",
                        "mode": "NULLABLE",
                        "type": "STRING",
                        "description": "",
                        "fields": []
                    },
                    {
                        "name": "post_id",
                        "mode": "NULLABLE",
                        "type": "STRING",
                        "description": "",
                        "fields": []
                    },
                    {
                        "name": "product_id",
                        "mode": "NULLABLE",
                        "type": "STRING",
                        "description": "",
                        "fields": []
                    },
                    {
                        "name": "flow_id",
                        "mode": "NULLABLE",
                        "type": "STRING",
                        "description": "",
                        "fields": []
                    }
                    ]
                }
                ]
            },
            {
                "name": "pageId",
                "mode": "NULLABLE",
                "type": "STRING",
                "description": "",
                "fields": []
            },
            {
                "name": "user_pseudo_id",
                "mode": "REQUIRED",
                "type": "STRING",
                "description": "",
                "fields": []
            },
            {
                "name": "ref_user_pseudo_id",
                "mode": "NULLABLE",
                "type": "STRING",
                "description": "",
                "fields": []
            },
            {
                "name": "source",
                "mode": "NULLABLE",
                "type": "STRING",
                "description": "",
                "fields": []
            }
            ]
        
        return schema
    
    def tableEventOffline():
        schema = [
            {
                "name": "eventId",
                "mode": "REQUIRED",
                "type": "STRING",
                "description": "",
                "fields": []
            },
            {
                "name": "eventTimeStamp",
                "mode": "REQUIRED",
                "type": "DATETIME",
                "description": "",
                "fields": []
            },
            {
                "name": "eventName",
                "mode": "REQUIRED",
                "type": "STRING",
                "description": "",
                "fields": []
            },
            {
                "name": "id",
                "mode": "NULLABLE",
                "type": "STRING",
                "description": "",
                "fields": []
            },
            {
                "name": "eventProperty",
                "mode": "REPEATED",
                "type": "RECORD",
                "description": "",
                "fields": [
                {
                    "name": "key",
                    "mode": "NULLABLE",
                    "type": "STRING",
                    "description": "",
                    "fields": []
                },
                {
                    "name": "value",
                    "mode": "NULLABLE",
                    "type": "STRING",
                    "description": "",
                    "fields": []
                }
                ]
            },
            {
                "name": "userProperty",
                "mode": "REPEATED",
                "type": "RECORD",
                "description": "",
                "fields": [
                {
                    "name": "key",
                    "mode": "NULLABLE",
                    "type": "STRING",
                    "description": "",
                    "fields": []
                },
                {
                    "name": "value",
                    "mode": "NULLABLE",
                    "type": "STRING",
                    "description": "",
                    "fields": []
                }
                ]
            },
            {
                "name": "pageId",
                "mode": "NULLABLE",
                "type": "STRING",
                "description": "",
                "fields": []
            },
            {
                "name": "user_pseudo_id",
                "mode": "REQUIRED",
                "type": "STRING",
                "description": "",
                "fields": []
            },
            {
                "name": "ref_user_pseudo_id",
                "mode": "NULLABLE",
                "type": "STRING",
                "description": "",
                "fields": []
            },
            {
                "name": "source",
                "mode": "NULLABLE",
                "type": "STRING",
                "description": "",
                "fields": []
            }
        ]
        return schema
    
    def tableCustomerProfile():
        schema = [
            {
                "name": "createdate",
                "mode": "REQUIRED",
                "type": "DATETIME",
                "description": "",
                "fields": []
            },
            {
                "name": "lastupdate",
                "mode": "REQUIRED",
                "type": "DATETIME",
                "description": "",
                "fields": []
            },
            {
                "name": "user_pseudo_id",
                "mode": "REQUIRED",
                "type": "STRING",
                "description": "",
                "fields": []
            },
            {
                "name": "user_profile",
                "mode": "REPEATED",
                "type": "RECORD",
                "description": "",
                "fields": [
                {
                    "name": "channel",
                    "mode": "NULLABLE",
                    "type": "STRING",
                    "description": "",
                    "fields": []
                },
                {
                    "name": "context",
                    "mode": "REPEATED",
                    "type": "RECORD",
                    "description": "",
                    "fields": [
                    {
                        "name": "created_at",
                        "mode": "NULLABLE",
                        "type": "DATETIME",
                        "description": "",
                        "fields": []
                    },
                    {
                        "name": "updated_at",
                        "mode": "NULLABLE",
                        "type": "DATETIME",
                        "description": "",
                        "fields": []
                    },
                    {
                        "name": "id",
                        "mode": "NULLABLE",
                        "type": "STRING",
                        "description": "",
                        "fields": []
                    },
                    {
                        "name": "source",
                        "mode": "NULLABLE",
                        "type": "RECORD",
                        "description": "",
                        "fields": [
                        {
                            "name": "from",
                            "mode": "NULLABLE",
                            "type": "STRING",
                            "description": "",
                            "fields": []
                        },
                        {
                            "name": "id",
                            "mode": "NULLABLE",
                            "type": "STRING",
                            "description": "",
                            "fields": []
                        },
                        {
                            "name": "page_id",
                            "mode": "NULLABLE",
                            "type": "STRING",
                            "description": "",
                            "fields": []
                        }
                        ]
                    }
                    ]
                }
                ]
            }
        ]
        return schema
    
    def tableProfileMapping():
        schema = [
            {
                "name": "lastupdate",
                "mode": "REQUIRED",
                "type": "DATETIME",
                "description": "",
                "fields": []
            },
            {
                "name": "user_pseudo_id",
                "mode": "REQUIRED",
                "type": "STRING",
                "description": "",
                "fields": []
            },
            {
                "name": "unified_user_pseudo_id",
                "mode": "REQUIRED",
                "type": "STRING",
                "description": "",
                "fields": []
            }
            ]
        return schema
    
    def tebleUserProfile():
        schema = [
            {
                "name": "lastupdate",
                "mode": "NULLABLE",
                "type": "DATETIME",
                "description": "Last Update User Profile",
                "fields": []
            },
            {
                "name": "user_pseudo_id",
                "mode": "NULLABLE",
                "type": "STRING",
                "description": "User ID",
                "fields": []
            },
            {
                "name": "userProperty",
                "mode": "REPEATED",
                "type": "RECORD",
                "description": "User Property Context",
                "fields": [
                {
                    "name": "key",
                    "mode": "NULLABLE",
                    "type": "STRING",
                    "description": "User Property Key",
                    "fields": []
                },
                {
                    "name": "value",
                    "mode": "REPEATED",
                    "type": "STRING",
                    "description": "List of User Property Value",
                    "fields": []
                }
                ]
            },
            {
                "name": "created_at",
                "mode": "NULLABLE",
                "type": "DATETIME",
                "description": "",
                "fields": []
            }
            ]
        return schema
    
    def tableMessageLabeling():
        schema = [
            {"name":"eventId","mode":"NULLABLE","type":"STRING","description":"","fields":[]},
            {"name":"eventTimeStamp","mode":"NULLABLE","type":"TIMESTAMP","description":"","fields":[]},
            {"name":"eventName","mode":"NULLABLE","type":"STRING","description":"","fields":[]},
            {"name":"pageId","mode":"NULLABLE","type":"STRING","description":"","fields":[]},
            {"name":"pageName","mode":"NULLABLE","type":"STRING","description":"","fields":[]},
            {"name":"user_pseudo_id","mode":"NULLABLE","type":"STRING","description":"","fields":[]},
            {"name":"source","mode":"NULLABLE","type":"STRING","description":"","fields":[]},
            {"name":"user_id","mode":"NULLABLE","type":"STRING","description":"","fields":[]},
            {"name":"comment_id","mode":"NULLABLE","type":"STRING","description":"","fields":[]},
            {"name":"parent_comment_id","mode":"NULLABLE","type":"STRING","description":"","fields":[]},
            {"name":"post_id","mode":"NULLABLE","type":"STRING","description":"","fields":[]},
            {"name":"message","mode":"NULLABLE","type":"STRING","description":"","fields":[]},
            {
                "name":"labeling","mode":"REPEATED","type":"RECORD","description":"",
                "fields":[
                {
                    "name":"category","mode":"NULLABLE","type":"RECORD","description":"",
                    "fields":[
                    {"name":"category_name","mode":"NULLABLE","type":"STRING","description":"","fields":[]},
                    {
                        "name":"exclude","mode":"REPEATED","type":"RECORD","description":"",
                        "fields":[
                        {"name":"keyword","mode":"NULLABLE","type":"STRING","description":"","fields":[]},
                        {"name":"value","mode":"NULLABLE","type":"INTEGER","description":"","fields":[]}
                        ]
                    },
                    {
                        "name":"include","mode":"REPEATED","type":"RECORD","description":"",
                        "fields":[
                        {"name":"keyword","mode":"NULLABLE","type":"STRING","description":"","fields":[]},
                        {"name":"value","mode":"NULLABLE","type":"INTEGER","description":"","fields":[]}
                        ]
                    }
                    ]
                }
                ]
            },
            {
                "name":"single_grouping","mode":"REPEATED","type":"RECORD","description":"",
                "fields":[
                {"name":"key","mode":"NULLABLE","type":"STRING","description":"","fields":[]},
                {"name":"value","mode":"NULLABLE","type":"STRING","description":"","fields":[]}
                ]
            }
            ]
        return schema
    
    def tableMediaSingleData():
        schema = [
            {"name": "media_channel","mode": "NULLABLE","type": "STRING","description": "","fields": []},
            {"name": "date","mode": "NULLABLE","type": "DATE","description": "","fields": []},
            {"name": "week","mode": "NULLABLE","type": "INTEGER","description": "","fields": []},
            {"name": "month","mode": "NULLABLE","type": "INTEGER","description": "","fields": []},
            {"name": "year","mode": "NULLABLE","type": "INTEGER","description": "","fields": []},
            {"name": "account_name","mode": "NULLABLE","type": "STRING","description": "","fields": []},
            {"name": "account_id","mode": "NULLABLE","type": "STRING","description": "","fields": []},
            {"name": "campaign_name","mode": "NULLABLE","type": "STRING","description": "","fields": []},
            {"name": "campaign_id","mode": "NULLABLE","type": "STRING","description": "","fields": []},
            {"name": "adset_name","mode": "NULLABLE","type": "STRING","description": "","fields": []},
            {"name": "adset_id","mode": "NULLABLE","type": "STRING","description": "","fields": []},
            {"name": "ad_name","mode": "NULLABLE","type": "STRING","description": "","fields": []},
            {"name": "ad_id","mode": "NULLABLE","type": "STRING","description": "","fields": []},
            {"name": "match_type","mode": "NULLABLE","type": "STRING","description": "","fields": []},
            {"name": "ad_label","mode": "NULLABLE","type": "STRING","description": "","fields": []},
            {"name": "objectives","mode": "NULLABLE","type": "STRING","description": "","fields": []},
            {"name": "cost","mode": "NULLABLE","type": "FLOAT","description": "","fields": []},
            {"name": "impression","mode": "NULLABLE","type": "FLOAT","description": "","fields": []},
            {"name": "reach","mode": "NULLABLE","type": "FLOAT","description": "","fields": []},
            {"name": "frequency","mode": "NULLABLE","type": "FLOAT","description": "","fields": []},
            {"name": "clicks","mode": "NULLABLE","type": "FLOAT","description": "","fields": []},
            {"name": "link_clicks","mode": "NULLABLE","type": "FLOAT","description": "","fields": []},
            {"name": "view","mode": "NULLABLE","type": "FLOAT","description": "","fields": []},
            {"name": "video_play_action","mode": "NULLABLE","type": "FLOAT","description": "","fields": []},
            {"name": "video_view","mode": "NULLABLE","type": "FLOAT","description": "","fields": []},
            {"name": "video_view_25","mode": "NULLABLE","type": "FLOAT","description": "","fields": []},
            {"name": "video_view_50","mode": "NULLABLE","type": "FLOAT","description": "","fields": []},
            {"name": "video_view_75","mode": "NULLABLE","type": "FLOAT","description": "","fields": []},
            {"name": "video_view_100","mode": "NULLABLE","type": "FLOAT","description": "","fields": []},
            {"name": "post_engagement","mode": "NULLABLE","type": "FLOAT","description": "","fields": []},
            {"name": "post_reaction","mode": "NULLABLE","type": "FLOAT","description": "","fields": []},
            {"name": "post_comment","mode": "NULLABLE","type": "FLOAT","description": "","fields": []},
            {"name": "post_share","mode": "NULLABLE","type": "FLOAT","description": "","fields": []},
            {"name": "page_like","mode": "NULLABLE","type": "FLOAT","description": "","fields": []},
            {"name": "messages","mode": "NULLABLE","type": "FLOAT","description": "","fields": []},
            {"name": "meta_leads","mode": "NULLABLE","type": "FLOAT","description": "","fields": []},
            {"name": "meta_purchase","mode": "NULLABLE","type": "FLOAT","description": "","fields": []},
            {"name": "conversions","mode": "NULLABLE","type": "FLOAT","description": "","fields": []},
            {"name": "conversions_value","mode": "NULLABLE","type": "FLOAT","description": "","fields": []},
            {"name": "view_15sec_tiktok","mode": "NULLABLE","type": "FLOAT","description": "","fields": []}
        ]
        return schema
    
    def tableProfileExplore():
        return [
            {
                "name": "lastupdate",
                "mode": "REQUIRED",
                "type": "DATETIME",
                "description": "",
                "fields": []
            },
            {
                "name": "user_pseudo_id",
                "mode": "REQUIRED",
                "type": "STRING",
                "description": "",
                "fields": []
            },
            {
                "name": "user_property",
                "mode": "REPEATED",
                "type": "RECORD",
                "description": "",
                "fields": [
                {
                    "name": "key",
                    "mode": "REQUIRED",
                    "type": "STRING",
                    "description": "",
                    "fields": []
                },
                {
                    "name": "value",
                    "mode": "REPEATED",
                    "type": "STRING",
                    "description": "",
                    "fields": []
                }
                ]
            },
            {
                "name": "keywords",
                "mode": "REPEATED",
                "type": "STRING",
                "description": "",
                "fields": []
            }
        ]
    
class Schema:
    # Allowed BQ types
    BQ_STRING  = "STRING"
    BQ_INTEGER = "INTEGER"
    BQ_FLOAT   = "FLOAT"
    BQ_DATETIME= "DATETIME"

    _INT_RE   = re.compile(r"^[+-]?\d+$")
    _FLOAT_RE = re.compile(r"^[+-]?(?:\d+\.\d*|\.\d+|\d+)(?:[eE][+-]?\d+)?$")
    _ISO_DATE_RE = re.compile(r"^\d{4}-\d{2}-\d{2}$")
    _ISO_DT_RE   = re.compile(r"^\d{4}-\d{2}-\d{2}[T\s]\d{2}:\d{2}:\d{2}(\.\d+)?(Z|[+-]\d{2}:\d{2})?$")

    @staticmethod
    def _looks_datetime_str(s: str) -> bool:
        s = s.strip()
        return bool(Schema._ISO_DT_RE.match(s) or Schema._ISO_DATE_RE.match(s))

    @staticmethod
    def _infer_scalar_type(v: Any, parse_string_numbers=True, parse_string_datetimes=True) -> str:
        if v is None:
            return Schema.BQ_STRING
        if isinstance(v, bool):
            return Schema.BQ_STRING  # we only allow STRING/INTEGER/FLOAT/DATETIME
        if isinstance(v, int) and not isinstance(v, bool):
            return Schema.BQ_INTEGER
        if isinstance(v, float) or isinstance(v, Decimal):
            return Schema.BQ_FLOAT
        if isinstance(v, (datetime, date, time)):
            return Schema.BQ_DATETIME
        if isinstance(v, (bytes, bytearray)):
            return Schema.BQ_STRING
        if isinstance(v, str):
            s = v.strip()
            if parse_string_datetimes and Schema._looks_datetime_str(s):
                return Schema.BQ_DATETIME
            if parse_string_numbers and Schema._INT_RE.match(s):
                return Schema.BQ_INTEGER
            if parse_string_numbers and Schema._FLOAT_RE.match(s):
                return Schema.BQ_FLOAT
            return Schema.BQ_STRING
        if isinstance(v, dict):
            return Schema.BQ_STRING  # no RECORD type per your constraint
        if isinstance(v, (list, tuple)):
            return Schema.BQ_STRING  # handled at list level
        return Schema.BQ_STRING

    @staticmethod
    def _widen_types(a: str, b: str) -> str:
        if a == b:
            return a
        # numeric widening (INTEGER + FLOAT -> FLOAT)
        if {a, b} <= {Schema.BQ_INTEGER, Schema.BQ_FLOAT}:
            return Schema.BQ_FLOAT
        # any other conflict -> STRING
        return Schema.BQ_STRING

    @staticmethod
    def _infer_field_schema_from_values(
        name: str,
        values: List[Any],
        parse_string_numbers=True,
        parse_string_datetimes=True,
    ) -> Dict[str, Any]:
        mode = "NULLABLE"
        field_type: Optional[str] = None
        is_repeated = False
        elem_type: Optional[str] = None

        for v in values:
            if v is None:
                mode = "NULLABLE"
                continue

            if isinstance(v, (list, tuple)):
                is_repeated = True
                for el in v:
                    if el is None:
                        mode = "NULLABLE"
                        continue
                    if isinstance(el, (list, tuple, dict)):
                        et = Schema.BQ_STRING
                    else:
                        et = Schema._infer_scalar_type(el, parse_string_numbers, parse_string_datetimes)
                    elem_type = et if elem_type is None else Schema._widen_types(elem_type, et)
                continue

            if isinstance(v, dict):
                vt = Schema.BQ_STRING
            else:
                vt = Schema._infer_scalar_type(v, parse_string_numbers, parse_string_datetimes)

            field_type = vt if field_type is None else Schema._widen_types(field_type, vt)

        if is_repeated:
            final_type = elem_type or Schema.BQ_STRING
            return {"name": name, "type": final_type, "mode": "REPEATED"}

        final_type = field_type or Schema.BQ_STRING
        return {"name": name, "type": final_type, "mode": mode}

    @staticmethod
    def infer_bq_schema_lite(
        records: List[Dict[str, Any]],
        *,
        parse_string_numbers: bool = True,
        parse_string_datetimes: bool = True,
        sample_size: Optional[int] = None,
        force_string_fields: Optional[List[str]] = None,  # e.g. ["tel","phone"]
    ) -> List[Dict[str, Any]]:
        """
        Infer schema using ONLY: STRING, INTEGER, FLOAT, DATETIME.
        Arrays -> REPEATED of inferred element type (or STRING if mixed).
        Nested dicts -> STRING.
        """
        if not records:
            return []

        sample = records[:sample_size] if (sample_size and sample_size > 0) else records

        all_keys = set()
        for r in sample:
            if isinstance(r, dict):
                all_keys.update(r.keys())

        schema: List[Dict[str, Any]] = []
        for key in sorted(all_keys):
            values = [(r.get(key) if isinstance(r, dict) else None) for r in sample]
            field = Schema._infer_field_schema_from_values(
                key, values,
                parse_string_numbers=parse_string_numbers,
                parse_string_datetimes=parse_string_datetimes,
            )
            
            lname = (key or "").lower()
            if any(sub in lname for sub in ("phone", "tel", "id")):
                field["type"] = Schema.BQ_STRING
                
            if force_string_fields and key in force_string_fields:
                field["type"] = Schema.BQ_STRING
              
            schema.append(field)
        return schema
    
    def process_schema(
        schema: List[Dict[str, Any]],
        offline_mapping: Dict[str, List[str]],
    ) -> Dict[str, Any]:
        unique_names = {f.get("name") for f in schema if f.get("unique") is True}

        contact_vals = set(offline_mapping.get("contact_key", []))
        demo_vals    = set(offline_mapping.get("demographic_col", []))
        sens_vals    = set(offline_mapping.get("sensitive_data", []))
        time_vals    = set(offline_mapping.get("time_col", []))
        other_vals   = set(offline_mapping.get("internal_key", [])) | set(offline_mapping.get("other_key", []))

        sens_name_lc = {s.lower() for s in sens_vals}
        
        contact_key: Dict[str, str] = {}
        demographic_col: Dict[str, str] = {}
        sensitive_cols_set: set = set()
        other_key: Dict[str, str] = {}
        partition: Optional[str] = None

        def classify(mapping: str, col_name: str):
            nonlocal partition
            
            name_is_sensitive = col_name.lower() in sens_name_lc
            mapping_is_sensitive = mapping in sens_vals if mapping is not None else False

            if name_is_sensitive or mapping_is_sensitive:
                sensitive_cols_set.add(col_name)
            
            if mapping in contact_vals:
                contact_key[mapping] = col_name
            elif mapping in demo_vals:
                demographic_col[mapping] = col_name
            elif mapping in time_vals or mapping == "Datetime":
                partition = col_name
            elif mapping in other_vals:
                other_key[mapping] = col_name
            # else: unmapped

        def clean_field(f: Dict[str, Any]) -> Dict[str, Any]:
            out: Dict[str, Any] = {
                "name": f["name"],
                "type": f["type"],
                "mode": f.get("mode", "NULLABLE"),
            }
            desc = f.get("description")
            if desc:
                out["description"] = desc

            if f["type"].upper() == "RECORD":
                kids = f.get("fields", [])
                out["fields"] = [clean_field(c) for c in kids]
                if not out["fields"]:
                    raise ValueError(f"RECORD field '{f['name']}' has no 'fields' defined.")
            else:
                for k in ("precision", "scale", "maxLength", "policyTags"):
                    if k in f:
                        out[k] = f[k]
            return out

        final_schema: List[Dict[str, Any]] = []
        for f in schema:
            name = f.get("name")
            if not name:
                continue
            mapping = f.get("mapping")
            if mapping:
                classify(mapping, name)
            final_schema.append(clean_field(f))

        result: Dict[str, Any] = {
            "schema": final_schema,
            "unique_col": sorted(unique_names),
        }
        if contact_key:
            result["contact_key"] = contact_key
        if demographic_col:
            result["demographic_col"] = demographic_col
        if sensitive_cols_set:
            result["sensitive_data"] = sorted(sensitive_cols_set)
        if other_key:
            result["other_key"] = other_key
        if partition:
            result["partition"] = partition

        return result


    
    def augment_schema_with_mapping_and_unique(
        bq_schema: List[Dict[str, Any]],
        fb: Dict[str, Any],
    ) -> List[Dict[str, Any]]:
        """
        bq_schema: list of dicts with keys: name, type, mode, description
        fb: firebase doc like the one you showed
        returns: new schema list with optional 'mapping' and 'unique'
        """
        # 1) Build reverse map: column_name -> mapping_label
        reverse_map: Dict[str, str] = {}

        # known structural keys we should not treat as label→column
        reserved = {
            "createdate", "lastupdate", "status", "row_count",
            "date_column", "demographic_col", "other_key",
            "sensitive_data", "unique_col"
        }

        # a) top-level label→column pairs (e.g., "Phone": "tel", "Email": "email")
        for k, v in fb.items():
            if k in reserved:
                continue
            if isinstance(v, str) and v:      # looks like a mapping label -> column
                reverse_map[v] = k

        # b) nested maps: demographic_col, other_key
        for bucket in ("demographic_col", "other_key"):
            m = fb.get(bucket, {})
            if isinstance(m, dict):
                for label, col in m.items():
                    if isinstance(col, str) and col:
                        reverse_map[col] = label

        # c) date_column ⇒ "Datetime"
        date_col = fb.get("date_column")
        if isinstance(date_col, str) and date_col:
            reverse_map[date_col] = "Datetime"

        # 2) Unique columns
        unique_set = set(fb.get("unique_col", []) or [])

        # 3) Build augmented schema
        out: List[Dict[str, Any]] = []
        for f in bq_schema:
            name = f.get("name")
            item = {
                "name": name,
                "type": f.get("type"),
                "mode": f.get("mode"),
                "description": f.get("description", "") or "",
            }
            # add mapping if known
            if name in reverse_map:
                item["mapping"] = reverse_map[name]
            # add unique if it was flagged in Firebase
            if name in unique_set:
                item["unique"] = True
            out.append(item)

        return out
    
    