249 lines
10 KiB
Python
249 lines
10 KiB
Python
"""Rule-based keyword extraction and normalization for Pokemon card generation."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import math
|
|
import re
|
|
from dataclasses import dataclass, field
|
|
from typing import Any, Dict, Iterable, List, Mapping, Optional, Sequence, Set, Tuple
|
|
|
|
DEFAULT_NORMALIZATION_MAP: Dict[str, List[str]] = {
|
|
"normal": ["basic", "common", "regular", "plain", "normaltype"],
|
|
"fire": ["flame", "flames", "burn", "burning", "blaze", "fiery", "heat", "inferno"],
|
|
"water": ["wave", "ocean", "sea", "river", "aqua", "splash", "tidal"],
|
|
"grass": ["plant", "leaf", "forest", "nature", "vine", "seed", "flora"],
|
|
"flying": ["air", "wind", "sky", "wing", "wings", "flight", "soar"],
|
|
"fighting": ["punch", "kick", "strike", "martial", "combat", "brawl"],
|
|
"poison": ["toxic", "venom", "acid", "poisonous", "toxin"],
|
|
"electric": ["lightning", "thunder", "shock", "volt", "spark", "electricity"],
|
|
"ground": ["earth", "soil", "sand", "mud", "quake", "dust"],
|
|
"rock": ["stone", "boulder", "crystal", "rocky", "pebble"],
|
|
"psychic": ["mind", "mental", "telepathy", "psyonic", "brain", "illusion"],
|
|
"ice": ["freeze", "frozen", "snow", "frost", "blizzard", "icy"],
|
|
"bug": ["insect", "ant", "beetle", "spider", "crawler"],
|
|
"ghost": ["spirit", "phantom", "haunt", "shadow", "specter"],
|
|
"steel": ["metal", "iron", "armor", "blade", "alloy"],
|
|
"dragon": ["drake", "wyrm", "serpent", "legendary"],
|
|
"dark": ["shadow", "evil", "night", "doom", "darkness"],
|
|
"fairy": ["magic", "magical", "sparkle", "light", "charm"],
|
|
"explosion": ["explosive", "explode", "blast"],
|
|
}
|
|
|
|
DEFAULT_ALLOWED_POS: Tuple[str, ...] = ("NOUN", "ADJ", "VERB")
|
|
DEFAULT_IGNORED_KEYWORDS: Set[str] = {"preevolution", "pokmon"}
|
|
DEFAULT_POS_WEIGHTS: Dict[str, float] = {
|
|
"NOUN": 3.0,
|
|
"ADJ": 2.0,
|
|
"VERB": 1.0,
|
|
}
|
|
DEFAULT_KEEP_RATIO = 0.8
|
|
DEFAULT_MIN_KEYWORDS = 12
|
|
DEFAULT_MAX_KEYWORDS = 30
|
|
|
|
|
|
def _invert_normalization_map(normalization_map: Mapping[str, Iterable[str]]) -> Dict[str, str]:
|
|
"""Build synonym -> canonical mapping for O(1) normalization lookup."""
|
|
inverse: Dict[str, str] = {}
|
|
for canonical, synonyms in normalization_map.items():
|
|
canonical_normalized = canonical.strip().lower()
|
|
inverse[canonical_normalized] = canonical_normalized
|
|
for synonym in synonyms:
|
|
synonym_normalized = synonym.strip().lower()
|
|
if synonym_normalized:
|
|
inverse[synonym_normalized] = canonical_normalized
|
|
return inverse
|
|
|
|
|
|
def _tokenize_keyword_phrase(value: str) -> List[str]:
|
|
return re.findall(r"[a-z0-9]+", value.lower())
|
|
|
|
|
|
@dataclass
|
|
class KeywordExtractor:
|
|
"""Deterministic spaCy + YAKE + rule-based normalization pipeline."""
|
|
|
|
nlp: Any
|
|
normalization_map: Mapping[str, Iterable[str]] = field(default_factory=lambda: DEFAULT_NORMALIZATION_MAP)
|
|
allowed_pos: Sequence[str] = field(default_factory=lambda: DEFAULT_ALLOWED_POS)
|
|
ignored_keywords: Set[str] = field(default_factory=lambda: set(DEFAULT_IGNORED_KEYWORDS))
|
|
pos_weights: Mapping[str, float] = field(default_factory=lambda: DEFAULT_POS_WEIGHTS)
|
|
keep_ratio: float = DEFAULT_KEEP_RATIO
|
|
min_keywords: int = DEFAULT_MIN_KEYWORDS
|
|
max_keywords: int = DEFAULT_MAX_KEYWORDS
|
|
use_yake: bool = True
|
|
|
|
def __post_init__(self) -> None:
|
|
self._normalization_lookup = _invert_normalization_map(self.normalization_map)
|
|
self._allowed_pos_set = set(self.allowed_pos)
|
|
self._ignored_keywords = {keyword.lower().strip() for keyword in self.ignored_keywords}
|
|
self._pos_weight_lookup = {k.upper(): float(v) for k, v in self.pos_weights.items()}
|
|
|
|
@classmethod
|
|
def from_default_model(
|
|
cls,
|
|
model_name: str = "en_core_web_sm",
|
|
normalization_map: Optional[Mapping[str, Iterable[str]]] = None,
|
|
allowed_pos: Sequence[str] = DEFAULT_ALLOWED_POS,
|
|
ignored_keywords: Optional[Set[str]] = None,
|
|
pos_weights: Mapping[str, float] = DEFAULT_POS_WEIGHTS,
|
|
keep_ratio: float = DEFAULT_KEEP_RATIO,
|
|
min_keywords: int = DEFAULT_MIN_KEYWORDS,
|
|
max_keywords: int = DEFAULT_MAX_KEYWORDS,
|
|
use_yake: bool = True,
|
|
) -> "KeywordExtractor":
|
|
"""Initialize extractor with a spaCy English pipeline."""
|
|
try:
|
|
import spacy
|
|
|
|
nlp = spacy.load(model_name)
|
|
except OSError as exc:
|
|
raise OSError(
|
|
f"spaCy model '{model_name}' is not installed. "
|
|
"Run: python -m spacy download en_core_web_sm"
|
|
) from exc
|
|
except Exception as exc:
|
|
raise RuntimeError(
|
|
"spaCy could not be loaded in this Python environment. "
|
|
"Try Python 3.13 or lower, then install spaCy and en_core_web_sm."
|
|
) from exc
|
|
|
|
return cls(
|
|
nlp=nlp,
|
|
normalization_map=normalization_map or DEFAULT_NORMALIZATION_MAP,
|
|
allowed_pos=allowed_pos,
|
|
ignored_keywords=ignored_keywords or set(DEFAULT_IGNORED_KEYWORDS),
|
|
pos_weights=pos_weights,
|
|
keep_ratio=keep_ratio,
|
|
min_keywords=min_keywords,
|
|
max_keywords=max_keywords,
|
|
use_yake=use_yake,
|
|
)
|
|
|
|
def extract(self, text: str) -> List[str]:
|
|
"""Extract, normalize and rank keywords from already-cleaned text."""
|
|
if not text or not text.strip():
|
|
return []
|
|
|
|
doc = self.nlp(text)
|
|
|
|
# Step 1: POS filtering + lowercase lemma/token extraction.
|
|
raw_keywords: List[Tuple[str, str]] = []
|
|
for token in doc:
|
|
if token.is_stop or token.is_punct or token.pos_ not in self._allowed_pos_set:
|
|
continue
|
|
|
|
base = token.lemma_.lower().strip() if token.lemma_ and token.lemma_ != "-PRON-" else token.text.lower().strip()
|
|
if base and base not in self._ignored_keywords:
|
|
raw_keywords.append((base, token.pos_))
|
|
|
|
# Step 2: Deduplicate before domain normalization.
|
|
deduplicated: List[Tuple[str, str]] = []
|
|
seen_raw: Set[str] = set()
|
|
for keyword, pos in raw_keywords:
|
|
if keyword in seen_raw:
|
|
continue
|
|
seen_raw.add(keyword)
|
|
deduplicated.append((keyword, pos))
|
|
|
|
# Step 3: Normalize and deduplicate canonical forms.
|
|
unique_entries: List[Tuple[str, str, str, int]] = []
|
|
seen_normalized: Set[str] = set()
|
|
for index, (original_keyword, pos) in enumerate(deduplicated):
|
|
normalized_keyword = self._normalize_keyword(original_keyword)
|
|
if normalized_keyword in seen_normalized:
|
|
continue
|
|
seen_normalized.add(normalized_keyword)
|
|
unique_entries.append((original_keyword, normalized_keyword, pos, index))
|
|
|
|
if not unique_entries:
|
|
return []
|
|
|
|
if not self.use_yake:
|
|
return [normalized_keyword for _, normalized_keyword, _, _ in unique_entries]
|
|
|
|
# Step 4: YAKE scoring + conservative selection to preserve detail.
|
|
yake_scores = self._extract_yake_scores(text)
|
|
if not yake_scores:
|
|
return [normalized_keyword for _, normalized_keyword, _, _ in unique_entries]
|
|
|
|
ranked: List[Tuple[float, int, str]] = []
|
|
for original_keyword, normalized_keyword, pos, index in unique_entries:
|
|
score_candidates: List[float] = []
|
|
if original_keyword in yake_scores:
|
|
score_candidates.append(yake_scores[original_keyword])
|
|
if normalized_keyword in yake_scores:
|
|
score_candidates.append(yake_scores[normalized_keyword])
|
|
|
|
# Missing score is treated as moderately relevant to avoid over-pruning.
|
|
yake_penalty = min(score_candidates) if score_candidates else 0.45
|
|
pos_weight = self._pos_weight_lookup.get(pos.upper(), 1.0)
|
|
combined_score = (1.0 - yake_penalty) * pos_weight
|
|
ranked.append((combined_score, index, normalized_keyword))
|
|
|
|
target_count = self._compute_target_count(len(ranked))
|
|
ranked.sort(key=lambda item: (-item[0], item[1]))
|
|
selected = ranked[:target_count]
|
|
selected.sort(key=lambda item: item[1])
|
|
|
|
return [keyword for _, _, keyword in selected]
|
|
|
|
def _compute_target_count(self, total_keywords: int) -> int:
|
|
if total_keywords <= 0:
|
|
return 0
|
|
|
|
target = max(self.min_keywords, math.ceil(total_keywords * self.keep_ratio))
|
|
if self.max_keywords > 0:
|
|
target = min(target, self.max_keywords)
|
|
return min(target, total_keywords)
|
|
|
|
def _extract_yake_scores(self, text: str) -> Dict[str, float]:
|
|
try:
|
|
import yake
|
|
except Exception:
|
|
return {}
|
|
|
|
text_token_count = len(text.split())
|
|
top_n = max(20, min(80, text_token_count * 2))
|
|
|
|
try:
|
|
extractor = yake.KeywordExtractor(lan="en", n=2, dedupLim=0.9, top=top_n)
|
|
phrase_scores = extractor.extract_keywords(text)
|
|
except Exception:
|
|
return {}
|
|
|
|
token_scores: Dict[str, float] = {}
|
|
for phrase, score in phrase_scores:
|
|
for token in _tokenize_keyword_phrase(phrase):
|
|
existing = token_scores.get(token)
|
|
if existing is None or score < existing:
|
|
token_scores[token] = score
|
|
|
|
if not token_scores:
|
|
return {}
|
|
|
|
values = list(token_scores.values())
|
|
min_score = min(values)
|
|
max_score = max(values)
|
|
|
|
if math.isclose(min_score, max_score):
|
|
return {token: 0.5 for token in token_scores}
|
|
|
|
# Normalize so 0.0=most important and 1.0=least important.
|
|
return {
|
|
token: (score - min_score) / (max_score - min_score)
|
|
for token, score in token_scores.items()
|
|
}
|
|
|
|
def _normalize_keyword(self, keyword: str) -> str:
|
|
keyword_lower = keyword.lower()
|
|
return self._normalization_lookup.get(keyword_lower, keyword_lower)
|
|
|
|
|
|
def extract_keywords(
|
|
text: str,
|
|
extractor: Optional[KeywordExtractor] = None,
|
|
) -> List[str]:
|
|
"""Convenience API to extract keywords with default extractor config."""
|
|
active_extractor = extractor or KeywordExtractor.from_default_model()
|
|
return active_extractor.extract(text)
|