Source code for kmlorm.core.querysets

"""
Django-style QuerySet implementation for KML elements.

This module provides the KMLQuerySet class that implements all the query
methods like filter(), exclude(), get(), etc. in a Django-compatible way.
"""

# pylint: disable=too-many-public-methods
import logging
import re
from typing import (
    TYPE_CHECKING,
    Any,
    Dict,
    Iterator,
    List,
    Optional,
    Union,
    Generic,
    TypeVar,
    overload,
)
from .exceptions import (
    KMLElementNotFound,
    KMLInvalidCoordinates,
    KMLMultipleElementsReturned,
    KMLQueryError,
    KMLValidationError,
)

if TYPE_CHECKING:
    from ..models.point import Coordinate
    from ..models.base import KMLElement


T = TypeVar("T", bound="KMLElement")


[docs] class KMLQuerySet(Generic[T]): """Typed QuerySet for KML elements. This class wraps a list of elements and implements the common query/filter operations. It is generic in the element type so callers receive precise T / Optional[T] / List[T] return types. Similar to Django's QuerySet. """
[docs] def __init__(self, elements: Optional[List[T]] = None) -> None: """ Initialize a QuerySet with a list of elements. Args: elements: List of KML elements to query. If None, starts empty. """ self._elements: List[T] = list(elements or []) self._ordered = False self._order_by_fields: List[str] = [] self._distinct = False
[docs] def __iter__(self) -> Iterator[T]: """Make QuerySet iterable.""" return iter(self._elements)
[docs] def __len__(self) -> int: """Return the number of elements in the QuerySet.""" return len(self._elements)
@overload def __getitem__(self, key: int) -> T: ... @overload def __getitem__(self, key: slice) -> "KMLQuerySet[T]": ...
[docs] def __getitem__(self, key: Union[int, slice]) -> Union[T, "KMLQuerySet[T]"]: """ Support indexing and slicing of QuerySet. Args: key: Index or slice object Returns: Single element for index, new QuerySet for slice """ if isinstance(key, int): return self._elements[key] if isinstance(key, slice): return self.__class__(self._elements[key]) raise TypeError("QuerySet indices must be integers or slices")
[docs] def __bool__(self) -> bool: """Return True if QuerySet has any elements.""" return bool(self._elements) or False
[docs] def __repr__(self) -> str: """Developer-friendly representation.""" return f"<KMLQuerySet [{len(self._elements)} elements]>"
@property def elements(self) -> List[T]: """ Returns the list of KMLElement objects associated with this queryset. Returns: Optional[List["KMLElement"]]: A list of KMLElement instances if available, otherwise None. """ return self._elements @elements.setter def elements(self, value: Optional[List[T]]) -> None: """ Sets the elements of the queryset, ensuring all items are instances of KMLElement. Args: value: An iterable containing KMLElement instances to assign, or None to do nothing. Raises: TypeError: If any item in `value` is not an instance of KMLElement. Returns: None """ # Setting None should do nothing (preserves existing elements) if value is None: return # pylint: disable=import-outside-toplevel from ..models.base import KMLElement as _KMLElement for item in value: if not isinstance(item, _KMLElement): raise TypeError("Elements assignment must contain only KMLElement instances.") self._elements = value.copy()
[docs] def all(self) -> "KMLQuerySet[T]": """ Return a copy of this QuerySet. This method exists for Django compatibility and returns a new QuerySet with the same elements. """ return self.__class__(self._elements.copy())
[docs] def filter(self, **kwargs: Any) -> "KMLQuerySet[T]": """ Filter elements based on field lookups. Supports Django-style field lookups like: - name__icontains='capital' - coordinates__latitude__gte=39.0 - visibility=True Args: **kwargs: Field lookup expressions Returns: New QuerySet with filtered elements """ filtered_elements = [] for element in self._elements: if self._matches_filters(element, kwargs): filtered_elements.append(element) new_qs = self.__class__(filtered_elements) new_qs.is_ordered = self.is_ordered new_qs.order_by_fields = self.order_by_fields return new_qs
@property def is_ordered(self) -> bool: """ Returns whether the queryset is ordered. Returns: bool: True if the queryset has an ordering applied, False otherwise. """ return self._ordered @is_ordered.setter def is_ordered(self, value: bool) -> None: """ Sets whether the queryset should be ordered. Args: value (bool): If True, the queryset will be ordered; if False, it will not be ordered. """ self._ordered = value @property def order_by_fields(self) -> List[str]: """ Returns the list of field names used to determine the ordering of query results. Returns: List[str]: A list of strings representing the field names by which the results are ordered. """ return self._order_by_fields @order_by_fields.setter def order_by_fields(self, value: List[str]) -> None: """ Sets the fields by which query results should be ordered. Args: value (List[str]): A list of field names to order the query results by. """ self._order_by_fields = value.copy() @property def is_distinct(self) -> bool: """ Returns whether the queryset is marked to return distinct results. Returns: bool: True if the queryset is set to return distinct results, False otherwise. """ return bool(self._distinct) @is_distinct.setter def is_distinct(self, value: bool) -> None: """ Sets whether the query should return distinct results. Args: value (bool): If True, the query will return only distinct results; otherwise, duplicates may be included. Returns: None """ self._distinct = bool(value)
[docs] def exclude(self, **kwargs: Any) -> "KMLQuerySet[T]": """ Exclude elements that match the given filters. This is the inverse of filter() - returns elements that do NOT match the criteria. Args: **kwargs: Field lookup expressions Returns: New QuerySet with non-matching elements """ filtered_elements = [] for element in self._elements: if not self._matches_filters(element, kwargs): filtered_elements.append(element) new_qs = self.__class__(filtered_elements) new_qs.is_ordered = self.is_ordered new_qs.order_by_fields = self.order_by_fields return new_qs
[docs] def get(self, **kwargs: Any) -> "T": """ Get a single element that matches the given criteria. Args: **kwargs: Field lookup expressions Returns: Single matching KML element Raises: KMLElementNotFound: If no elements match KMLMultipleElementsReturned: If multiple elements match """ filtered = self.filter(**kwargs) if len(filtered) == 0: element_type = self._elements[0].__class__.__name__ if self._elements else "KMLElement" raise KMLElementNotFound(element_type, kwargs) if len(filtered) > 1: element_type = filtered.elements[0].__class__.__name__ raise KMLMultipleElementsReturned(element_type, len(filtered), kwargs) return filtered.elements[0]
[docs] def first(self) -> Optional[T]: """ Get the first element in the QuerySet. Returns: First element or None if QuerySet is empty """ return self._elements[0] if self._elements else None
[docs] def last(self) -> Optional[T]: """ Get the last element in the QuerySet. Returns: Last element or None if QuerySet is empty """ return self._elements[-1] if self._elements else None
[docs] def count(self) -> int: """ Return the number of elements in the QuerySet. Returns: Number of elements """ return len(self._elements)
[docs] def exists(self) -> bool: """ Check if the QuerySet contains any elements. Returns: True if QuerySet has elements, False otherwise """ return bool(self._elements)
[docs] def none(self) -> "KMLQuerySet[T]": """ Return an empty QuerySet. Returns: Empty QuerySet of the same type """ return self.__class__([])
[docs] def order_by(self, *fields: str) -> "KMLQuerySet[T]": """ Order elements by the given fields. Supports field names with optional '-' prefix for descending order. Args: *fields: Field names to order by (e.g., 'name', '-visibility') Returns: New ordered QuerySet """ if not fields: return self.all() new_qs = self.all() new_qs.is_ordered = True new_qs.order_by_fields = list(fields) # Sort elements by each field (in reverse order for stable sorting) for field in reversed(fields): reverse = field.startswith("-") clean_field = field.lstrip("-") try: def _key_fn(elem: T, cf: str = clean_field) -> Any: return self._get_field_value(elem, cf) new_qs.elements.sort(key=_key_fn, reverse=reverse) except AttributeError as ae: raise KMLQueryError(f"Cannot order by field '{clean_field}'", clean_field) from ae return new_qs
[docs] def reverse(self) -> "KMLQuerySet[T]": """ Reverse the order of elements. Returns: New QuerySet with reversed element order """ new_qs = self.all() new_qs.elements.reverse() return new_qs
[docs] def distinct(self) -> "KMLQuerySet[T]": """ Remove duplicate elements. Returns: New QuerySet with unique elements only """ seen = set() unique_elements = [] for element in self._elements: # Use id if available, otherwise use object id key = element.id if element.id else id(element) if key not in seen: seen.add(key) unique_elements.append(element) new_qs = self.__class__(unique_elements) new_qs.is_distinct = True new_qs.is_ordered = self.is_ordered new_qs.order_by_fields = self._order_by_fields.copy() return new_qs
[docs] def values(self, *fields: str) -> List[Dict[str, Any]]: """ Return a list of dictionaries with specified field values. Args: *fields: Field names to include in dictionaries Returns: List of dictionaries with field values """ if not fields: # Return all fields return [element.to_dict() for element in self._elements] result = [] for element in self._elements: item = {} for field in fields: try: item[field] = self._get_field_value(element, field) except AttributeError: item[field] = None result.append(item) return result
[docs] def values_list(self, *fields: str, flat: bool = False) -> List[Any]: """ Return a list of tuples with specified field values. Args: *fields: Field names to include in tuples flat: If True and only one field, return flat list of values Returns: List of tuples (or flat list if flat=True and one field) """ if flat and len(fields) != 1: raise ValueError("values_list() with flat=True requires exactly one field") result = [] for element in self._elements: values = [] for field in fields: try: values.append(self._get_field_value(element, field)) except AttributeError: values.append(None) if flat: result.append(values[0]) else: result.append(tuple(values)) return result
# Geospatial-specific methods
[docs] def near( self, longitude: float, latitude: float, radius_km: Optional[float] = None ) -> "KMLQuerySet[T]": """ Filter elements within a radius of given coordinates. Args: longitude: Center longitude (-180 to 180) latitude: Center latitude (-90 to 90) radius_km: Radius in kilometers (if None, no distance filtering) Returns: New QuerySet with nearby elements """ # pylint: disable=import-outside-toplevel from ..models.point import Coordinate from ..spatial.calculations import ( SpatialCalculations, ) center = Coordinate(longitude=longitude, latitude=latitude, altitude=0.0) if radius_km is None: return self.all() filtered_elements = [] for element in self._elements: try: coords = self._point_coords(element) if coords: distance = SpatialCalculations.distance_between(center, coords) if distance is not None and distance <= radius_km: filtered_elements.append(element) except (ValueError, TypeError): continue return self.__class__(filtered_elements)
[docs] def within_bounds( self, north: float, south: float, east: float, west: float ) -> "KMLQuerySet[T]": """ Filter elements within a bounding box. Args: north: Northern boundary (max latitude) south: Southern boundary (min latitude) east: Eastern boundary (max longitude) west: Western boundary (min longitude) Returns: New QuerySet with elements in bounds """ # Validate bounds if not -90 <= south <= north <= 90: raise KMLInvalidCoordinates("Invalid latitude bounds") if not -180 <= west <= 180 and -180 <= east <= 180: raise KMLInvalidCoordinates("Invalid longitude bounds") filtered_elements = [] for element in self._elements: try: coords = self._point_coords(element) if coords: elem_lon, elem_lat = coords.longitude, coords.latitude # Handle longitude wraparound if west <= east: # Normal case lon_in_bounds = west <= elem_lon <= east else: # Crosses 180° meridian lon_in_bounds = elem_lon >= west or elem_lon <= east if south <= elem_lat <= north and lon_in_bounds: filtered_elements.append(element) except (ValueError, TypeError): continue return self.__class__(filtered_elements)
[docs] def has_coordinates(self) -> "KMLQuerySet[T]": """ Filter elements that have coordinate data. Returns: New QuerySet with elements that have coordinates """ filtered_elements = [] for element in self._elements: try: coords = self._point_coords(element) if coords: filtered_elements.append(element) except (ValueError, TypeError): continue return self.__class__(filtered_elements)
[docs] def valid_coordinates(self) -> "KMLQuerySet[T]": """ Filter elements with valid coordinate ranges. Uses the Coordinate class validation to ensure consistency with the authoritative coordinate validation logic. Returns: New QuerySet with elements having valid coordinates """ filtered_elements = [] for element in self._elements: try: coords = self._point_coords(element) if coords and coords.longitude is not None and coords.latitude is not None: # Import here to avoid circular imports from ..models.point import Coordinate # pylint: disable=import-outside-toplevel # Create a new Coordinate to validate ranges using the authoritative logic # This replaces manual range checks with the same validation used elsewhere try: altitude = getattr(coords, "altitude", 0.0) Coordinate( longitude=coords.longitude, latitude=coords.latitude, altitude=altitude ) filtered_elements.append(element) except KMLValidationError: # Invalid coordinate ranges - skip this element continue except (ValueError, TypeError, AttributeError): # Problems extracting coordinates - skip this element continue return self.__class__(filtered_elements)
# Helper methods def _matches_filters(self, element: T, filters: Dict[str, Any]) -> bool: """ Check if an element matches all the given filters. Args: element: KML element to check filters: Dictionary of field lookups Returns: True if element matches all filters """ for lookup, value in filters.items(): if not self._matches_single_filter(element, lookup, value): return False return True def _matches_single_filter(self, element: T, lookup: str, value: Any) -> bool: """ Check if an element matches a single filter. Args: element: KML element to check lookup: Field lookup string (e.g., 'name__icontains') value: Value to match against Returns: True if element matches the filter """ # Parse field lookup parts = lookup.split("__") field_name = parts[0] lookup_type = parts[1] if len(parts) > 1 else "exact" # Get field value try: field_value = self._get_field_value(element, field_name) except AttributeError: return False # Apply lookup type return self._apply_lookup(field_value, lookup_type, value) def _get_field_value(self, element: T, field_path: str) -> Any: """ Get a field value from an element, supporting nested field access. Args: element: KML element field_path: Field name or path (e.g., 'name' or 'coordinates.latitude') Returns: Field value Raises: AttributeError: If field doesn't exist """ parts = field_path.split(".") current = element for part in parts: if hasattr(current, part): current = getattr(current, part) else: raise AttributeError(f"'{element.__class__.__name__}' has no attribute '{part}'") return current def _apply_lookup(self, field_value: Any, lookup_type: str, filter_value: Any) -> bool: """ Apply a lookup type to compare field value with filter value. Args: field_value: Value from the element field lookup_type: Lookup type (e.g., 'exact', 'icontains', 'gte') filter_value: Value to compare against Returns: True if lookup matches """ # pylint: disable=too-many-return-statements, too-many-branches, too-many-public-methods if field_value is None: return lookup_type == "isnull" and filter_value # String lookups if lookup_type == "exact": return bool(field_value == filter_value) if lookup_type == "iexact": return str(field_value).lower() == str(filter_value).lower() if lookup_type == "contains": return str(filter_value) in str(field_value) if lookup_type == "icontains": return str(filter_value).lower() in str(field_value).lower() if lookup_type == "startswith": return str(field_value).startswith(str(filter_value)) if lookup_type == "istartswith": return str(field_value).lower().startswith(str(filter_value).lower()) if lookup_type == "endswith": return str(field_value).endswith(str(filter_value)) if lookup_type == "iendswith": return str(field_value).lower().endswith(str(filter_value).lower()) if lookup_type == "regex": return bool(re.search(str(filter_value), str(field_value))) if lookup_type == "iregex": return bool(re.search(str(filter_value), str(field_value), re.IGNORECASE)) # Comparison lookups if lookup_type == "gt": return bool(field_value > filter_value) if lookup_type == "gte": return bool(field_value >= filter_value) if lookup_type == "lt": return bool(field_value < filter_value) if lookup_type == "lte": return bool(field_value <= filter_value) # Range and membership lookups if lookup_type == "in": return bool(field_value in filter_value) if lookup_type == "range": return bool(filter_value[0] <= field_value <= filter_value[1]) if lookup_type == "isnull": return bool((field_value is None) == filter_value) raise KMLQueryError(f"Unsupported lookup type: {lookup_type}") def _point_coords(self, element: T) -> Optional["Coordinate"]: """ Extract coordinates from a KML element. Args: element: KML element to extract coordinates from Returns: Coordinate instance or None if no valid coordinates found """ from ..models.point import ( # pylint: disable=import-outside-toplevel Point as _Point, Coordinate, ) # For Point objects, get coordinates directly if isinstance(element, _Point): return element.coordinates # For other elements, try to find standard coordinates attribute coords = getattr(element, "coordinates", None) if not coords: return None try: return Coordinate.from_any(coords) except (ValueError, TypeError) as e: # Log warning for invalid coordinates but don't raise logger = logging.getLogger(__name__) logger.warning( "Failed to parse coordinates from element %s: %s. Skipping coordinate extraction.", getattr(element, "id", "unknown"), str(e), ) return None