12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879 |
- from datetime import timedelta, datetime
- import pytz
- import pandas as pd
- from sqlalchemy import select
- from flexmeasures.data.models.data_sources import DataSource
- from flexmeasures.data.models.time_series import TimedBelief
- from flexmeasures.data.services.scheduling import create_scheduling_job
- from flexmeasures.data.tests.utils import work_on_rq, exception_reporter
- def test_scheduling_a_charging_station(
- db, app, add_charging_station_assets, setup_test_data
- ):
- """Test one clean run of one scheduling job:
- - data source was made,
- - schedule has been made
- Starting with a state of charge 1 kWh, within 2 hours we should be able to reach 5 kWh.
- """
- soc_at_start = 1
- target_soc = 5
- duration_until_target = timedelta(hours=2)
- charging_station = add_charging_station_assets["Test charging station"].sensors[0]
- tz = pytz.timezone("Europe/Amsterdam")
- start = tz.localize(datetime(2015, 1, 2))
- end = tz.localize(datetime(2015, 1, 3))
- target_datetime = start + duration_until_target
- resolution = timedelta(minutes=15)
- soc_targets = [dict(datetime=target_datetime.isoformat(), value=target_soc)]
- assert (
- db.session.execute(
- select(DataSource).filter_by(name="Seita", type="scheduler")
- ).scalar_one_or_none()
- is None
- ) # Make sure the scheduler data source isn't there
- job = create_scheduling_job(
- asset_or_sensor=charging_station,
- start=start,
- end=end,
- belief_time=start,
- resolution=resolution,
- flex_model={
- "soc-at-start": soc_at_start,
- "soc-targets": soc_targets,
- "roundtrip-efficiency": "100%",
- "storage-efficiency": 1,
- },
- )
- print("Job: %s" % job.id)
- work_on_rq(app.queues["scheduling"], exc_handler=exception_reporter)
- scheduler_source = db.session.execute(
- select(DataSource).filter_by(name="Seita", type="scheduler")
- ).scalar_one_or_none()
- assert (
- scheduler_source is not None
- ) # Make sure the scheduler data source is now there
- power_values = db.session.scalars(
- select(TimedBelief)
- .filter(TimedBelief.sensor_id == charging_station.id)
- .filter(TimedBelief.source_id == scheduler_source.id)
- ).all()
- consumption_schedule = pd.Series(
- [-v.event_value for v in power_values],
- index=pd.DatetimeIndex([v.event_start for v in power_values]),
- ) # For consumption schedules, positive values denote consumption. For the db, consumption is negative
- assert len(consumption_schedule) == 96
- print(consumption_schedule.head(12))
- assert (
- consumption_schedule.head(8).sum() * (resolution / timedelta(hours=1)) == 4.0
- ) # The first 2 hours should consume 4 kWh to charge from 1 to 5 kWh
|