diff --git a/backend/database/mongo_database_connector.py b/backend/database/mongo_database_connector.py index e4baea6..8fc9397 100644 --- a/backend/database/mongo_database_connector.py +++ b/backend/database/mongo_database_connector.py @@ -35,7 +35,6 @@ def get(self, key): raise Exception(f"Unknown platform: {document['platform']} found in database for key: {key}!") def set(self, key, value): - # logger.debug(f"[{__name__}]: Setting to mongo: {value.to_dict()}") self._collection.update_one( {"_id": key}, {"$set": value.to_dict()}, diff --git a/backend/dataspace/cdse_connector.py b/backend/dataspace/cdse_connector.py index 45c851c..5aafa8a 100644 --- a/backend/dataspace/cdse_connector.py +++ b/backend/dataspace/cdse_connector.py @@ -17,16 +17,17 @@ class CDSEConnector(DataspaceConnector): def __init__( self, feature_id=None, workdir=None, - logger: logging.Logger = logging.getLogger(__name__) ): - super().__init__(root_url=CDSE_CATALOG_ROOT, feature_id=feature_id, workdir=workdir, logger=logger) - self._cdse_s3_client = S3Client(config=CDSE_S3_CREDENTIALS, logger=self._logger) + super().__init__(root_url=CDSE_CATALOG_ROOT, feature_id=feature_id, workdir=workdir) + self._cdse_s3_client = S3Client(config=CDSE_S3_CREDENTIALS) + self._logger = logging.getLogger(__name__) def _get_feature(self) -> dict: if self._feature is None: endpoint = f"Products({self._feature_id})" response: httpx.Response = self._send_request(endpoint=endpoint) + self._logger.debug(f"CDSE connector calling search endpoint for feature {self._feature_id}") if response.status_code != 200: raise CDSEConnectorCouldNotFetchFeature(feature_id=self._feature_id) diff --git a/backend/dataspace/dataspace_connector.py b/backend/dataspace/dataspace_connector.py index b130606..d051719 100644 --- a/backend/dataspace/dataspace_connector.py +++ b/backend/dataspace/dataspace_connector.py @@ -15,8 +15,7 @@ class DataspaceConnector(HTTPRequestableObject): def __init__( self, - root_url=None, feature_id=None, workdir=None, - logger: logging.Logger = logging.getLogger(__name__) + root_url=None, feature_id=None, workdir=None ): if root_url is None: raise DataspaceConnectorRootURLNotProvided() @@ -27,12 +26,12 @@ def __init__( if workdir is None: raise DataspaceConnectorWorkdirNotSpecified() - self._workdir = Path(workdir.name) + self._workdir = workdir super().__init__( - root_url=root_url, - logger=logger, + root_url=root_url ) + self._logger = logging.getLogger(__name__) try: self._get_feature() diff --git a/backend/dataspace/dhr_connector.py b/backend/dataspace/dhr_connector.py index 007982a..e464a30 100644 --- a/backend/dataspace/dhr_connector.py +++ b/backend/dataspace/dhr_connector.py @@ -1,5 +1,6 @@ import logging import re +from urllib.parse import urljoin import httpx @@ -13,34 +14,28 @@ class DHRConnector(DataspaceConnector): _dhr_http_client: HTTPClient | None = None - _resto_id: str | None = None def __init__( self, - feature_id=None, workdir=None, - logger: logging.Logger = logging.getLogger(__name__) + feature_id=None, workdir=None ): if not DHR_USE_DHR: raise DHRConnectorIsNotRequestedByUser() - super().__init__(root_url=DHR_CATALOG_ROOT, feature_id=feature_id, workdir=workdir, logger=logger) - self._dhr_http_client = HTTPClient(config=DHR_CONNECTOR_CREDENTIALS, logger=self._logger) - - def _get_resto_id(self) -> str: - if self._resto_id is None: - import uuid - resto_uuid_namespace = b'\x92\x70\x80\x59\x20\x77\x45\xa3\xa4\xf3\x1e\xb4\x28\x78\x9c\xff' - self._resto_id = str(uuid.uuid5(uuid.UUID(bytes=resto_uuid_namespace), f"dhr1{self._feature_id}")) - - return self._resto_id + self._logger = logging.getLogger(__name__) + self._dhr_http_client = HTTPClient(config=DHR_CONNECTOR_CREDENTIALS) + super().__init__(root_url=DHR_CATALOG_ROOT, feature_id=feature_id, workdir=workdir) def _get_feature(self) -> dict: if self._feature is None: - response: httpx.Response = self._send_request(endpoint="search", payload_dict={"ids": self._get_resto_id()}) - + url = urljoin(DHR_CATALOG_ROOT, "search") + response: httpx.Response = httpx.get(url, params={"ids": self._feature_id}) + # response: httpx.Response = self._dhr_http_client.get(url, params={"ids": self._feature_id}) + self._logger.warning(f"DHR connector calling search endpoint for feature {self._feature_id}") if response.status_code != 200: raise DHRConnectorCouldNotFetchFeature(feature_id=self._feature_id) response_data = response.json() + self._logger.warning(f"DHR connector response: {response_data}") if response_data['numberReturned'] < 1: raise DHRConnectorCouldNotFetchFeature(feature_id=self._feature_id) @@ -55,10 +50,12 @@ def _get_asset_path(self, full_path: str | None = None) -> str: if full_path is None: return "" - re_matches = re.findall(r"Nodes\('([^']+)'\)", full_path) - asset_path = "/".join(re_matches) - - return asset_path + self._logger.info(f"full path: {full_path}") + self._logger.info(f"product: {re.search(r'/product/(.+?)/', full_path)}") + parts = full_path.split('/product/') + if len(parts) > 1: + return parts[1] + return "" def get_available_files(self) -> list[tuple[str, str]]: self._get_feature() @@ -66,6 +63,7 @@ def get_available_files(self) -> list[tuple[str, str]]: available_files = [ (self._get_asset_path(asset['href']), asset['href']) for asset in self._feature['assets'].values() ] + self._logger.debug(f"Available files: {available_files}") return available_files @@ -81,6 +79,4 @@ def download_selected_files(self, files_to_download: list[tuple[str, str]]) -> l return downloaded_files def get_polygon(self) -> list[list[float]]: - # TODO implement after Sentinel available in STAC - raise Exception("Not implemented") - return [[0.0]] + return self._feature['geometry']['coordinates'][0] diff --git a/backend/dataspace/http_client.py b/backend/dataspace/http_client.py index 9e1e704..1d69586 100644 --- a/backend/dataspace/http_client.py +++ b/backend/dataspace/http_client.py @@ -1,9 +1,7 @@ import logging - import httpx from pathlib import Path - from dataspace.exceptions.http_client import * @@ -14,152 +12,57 @@ class HTTPClient: def __init__( self, - config: dict = None, - logger=logging.getLogger(__name__) + config: dict = None ): - self._logger = logger - + self._logger = logging.getLogger(name=__name__) if config is None: raise HTTPClientConfigNotProvided() - self._http_client = httpx.Client(auth=httpx.BasicAuth(config['username'], config['password'])) + self._logger.info(f"Keys in config: {config.keys()}") + self._logger.info(f"Token url: {config['token_url']}") + if "token_url" in config and config["token_url"] is not None: + self._logger.info("Using token authentication") + token = self.obtain_token(config["token_url"], config["client_id"], config["username"], config["password"]) + self._http_client = httpx.Client(headers={"Authorization": f"Bearer {token}"}) + else: + self._http_client = httpx.Client(auth=httpx.BasicAuth(config['username'], config['password'])) def download_file(self, url, path_to_download: str | Path) -> str: - print(f"Downloading {url} into {str(path_to_download)}") # Todo logging + self._logger.info(f"Downloading {url} into {str(path_to_download)}") path_to_download = Path(path_to_download) - # TODO V TOMLHE MISTE TO NEKDY PADNE: - """ - REQUEST_VISUALIZATION>>>feature_id='b43f0086-6fec-4dcd-a3f8-a0661327c59f' platform='SENTINEL-2' filters={'cloud_cover': '100', 'levels': ['S2MSI2A'], 'bands': ['TCI']}<<>>feature_id='b43f0086-6fec-4dcd-a3f8-a0661327c59f' platform='SENTINEL-2' filters={'cloud_cover': '100', 'levels': ['S2MSI2A'], 'bands': ['TCI']}<< httpx.Response: + return self._http_client.get(url, params=params, headers=headers) + + def post(self, url: str, data: dict | None = None, headers: dict | None = None) -> httpx.Response: + return self._http_client.post(url, json=data, headers=headers) + #return self._http_client.post(url, json=json, data=data, headers=headers) diff --git a/backend/dataspace/http_requestable_object.py b/backend/dataspace/http_requestable_object.py index b91f59c..9d4e00d 100644 --- a/backend/dataspace/http_requestable_object.py +++ b/backend/dataspace/http_requestable_object.py @@ -13,11 +13,11 @@ class HTTPRequestableObject: _root_url: str = None _logger: logging.Logger = None - def __init__(self, root_url=None, logger=logging.getLogger(__name__)): + def __init__(self, root_url=None,): if not root_url: raise HTTPRequestableObjectBaseURLNotSpecified() self._root_url = self._normalize_url(url=root_url) - self._logger = logger + self._logger = logging.getLogger(__name__) @staticmethod def _normalize_url(url: str) -> str: diff --git a/backend/dataspace/s3_client.py b/backend/dataspace/s3_client.py index 024b598..f317e6c 100644 --- a/backend/dataspace/s3_client.py +++ b/backend/dataspace/s3_client.py @@ -20,10 +20,9 @@ class S3Client: def __init__( self, - config: dict = None, - logger=logging.getLogger(__name__) + config: dict = None ): - self._logger = logger + self._logger = logging.getLogger(name=__name__) if config is None: raise S3ClientConfigNotProvided() diff --git a/backend/fastapi_server/routes/get_tile.py b/backend/fastapi_server/routes/get_tile.py index 8cad16c..884b0e2 100644 --- a/backend/fastapi_server/routes/get_tile.py +++ b/backend/fastapi_server/routes/get_tile.py @@ -30,8 +30,7 @@ def get_tile( tiling_worker = TilingWorker( processed_feature=fastapi_shared.database.get(request_hash), selected_file=selected_file, - z=z, x=x, y=y, - logger=logger + z=z, x=x, y=y ) tile_path = tiling_worker.save_tile() diff --git a/backend/fastapi_server/routes/request_download.py b/backend/fastapi_server/routes/request_download.py index 8dbd710..906c826 100644 --- a/backend/fastapi_server/routes/request_download.py +++ b/backend/fastapi_server/routes/request_download.py @@ -10,11 +10,11 @@ router = APIRouter() -@router.get(variables.UVICORN_SERVER_PREFIX + "/download_image/{request_hash}/{filename}") -async def download_image(request_hash: str, filename: str): - logger.debug(f"[{__name__}]: Requesting file download for hash: {request_hash}, file: {filename}") +@router.get(variables.UVICORN_SERVER_PREFIX + "/download_image/{featureid}/{filename}") +async def download_image(featureid: str, filename: str): + logger.debug(f"[{__name__}]: Requesting file download: {featureid}, file: {filename}") - return_entry = fastapi_shared.database.get(request_hash) + return_entry = fastapi_shared.database.get(featureid) if return_entry is None: raise HTTPException(status_code=404, detail="Product not found in database!") @@ -23,14 +23,14 @@ async def download_image(request_hash: str, filename: str): raise HTTPException(status_code=400, detail="Product processing is not completed!") processed_files = return_entry.get_processed_files() - if filename not in processed_files[request_hash]: + if filename not in processed_files[featureid]: raise HTTPException(status_code=404, detail=f"File {filename} not available for this product!") selected_file = ( Path(variables.DOCKER_SHARED_DATA_DIRECTORY) / - request_hash / + featureid / filename ) logger.debug(f"[{__name__}]: File path: {selected_file}") - return FileResponse(path=selected_file, filename=f"{request_hash}_{filename}") + return FileResponse(path=selected_file, filename=f"{featureid}_{filename}") diff --git a/backend/fastapi_server/routes/request_processing.py b/backend/fastapi_server/routes/request_processing.py index 20c4c80..295d473 100644 --- a/backend/fastapi_server/routes/request_processing.py +++ b/backend/fastapi_server/routes/request_processing.py @@ -21,61 +21,57 @@ async def request_processing( processed_feature_model: ProcessedFeatureModel = ProcessedFeatureModel() ): logger.debug(f"[{__name__}]: request_feature_model: {processed_feature_model}") - - request_hash = processed_feature_model.hash_myself() - logger.debug(f"[{__name__}]: request_hash: {request_hash}") + feature_id = processed_feature_model.feature_id # return failed reason once and prepare for recomputation - db_feature = fastapi_shared.database.get(request_hash) + db_feature = fastapi_shared.database.get(feature_id) if db_feature is not None and db_feature.get_status() == RequestStatuses.FAILED: - fastapi_shared.database.delete(request_hash) + fastapi_shared.database.delete(feature_id) raise HTTPException( status_code=500, detail=f"Feature processing failed! Reason: {db_feature.get_fail_reason()}" ) - # TODO - tady by se spíš měl zahashovat celý request a ten uložit do DB - if fastapi_shared.database.get(request_hash) is None: + if fastapi_shared.database.get(feature_id) is None: processed_feature: ProcessedFeature | None = None match Platforms(processed_feature_model.platform): case Platforms.SENTINEL_1: processed_feature = Sentinel1Feature( - feature_id=processed_feature_model.feature_id, + feature_id=feature_id, platform=processed_feature_model.platform, - filters=processed_feature_model.filters, - request_hash=request_hash + filters=processed_feature_model.filters ) case Platforms.SENTINEL_2: processed_feature = Sentinel2Feature( - feature_id=processed_feature_model.feature_id, + feature_id=feature_id, platform=processed_feature_model.platform, - filters=processed_feature_model.filters, - request_hash=request_hash + filters=processed_feature_model.filters ) case _: raise HTTPException(status_code=400, detail="Unknown platform!") fastapi_shared.database.set( - key=request_hash, + key=feature_id, value=processed_feature ) - fastapi_shared.celery_queue.send_task('tasks.data_tasks.process_feature_task', args=[request_hash]) + fastapi_shared.celery_queue.send_task('tasks.data_tasks.download_feature_task', args=[feature_id]) - return_entry: ProcessedFeature | None = fastapi_shared.database.get(request_hash) + return_entry: ProcessedFeature | None = fastapi_shared.database.get(feature_id) if return_entry is None: raise HTTPException(status_code=404, detail="Feature not found in database!") if return_entry.get_status == RequestStatuses.NON_EXISTING: - fastapi_shared.database.delete(request_hash) + fastapi_shared.database.delete(feature_id) raise HTTPException(status_code=404, detail="Feature not found in database!") if return_entry.get_status == RequestStatuses.FAILED: - fastapi_shared.database.delete(request_hash) + fastapi_shared.database.delete(feature_id) + logger.error(f"[{__name__}]: Feature processing failed! Reason: {return_entry.get_fail_reason()}") raise HTTPException( status_code=500, detail=f"Feature processing failed! Reason: {return_entry.get_fail_reason()}" diff --git a/backend/feature/processing/processed_feature.py b/backend/feature/processing/processed_feature.py index 76a3ec6..a7f50b3 100644 --- a/backend/feature/processing/processed_feature.py +++ b/backend/feature/processing/processed_feature.py @@ -1,14 +1,10 @@ import json import logging import re -import shutil - import docker from abc import ABC, abstractmethod -from datetime import datetime, timezone from pathlib import Path -from tempfile import TemporaryDirectory from typing import Dict, Any from resources.enums import RequestStatuses @@ -26,8 +22,6 @@ class ProcessedFeature(ABC): _logger: logging.Logger = None - _request_hash: str = None - _dataspace_connector: DataspaceConnector | None = None _feature_id: str = None @@ -37,6 +31,7 @@ class ProcessedFeature(ABC): _status: RequestStatuses = RequestStatuses.NON_EXISTING _fail_reason: str = None + _downloaded_files: list[str] = None _output_directory: Path = None _output_files: list[str] = None # TODO možná url, Path, nebo tak něco..? @@ -44,23 +39,18 @@ class ProcessedFeature(ABC): _zoom_levels = {"min_zoom": 8, "max_zoom": 15} # todo should be like 8 to 15 but gjtiff crashes on low memory - _workdir: TemporaryDirectory = None + _workdir: Path = None def __init__( - self, logger: logging.Logger = logging.getLogger(name=__name__), - feature_id: str = None, platform: str = None, filters: Dict[str, Any] = None, - request_hash: str = None + self, feature_id: str = None, platform: str = None, filters: Dict[str, Any] = None ): - self._logger = logger + self._logger = logging.getLogger(name=__name__) self._logger.debug(f"[{__name__}]: Initializing Requested feature for platform: {platform}") - self._request_hash = request_hash - - self._workdir = TemporaryDirectory() - if feature_id is None: raise ProcessedFeatureIDNotSpecified() self._feature_id = feature_id + self._workdir = Path(variables.DOCKER_SHARED_DATA_DIRECTORY, feature_id) self._assign_connector() @@ -81,11 +71,6 @@ def _remove_path_tree(self, path: Path): self._remove_path_tree(child) path.rmdir() - def __del__(self): - self._workdir.cleanup() - # if self._output_directory is not None: - # self._remove_path_tree(self._output_directory) - def _assign_connector(self): self._logger.debug(f"[{__name__}]: Assigning dataspace connector") @@ -93,17 +78,16 @@ def _assign_connector(self): try: self._dataspace_connector = DHRConnector( feature_id=self._feature_id, - workdir=self._workdir, - logger=self._logger + workdir=self._workdir ) return - except DataspaceConnectorCouldNotFetchFeature: + except DataspaceConnectorCouldNotFetchFeature as e: + self._logger.debug(f"[DHR connector failed, trying CDSE connector. {e.message}") pass self._dataspace_connector = CDSEConnector( feature_id=self._feature_id, - workdir=self._workdir, - logger=self._logger + workdir=self._workdir ) def get_feature_id(self) -> str: @@ -132,7 +116,7 @@ def get_processed_files(self) -> dict[str, list[str]]: # file = file.replace(str(self._output_directory).replace("\\", "/"), '') file = file.split('/')[-1] - processed_files.setdefault(self._request_hash, []).append(file) + processed_files.setdefault(self._feature_id, []).append(file) return processed_files @@ -144,19 +128,17 @@ def get_bbox(self) -> list[float]: raise ProcessedFeatureBboxNotSet(feature_id=self._feature_id) return self._bbox - def get_request_hash(self) -> str: - return self._request_hash - def to_dict(self) -> dict: return { - "_id": self._request_hash, + "_id": self._feature_id, "feature_id": self._feature_id, "platform": self._platform, "filters": self._filters, "status": self._status.value, "fail_reason": self._fail_reason, "output_files": self._output_files, - "bbox": self._bbox + "bbox": self._bbox, + "downloaded_files": self._downloaded_files } @classmethod @@ -164,13 +146,13 @@ def from_dict(cls, data: dict): instance = cls( feature_id=data.get('feature_id'), platform=data.get('platform'), - filters=data.get('filters'), - request_hash=data.get('_id') + filters=data.get('filters') ) instance._status = RequestStatuses(data.get('status')) instance._fail_reason = data.get('fail_reason') instance._output_files = data.get('output_files') instance._bbox = data.get('bbox') + instance._downloaded_files = data.get('downloaded_files') return instance def get_output_directory(self) -> Path: @@ -183,79 +165,36 @@ def get_output_directory(self) -> Path: def _filter_available_files(self, available_files: list[tuple[str, str]] = None) -> list[tuple[str, str]]: pass - def _download_feature(self) -> list[str]: + def _download_feature(self): available_files = self._dataspace_connector.get_available_files() filtered_files = self._filter_available_files(available_files=available_files) - downloaded_files = self._dataspace_connector.download_selected_files(files_to_download=filtered_files) - - return downloaded_files + self._downloaded_files = self._dataspace_connector.download_selected_files(files_to_download=filtered_files) - def process_feature(self): - """ - Stažení tily identifikované pomocí feature_id z copernicus dataspace - Pravděpodobně z jejich s3 na loklání uložiště. Poté spustit processing dané tily - Po dokončení processingu zápis do DB ohledně dokončení generování - - Na stav se bude ptát peridociky forntend voláním /api/check_visualization_status - """ + def download_feature(self): try: self._set_status(status=RequestStatuses.PROCESSING) - self._set_bbox(self._dataspace_connector.get_rectangular_bbox()) - time_download_start = datetime.now(tz=timezone.utc) - print( - f"[{__name__}]: Downloading feature {self._feature_id} started at {time_download_start}" - ) - - downloaded_feature_files_paths = self._download_feature() - - time_download_finish = datetime.now(tz=timezone.utc) - time_download_elapsed = time_download_finish - time_download_start - print( - f"[{__name__}]: Downloading feature {self._feature_id} finished at {time_download_finish}," - f" elapsed {time_download_elapsed}" - ) - - print(f"[{__name__}]: Feature ID {self._feature_id} downloaded into {str(self._workdir.name)}") - - time_process_start = datetime.now(tz=timezone.utc) - print( - f"[{__name__}]: Processing feature {self._feature_id} started at {time_process_start}" - ) - - self._output_files = self._process_feature_files(feature_files=downloaded_feature_files_paths) - - time_process_finish = datetime.now(tz=timezone.utc) - time_process_elapsed = time_process_finish - time_process_start - print( - f"[{__name__}]: Processing feature {self._feature_id} finished at {time_process_finish}," - f" elapsed {time_process_elapsed}" - ) - # Po vytvoření snímku ho dočasně nakopírovat na nějaké úložiště - # TODO prozatím bude uloženo ve složce webserveru s frontendem (config/variables.py --- FRONTEND_ROOT_DIR) - # ze seznamu souborů ve složce udělat seznam odkazů na webserver a uložit do self._hrefs: [str] + self._logger.info(f"[{__name__}]: Downloading feature {self._feature_id} started.") + self._download_feature() + self._logger.info(f"[{__name__}]: Feature {self._feature_id} downloaded.") + except Exception as e: + self._fail_reason = str(e) + self._set_status(status=RequestStatuses.FAILED) + def process_feature(self, files_to_process: list[str]): + try: + self._logger.info(f"[{__name__}]: Processing feature {self._feature_id} started, files {files_to_process}") + self._output_files = self._process_feature_files(feature_files=files_to_process) + self._logger.info(f"[{__name__}]: Processing feature {self._feature_id} completed.") self._set_status(status=RequestStatuses.COMPLETED) - time_total_elapsed = time_process_finish - time_download_start - print( - f"[{__name__}]: Total time elapsed {time_total_elapsed}" - ) - except Exception as e: self._fail_reason = str(e) self._set_status(status=RequestStatuses.FAILED) def _process_feature_files(self, feature_files: list[str]) -> list[str] | None: - self._output_directory = Path(variables.DOCKER_SHARED_DATA_DIRECTORY, self._request_hash) - self._output_directory.mkdir(parents=True, exist_ok=True) - - for item in self._output_directory.iterdir(): - if item.is_file() or item.is_symlink(): - item.unlink() - elif item.is_dir(): - shutil.rmtree(item) + self._output_directory = Path(variables.DOCKER_SHARED_DATA_DIRECTORY, self._feature_id) gjtiff_stdout = self._run_gjtiff_docker( input_files=feature_files, @@ -299,3 +238,13 @@ def _run_gjtiff_docker( self._logger.error(f"[{__name__}]: gjtiff stderr: {stderr.decode('utf-8')}") return stdout.decode('utf-8') + + @abstractmethod + def get_default_product_files(self): + """ From downloaded files filters those that match the default products (bands)""" + pass + + @abstractmethod + def get_product_files(self, products): + """ From downloaded files filters those that match the products (bands) list in order""" + pass diff --git a/backend/feature/processing/sentinel1_feature.py b/backend/feature/processing/sentinel1_feature.py index 9c7698b..1a03f91 100644 --- a/backend/feature/processing/sentinel1_feature.py +++ b/backend/feature/processing/sentinel1_feature.py @@ -10,14 +10,12 @@ class Sentinel1Feature(ProcessedFeature): def __init__( self, - feature_id: str = None, platform: str = None, filters: Dict[str, Any] = None, - request_hash: str = None + feature_id: str = None, platform: str = None, filters: Dict[str, Any] = None ): super().__init__( feature_id=feature_id, platform=platform, - filters=filters, - request_hash=request_hash + filters=filters ) self._filters_polarisation_channels_availability = { @@ -59,3 +57,9 @@ def _filter_available_files(self, available_files: list[tuple[str, str]] = None) break return filtered_files + + def get_default_product_files(self): + return [] + + def get_product_files(self, products): + return [] diff --git a/backend/feature/processing/sentinel2_feature.py b/backend/feature/processing/sentinel2_feature.py index c88c3a9..4efbbbc 100644 --- a/backend/feature/processing/sentinel2_feature.py +++ b/backend/feature/processing/sentinel2_feature.py @@ -1,22 +1,19 @@ import re from typing import Dict, Any -from fastapi.logger import logger from feature.processing.processed_feature import ProcessedFeature +DEFAULT_PRODUCTS = ['TCI'] class Sentinel2Feature(ProcessedFeature): def __init__( self, - feature_id: str = None, platform: str = None, filters: Dict[str, Any] = None, - request_hash: str = None + feature_id: str = None, platform: str = None, filters: Dict[str, Any] = None ): super().__init__( - logger=logger, feature_id=feature_id, platform=platform, - filters=filters, - request_hash=request_hash + filters=filters ) self._logger.debug(f"[{__name__}]: Sentinel-2 feature initialized") @@ -27,10 +24,11 @@ def _filter_available_files(self, available_files: list[tuple[str, str]] = None) filtered_files = [] - selected_bands_pattern = "|".join(self._get_selected_bands()) + # selected_bands_pattern = "|".join(self._get_selected_bands()) extensions = ['jp2', 'j2k', 'jpf', 'jpm', 'jpg2', 'j2c', 'jpc', 'jpx', 'mj2'] extensions_pattern = "|".join(extensions) - regex_pattern = rf"([^/]+)/GRANULE/([^/]+)/IMG_DATA/(?:R\d{{2}}m/)?([^/]+_({selected_bands_pattern})(?:_\d{{2}}m)?\.({extensions_pattern}))" + regex_pattern = rf"(?:.*/)?(?!.*MSK)[^/]+\.({extensions_pattern})$" # anything ending with correct extension and not being a mask file (MSK) + # csde_regex_pattern = rf"([^/]+)/GRANULE/([^/]+)/IMG_DATA/(?:R\d{{2}}m/)?([^/]+_({selected_bands_pattern})(?:_\d{{2}}m)?\.({extensions_pattern}))" for available_file in available_files: if re.match(regex_pattern, available_file[0].strip()): @@ -40,17 +38,17 @@ def _filter_available_files(self, available_files: list[tuple[str, str]] = None) return filtered_files - def _get_selected_bands(self): - selected_bands = [] - - for band in self._filters['bands']: - band = band.upper() - if band == 'B8A' or band == 'TCI': - selected_bands.append(band) - else: - selected_bands.append(f'B{int(band[1:]):02}') - - return selected_bands + # def _get_selected_bands(self): + # selected_bands = [] + # + # for band in self._filters['bands']: + # band = band.upper() + # if band == 'B8A' or band == 'TCI': + # selected_bands.append(band) + # else: + # selected_bands.append(f'B{int(band[1:]):02}') + # + # return selected_bands def _prune_low_resolution_files(self, files: list[tuple[str, str]]): """ @@ -80,3 +78,25 @@ def _prune_low_resolution_files(self, files: list[tuple[str, str]]): best_resolution[band] = (resolution, i) return [files[i] for r, i in best_resolution.values()] + + def get_default_product_files(self): + products = self.get_product_files(DEFAULT_PRODUCTS) + if not products: + products = [self._downloaded_files[0]] + return products + + def get_product_files(self, products): + if not self._downloaded_files: + raise Exception("No downloaded files found for this feature") + if len(products) != 1 and len(products) != 3: + raise Exception(f"Only 1 or 3 bands are supported as processable, defined: {products}") + + files = [] + for product in products: + pattern = re.compile(rf'_{re.escape(product)}_.*\.[^.]+$') + for file in self._downloaded_files: + if pattern.search(file): + files.append(file) + continue + + return files diff --git a/backend/feature/tiling/tiling_worker.py b/backend/feature/tiling/tiling_worker.py index 923e730..e0483c6 100644 --- a/backend/feature/tiling/tiling_worker.py +++ b/backend/feature/tiling/tiling_worker.py @@ -30,16 +30,15 @@ def __init__( self, processed_feature: ProcessedFeature, selected_file: str, - z: int, x: int, y: int, - logger: logging.Logger = logging.getLogger(__name__), + z: int, x: int, y: int ): - self._logger = logger + self._logger = logging.getLogger(__name__) self._processed_feature = processed_feature self._selected_file = ( Path(variables.DOCKER_SHARED_DATA_DIRECTORY) / - self._processed_feature.get_request_hash() / + self._processed_feature.get_feature_id() / selected_file ) self._image_file = Image.open(self._selected_file) @@ -86,7 +85,7 @@ def save_tile(self) -> Path: if left > self._image_width or top > self._image_height or right < 0 or bottom < 0: raise TilingWorkerTileOutOfBounds( - request_hash=self._processed_feature.get_request_hash(), + request_hash=self._processed_feature.get_feature_id(), z=self._z, x=self._x, y=self._y ) diff --git a/backend/resources/enums.py b/backend/resources/enums.py index 4938a8c..d6ecbb2 100644 --- a/backend/resources/enums.py +++ b/backend/resources/enums.py @@ -4,6 +4,7 @@ class RequestStatuses(Enum): NON_EXISTING = "non_existing" ACCEPTED = "accepted" + DOWNLOADING = "downloading" PROCESSING = "processing" COMPLETED = "completed" FAILED = "failed" diff --git a/backend/resources/reqeusted_feature_model.py b/backend/resources/reqeusted_feature_model.py index 56ecc8b..10233a5 100644 --- a/backend/resources/reqeusted_feature_model.py +++ b/backend/resources/reqeusted_feature_model.py @@ -2,15 +2,11 @@ import json from pydantic import BaseModel -from typing import Dict, Any +from typing import Dict, Any, List class ProcessedFeatureModel(BaseModel): feature_id: str = None platform: str = None filters: Dict[str, Any] = None - - def hash_myself(self) -> str: - model_dict = self.model_dump(by_alias=True, exclude_none=True) - model_json = json.dumps(model_dict, sort_keys=True) - return hashlib.sha256(model_json.encode()).hexdigest() + products: List[str] = None diff --git a/backend/tasks/data_tasks.py b/backend/tasks/data_tasks.py index 39c3668..2a0a933 100644 --- a/backend/tasks/data_tasks.py +++ b/backend/tasks/data_tasks.py @@ -1,6 +1,11 @@ +import shutil +from pathlib import Path + +import variables from celery_app import celery_app from database.mongo_database_connector import MongoDatabaseConnector from celery.utils.log import get_task_logger +from resources.enums import RequestStatuses _db: MongoDatabaseConnector | None = None logger = get_task_logger("tasks") @@ -11,14 +16,37 @@ def init_db(): _db = MongoDatabaseConnector() _db.connect() +@celery_app.task(ignore_result=True) +def download_feature_task(feature_id: str): + init_db() + logger.info (f"Download task for {feature_id}") + feature = _db.get(feature_id) + feature._logger = logger + feature._status = RequestStatuses.DOWNLOADING + _db.set(key=feature_id, value=feature) + + feature.download_feature() + _db.set(key=feature_id, value=feature) + if feature.get_status() == RequestStatuses.PROCESSING: + process_feature_task.delay(feature_id) + delete_feature_task.apply_async(args=[feature_id], countdown=60*60) @celery_app.task(ignore_result=True) -def process_feature_task(hash: str): +def process_feature_task(feature_id: str, files: list[str] = None): init_db() - # will have more complex payload once we implement additional bands processing for existing files - logger.info(f"Task {hash}") - feature = _db.get(hash) + logger.info(f"Processing task for {feature_id}, requested files: {files}") + feature = _db.get(feature_id) feature._logger = logger - logger.info(f"Processed feature: {feature}") - feature.process_feature() - _db.set(key=feature.get_request_hash(), value=feature) + files_to_process = feature.get_default_product_files() if not files else feature.get_product_files(files) + feature.process_feature(files_to_process) + _db.set(key=feature_id, value=feature) + +@celery_app.task(ignore_result=True) +def delete_feature_task(feature_id: str): + """ Removes all downloaded and processed files related to the feature""" + init_db() + logger.info(f"Delete task for {feature_id}") + _db.delete(feature_id) + # todo - handle repetitive requests (e.g. newest product) - make last_accesed field in DB and check by it? + feature_dir = Path(variables.DOCKER_SHARED_DATA_DIRECTORY, feature_id) + shutil.rmtree(feature_dir) diff --git a/backend/variables.py b/backend/variables.py index 8303a56..db591e1 100644 --- a/backend/variables.py +++ b/backend/variables.py @@ -20,6 +20,8 @@ 'host_base': os.getenv("DHR_CONNECTOR_HOST_BASE"), 'username': os.getenv("DHR_CONNECTOR_USERNAME"), 'password': os.getenv("DHR_CONNECTOR_PASSWORD"), + 'token_url': os.getenv("DHR_CONNECTOR_TOKEN_URL"), + 'client_id': os.getenv("DHR_CONNECTOR_CLIENT_ID"), } CDSE_CATALOG_ROOT: str = os.getenv("CDSE_CATALOG_ROOT") diff --git a/docs/docs.md b/docs/docs.md index cf13eda..103afc4 100644 --- a/docs/docs.md +++ b/docs/docs.md @@ -92,7 +92,7 @@ The backend is a Python service built with FastAPI and designed to process satel ## API endpoints - `POST /api/request_processing` - queue visualization request for a product, get status. - `GET /api/get_tile/{z}/{x}/{x}` - retrieve WebMercator tile -- `GET /api/download_image/{request_hash}/{filename}` - download entire processed product image file. +- `GET /api/download_image/{feature_id}/{filename}` - download entire processed product image file. ## Workflow 1. **Enqueue** - frontend posts a visualization request containing product ID + parameters. diff --git a/frontend/html/index.js b/frontend/html/index.js index 10d4046..d8ea039 100644 --- a/frontend/html/index.js +++ b/frontend/html/index.js @@ -598,17 +598,15 @@ const openFeature = () => { if (selectedValue) { const selectedValueJSON = JSON.parse(selectedValue); - const requestHash = encodeURIComponent(selectedValueJSON.requestHash); + const featureId = encodeURIComponent(selectedValueJSON.featureId); let file = encodeURIComponent(selectedValueJSON.file); if (window.currentSatelliteTiles) { leafletMap.removeLayer(window.currentSatelliteTiles); } - //const tileUrlTemplate = `${backendHost}/api/get_tile/{z}/{x}/{y}.jpg?request_hash=${requestHash}&selected_file=${file}`; - file = file.split('.').slice(0,-1).join('.'); - const tileUrlTemplate = `${backendHost}/data/${requestHash}/${file}/{z}/{x}/{y}.jpg`; + const tileUrlTemplate = `${backendHost}/data/${featureId}/${file}/{z}/{x}/{y}.jpg`; const satelliteTiles = L.tileLayer(tileUrlTemplate, { attribution: 'Satellite imagery (c) Copernicus programme', @@ -629,14 +627,13 @@ const visualize = async (featureId) => { let processedProductsSelect = document.querySelector("#processed-products-select"); processedProductsSelect.innerHTML = ''; + console.log(visualizationRequest.getProcessedFiles()) - for (const requestHash in visualizationRequest.getProcessedFiles()) { - for (const file of visualizationRequest.getProcessedFiles()[requestHash]) { - let option = document.createElement("option"); - option.value = `{"requestHash":"${requestHash}", "file":"${file}"}`; - option.textContent = file; - processedProductsSelect.appendChild(option); - } + for (const file of visualizationRequest.getProcessedFiles()[featureId]) { + let option = document.createElement("option"); + option.value = `{"featureId":"${featureId}", "file":"${file}"}`; + option.textContent = file; + processedProductsSelect.appendChild(option); } openFeature(); @@ -651,10 +648,10 @@ const downloadImage = async () => { try { const selectedValueJSON = JSON.parse(selectedValue); - const requestHash = encodeURIComponent(selectedValueJSON.requestHash); + const featureId = encodeURIComponent(selectedValueJSON.featureId); const filename = encodeURIComponent(selectedValueJSON.file); - const response = await fetch(`${backendHost}/api/download_image/${requestHash}/${filename}`); + const response = await fetch(`${backendHost}/api/download_image/${featureId}/${filename}`); if (!response.ok) { showAlert(status.ERROR, `Download failed with status ${response.status}`);