"""Parse a single instance."""
import datetime
import io
import itertools
import json
import zipfile
from collections import Counter, defaultdict
from enum import Enum, auto
from functools import cached_property
from pathlib import Path
from typing import BinaryIO
from zoneinfo import ZoneInfo
import stringcase
from lxml import etree # nosec: B410
from lxml.etree import _Element as Element # nosec: B410
from pydantic import BaseModel, field_validator
from ferc_xbrl_extractor.helpers import get_logger
[docs]
XBRL_INSTANCE = "http://www.xbrl.org/2003/instance"
[docs]
class Period(BaseModel):
"""Pydantic model that defines an XBRL period.
A period can be instantaneous or a duration of time. Instantaneous periods will
only have the end_date field, while duration periods will have start_date, and
end_date.
"""
[docs]
start_date: str | None = None
@classmethod
[docs]
def from_xml(cls, elem: Element) -> "Period":
"""Construct Period from XML element."""
instant = elem.find(f"{{{XBRL_INSTANCE}}}instant")
if instant is not None:
return cls(instant=True, end_date=instant.text)
return cls(
instant=False,
start_date=elem.find(f"{{{XBRL_INSTANCE}}}startDate").text,
end_date=elem.find(f"{{{XBRL_INSTANCE}}}endDate").text,
)
[docs]
class DimensionType(Enum):
"""Indicate dimension type.
XBRL contains explicit (all allowable values defined in taxonomy) and typed
(dimension with dynamic values) dimensions.
"""
[docs]
class Axis(BaseModel):
"""Pydantic model that defines an XBRL Axis.
Axes (or dimensions, terms are interchangeable in XBRL) are used for identifying
individual facts when the entity id, and period are insufficient. All axes will
be turned into columns, and be a part of the primary key for the table they
belong to.
"""
[docs]
dimension_type: DimensionType
@field_validator("name", mode="before")
@classmethod
[docs]
def strip_prefix(cls, name: str) -> str:
"""Strip XML prefix from name."""
return name.split(":")[1] if ":" in name else name
@classmethod
[docs]
def from_xml(cls, elem: Element) -> "Axis":
"""Construct Axis from XML element."""
if elem.tag.endswith("explicitMember"):
return cls(
name=elem.attrib["dimension"],
value=elem.text,
dimension_type=DimensionType.EXPLICIT,
)
if elem.tag.endswith("typedMember"):
dim = elem.getchildren()[0]
return cls(
name=elem.attrib["dimension"],
value=dim.text if dim.text else "",
dimension_type=DimensionType.TYPED,
)
# If dimension is not typed or explicit raise exception
raise ValueError("XBRL dimension not formatted correctly")
[docs]
class Entity(BaseModel):
"""Pydantic model that defines an XBRL Entity.
Entities are used to identify individual XBRL facts. An Entity should
contain a unique identifier, as well as any dimensions defined for a
table.
"""
@classmethod
[docs]
def from_xml(cls, elem: Element) -> "Entity":
"""Construct Entity from XML element."""
# Segment node contains dimensions prefixed with xbrldi
segment = elem.find(f"{{{XBRL_INSTANCE}}}segment")
dims = segment.findall("*") if segment is not None else []
return cls(
identifier=elem.find(f"{{{XBRL_INSTANCE}}}identifier").text,
dimensions=[Axis.from_xml(child) for child in dims],
)
@cached_property
[docs]
def snakecase_dimensions(self) -> list[str]:
"""Return list of dimension names in snakecase."""
return [stringcase.snakecase(dim.name) for dim in self.dimensions]
[docs]
def check_dimensions(self, primary_key: list[str]) -> bool:
"""Check if Context has extra axes not defined in primary key."""
return all(snake_dim in primary_key for snake_dim in self.snakecase_dimensions)
[docs]
class Context(BaseModel):
"""Pydantic model that defines an XBRL Context.
Contexts are used to provide useful background information for facts. The
context indicates the entity, time period, and any other dimensions which apply
to the fact.
"""
@classmethod
[docs]
def from_xml(cls, elem: Element) -> "Context":
"""Construct Context from XML element."""
return cls(
**{
"c_id": elem.attrib["id"],
"entity": Entity.from_xml(elem.find(f"{{{XBRL_INSTANCE}}}entity")),
"period": Period.from_xml(elem.find(f"{{{XBRL_INSTANCE}}}period")),
}
)
[docs]
def check_dimensions(self, primary_key: list[str]) -> bool:
"""Check if Context has extra axes not defined in primary key.
Facts missing axes from primary key can be treated as totals
across that axis, but facts with extra axes would not fit in
table.
Args:
primary_key: Primary key of table.
"""
return self.entity.check_dimensions(primary_key)
[docs]
def as_primary_key(self, filing_name: str, axes: list[str]) -> dict[str, str]:
"""Return a dictionary that represents the context as composite primary key."""
# Create dictionary mapping axis (column) name to value
axes_dict = {
stringcase.snakecase(axis.name): axis.value
for axis in self.entity.dimensions
}
axes_dict |= {axis: "total" for axis in axes if axis not in axes_dict}
# Get date based on period type
if self.period.instant:
date_dict = {"date": self.period.end_date}
else:
date_dict = {
# Ignore type because start_date will always be str if duration period
"start_date": self.period.start_date,
"end_date": self.period.end_date,
}
return {
"entity_id": self.entity.identifier,
"filing_name": filing_name,
**date_dict,
**axes_dict,
}
[docs]
def __hash__(self):
"""Just hash Context ID as it uniquely identifies contexts for an instance."""
return hash(self.c_id)
[docs]
class Fact(BaseModel):
"""Pydantic model that defines an XBRL Fact.
A fact is a single "data point", which contains a name, value, and a Context to
give background information.
"""
[docs]
value: str | None = None
@classmethod
[docs]
def from_xml(cls, elem: Element) -> "Fact":
"""Construct Fact from XML element."""
# Get prefix from namespace map to strip from fact name
prefix = f"{{{elem.nsmap[elem.prefix]}}}"
return cls(
name=stringcase.snakecase(elem.tag.replace(prefix, "")), # Strip prefix
c_id=elem.attrib["contextRef"],
value=elem.text,
)
# TODO (daz): use computed_field once we upgrade to Pydantic 2.x
[docs]
def f_id(self) -> str:
"""A unique identifier for the Fact.
There is an `id` attribute on most fact entries, but there are some
facts without an `id` attribute, so we can't use that. Instead we
assume that each fact is uniquely identified by its context ID and the
concept name.
NB, this is a function, not a property. This would be a property, but a
property is not pickleable within Pydantic 1.x
"""
return f"{self.c_id}:{self.name}"
[docs]
class Instance:
"""Class to encapsulate a parsed instance.
This class should be constructed using the InstanceBuilder class. Instance wraps
the contexts and facts parsed by the InstanceBuilder, and is used to construct
dataframes from fact tables.
"""
def __init__(
self,
contexts: dict[str, Context],
instant_facts: dict[str, list[Fact]],
duration_facts: dict[str, list[Fact]],
filing_name: str,
publication_time: datetime.datetime,
):
"""Construct Instance from parsed contexts and facts.
This will use a dictionary to map contexts and facts to the Axes defined for
the relevant contexts. This makes it easy to identify which facts might belong
in a specific fact table.
Args:
contexts: Dictionary mapping context ID to contexts.
instant_facts: Dictionary mapping concept name to list of instant facts.
duration_facts: Dictionary mapping concept name to list of duration facts.
filing_name: Name of parsed filing.
publication_time: the time at which the filing was made available online.
"""
self.logger = get_logger(__name__)
self.instant_facts = instant_facts
self.duration_facts = duration_facts
self.fact_id_counts = Counter(
f.f_id()
for f in itertools.chain.from_iterable(
(instant_facts | duration_facts).values()
)
)
self.total_facts = len(self.fact_id_counts)
self.duplicated_fact_ids = [
f_id
for f_id, _ in itertools.takewhile(
lambda c: c[1] >= 2, self.fact_id_counts.most_common()
)
]
if self.duplicated_fact_ids:
self.logger.debug(
f"Duplicated facts in {filing_name}: {self.duplicated_fact_ids}"
)
self.used_fact_ids: set[str] = set()
self.filing_name = filing_name
self.contexts = contexts
if "report_date" in duration_facts:
self.report_date = datetime.date.fromisoformat(
duration_facts["report_date"][0].value
)
else:
# FERC 714 workaround - though sometimes reports with different
# publish dates have the same certifying official date.
self.report_date = datetime.date.fromisoformat(
duration_facts["certifying_official_date"][0].value
)
self.publication_time = publication_time
[docs]
def get_facts(
self, instant: bool, concept_names: list[str], primary_key: list[str]
) -> dict[str, list[Fact]]:
"""Return a dictionary that maps Context ID's to a list of facts for each context.
Args:
instant: Get facts with instant or duration period.
concept_names: Name of concepts which map to a column name and name of facts.
primary_key: Name of columns in primary_key used to filter facts.
"""
period_fact_dict = self.instant_facts if instant else self.duration_facts
all_facts_for_concepts = itertools.chain.from_iterable(
period_fact_dict[concept_name] for concept_name in concept_names
)
return (
fact
for fact in all_facts_for_concepts
if self.contexts[fact.c_id].check_dimensions(primary_key)
)
[docs]
class InstanceBuilder:
"""Class to manage parsing XBRL filings."""
def __init__(
self,
file_info: str | BinaryIO,
name: str,
publication_time: datetime.datetime,
):
"""Construct InstanceBuilder class.
Args:
file_info: Either path to filing, or file data.
name: Name of filing.
publication_time: Time this filing was published.
"""
self.name = name
self.file = file_info
self.publication_time = publication_time
[docs]
def parse(self, fact_prefix: str = "ferc") -> Instance:
"""Parse a single XBRL instance using XML library directly.
This will return an Instance class which wraps the data parsed from the
filing in question.
Args:
fact_prefix: Prefix to identify facts in filing (defaults to 'ferc').
Returns:
context_dict: Dictionary of contexts in filing.
fact_dict: Dictionary of facts in filing.
filing_name: Name of filing.
"""
# Create parser to enable parsing 'huge' xml files
parser = etree.XMLParser(huge_tree=True)
# Check if instance contains path to file or file data and parse accordingly
tree = etree.parse(self.file, parser=parser) # noqa: S320
root = tree.getroot()
# Dictionary mapping context ID's to context structures
context_dict = {}
# Dictionary mapping context ID's to fact structures
# Allows looking up all facts with a specific context ID
instant_facts: dict[str, list[Fact]] = defaultdict(list)
duration_facts: dict[str, list[Fact]] = defaultdict(list)
# Find all contexts in XML file
contexts = root.findall(f"{{{XBRL_INSTANCE}}}context")
# Find all facts in XML file
facts = root.findall(f"{fact_prefix}:*", root.nsmap)
# Loop through contexts and parse into pydantic structures
for context in contexts:
new_context = Context.from_xml(context)
context_dict[new_context.c_id] = new_context
# Loop through facts and parse into pydantic structures
for fact in facts:
new_fact = Fact.from_xml(fact)
# Sort facts by period type
if new_fact.value is not None:
if context_dict[new_fact.c_id].period.instant:
instant_facts[new_fact.name].append(new_fact)
else:
duration_facts[new_fact.name].append(new_fact)
return Instance(
context_dict,
instant_facts,
duration_facts,
self.name,
publication_time=self.publication_time,
)
[docs]
def instances_from_zip(instance_path: Path | io.BytesIO) -> list[InstanceBuilder]:
"""Get list of instances from specified path to zipfile.
Args:
instance_path: Path to zipfile containing XBRL filings.
"""
allowable_suffixes = [".xbrl"]
archive = zipfile.ZipFile(instance_path)
with archive.open("rssfeed") as f:
filings_metadata = json.loads(f.read())
publication_times = {
get_filing_name(metadata): datetime.datetime.fromisoformat(
metadata["published_parsed"]
)
for metadata in itertools.chain.from_iterable(
e.values() for e in filings_metadata.values()
)
}
# Read files into in memory buffers to parse
return [
InstanceBuilder(
io.BytesIO(archive.open(filename).read()),
Path(filename).stem,
publication_time=publication_times[filename],
)
for filename in archive.namelist()
if Path(filename).suffix in allowable_suffixes
]
[docs]
def get_filing_name(filing_metadata: dict[str, str | int]) -> str:
"""Generate the filing filename based on its metadata, as seen in `rssfeed`.
This uses the same logic as `pudl_archiver.archivers.ferc.xbrl.archive_year`.
NOTE: the published time appears to be in America/New_York. We need to make the
archivers explictly use UTC everywhere, but until then we will force America/New_York
in this function.
"""
# TODO (daz): just put the expected filename in rssfeed also, so we don't
# have to reconstruct the name generation logic.
published_time = datetime.datetime.fromisoformat(
filing_metadata["published_parsed"]
).replace(tzinfo=ZoneInfo("America/New_York"))
return (
f"{filing_metadata['title']}_"
f"form{filing_metadata['ferc_formname'].split('_')[-1]}_"
f"{filing_metadata['ferc_period']}_"
f"{round(published_time.timestamp())}.xbrl".replace(" ", "_")
)
[docs]
def get_instances(instance_path: Path | io.BytesIO) -> list[InstanceBuilder]:
"""Get list of instances from specified path.
Args:
instance_path: Path to one or more XBRL filings.
"""
allowable_suffixes = [".xbrl"]
if isinstance(instance_path, io.BytesIO):
return instances_from_zip(instance_path)
if not instance_path.exists():
raise ValueError(f"Could not find XBRL instances at {instance_path}.")
if instance_path.suffix == ".zip":
return instances_from_zip(instance_path)
# Single instance
if instance_path.is_file():
instances = [instance_path]
# Directory of instances
else:
# Must be either a directory or file
assert instance_path.is_dir() # nosec: B101
instances = sorted(instance_path.iterdir())
return [
InstanceBuilder(str(instance), instance.name.rstrip(instance.suffix))
for instance in sorted(instances)
if instance.suffix in allowable_suffixes
]