12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970 |
- from __future__ import annotations
- from flask import current_app
- from sqlalchemy import select
- from flexmeasures import User
- from flexmeasures.data import db
- from flexmeasures.data.models.data_sources import DataSource
- from flexmeasures.data.models.user import is_user
- def get_or_create_source(
- source: User | str,
- source_type: str | None = None,
- model: str | None = None,
- version: str | None = None,
- attributes: dict | None = None,
- flush: bool = True,
- ) -> DataSource:
- if is_user(source):
- source_type = "user"
- query = select(DataSource).filter(DataSource.type == source_type)
- if model is not None:
- query = query.filter(DataSource.model == model)
- if version is not None:
- query = query.filter(DataSource.version == version)
- if attributes is not None:
- query = query.filter(
- DataSource.attributes_hash == DataSource.hash_attributes(attributes)
- )
- if is_user(source):
- query = query.filter(DataSource.user == source)
- elif isinstance(source, str):
- query = query.filter(DataSource.name == source)
- else:
- raise TypeError("source should be of type User or str")
- _source = db.session.execute(query).scalar_one_or_none()
- if not _source:
- if is_user(source):
- _source = DataSource(user=source, model=model, version=version)
- else:
- if source_type is None:
- raise TypeError("Please specify a source type")
- _source = DataSource(
- name=source,
- model=model,
- version=version,
- type=source_type,
- attributes=attributes,
- )
- current_app.logger.info(f"Setting up {_source} as new data source...")
- db.session.add(_source)
- if flush:
- # assigns id so that we can reference the new object in the current db session
- db.session.flush()
- return _source
- def get_source_or_none(
- source: int | str, source_type: str | None = None
- ) -> DataSource | None:
- """
- :param source: source id
- :param source_type: optionally, filter by source type
- """
- query = select(DataSource)
- if source_type is not None:
- query = query.filter(DataSource.type == source_type)
- query = query.filter(DataSource.id == int(source))
- return db.session.execute(query).scalar_one_or_none()
|