"""Managers and Querysets for EveEntity models."""
from __future__ import annotations
import logging
from collections import defaultdict
from http import HTTPStatus
from typing import TYPE_CHECKING, Any, Iterable, Optional, Set, Tuple
from django.db import models
from django.db.utils import IntegrityError
from esi.exceptions import HTTPClientError
from typing_extensions import deprecated
from eveuniverse.app_settings import EVEUNIVERSE_BULK_METHODS_BATCH_SIZE
from eveuniverse.constants import POST_UNIVERSE_NAMES_MAX_ITEMS
from eveuniverse.helpers import EveEntityNameResolver
from eveuniverse.providers import esi
from eveuniverse.utils import chunks
from .universe import EveUniverseEntityModelManager
if TYPE_CHECKING:
from eveuniverse.models import EveEntity
logger = logging.getLogger(__name__)
_ESI_INVALID_IDS = [1] # Will never try to resolve these invalid IDs from ESI
_ESI_MAX_NAMES_PER_REQUEST = 500
[docs]
class EveEntityQuerySet(models.QuerySet):
"""Custom queryset for EveEntity."""
[docs]
def update_from_esi(self) -> int:
"""Updates all Eve entity objects in this queryset from ESI.
Return count of updated objs.
"""
from eveuniverse.models import EveEntity
return EveEntity.objects.update_from_esi_by_id(self.valid_ids()) # type: ignore
[docs]
def valid_ids(self) -> Set[int]:
"""Determine valid Ids in this Queryset."""
return set(self.exclude(id__in=_ESI_INVALID_IDS).values_list("id", flat=True))
[docs]
class EveEntityManagerBase(EveUniverseEntityModelManager):
"""Custom manager for EveEntity"""
_MAX_DEPTH = 5 # max recursion depth when resolving IDs
[docs]
@deprecated("Replaced by `bulk_resolve_ids()`")
def bulk_create_esi(self, ids: Iterable[int]) -> int:
"""Resolve given IDs from ESI and update or create corresponding objects.
Args:
ids: List of valid EveEntity IDs
Returns:
Count of updated entities
.. deprecated:: 1.5.0
Use :func:`bulk_resolve_ids` instead.
"""
return self.bulk_resolve_ids(ids)
[docs]
def bulk_resolve_ids(self, ids: Iterable[int]) -> int:
"""Resolve given IDs from ESI and update or create corresponding objects.
Args:
ids: IDs to be resolved
Returns:
Count of updated entities
"""
ids = set(map(int, ids))
self._create_missing_objs(ids)
to_update_qs: EveEntityQuerySet = self.filter(id__in=ids, name="")
return to_update_qs.update_from_esi()
def _create_missing_objs(self, ids: Set[int]) -> Set[int]:
"""Create missing objs and return their IDs."""
existing_ids = set(self.filter(id__in=ids).values_list("id", flat=True))
new_ids = ids.difference(existing_ids)
if new_ids:
objects = [self.model(id=id) for id in new_ids]
self.bulk_create(
objects,
batch_size=EVEUNIVERSE_BULK_METHODS_BATCH_SIZE,
ignore_conflicts=True,
) # type: ignore
return new_ids
[docs]
def bulk_resolve_names(self, ids: Iterable[int]) -> EveEntityNameResolver:
"""Resolve given IDs to names and return them.
Args:
ids: List of valid EveEntity IDs
Returns:
EveEntityNameResolver object helpful for quick resolving a large amount
of IDs
"""
ids = set(map(int, ids))
self.bulk_resolve_ids(ids)
return EveEntityNameResolver(
{
row[0]: row[1]
for row in self.filter(id__in=ids).values_list("id", "name")
}
)
[docs]
def bulk_update_all_esi(self):
"""Update all EveEntity objects in the database from ESI.
Returns:
Count of updated entities.
"""
return self.all().update_from_esi() # type: ignore
[docs]
def bulk_update_new_esi(self) -> int:
"""Update all unresolved EveEntity objects in the database from ESI.
Returns:
Count of updated entities.
"""
return self.filter(name="").update_from_esi() # type: ignore
[docs]
def fetch_by_names_esi(
self, names: Iterable[str], update: bool = False
) -> models.QuerySet[EveEntity]:
"""Fetch entities matching given names.
Will fetch missing entities from ESI if needed or requested.
Note that names that are not found by ESI are ignored.
Args:
names: Names of entities to fetch
update: When True will always update from ESI
Returns:
query with matching entities.
"""
names = set(names)
if update:
names_to_fetch = names
else:
existing_names = set(
self.filter(name__in=names).values_list("name", flat=True)
)
names_to_fetch = names - existing_names
if names_to_fetch:
esi_result = self._fetch_names_from_esi(names_to_fetch)
if esi_result:
self._update_or_create_entities(esi_result)
return self.filter(name__in=names)
def _fetch_names_from_esi(self, names: Iterable[str]) -> dict:
logger.info("Trying to fetch EveEntities from ESI by name")
result = defaultdict(list)
names_2 = sorted(names)
for chunk_names in chunks(names_2, _ESI_MAX_NAMES_PER_REQUEST):
result_chunk = esi.client.Universe.PostUniverseIds(body=chunk_names).result(
use_etag=False
)
for category, entities in result_chunk.model_dump().items():
if entities:
result[category] += entities
result_compressed = {
category: entities for category, entities in result.items() if entities
}
return result_compressed
def _update_or_create_entities(self, esi_result):
for category_key, entities in esi_result.items():
try:
category = self._map_category_key_to_category(category_key)
except ValueError:
logger.warning(
"Ignoring entities with unknown category %s: %s",
category_key,
entities,
)
continue
for entity in entities:
self.update_or_create(
id=entity["id"],
defaults={"name": entity["name"], "category": category},
)
def _map_category_key_to_category(self, category_key: str) -> str:
"""Map category keys from ESI result to categories."""
my_map = {
"alliances": self.model.CATEGORY_ALLIANCE,
"characters": self.model.CATEGORY_CHARACTER,
"constellations": self.model.CATEGORY_CONSTELLATION,
"corporations": self.model.CATEGORY_CORPORATION,
"factions": self.model.CATEGORY_FACTION,
"inventory_types": self.model.CATEGORY_INVENTORY_TYPE,
"regions": self.model.CATEGORY_REGION,
"systems": self.model.CATEGORY_SOLAR_SYSTEM,
"stations": self.model.CATEGORY_STATION,
}
try:
return my_map[category_key]
except KeyError:
raise ValueError(f"Invalid category: {category_key}") from None
def get_queryset(self) -> models.QuerySet:
""":meta private:"""
return EveEntityQuerySet(self.model, using=self._db)
[docs]
def get_or_create_esi(
self,
*,
id: int,
include_children: bool = False,
wait_for_children: bool = True,
enabled_sections: Optional[Iterable[str]] = None,
task_priority: Optional[int] = None,
) -> Tuple[Any, bool]:
"""gets or creates an EvEntity object.
The object is automatically fetched from ESI if it does not exist (blocking)
or if it has not yet been resolved.
Args:
id: Eve Online ID of object
Returns:
A tuple consisting of the requested EveEntity object and a created flag
Returns a None objects if the ID is invalid
"""
id = int(id)
try:
obj = self.exclude(name="").get(id=id)
created = False
except self.model.DoesNotExist:
obj, created = self.update_or_create_esi(
id=id,
include_children=include_children,
wait_for_children=wait_for_children,
)
return obj, created
[docs]
def resolve_name(self, id: int) -> str:
"""Return the name for the given Eve entity ID
or an empty string if ID is not valid.
"""
if id is not None:
obj, _ = self.get_or_create_esi(id=int(id))
if obj:
return obj.name
return ""
[docs]
def update_or_create_esi(
self,
*,
id: int,
include_children: bool = False,
wait_for_children: bool = True,
enabled_sections: Optional[Iterable[str]] = None,
task_priority: Optional[int] = None,
) -> Tuple[Any, bool]:
"""Update or create an EveEntity object by fetching it from ESI (blocking).
Args:
id: Eve Online ID of object
include_children: (no effect)
wait_for_children: (no effect)
Returns:
A tuple consisting of the requested object and a created flag
When the ID is invalid the returned object will be None
Exceptions:
Raises all HTTP codes of ESI endpoint /universe/names except 404
"""
id = int(id)
logger.info("%s: Trying to resolve ID to EveEntity with ESI", id)
if id in _ESI_INVALID_IDS:
logger.info("%s: ID is not valid", id)
return None, False
try:
result = esi.client.Universe.PostUniverseNames(body=[id]).result(
use_etag=False
)
except HTTPClientError as ex:
if ex.status_code == HTTPStatus.NOT_FOUND:
logger.info("%s: ID is not valid", id)
return None, False
raise ex
item = result[0]
return self.update_or_create(
id=item.id,
defaults={"name": item.name, "category": item.category},
)
[docs]
def update_or_create_all_esi(
self,
*,
include_children: bool = False,
wait_for_children: bool = True,
enabled_sections: Optional[Iterable[str]] = None,
task_priority: Optional[int] = None,
) -> None:
"""not implemented - do not use"""
raise NotImplementedError()
[docs]
def update_from_esi_by_id(self, ids: Iterable[int]) -> int:
"""Updates all Eve entity objects by id from ESI."""
if not ids:
return 0
ids = sorted(set((int(id) for id in ids if id not in _ESI_INVALID_IDS)))
logger.info("Updating %d entities from ESI", len(ids))
resolved_counter = 0
for chunk_ids in chunks(ids, POST_UNIVERSE_NAMES_MAX_ITEMS):
logger.debug("Trying to resolve the following IDs from ESI:\n%s", chunk_ids)
resolved_counter = self._resolve_entities_from_esi(chunk_ids)
return resolved_counter
def _resolve_entities_from_esi(self, ids: list, depth: int = 1):
resolved_counter = 0
try:
items = esi.client.Universe.PostUniverseNames(body=ids).result(
use_etag=False
)
except HTTPClientError as ex:
if ex.status_code == HTTPStatus.NOT_FOUND:
# if API fails to resolve all IDs, we divide and conquer,
# trying to resolve each half of the ids separately
if len(ids) > 1 and depth < self._MAX_DEPTH:
resolved_counter += self._resolve_entities_from_esi(
ids[::2], depth + 1
)
resolved_counter += self._resolve_entities_from_esi(
ids[1::2], depth + 1
)
else:
logger.warning("Failed to resolve invalid IDs: %s", ids)
else:
raise ex
else:
resolved_counter += len(items)
for item in items:
try:
self.update_or_create(
id=item.id,
defaults={"name": item.name, "category": item.category},
)
except IntegrityError:
pass
return resolved_counter
EveEntityManager = EveEntityManagerBase.from_queryset(EveEntityQuerySet)