Source code for eveuniverse.tasks

"""Tasks for Eve Universe."""

import logging
from typing import Iterable, List, Optional

from celery import chain, shared_task
from celery_once import QueueOnce as BaseQueueOnce
from django.db.utils import OperationalError

from . import __title__
from .app_settings import EVEUNIVERSE_LOAD_TASKS_PRIORITY, EVEUNIVERSE_TASKS_TIME_LIMIT
from .constants import POST_UNIVERSE_NAMES_MAX_ITEMS, EveCategoryId
from .models import EveCategory, EveEntity, EveMarketPrice, EveRegion, EveType
from .models.base import EveUniverseEntityModel, determine_effective_sections
from .providers import esi
from .utils import LoggerAddTag, chunks

logger = LoggerAddTag(logging.getLogger(__name__), __title__)
# logging.getLogger("esi").setLevel(logging.INFO)


# pylint: disable = abstract-method
class QueueOnce(BaseQueueOnce):
    """Make sure all redundant tasks will abort gracefully."""

    once = BaseQueueOnce.once
    once["graceful"] = True


# params for all tasks
_TASK_DEFAULTS = {"time_limit": EVEUNIVERSE_TASKS_TIME_LIMIT}

# params for tasks that make ESI calls
_TASK_ESI_DEFAULTS = {
    **_TASK_DEFAULTS,
    **{
        "autoretry_for": [OperationalError],  # TODO: Double-check: Correct exception?
        "retry_kwargs": {"max_retries": 3},
        "retry_backoff": True,
    },
}
_TASK_ESI_DEFAULTS_ONCE = {**_TASK_ESI_DEFAULTS, **{"base": QueueOnce}}


# Eve Universe objects


[docs] @shared_task(**_TASK_ESI_DEFAULTS_ONCE) def load_eve_object( model_name: str, id: int, include_children=False, wait_for_children=True ) -> None: """Task for loading an eve object. Will only be created from ESI if it does not exist """ logger.info("Loading %s with ID %s", model_name, id) model_class = EveUniverseEntityModel.get_model_class(model_name) model_class.objects.get_or_create_esi( # type: ignore id=id, include_children=include_children, wait_for_children=wait_for_children )
[docs] @shared_task(**_TASK_ESI_DEFAULTS_ONCE) def update_or_create_eve_object( model_name: str, id: int, include_children=False, wait_for_children=True, enabled_sections: Optional[List[str]] = None, task_priority: Optional[int] = None, ) -> None: """Update or create an eve object from ESI. Args: model_name: Name of the respective Django model, e.g. ``"EveType"`` id: Eve Online ID of object include_children: if child objects should be updated/created as well (only when a new object is created) wait_for_children: when true child objects will be updated/created blocking (if any), else async (only when a new object is created) enabled_sections: Sections to load regardless of current settings, e.g. `[EveType.Section.DOGMAS]` will always load dogmas for EveTypes task_priority: priority of started tasks """ logger.info("Updating/Creating %s with ID %s", model_name, id) model_class = EveUniverseEntityModel.get_model_class(model_name) model_class.objects.update_or_create_esi( # type: ignore id=id, include_children=include_children, wait_for_children=wait_for_children, enabled_sections=enabled_sections, task_priority=task_priority, )
@shared_task(**_TASK_ESI_DEFAULTS_ONCE) def update_or_create_inline_object( parent_obj_id: int, parent_fk: str, eve_data_obj: dict, other_pk_info: dict, parent2_model_name: str, inline_model_name: str, parent_model_name: str, enabled_sections: Optional[List[str]] = None, ) -> None: """Task for updating or creating a single inline object from ESI""" logger.info( "Updating/Creating inline object %s for %s wit ID %s", inline_model_name, parent_model_name, parent_obj_id, ) model_class = EveUniverseEntityModel.get_model_class(parent_model_name) model_class._update_or_create_inline_object( # type: ignore parent_obj_id=parent_obj_id, parent_fk=parent_fk, eve_data_obj=eve_data_obj, other_pk_info=other_pk_info, parent2_model_name=parent2_model_name, inline_model_name=inline_model_name, enabled_sections=enabled_sections, ) # EveEntity objects
[docs] @shared_task(**_TASK_ESI_DEFAULTS) def create_eve_entities(ids: Iterable[int]) -> None: """Task for bulk creating and resolving multiple entities from ESI.""" EveEntity.objects.bulk_resolve_ids(ids) # type: ignore
[docs] @shared_task(**_TASK_ESI_DEFAULTS_ONCE) def update_unresolved_eve_entities() -> None: """Update all unresolved EveEntity objects from ESI. Will resolve entities in parallel to speed up resolving large sets. """ ids = list(EveEntity.objects.filter(name="").valid_ids()) # type: ignore logger.info("Updating %d unresolved entities from ESI", len(ids)) for chunk_ids in chunks(ids, POST_UNIVERSE_NAMES_MAX_ITEMS): _update_unresolved_eve_entities_for_page.delay(chunk_ids) # type: ignore
@shared_task(**_TASK_ESI_DEFAULTS) def _update_unresolved_eve_entities_for_page(ids: Iterable[int]) -> None: """Update unresolved EveEntity objects for given ids from ESI.""" EveEntity.objects.update_from_esi_by_id(ids) # type: ignore # Object loaders
[docs] @shared_task(**_TASK_ESI_DEFAULTS_ONCE) def load_map(enabled_sections: Optional[List[str]] = None) -> None: """Load the complete Eve map with all regions, constellation and solar systems and additional related entities if they are enabled. Args: enabled_sections: Sections to load regardless of current settings """ logger.info( "Loading complete map with all regions, constellations, solar systems " "and the following additional entities if related to the map: %s", ", ".join(determine_effective_sections(enabled_sections)), ) category, method = EveRegion._esi_path_list() all_ids = getattr(getattr(esi.client, category), method)().results() for id in all_ids: update_or_create_eve_object.delay( model_name="EveRegion", id=id, include_children=True, wait_for_children=False, enabled_sections=enabled_sections, task_priority=EVEUNIVERSE_LOAD_TASKS_PRIORITY, ) # type: ignore
[docs] @shared_task(**_TASK_ESI_DEFAULTS_ONCE) def load_all_types(enabled_sections: Optional[List[str]] = None) -> None: """Load all eve types. Args: enabled_sections: Sections to load regardless of current settings """ logger.info( "Loading all eve types from ESI including these sections: %s", ", ".join(determine_effective_sections(enabled_sections)), ) category, method = EveCategory._esi_path_list() result = getattr(getattr(esi.client, category), method)().results() if not result: raise ValueError("Did not receive category IDs from ESI.") category_ids = sorted(result) logger.debug("Fetching categories for IDs: %s", category_ids) for category_id in category_ids: _load_category_with_children( category_id=category_id, enabled_sections=enabled_sections )
def _load_category_with_children( category_id: int, force_loading_dogma: bool = False, enabled_sections: Optional[List[str]] = None, ) -> None: """Start loading a category async incl. all it's children from ESI.""" enabled_sections = enabled_sections or [] if force_loading_dogma: enabled_sections.append(EveType.Section.DOGMAS.value) update_or_create_eve_object.delay( model_name="EveCategory", id=category_id, include_children=True, wait_for_children=False, enabled_sections=enabled_sections, task_priority=EVEUNIVERSE_LOAD_TASKS_PRIORITY, ) # type: ignore def _load_group_with_children(group_id: int, force_loading_dogma: bool = False) -> None: """Starts a task for loading a group incl. all it's children from ESI""" enabled_sections = [EveType.Section.DOGMAS.value] if force_loading_dogma else None update_or_create_eve_object.delay( model_name="EveGroup", id=group_id, include_children=True, wait_for_children=False, enabled_sections=enabled_sections, task_priority=EVEUNIVERSE_LOAD_TASKS_PRIORITY, ) # type: ignore def _load_type_with_children(type_id: int, force_loading_dogma: bool = False) -> None: """Starts a task for loading a type incl. all it's children from ESI""" enabled_sections = [EveType.Section.DOGMAS.value] if force_loading_dogma else None update_or_create_eve_object.delay( model_name="EveType", id=type_id, include_children=False, wait_for_children=False, enabled_sections=enabled_sections, task_priority=EVEUNIVERSE_LOAD_TASKS_PRIORITY, ) # type: ignore @shared_task(**_TASK_ESI_DEFAULTS_ONCE) def load_ship_types(enabled_sections: Optional[List[str]] = None) -> None: """Load all ship types. Args: enabled_sections: Sections to load regardless of current settings """ logger.info("Started loading all ship types into eveuniverse") _load_category_with_children( category_id=EveCategoryId.SHIP.value, enabled_sections=enabled_sections ) @shared_task(**_TASK_ESI_DEFAULTS_ONCE) def load_structure_types(enabled_sections: Optional[List[str]] = None) -> None: """Load all structure types. Args: enabled_sections: Sections to load regardless of current settings """ logger.info("Started loading all structure types into eveuniverse") _load_category_with_children( category_id=EveCategoryId.STRUCTURE.value, enabled_sections=enabled_sections )
[docs] @shared_task(**_TASK_ESI_DEFAULTS_ONCE) def load_eve_types( category_ids: Optional[List[int]] = None, group_ids: Optional[List[int]] = None, type_ids: Optional[List[int]] = None, force_loading_dogma: bool = False, ) -> None: """Load specified eve types from ESI. Will always load all children except for EveType Args: - category_ids: EveCategory IDs - group_ids: EveGroup IDs - type_ids: EveType IDs - load_dogma: When True will load dogma for all types """ logger.info("Started loading several eve types into eveuniverse") if category_ids: for category_id in category_ids: _load_category_with_children(category_id, force_loading_dogma) if group_ids: for group_id in group_ids: _load_group_with_children(group_id, force_loading_dogma) if type_ids: for type_id in type_ids: _load_type_with_children(type_id, force_loading_dogma)
[docs] @shared_task(**_TASK_ESI_DEFAULTS_ONCE) def update_market_prices(minutes_until_stale: Optional[int] = None): """Updates market prices from ESI.""" chain( _fetch_market_prices_esi.s().set(priority=EVEUNIVERSE_LOAD_TASKS_PRIORITY), _update_market_prices_from_data.s(minutes_until_stale).set( priority=EVEUNIVERSE_LOAD_TASKS_PRIORITY ), ).delay()
@shared_task(**_TASK_ESI_DEFAULTS_ONCE) def _fetch_market_prices_esi(): """Fetch market prices from ESI.""" prices = EveMarketPrice.objects.fetch_data_from_esi() # type: ignore return prices @shared_task(**_TASK_DEFAULTS) def _update_market_prices_from_data( prices: dict, minutes_until_stale: Optional[int] = None ): """Updates market prices from provided data.""" if not prices: return EveMarketPrice.objects.update_objs_from_esi_data(prices, minutes_until_stale) # type: ignore