Source code for insupdel4stac.updater

# SPDX-FileCopyrightText: 2023 Karlsruher Institut für Technologie
#
# SPDX-License-Identifier: EUPL-1.2
import json
import os
import sys
from typing import Optional, Union

import pystac
import requests
from pypgstac.db import PgstacDB
from pypgstac.load import Loader, Methods
from requests.exceptions import RequestException

from .analysers.table_details import TableDetails
from .logger import Logger


[docs] class Updater(object): """ This is a class for updating items and collections in pgSTAC and STAC-API. Args: collections_list (list): A list of collections to be updated. service_type (str): The service type to be used for updating. It can be `pgstac` or `stacapi`. table_details (list, optional): A list of dictionaries that contains the details of the tables. Defaults to None. loader (Loader, optional): The loader object to be used for updating. ppgstacdb (PgstacDB, optional): The pgstacdb object to be used for updating. pgstac_properties (dict, optional): The properties of pgstac. Defaults to None. stacapi_properties (dict, optional): The properties of stacapi. Defaults to None. logger_properties (dict, optional): The properties of logger. Defaults to None. """ def __init__( self, collections_list: list, service_type: str, table_details: Union[list, None] = None, loader: Optional[Loader] = None, pgstacdb: Optional[PgstacDB] = None, pgstac_properties: Optional[dict] = None, stacapi_properties: Optional[dict] = None, logger_properties: Optional[dict] = None, ): if logger_properties is not None and isinstance( logger_properties, dict ): self.logger_properties = logger_properties self.collections_list = collections_list if stacapi_properties is not None and isinstance( stacapi_properties, dict ): self.stacapi_properties = stacapi_properties if pgstac_properties is not None and isinstance( pgstac_properties, dict ): self.pgstac_properties = pgstac_properties if pgstacdb is not None and isinstance(pgstacdb, PgstacDB): self.PgstacDB = pgstacdb if loader is not None and isinstance(loader, Loader): self.loader = loader for collection in self.collections_list: self.update(collection, service_type, loader, table_details)
[docs] def update( self, collection: pystac.Collection, service: str, loader: Optional[Loader] = None, table_details: Union[list, None] = None, ): """ This function updates the items and collections in pgSTAC and STAC-API according to the `service_type` and `table_details`. """ collection_self_path: str = "" for link in collection.links: if link.rel == "self" and collection_self_path == "": collection_self_path = str(link.target) if service == "pgstac" and loader is not None: self.pgstac_collection_update(collection_self_path, loader) elif service == "stacapi": self.stacapi_collection_update(collection) for link in collection.links: if link.rel == "item" or link.rel == "child": item_path = os.path.join( os.path.dirname(collection_self_path), str(link.target) ) item_id = os.path.splitext(os.path.basename(item_path))[0] item = collection.get_item(item_id) if table_details is not None: for table_detail in table_details: if ( table_detail.get("table") is not None and table_detail.get("item-id") is not None and table_detail["table"] == "item" and ( isinstance(table_detail.get("item-id"), list) or isinstance(table_detail.get("item-id"), str) ) ): if TableDetails().item_table_details(table_detail)[ 0 ] is not None and any( i in item_id for i in TableDetails().item_table_details( table_detail )[0] ): # star_item_ids if ( table_detail.get("collection-id") is not None ): if TableDetails().collection_table_details( table_detail )[0] is not None and any( c in collection.id for c in TableDetails().collection_table_details( table_detail )[ 0 ] ): # star_collection_ids if ( service == "pgstac" and item is not None ): self.pgstac_item_update( item, self.PgstacDB ) elif ( service == "stacapi" and item is not None ): self.stacapi_item_update(item) continue elif TableDetails().collection_table_details( table_detail )[ 1 ] is not None and any( c == collection.id for c in TableDetails().collection_table_details( table_detail )[ 1 ] ): # other_collection_ids if ( service == "pgstac" and item is not None ): self.pgstac_item_update( item, self.PgstacDB ) elif ( service == "stacapi" and item is not None ): self.stacapi_item_update(item) continue else: self.logger_properties[ "logger_level" ] = "WARNING" self.logger_properties[ "logger_msg" ] = "Please ensure that the collection-id is provided accurately." Logger(self.logger_properties) continue # elif table_detail.get("collection-id") is not None and isinstance(table_detail.get("collection-id"), list): # self.logger_properties["logger_level"] = "WARNING" # self.logger_properties["logger_msg"] = "When the `table` is `item`, each item in a list has a unique collection-id, and it is not possible for several items to have more than one collection-id. The collectio-id should correspond to the existing list of items. Please revise the details provided in the `table_details`." # Logger(self.logger_properties) elif table_detail.get("collection-id") is None: self.logger_properties[ "logger_level" ] = "WARNING" self.logger_properties[ "logger_msg" ] = "The collection ID has been left blank in table_details. In order to enhance the efficiency of the procedure, please furnish the collection-id." Logger(self.logger_properties) if ( service == "pgstac" and item is not None ): self.pgstac_item_update( item, self.PgstacDB ) elif ( service == "stacapi" and item is not None ): self.stacapi_item_update(item) continue else: self.logger_properties[ "logger_level" ] = "ERROR" self.logger_properties[ "logger_msg" ] = "Review your `table_details` and run the process again." Logger(self.logger_properties) continue elif TableDetails().item_table_details( table_detail )[1] is not None and any( i == item_id for i in TableDetails().item_table_details( table_detail )[1] ): # other_item_ids if ( table_detail.get("collection-id") is not None ): if TableDetails().collection_table_details( table_detail )[0] is not None and any( c in collection.id for c in TableDetails().collection_table_details( table_detail )[ 0 ] ): # star_collection_ids if ( service == "pgstac" and item is not None ): self.pgstac_item_update( item, self.PgstacDB ) elif ( service == "stacapi" and item is not None ): self.stacapi_item_update(item) continue elif TableDetails().collection_table_details( table_detail )[ 1 ] is not None and any( c == collection.id for c in TableDetails().collection_table_details( table_detail )[ 1 ] ): # other_collection_ids if ( service == "pgstac" and item is not None ): self.pgstac_item_update( item, self.PgstacDB ) elif ( service == "stacapi" and item is not None ): self.stacapi_item_update(item) continue else: self.logger_properties[ "logger_level" ] = "WARNING" self.logger_properties[ "logger_msg" ] = "Please ensure that the collection-id is provided accurately." Logger(self.logger_properties) continue # elif table_detail.get("collection-id") is not None and isinstance(table_detail.get("collection-id"), list): # self.logger_properties["logger_level"] = "WARNING" # self.logger_properties["logger_msg"] = "When the `table` is `item`, each item in a list has a unique collection-id, and it is not possible for several items to have more than one collection-id. The collectio-id should correspond to the existing list of items. Please revise the details provided in the `table_details`." # Logger(self.logger_properties) elif table_detail.get("collection-id") is None: self.logger_properties[ "logger_level" ] = "WARNING" self.logger_properties[ "logger_msg" ] = "The collection ID has been left blank in table_details. In order to enhance the efficiency of the procedure, please furnish the collection-id." Logger(self.logger_properties) if ( service == "pgstac" and item is not None ): self.pgstac_item_update( item, self.PgstacDB ) elif ( service == "stacapi" and item is not None ): self.stacapi_item_update(item) continue else: self.logger_properties[ "logger_level" ] = "ERROR" self.logger_properties[ "logger_msg" ] = "Review your `table_details` and run the process again." Logger(self.logger_properties) continue else: self.logger_properties["logger_level"] = "ERROR" self.logger_properties["logger_msg"] = ( "Please ensure that the `table` is `item` and the `item-id` is a `list` or `str` in the " + str(table_detail) ) Logger(self.logger_properties) continue else: if service == "pgstac" and item is not None: self.pgstac_item_update(item, self.PgstacDB) elif service == "stacapi" and item is not None: self.stacapi_item_update(item)
[docs] def pgstac_collection_update( self, collection_self_path: str, loader: Loader ): """ This function updates the collections in pgSTAC. """ try: loader.load_collections( str(collection_self_path), Methods.upsert, ) self.logger_properties["logger_level"] = "INFO" self.logger_properties[ "logger_msg" ] = "The updating of collection in pgSTAC database has been done successfully." Logger(self.logger_properties) except Exception: ( ex_type, ex_value, ex_traceback, ) = sys.exc_info() self.logger_properties["logger_level"] = "ERROR" if ex_type is not None and ex_value is not None: self.logger_properties["logger_msg"] = ( "The updating of collection in pgSTAC database could not be done. %s : %s." % (ex_type.__name__, ex_value) ) else: self.logger_properties[ "logger_msg" ] = "The updating of collection in pgSTAC database could not be done." Logger(self.logger_properties) pass
[docs] def stacapi_collection_update(self, collection: pystac.Collection): """ This function updates the collections in STAC-API via PUT method. """ url = self.stacapi_properties["stacapi_url"] + "/collections" collection_json = collection.to_dict() headers = { "accept": "application/json", "Content-Type": "application/json", } try: req = requests.put( url, headers=headers, data=json.dumps(collection_json), timeout=self.stacapi_properties["timeout"], verify=self.stacapi_properties["verify"], ) if req.status_code == 200: self.logger_properties["logger_level"] = "INFO" self.logger_properties[ "logger_msg" ] = "The updating of collection in STAC-API database has been done successfully." Logger(self.logger_properties) else: self.logger_properties["logger_level"] = "ERROR" self.logger_properties["logger_msg"] = ( "The updating of collection in STAC-API database could not be done. %s: %s" % (req.status_code, req.reason) ) Logger(self.logger_properties) pass except RequestException as e: # Handle any exceptions (e.g., timeout, connection error) self.logger_properties["logger_level"] = "ERROR" self.logger_properties["logger_msg"] = ( "The updating of collection in STAC-API database could not be done. %s" % e ) Logger(self.logger_properties) pass
[docs] def pgstac_item_update(self, item: pystac.Item, PgstacDB: PgstacDB): """ This function updates the items in pgSTAC. """ try: # loader.load_items( # str(item_path), # Methods.upsert, # ) item_json = item.to_dict() gen_items = PgstacDB.func( "upsert_item", json.dumps(item_json), ) [e for e in gen_items] self.logger_properties["logger_level"] = "INFO" self.logger_properties[ "logger_msg" ] = "The updating of item in pgSTAC database has been done successfully." Logger(self.logger_properties) except Exception: ( ex_type, ex_value, ex_traceback, ) = sys.exc_info() self.logger_properties["logger_level"] = "ERROR" if ex_type is not None and ex_value is not None: self.logger_properties["logger_msg"] = ( "The updating of item in pgSTAC database could not be done. %s : %s." % (ex_type.__name__, ex_value) ) else: self.logger_properties[ "logger_msg" ] = "The updating of item in pgSTAC database could not be done." Logger(self.logger_properties)
[docs] def stacapi_item_update(self, item: pystac.Item): """ This function updates the items in STAC-API via PUT method. """ url = ( self.stacapi_properties["stacapi_url"] + "/collections/" + item.collection_id + "/items/" + str(item.id) ) item_json = item.to_dict() headers = { "accept": "application/json", "Content-Type": "application/json", } try: req = requests.put( url, headers=headers, data=json.dumps(item_json), timeout=self.stacapi_properties["timeout"], verify=self.stacapi_properties["verify"], ) if req.status_code == 200: self.logger_properties["logger_level"] = "INFO" self.logger_properties[ "logger_msg" ] = "The updating of item in STAC-API database has been done successfully." Logger(self.logger_properties) else: self.logger_properties["logger_level"] = "ERROR" self.logger_properties["logger_msg"] = ( "The updating of item in STAC-API database could not be done. %s: %s" % (req.status_code, req.reason) ) Logger(self.logger_properties) pass except RequestException as e: # Handle any exceptions (e.g., timeout, connection error) self.logger_properties["logger_level"] = "ERROR" self.logger_properties["logger_msg"] = ( "The updating of item in STAC-API database could not be done. %s" % e ) Logger(self.logger_properties) pass