[docs]defcreate_alembic_config(url:Union[str,URL,Url,MultiHostUrl]):"""Create our alembic configuration object, to run alembic commands."""fromalembic.configimportConfigasAlembicConfigifisinstance(url,(Url,MultiHostUrl)):url=str(url)url=make_url(url)alembic_cfg=AlembicConfig()alembic_cfg.set_main_option("script_location",os.path.join(storage.__path__[0],"migrations"))alembic_cfg.set_main_option("file_template","%%(year)d%%(month).2d%%(day).2d%%(hour).2d%%(minute).2d%%(second).2d_%%(rev)s_%%(slug)s",)alembic_cfg.set_main_option("truncate_slug_length","40")alembic_cfg.set_main_option("output_encoding","utf-8")alembic_cfg.set_main_option("sqlalchemy.url",url.render_as_string(hide_password=False))alembic_cfg.set_main_option("configure_logger","false")returnalembic_cfg
[docs]defcreate_harp_settings_with_storage_from_command_line_options(kwargs):"""Create a Harp configuration object using common server command line options (--set...) with the sqlalchemy storage application installed so that the storage is properly configured."""fromharp.commandline.options.serverimportCommonServerOptionsoptions=CommonServerOptions(**kwargs)builder=ConfigurationBuilder.from_commandline_options(options)builder.applications.add("storage")returnbuilder.build()
[docs]asyncdefdo_reset(engine):logger.info("🛢 [db:reset] dropping all tables.")asyncwithengine.begin()asconn:awaitconn.run_sync(Base.metadata.drop_all)awaitconn.execute(text("DROP TABLE IF EXISTS alembic_version;"))
asyncdef_do_migrate(engine,*,migrator,reset=False):logger.info(f"🛢 Starting database migrations... (dialect={engine.dialect.name}, reset={reset}).")ifreset:awaitdo_reset(engine)ifengine.dialect.name=="sqlite":logger.debug("🛢 [db:migrate dialect=sqlite] creating all tables (without alembic).")asyncwithengine.begin()asconn:awaitconn.run_sync(Base.metadata.create_all)awaitconn.commit()elifmigrator:# alembic manages migrations except for sqlite, because it's not trivial to make them work and an env using# sqlite does not really need to support upgrades (drop/recreate is fine when harp is upgraded).logger.info("🛢 [db:migrate] Running database migrations...")withThreadPoolExecutor()asexecutor:awaitasyncio.get_event_loop().run_in_executor(executor,migrator)ifengine.dialect.name=="mysql":logger.debug("🛢 [db:migrate dialect=mysql] creating fulltext indexes.")try:asyncwithengine.begin()asconn:awaitconn.execute(text(f"CREATE FULLTEXT INDEX endpoint_ft_index ON {Transaction.__tablename__} (endpoint);"))# Create the full text index for messages.summaryawaitconn.execute(text(f"CREATE FULLTEXT INDEX summary_ft_index ON {Message.__tablename__} (summary);"))awaitconn.commit()exceptOperationalErrorase:# check for duplicate key errorife.origande.orig.args[0]==1061:passelse:raiseelogger.debug("🛢 [db:migrate] Done.")
[docs]asyncdefdo_migrate(engine,*,migrator,reset=False):try:await_do_migrate(engine,migrator=migrator,reset=reset)exceptExceptionase:logger.error(f"🛢 [db:migrate] Migrations failed: {e}")raiseRuntimeError(f"Could not run migrations ({e}).")frome