Files
Juicepyter/clean-text-to-keywords/keyword_extractor.py
Louis Labeyrie 3cfb18be11 Narrow bare except handlers in keyword_extractor.py
Replace broad `except Exception` with `except ImportError` for the yake
import and `except (ValueError, TypeError)` for YAKE extraction. Add
logging so failures are no longer silently swallowed.

Fixes #4
2026-03-19 19:32:06 +01:00

254 lines
10 KiB
Python

"""Rule-based keyword extraction and normalization for Pokemon card generation."""
from __future__ import annotations
import logging
import math
import re
logger = logging.getLogger(__name__)
from dataclasses import dataclass, field
from typing import Any, Dict, Iterable, List, Mapping, Optional, Sequence, Set, Tuple
DEFAULT_NORMALIZATION_MAP: Dict[str, List[str]] = {
"normal": ["basic", "common", "regular", "plain", "normaltype"],
"fire": ["flame", "flames", "burn", "burning", "blaze", "fiery", "heat", "inferno"],
"water": ["wave", "ocean", "sea", "river", "aqua", "splash", "tidal"],
"grass": ["plant", "leaf", "forest", "nature", "vine", "seed", "flora"],
"flying": ["air", "wind", "sky", "wing", "wings", "flight", "soar"],
"fighting": ["punch", "kick", "strike", "martial", "combat", "brawl"],
"poison": ["toxic", "venom", "acid", "poisonous", "toxin"],
"electric": ["lightning", "thunder", "shock", "volt", "spark", "electricity"],
"ground": ["earth", "soil", "sand", "mud", "quake", "dust"],
"rock": ["stone", "boulder", "crystal", "rocky", "pebble"],
"psychic": ["mind", "mental", "telepathy", "psyonic", "brain", "illusion"],
"ice": ["freeze", "frozen", "snow", "frost", "blizzard", "icy"],
"bug": ["insect", "ant", "beetle", "spider", "crawler"],
"ghost": ["spirit", "phantom", "haunt", "shadow", "specter"],
"steel": ["metal", "iron", "armor", "blade", "alloy"],
"dragon": ["drake", "wyrm", "serpent", "legendary"],
"dark": ["shadow", "evil", "night", "doom", "darkness"],
"fairy": ["magic", "magical", "sparkle", "light", "charm"],
"explosion": ["explosive", "explode", "blast"],
}
DEFAULT_ALLOWED_POS: Tuple[str, ...] = ("NOUN", "ADJ", "VERB")
DEFAULT_IGNORED_KEYWORDS: Set[str] = {"preevolution", "pokmon"}
DEFAULT_POS_WEIGHTS: Dict[str, float] = {
"NOUN": 3.0,
"ADJ": 2.0,
"VERB": 1.0,
}
DEFAULT_KEEP_RATIO = 0.8
DEFAULT_MIN_KEYWORDS = 12
DEFAULT_MAX_KEYWORDS = 30
def _invert_normalization_map(normalization_map: Mapping[str, Iterable[str]]) -> Dict[str, str]:
"""Build synonym -> canonical mapping for O(1) normalization lookup."""
inverse: Dict[str, str] = {}
for canonical, synonyms in normalization_map.items():
canonical_normalized = canonical.strip().lower()
inverse[canonical_normalized] = canonical_normalized
for synonym in synonyms:
synonym_normalized = synonym.strip().lower()
if synonym_normalized:
inverse[synonym_normalized] = canonical_normalized
return inverse
def _tokenize_keyword_phrase(value: str) -> List[str]:
return re.findall(r"[a-z0-9]+", value.lower())
@dataclass
class KeywordExtractor:
"""Deterministic spaCy + YAKE + rule-based normalization pipeline."""
nlp: Any
normalization_map: Mapping[str, Iterable[str]] = field(default_factory=lambda: DEFAULT_NORMALIZATION_MAP)
allowed_pos: Sequence[str] = field(default_factory=lambda: DEFAULT_ALLOWED_POS)
ignored_keywords: Set[str] = field(default_factory=lambda: set(DEFAULT_IGNORED_KEYWORDS))
pos_weights: Mapping[str, float] = field(default_factory=lambda: DEFAULT_POS_WEIGHTS)
keep_ratio: float = DEFAULT_KEEP_RATIO
min_keywords: int = DEFAULT_MIN_KEYWORDS
max_keywords: int = DEFAULT_MAX_KEYWORDS
use_yake: bool = True
def __post_init__(self) -> None:
self._normalization_lookup = _invert_normalization_map(self.normalization_map)
self._allowed_pos_set = set(self.allowed_pos)
self._ignored_keywords = {keyword.lower().strip() for keyword in self.ignored_keywords}
self._pos_weight_lookup = {k.upper(): float(v) for k, v in self.pos_weights.items()}
@classmethod
def from_default_model(
cls,
model_name: str = "en_core_web_sm",
normalization_map: Optional[Mapping[str, Iterable[str]]] = None,
allowed_pos: Sequence[str] = DEFAULT_ALLOWED_POS,
ignored_keywords: Optional[Set[str]] = None,
pos_weights: Mapping[str, float] = DEFAULT_POS_WEIGHTS,
keep_ratio: float = DEFAULT_KEEP_RATIO,
min_keywords: int = DEFAULT_MIN_KEYWORDS,
max_keywords: int = DEFAULT_MAX_KEYWORDS,
use_yake: bool = True,
) -> "KeywordExtractor":
"""Initialize extractor with a spaCy English pipeline."""
try:
import spacy
nlp = spacy.load(model_name)
except OSError as exc:
raise OSError(
f"spaCy model '{model_name}' is not installed. "
"Run: python -m spacy download en_core_web_sm"
) from exc
except Exception as exc:
raise RuntimeError(
"spaCy could not be loaded in this Python environment. "
"Try Python 3.13 or lower, then install spaCy and en_core_web_sm."
) from exc
return cls(
nlp=nlp,
normalization_map=normalization_map or DEFAULT_NORMALIZATION_MAP,
allowed_pos=allowed_pos,
ignored_keywords=ignored_keywords or set(DEFAULT_IGNORED_KEYWORDS),
pos_weights=pos_weights,
keep_ratio=keep_ratio,
min_keywords=min_keywords,
max_keywords=max_keywords,
use_yake=use_yake,
)
def extract(self, text: str) -> List[str]:
"""Extract, normalize and rank keywords from already-cleaned text."""
if not text or not text.strip():
return []
doc = self.nlp(text)
# Step 1: POS filtering + lowercase lemma/token extraction.
raw_keywords: List[Tuple[str, str]] = []
for token in doc:
if token.is_stop or token.is_punct or token.pos_ not in self._allowed_pos_set:
continue
base = token.lemma_.lower().strip() if token.lemma_ and token.lemma_ != "-PRON-" else token.text.lower().strip()
if base and base not in self._ignored_keywords:
raw_keywords.append((base, token.pos_))
# Step 2: Deduplicate before domain normalization.
deduplicated: List[Tuple[str, str]] = []
seen_raw: Set[str] = set()
for keyword, pos in raw_keywords:
if keyword in seen_raw:
continue
seen_raw.add(keyword)
deduplicated.append((keyword, pos))
# Step 3: Normalize and deduplicate canonical forms.
unique_entries: List[Tuple[str, str, str, int]] = []
seen_normalized: Set[str] = set()
for index, (original_keyword, pos) in enumerate(deduplicated):
normalized_keyword = self._normalize_keyword(original_keyword)
if normalized_keyword in seen_normalized:
continue
seen_normalized.add(normalized_keyword)
unique_entries.append((original_keyword, normalized_keyword, pos, index))
if not unique_entries:
return []
if not self.use_yake:
return [normalized_keyword for _, normalized_keyword, _, _ in unique_entries]
# Step 4: YAKE scoring + conservative selection to preserve detail.
yake_scores = self._extract_yake_scores(text)
if not yake_scores:
return [normalized_keyword for _, normalized_keyword, _, _ in unique_entries]
ranked: List[Tuple[float, int, str]] = []
for original_keyword, normalized_keyword, pos, index in unique_entries:
score_candidates: List[float] = []
if original_keyword in yake_scores:
score_candidates.append(yake_scores[original_keyword])
if normalized_keyword in yake_scores:
score_candidates.append(yake_scores[normalized_keyword])
# Missing score is treated as moderately relevant to avoid over-pruning.
yake_penalty = min(score_candidates) if score_candidates else 0.45
pos_weight = self._pos_weight_lookup.get(pos.upper(), 1.0)
combined_score = (1.0 - yake_penalty) * pos_weight
ranked.append((combined_score, index, normalized_keyword))
target_count = self._compute_target_count(len(ranked))
ranked.sort(key=lambda item: (-item[0], item[1]))
selected = ranked[:target_count]
selected.sort(key=lambda item: item[1])
return [keyword for _, _, keyword in selected]
def _compute_target_count(self, total_keywords: int) -> int:
if total_keywords <= 0:
return 0
target = max(self.min_keywords, math.ceil(total_keywords * self.keep_ratio))
if self.max_keywords > 0:
target = min(target, self.max_keywords)
return min(target, total_keywords)
def _extract_yake_scores(self, text: str) -> Dict[str, float]:
try:
import yake
except ImportError:
logger.warning("yake not installed, skipping relevance scoring")
return {}
text_token_count = len(text.split())
top_n = max(20, min(80, text_token_count * 2))
try:
extractor = yake.KeywordExtractor(lan="en", n=2, dedupLim=0.9, top=top_n)
phrase_scores = extractor.extract_keywords(text)
except (ValueError, TypeError) as e:
logger.warning("YAKE extraction failed: %s", e)
return {}
token_scores: Dict[str, float] = {}
for phrase, score in phrase_scores:
for token in _tokenize_keyword_phrase(phrase):
existing = token_scores.get(token)
if existing is None or score < existing:
token_scores[token] = score
if not token_scores:
return {}
values = list(token_scores.values())
min_score = min(values)
max_score = max(values)
if math.isclose(min_score, max_score):
return {token: 0.5 for token in token_scores}
# Normalize so 0.0=most important and 1.0=least important.
return {
token: (score - min_score) / (max_score - min_score)
for token, score in token_scores.items()
}
def _normalize_keyword(self, keyword: str) -> str:
keyword_lower = keyword.lower()
return self._normalization_lookup.get(keyword_lower, keyword_lower)
def extract_keywords(
text: str,
extractor: Optional[KeywordExtractor] = None,
) -> List[str]:
"""Convenience API to extract keywords with default extractor config."""
active_extractor = extractor or KeywordExtractor.from_default_model()
return active_extractor.extract(text)