Source code for speasy.core.impex.parser

"""Base inventory tree management
"""
import xml.etree.ElementTree as Et

from ...core import fix_name
from ...core.inventory.indexes import (CatalogIndex, ComponentIndex,
                                       DatasetIndex, ParameterIndex,
                                       ArgumentListIndex, ArgumentIndex,
                                       TemplatedParameterIndex, SpeasyIndex,
                                       TimetableIndex)


[docs] def to_xmlid(index_or_str) -> str: if type(index_or_str) is str: return index_or_str if type(index_or_str) is dict and "xmlid" in index_or_str: return index_or_str['xmlid'] if hasattr(index_or_str, 'xmlid'): return index_or_str.xmlid else: raise TypeError(f"given parameter {index_or_str} of type {type(index_or_str)} is not a compatible index")
[docs] class ImpexXMLParser:
[docs] @staticmethod def fix_names(**kwargs): return {fix_name(key): value for key, value in kwargs.items()}
[docs] @staticmethod def fix_xmlid(**kwargs): def clean(key): if '}id' in key and key.startswith('{'): return 'xmlid' return key return {clean(key): value for key, value in kwargs.items()}
[docs] @staticmethod def index_ctor_args(node, provider_name, is_public: bool = True): meta = ImpexXMLParser.fix_names(**ImpexXMLParser.fix_xmlid(**node.attrib)) meta['is_public'] = is_public if 'dataStart' in meta: meta['start_date'] = meta.pop('dataStart') if 'dataStop' in meta: meta['stop_date'] = meta.pop('dataStop') uid = meta.get('xmlid', node.tag) return {"name": meta.get('name', node.tag), "provider": provider_name, 'uid': uid, "meta": meta}
[docs] @staticmethod def make_any_node(parent, node, provider_name, ctor, name_key='xmlid', is_public: bool = True): new = ctor(**ImpexXMLParser.index_ctor_args(node, provider_name, is_public=is_public)) name = fix_name(new.__dict__.get(name_key, node.tag)) parent.__dict__[name] = new return new
[docs] @staticmethod def make_dataset_node(parent, node, provider_name, name_key, is_public: bool = True): ds = ImpexXMLParser.make_any_node(parent, node, provider_name, DatasetIndex, name_key=name_key, is_public=is_public) return ds
[docs] @staticmethod def make_parameter_node(parent, node, provider_name, name_key, is_public: bool = True): if arguments:=node.find('.//arguments'): arguments.set('name', '__spz_arguments__') param = ImpexXMLParser.make_any_node(parent, node, provider_name, TemplatedParameterIndex, name_key=name_key, is_public=is_public) else: param = ImpexXMLParser.make_any_node(parent, node, provider_name, ParameterIndex, name_key=name_key, is_public=is_public) if isinstance(parent, DatasetIndex): param.start_date = parent.start_date param.stop_date = parent.stop_date param.dataset = parent.spz_uid() return param
[docs] @staticmethod def make_user_parameter_node(parent, node, provider_name, name_key, is_public: bool = True): # It seems that AMDA prevents users from using incompatible names here param = ImpexXMLParser.make_any_node(parent, node, provider_name, ParameterIndex, name_key=name_key, is_public=is_public) return param
[docs] @staticmethod def make_component_node(parent, node, provider_name, name_key, is_public: bool = True): component = ImpexXMLParser.make_any_node(parent, node, provider_name, ComponentIndex, name_key=name_key, is_public=is_public) if isinstance(parent, ParameterIndex): component.start_date = parent.start_date component.stop_date = parent.stop_date component.dataset = parent.dataset component.parameter = parent.spz_uid() return component
[docs] @staticmethod def make_timetable_node(parent, node, provider_name, name_key, is_public: bool = True): tt = ImpexXMLParser.make_any_node(parent, node, provider_name, TimetableIndex, name_key=name_key, is_public=is_public) return tt
[docs] @staticmethod def make_catalog_node(parent, node, provider_name, name_key, is_public: bool = True): cat = ImpexXMLParser.make_any_node(parent, node, provider_name, CatalogIndex, name_key=name_key, is_public=is_public) return cat
[docs] @staticmethod def make_path_node(parent, node, provider_name, name_key, is_public: bool = True): return ImpexXMLParser.make_any_node(parent, node, provider_name, SpeasyIndex, name_key=name_key, is_public=is_public)
[docs] @staticmethod def parse_template_arguments(parent, node, provider_name, name_key, is_public: bool = True): return ImpexXMLParser.make_any_node(parent, node, provider_name, ArgumentListIndex, name_key=name_key, is_public=is_public)
[docs] @staticmethod def parse_template_argument(parent, node, provider_name, name_key, is_public: bool = True): if node.get('type') == 'list': choices = [] for item in node.findall('.//item'): choices.append((item.get('name'), item.get('key'))) node.remove(item) node.set('choices', choices) elif node.get('type') == 'generated-list': node.set('type', 'list') choices = [] for k in range(int(node.get('minkey')), int(node.get('maxkey'))): choices.append((node.get('nametpl').replace('##key##', str(k), 1), str(k))) node.set('choices', choices) return ImpexXMLParser.make_any_node(parent, node, provider_name, ArgumentIndex, name_key=name_key, is_public=is_public)
[docs] @staticmethod def parse(xml, provider_name, name_mapping=None, is_public: bool = True): handlers = { 'mission': ImpexXMLParser.make_path_node, 'observatory': ImpexXMLParser.make_path_node, 'datasetGroup': ImpexXMLParser.make_path_node, 'instrument': ImpexXMLParser.make_path_node, 'dataset': ImpexXMLParser.make_dataset_node, 'parameter': ImpexXMLParser.make_parameter_node, 'component': ImpexXMLParser.make_component_node, 'timeTable': ImpexXMLParser.make_timetable_node, 'timetab': ImpexXMLParser.make_timetable_node, 'catalog': ImpexXMLParser.make_catalog_node, 'param': ImpexXMLParser.make_user_parameter_node, 'arguments': ImpexXMLParser.parse_template_arguments, 'argument': ImpexXMLParser.parse_template_argument } def _recursive_parser(parent, node, is_node_public): if name_mapping and node.tag in name_mapping: name_key = name_mapping[node.tag] elif node.tag not in handlers.keys(): name_key = '' else: name_key = 'name' new = handlers.get(node.tag, ImpexXMLParser.make_path_node)(parent, node, provider_name, name_key, is_node_public) for subnode in node: _recursive_parser(new, subnode, is_node_public) root = SpeasyIndex("root", provider_name, f"{provider_name}_root_node") if xml is not None: tree = Et.fromstring(xml) _recursive_parser(root, tree, is_node_public=is_public) return root