import json from statistics import mean, median def _text_has_surrogates(value): for char in value: codepoint = ord(char) if 0xD800 <= codepoint <= 0xDFFF: return True return False def normalize_unicode_text(value): if not isinstance(value, str) or not value: return value if not _text_has_surrogates(value): return value normalized = [] index = 0 while index < len(value): codepoint = ord(value[index]) if 0xD800 <= codepoint <= 0xDBFF: if index + 1 < len(value): next_codepoint = ord(value[index + 1]) if 0xDC00 <= next_codepoint <= 0xDFFF: combined = 0x10000 + ((codepoint - 0xD800) << 10) + (next_codepoint - 0xDC00) normalized.append(chr(combined)) index += 2 continue normalized.append("\uFFFD") index += 1 continue if 0xDC00 <= codepoint <= 0xDFFF: normalized.append("\uFFFD") index += 1 continue normalized.append(value[index]) index += 1 return "".join(normalized) def normalize_unicode_value(value): if isinstance(value, str): return normalize_unicode_text(value) if isinstance(value, list): normalized = None for index, item in enumerate(value): normalized_item = normalize_unicode_value(item) if normalized is not None: normalized.append(normalized_item) continue if normalized_item is not item: normalized = list(value[:index]) normalized.append(normalized_item) return normalized if normalized is not None else value if isinstance(value, tuple): normalized = None for index, item in enumerate(value): normalized_item = normalize_unicode_value(item) if normalized is not None: normalized.append(normalized_item) continue if normalized_item is not item: normalized = list(value[:index]) normalized.append(normalized_item) return tuple(normalized) if normalized is not None else value if isinstance(value, dict): normalized = None for key, item in value.items(): normalized_key = normalize_unicode_text(key) if isinstance(key, str) else key normalized_item = normalize_unicode_value(item) if normalized is not None: normalized[normalized_key] = normalized_item continue if normalized_key is not key or normalized_item is not item: normalized = {} for original_key, original_item in value.items(): if original_key == key: break normalized[original_key] = original_item normalized[normalized_key] = normalized_item return normalized if normalized is not None else value return value def parse_jsonish(value): """Parse nested JSON strings until a non-string value is reached.""" if not isinstance(value, str): return normalize_unicode_value(value) current = value while isinstance(current, str): text = current.strip() if not text: return normalize_unicode_text(current) try: parsed = json.loads(text) except json.JSONDecodeError: return normalize_unicode_text(current) current = normalize_unicode_value(parsed) if "\\ud" in text else parsed return current def safe_int(value, default=0): if value is None or value == "": return default try: return int(value) except (TypeError, ValueError): return default def safe_float(value, default=0.0): if value is None or value == "": return default try: return float(value) except (TypeError, ValueError): return default def percentile(values, pct): if not values: return 0.0 ordered = sorted(values) if len(ordered) == 1: return float(ordered[0]) rank = pct * (len(ordered) - 1) low = int(rank) high = min(low + 1, len(ordered) - 1) fraction = rank - low return ordered[low] + (ordered[high] - ordered[low]) * fraction def series_stats(values): cleaned = [v for v in values if v is not None] if not cleaned: return { "count": 0, "min": 0, "max": 0, "mean": 0.0, "median": 0.0, "p90": 0.0, } return { "count": len(cleaned), "min": min(cleaned), "max": max(cleaned), "mean": mean(cleaned), "median": median(cleaned), "p90": percentile(cleaned, 0.9), } def safe_div(numerator, denominator): if not denominator: return 0.0 return numerator / denominator def compact_json(data): return normalize_unicode_text(json.dumps(data, ensure_ascii=False, separators=(",", ":")))