Files

165 lines
5.0 KiB
Python

import json
from statistics import mean, median
def _text_has_surrogates(value):
for char in value:
codepoint = ord(char)
if 0xD800 <= codepoint <= 0xDFFF:
return True
return False
def normalize_unicode_text(value):
if not isinstance(value, str) or not value:
return value
if not _text_has_surrogates(value):
return value
normalized = []
index = 0
while index < len(value):
codepoint = ord(value[index])
if 0xD800 <= codepoint <= 0xDBFF:
if index + 1 < len(value):
next_codepoint = ord(value[index + 1])
if 0xDC00 <= next_codepoint <= 0xDFFF:
combined = 0x10000 + ((codepoint - 0xD800) << 10) + (next_codepoint - 0xDC00)
normalized.append(chr(combined))
index += 2
continue
normalized.append("\uFFFD")
index += 1
continue
if 0xDC00 <= codepoint <= 0xDFFF:
normalized.append("\uFFFD")
index += 1
continue
normalized.append(value[index])
index += 1
return "".join(normalized)
def normalize_unicode_value(value):
if isinstance(value, str):
return normalize_unicode_text(value)
if isinstance(value, list):
normalized = None
for index, item in enumerate(value):
normalized_item = normalize_unicode_value(item)
if normalized is not None:
normalized.append(normalized_item)
continue
if normalized_item is not item:
normalized = list(value[:index])
normalized.append(normalized_item)
return normalized if normalized is not None else value
if isinstance(value, tuple):
normalized = None
for index, item in enumerate(value):
normalized_item = normalize_unicode_value(item)
if normalized is not None:
normalized.append(normalized_item)
continue
if normalized_item is not item:
normalized = list(value[:index])
normalized.append(normalized_item)
return tuple(normalized) if normalized is not None else value
if isinstance(value, dict):
normalized = None
for key, item in value.items():
normalized_key = normalize_unicode_text(key) if isinstance(key, str) else key
normalized_item = normalize_unicode_value(item)
if normalized is not None:
normalized[normalized_key] = normalized_item
continue
if normalized_key is not key or normalized_item is not item:
normalized = {}
for original_key, original_item in value.items():
if original_key == key:
break
normalized[original_key] = original_item
normalized[normalized_key] = normalized_item
return normalized if normalized is not None else value
return value
def parse_jsonish(value):
"""Parse nested JSON strings until a non-string value is reached."""
if not isinstance(value, str):
return normalize_unicode_value(value)
current = value
while isinstance(current, str):
text = current.strip()
if not text:
return normalize_unicode_text(current)
try:
parsed = json.loads(text)
except json.JSONDecodeError:
return normalize_unicode_text(current)
current = normalize_unicode_value(parsed) if "\\ud" in text else parsed
return current
def safe_int(value, default=0):
if value is None or value == "":
return default
try:
return int(value)
except (TypeError, ValueError):
return default
def safe_float(value, default=0.0):
if value is None or value == "":
return default
try:
return float(value)
except (TypeError, ValueError):
return default
def percentile(values, pct):
if not values:
return 0.0
ordered = sorted(values)
if len(ordered) == 1:
return float(ordered[0])
rank = pct * (len(ordered) - 1)
low = int(rank)
high = min(low + 1, len(ordered) - 1)
fraction = rank - low
return ordered[low] + (ordered[high] - ordered[low]) * fraction
def series_stats(values):
cleaned = [v for v in values if v is not None]
if not cleaned:
return {
"count": 0,
"min": 0,
"max": 0,
"mean": 0.0,
"median": 0.0,
"p90": 0.0,
}
return {
"count": len(cleaned),
"min": min(cleaned),
"max": max(cleaned),
"mean": mean(cleaned),
"median": median(cleaned),
"p90": percentile(cleaned, 0.9),
}
def safe_div(numerator, denominator):
if not denominator:
return 0.0
return numerator / denominator
def compact_json(data):
return normalize_unicode_text(json.dumps(data, ensure_ascii=False, separators=(",", ":")))