class moto.timestreamwrite.models.TimestreamWriteBackend(region_name, account_id)

When using the decorators, you can use the following internal API to verify records have arrived:

from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
from moto.timestreamwrite.models import timestreamwrite_backends

backend = timestreamwrite_backends[ACCOUNT_ID]["us-east-1"]
records = backend.databases["mydatabase"].tables["mytable"].records

Example usage

def test_timestreamwrite_behaviour:

Implemented features for this service

  • [X] create_database

  • [X] create_table

  • [X] delete_database

  • [X] delete_table

  • [X] describe_database

  • [X] describe_endpoints

  • [X] describe_table

  • [X] list_databases

  • [X] list_tables

  • [X] list_tags_for_resource

  • [X] tag_resource

  • [X] untag_resource

  • [X] update_database

  • [X] update_table

  • [X] write_records