sqly.migration¶
Implementation of the sqly migration commands. See the CLI Usage document for more information about usage.
Migration
dataclass
¶
Represents a single migration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
app |
str |
The name of the app (module) that owns the Migration. |
required |
ts |
int |
( |
<factory> |
name |
str |
The (optional) name of the migration provides a short description. |
<factory> |
depends |
list[str] |
A list of migrations (keys) that this migration depends on. |
<factory> |
applied |
Optional[datetime] |
If the migration has been applied, the datetime. |
None |
doc |
Optional[str] |
A document string describing the migration. |
None |
up |
list[str] |
a list of SQL statements implementing the “forward” migration. |
<factory> |
dn |
list[str] |
a list of SQL statements implementing the “reverse” migration. |
<factory> |
Source code in sqly/migration.py
@dataclass
class Migration:
"""
Represents a single migration.
Arguments:
app (str): The name of the app (module) that owns the Migration.
ts (int): (`YYYYmmddHHMMSSfff`) An integer representing the timestamp when the
migration was created (millisecond resolution).
name (str): The (optional) name of the migration provides a short description.
depends (list[str]): A list of migrations (keys) that this migration depends on.
applied (Optional[datetime]): If the migration has been applied, the datetime.
doc (Optional[str]): A document string describing the migration.
up (list[str]): a list of SQL statements implementing the "forward" migration.
dn (list[str]): a list of SQL statements implementing the "reverse" migration.
"""
app: str
ts: int = field(default_factory=migration_timestamp)
name: str = field(default_factory=str)
depends: list[str] = field(default_factory=list)
applied: Optional[datetime] = None
doc: Optional[str] = None
up: list[str] = field(default_factory=list)
dn: list[str] = field(default_factory=list)
data: dict[str, list[dict[str, Any]]] = field(default_factory=dict)
def __post_init__(self):
# replace non-word characters in the name with an underscore
self.name = re.sub(r"[\W_]+", "_", self.name or "")
# ensure that list attributes are lists (such as when loaded from sqlite3)
for key in ["depends", "up", "dn"]:
if not self.__dict__[key]:
self.__dict__[key] = []
elif isinstance(self.__dict__[key], str):
self.__dict__[key] = json.loads(self.__dict__[key])
else:
self.__dict__[key] = list(self.__dict__[key])
def __repr__(self) -> str:
return (
self.__class__.__name__
+ "("
+ ", ".join(
f"{key}={getattr(self, key)!r}" for key in ["key", "depends", "applied"]
)
+ ")"
)
def __str__(self) -> str:
"""The string representation of the Migration is the instance as YAML."""
return self.yaml()
def __hash__(self) -> int:
"""The unique hash is based on the Migration.key."""
return uuid.uuid3(SQLY_UUID_NAMESPACE, self.key).int
def dict(
self, exclude: Optional[list] = None, exclude_none: bool = False
) -> Dict[str, Any]:
"""
The Migration serialized as a dict.
Arguments:
exclude (Optional[list]): A list of fields to exclude.
exclude_none (bool): Whether to exclude fields with value None.
"""
return {
key: val
for key, val in asdict(self).items()
if key not in (exclude or []) and (exclude_none is False or val is not None)
}
@property
def key(self):
"""
The Migration.key uniquely identifies the migration.
Format = `{app}:{ts}_{name}`
"""
return f"{self.app}:{self.ts}_{self.name}"
@property
def filename(self):
"""The filename (without path) for the Migration"""
return f"{self.ts}_{self.name}.yaml"
@classmethod
def load(cls, filepath: Path) -> Migration:
"""
Load the migration at the given file path.
"""
with open(filepath) as f:
data = yaml.safe_load(f.read())
return cls(**data)
@classmethod
def key_load(cls, migration_key: str) -> Migration:
"""Load the Migration that has the given key."""
return cls.load(cls.key_filepath(migration_key))
@classmethod
def key_filepath(cls, migration_key: str) -> Path:
"""The file path of the Migration that has the given key.
Arguments:
migration_key (str): The Migration key
Returns:
file path (Path): The files file of the Migration
"""
app, basename = migration_key.split(":")
return app_migrations_path(app) / f"{basename}.yaml"
@classmethod
def app_migrations(
cls, app: str, include_depends: bool = True
) -> Dict[str, Migration]:
"""
For a given module name, get the migrations in that module. If `include_depends`
is `True` (the default), also include depends migrations from other apps.
Arguments:
app (str): The name of the app (module) for which to list migrations.
include_depends (bool): Whether to include dependency Migrations in the
listing.
Returns:
migrations (dict[str, Migration]): A dict of Migrations, by key.
"""
migration_filenames = glob(str(app_migrations_path(app) / "*.yaml"))
migrations = {
m.key: m
for m in set(cls.load(filename) for filename in migration_filenames)
}
if include_depends is True:
dependencies = {}
for migration in migrations.values():
dependencies |= migration.depends_migrations()
migrations |= dependencies
return migrations
@classmethod
def all_migrations(cls, *apps: list[str]) -> Dict[str, Migration]:
"""
Return all the migrations, including dependencies, for the given app(s).
Arguments:
apps (list[str]): The app or apps for which to list Migrations.
Returns:
migrations (dict[str, Migration]): A dict of Migrations, by key.
"""
# always depend on sqly
migrations = cls.app_migrations("sqly")
for app in [app for app in apps if app not in ["sqly"]]:
migrations |= cls.app_migrations(app, include_depends=True)
return migrations
@classmethod
def create(
cls, app: str, *other_apps: list[str], name: Optional[str] = None
) -> Migration:
"""
Create a new Migration object for the given app (module) name. The new Migration
is not saved to the filesystem: It is just a Migration instance in memory.
Every new migration automatically depends on all the "leaf" nodes in the
existing migration graph. Leaf nodes are those with out_degree == 0 (no edges
pointing out). See:
<https://networkx.org/documentation/stable/reference/classes/generated/networkx.DiGraph.out_degree.html>.
For a worked example, see:
<https://stackoverflow.com/questions/31946253/find-end-nodes-leaf-nodes-in-radial-tree-networkx-graph/31953001>.
NOTE: The existing migration graph is calculated from the filesystem, not what
is applied in any database. Migrations from other branches might currently be
applied in the database; but for the purpose of creating a Migration graph, the
filesystem is the source of truth.
Arguments:
app (str): The name of the app for which to create the new Migration.
other_apps (list[str]): The other apps to include in the dependency graph.
name (Optional[str]): The name (label) for the migration. Default = `""`.
Returns:
migration (Migration): The Migration that has just been created.
"""
migrations = cls.all_migrations(app, *other_apps)
graph = cls.graph(migrations)
depends = [node for node in graph.nodes() if graph.out_degree(node) == 0]
migration = cls(
app=app,
name=name,
depends=depends,
)
return migration
@classmethod
def database_migrations(
cls, connection: Any, dialect: Dialect
) -> Dict[str, Migration]:
"""
Query the database with the given `connection` and return a dict of the
Migrations in the database, by key. If no Migrations have been applied in the
database, the result is an empty dict.
Arguments:
connection (Any): A database connection.
Returns:
migrations (dict[str, Migration]): A dict of Migrations by key.
"""
if dialect.must_async:
sql = ASQL(dialect=dialect)
else:
sql = SQL(dialect=dialect)
try:
records = lib.gen(sql.select(connection, "select * from sqly_migrations"))
except Exception as exc:
print(str(exc))
records = []
return {m.key: m for m in set(cls(**record) for record in records)}
@classmethod
def migrate(
cls,
connection: Any,
dialect: Dialect,
migration: Migration,
dryrun: bool = False,
):
"""
Migrate the database to this migration, either up or down, using the given
database connection.
Algorithm:
1. Collate the list of applied migrations in the database with the list of
migrations available in this application. Give precedence to the file
definitions in the application.
2. Calculate the graph path to reach this migration and whether this is an "up"
or "down" migration.
- if this migration has not been applied to the database, then the graph
path is from the last applied predecessor "up" to this migration.
- if this migration has been applied to the database, then the graph path is
from the last applied successor "down" to this migration.
3. Apply the sequence of migrations (either up or down).
[_What about situations in which the path to the given Migration includes both
"down" Migrations to back out of another branch and "up" Migrations preceding
the given Migration on its branch? Our current solution is to ignore "other"
branches and only migrate from the last applied predecessor._]
Arguments:
connection (Any): A database connection. dialect (Dialect): The SQL database
migration (Migration): The Migration that we are migrating _to_. dryrun
(bool): Whether this is a dry run.
"""
db_migrations = cls.database_migrations(connection, dialect)
all_migrations = cls.all_migrations(migration.app)
migrations = db_migrations | all_migrations
graph = cls.graph(migrations)
if migration.key not in db_migrations:
# apply 'up' migrations for all ancestors and this migration
subgraph = nx.subgraph(graph, migration.ancestors(graph) | {migration.key})
for key in nx.lexicographical_topological_sort(subgraph):
if key not in db_migrations:
migrations[key].apply(
connection, dialect, direction="up", dryrun=dryrun
)
else:
# apply 'dn' migrations for all descendants in reverse
subgraph = nx.subgraph(graph, migration.descendants(graph))
for key in reversed(list(nx.lexicographical_topological_sort(subgraph))):
if key in db_migrations:
migrations[key].apply(
connection, dialect, direction="dn", dryrun=dryrun
)
def depends_migrations(self) -> Dict[str, Migration]:
"""
All migrations that this migration depends on, recursively.
Returns:
migrations (dict[str, Migration]): A dict of Migrations by key.
"""
dependencies = {}
for depend in self.depends:
migration = self.key_load(depend)
dependencies |= {depend: migration} | migration.depends_migrations()
return dependencies
@classmethod
def graph(cls, migrations: Mapping[str, Migration]) -> nx.classes.digraph.DiGraph:
"""
Given a mapping of Migrations, create a dependency graph of Migrations. The
resulting graph is a DAG (directed acyclic graph) that is a [transitive
reduction](https://en.wikipedia.org/wiki/Transitive_reduction) of the Migrations
graph. If the graph is not a DAG (e.g., it has cycles) then a networkx.HasACycle
exception is raised.
Arguments:
migrations (Mapping[str, Migration]): A mapping of Migrations by key.
Returns:
graph (nx.classes.digraph.DiGraph): A networkx DiGraph of the Migrations.
"""
graph = nx.DiGraph()
dag = {key: migrations[key].depends for key in migrations}
for migration_key, migration_depends in dag.items():
graph.add_node(migration_key)
for depend in migration_depends:
graph.add_edge(depend, migration_key)
if not nx.is_directed_acyclic_graph(graph):
raise nx.HasACycle(dag)
return nx.transitive_reduction(graph)
def ancestors(self, graph: nx.classes.digraph.DiGraph) -> AbstractSet[str]:
"""
Given a Migration and a graph, return the set of all ancestors of this
Migration. If this Migration is not in the given graph, a NetworkXError
Exception is raised.
Arguments:
graph (nx.classes.digraph.DiGraph): A graph of Migrations including this
one.
Returns:
migration keys (set): The set of migrations (keys) that are ancestors.
"""
return nx.ancestors(graph, self.key)
def descendants(self, graph: nx.classes.digraph.DiGraph) -> AbstractSet[str]:
"""
Given a Migration and a graph, return the set of all descendants of this
Migration. If this Migration is not in the given graph, a NetworkXError
Exception is raised.
Arguments:
graph (nx.classes.digraph.DiGraph): A graph of Migrations including this
one.
Returns:
migration keys (set): The set of migrations (keys) that are ancestors.
"""
return nx.descendants(graph, self.key)
def yaml(self, exclude: Optional[list] = None, exclude_none: bool = False) -> str:
"""
Serialize this Migration as a YAML string.
Arguments:
exclude (Optional[list]): A list of fields to exclude.
exclude_none (bool): Whether to exclude fields with value None.
"""
return yaml.dump(
self.dict(exclude=exclude, exclude_none=exclude_none),
default_flow_style=False,
sort_keys=False,
)
def save(self, exclude: Optional[list] = None, exclude_none: bool = False):
"""
Save this Migration to the filesystem.
Arguments:
exclude (Optional[list]): A list of fields to exclude.
exclude_none (bool): Whether to exclude fields with value None.
Returns:
tuple (filepath, size): The filepath where the Migration was saved, and its
size in bytes.
"""
filepath = app_migrations_path(self.app) / self.filename
os.makedirs(filepath.parent, exist_ok=True)
with open(filepath, "wb") as f:
size = f.write(
self.yaml(exclude=exclude, exclude_none=exclude_none).encode()
)
return filepath, size
def apply(
self,
connection: Any,
dialect: Dialect,
direction: str = "up",
dryrun: bool = False,
):
"""
Apply the migration (direction = 'up' or 'dn') to connection database. The
entire migration script is wrapped in a transaction. (This method is called
internally by `Migration.migrate()`).
Arguments:
connection (Any): A database connection.
dialect (Dialect): The SQL dialect of the database connection.
direction (str): Which migration to apply: "up" or "dn".
dryrun (bool): Whether this is a dry run.
"""
print(self.key, direction, end=" ... ")
if dryrun:
print("DRY RUN")
return
migration_queries = getattr(self, direction)
if migration_queries:
for migration_query in migration_queries:
lib.run(connection.execute(migration_query))
if direction == "up":
sqly_migrations_query = self.insert_query(dialect)
# if there is data, load it
for table, records in self.data.items():
for record in records:
query = SQL(dialect=dialect).render(
queries.INSERT(table, record), record
)
lib.run(connection.execute(*query))
else:
# TODO? if there is data, delete it?
sqly_migrations_query = self.delete_query(dialect)
lib.run(connection.execute(*sqly_migrations_query))
lib.run(connection.commit())
print("OK")
def insert_query(self, dialect: Dialect) -> Any:
"""
Render a SQL query to insert this Migration into the sqly_migrations table.
Arguments:
dialect (Dialect): The SQL database dialect to render the query for.
Returns:
tuple (str, params...): The SQL query and params formatted for the database
dialect.
"""
data = {k: v for k, v in self.dict(exclude_none=True).items()}
data["depends"] = json.dumps(data.get("depends") or [])
data["up"] = json.dumps(data.get("up") or [])
data["dn"] = json.dumps(data.get("dn") or [])
if "data" in data:
data.pop("data")
sql = queries.INSERT("sqly_migrations", data)
return SQL(dialect=dialect).render(sql, data)
def delete_query(self, dialect):
"""
Render a SQL query to delete this Migration from the sqly_migrations table.
Arguments:
dialect (Dialect): The SQL database dialect to render the query for.
Returns:
tuple (str, params...): The SQL query and params formatted for the database
dialect.
"""
sql = queries.DELETE(
"sqly_migrations", [Q.filter(key) for key in ["app", "ts", "name"]]
)
return SQL(dialect=dialect).render(sql, self.dict())
filename
property
readonly
¶
The filename (without path) for the Migration
key
property
readonly
¶
The Migration.key uniquely identifies the migration.
Format = {app}:{ts}_{name}
__hash__(self)
special
¶
__str__(self)
special
¶
all_migrations(*apps)
classmethod
¶
Return all the migrations, including dependencies, for the given app(s).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
apps |
list[str] |
The app or apps for which to list Migrations. |
() |
Returns:
Type | Description |
---|---|
migrations (dict[str, Migration]) |
A dict of Migrations, by key. |
Source code in sqly/migration.py
@classmethod
def all_migrations(cls, *apps: list[str]) -> Dict[str, Migration]:
"""
Return all the migrations, including dependencies, for the given app(s).
Arguments:
apps (list[str]): The app or apps for which to list Migrations.
Returns:
migrations (dict[str, Migration]): A dict of Migrations, by key.
"""
# always depend on sqly
migrations = cls.app_migrations("sqly")
for app in [app for app in apps if app not in ["sqly"]]:
migrations |= cls.app_migrations(app, include_depends=True)
return migrations
ancestors(self, graph)
¶
Given a Migration and a graph, return the set of all ancestors of this Migration. If this Migration is not in the given graph, a NetworkXError Exception is raised.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
graph |
nx.classes.digraph.DiGraph |
A graph of Migrations including this one. |
required |
Returns:
Type | Description |
---|---|
migration keys (set) |
The set of migrations (keys) that are ancestors. |
Source code in sqly/migration.py
def ancestors(self, graph: nx.classes.digraph.DiGraph) -> AbstractSet[str]:
"""
Given a Migration and a graph, return the set of all ancestors of this
Migration. If this Migration is not in the given graph, a NetworkXError
Exception is raised.
Arguments:
graph (nx.classes.digraph.DiGraph): A graph of Migrations including this
one.
Returns:
migration keys (set): The set of migrations (keys) that are ancestors.
"""
return nx.ancestors(graph, self.key)
app_migrations(app, include_depends=True)
classmethod
¶
For a given module name, get the migrations in that module. If include_depends
is True
(the default), also include depends migrations from other apps.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
app |
str |
The name of the app (module) for which to list migrations. |
required |
include_depends |
bool |
Whether to include dependency Migrations in the listing. |
True |
Returns:
Type | Description |
---|---|
migrations (dict[str, Migration]) |
A dict of Migrations, by key. |
Source code in sqly/migration.py
@classmethod
def app_migrations(
cls, app: str, include_depends: bool = True
) -> Dict[str, Migration]:
"""
For a given module name, get the migrations in that module. If `include_depends`
is `True` (the default), also include depends migrations from other apps.
Arguments:
app (str): The name of the app (module) for which to list migrations.
include_depends (bool): Whether to include dependency Migrations in the
listing.
Returns:
migrations (dict[str, Migration]): A dict of Migrations, by key.
"""
migration_filenames = glob(str(app_migrations_path(app) / "*.yaml"))
migrations = {
m.key: m
for m in set(cls.load(filename) for filename in migration_filenames)
}
if include_depends is True:
dependencies = {}
for migration in migrations.values():
dependencies |= migration.depends_migrations()
migrations |= dependencies
return migrations
apply(self, connection, dialect, direction='up', dryrun=False)
¶
Apply the migration (direction = ‘up’ or ‘dn’) to connection database. The
entire migration script is wrapped in a transaction. (This method is called
internally by Migration.migrate()
).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
connection |
Any |
A database connection. |
required |
dialect |
Dialect |
The SQL dialect of the database connection. |
required |
direction |
str |
Which migration to apply: “up” or “dn”. |
'up' |
dryrun |
bool |
Whether this is a dry run. |
False |
Source code in sqly/migration.py
def apply(
self,
connection: Any,
dialect: Dialect,
direction: str = "up",
dryrun: bool = False,
):
"""
Apply the migration (direction = 'up' or 'dn') to connection database. The
entire migration script is wrapped in a transaction. (This method is called
internally by `Migration.migrate()`).
Arguments:
connection (Any): A database connection.
dialect (Dialect): The SQL dialect of the database connection.
direction (str): Which migration to apply: "up" or "dn".
dryrun (bool): Whether this is a dry run.
"""
print(self.key, direction, end=" ... ")
if dryrun:
print("DRY RUN")
return
migration_queries = getattr(self, direction)
if migration_queries:
for migration_query in migration_queries:
lib.run(connection.execute(migration_query))
if direction == "up":
sqly_migrations_query = self.insert_query(dialect)
# if there is data, load it
for table, records in self.data.items():
for record in records:
query = SQL(dialect=dialect).render(
queries.INSERT(table, record), record
)
lib.run(connection.execute(*query))
else:
# TODO? if there is data, delete it?
sqly_migrations_query = self.delete_query(dialect)
lib.run(connection.execute(*sqly_migrations_query))
lib.run(connection.commit())
print("OK")
create(app, *other_apps, *, name=None)
classmethod
¶
Create a new Migration object for the given app (module) name. The new Migration is not saved to the filesystem: It is just a Migration instance in memory.
Every new migration automatically depends on all the “leaf” nodes in the existing migration graph. Leaf nodes are those with out_degree == 0 (no edges pointing out). See: https://networkx.org/documentation/stable/reference/classes/generated/networkx.DiGraph.out_degree.html. For a worked example, see: https://stackoverflow.com/questions/31946253/find-end-nodes-leaf-nodes-in-radial-tree-networkx-graph/31953001.
NOTE: The existing migration graph is calculated from the filesystem, not what is applied in any database. Migrations from other branches might currently be applied in the database; but for the purpose of creating a Migration graph, the filesystem is the source of truth.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
app |
str |
The name of the app for which to create the new Migration. |
required |
other_apps |
list[str] |
The other apps to include in the dependency graph. |
() |
name |
Optional[str] |
The name (label) for the migration. Default = |
None |
Returns:
Type | Description |
---|---|
migration (Migration) |
The Migration that has just been created. |
Source code in sqly/migration.py
@classmethod
def create(
cls, app: str, *other_apps: list[str], name: Optional[str] = None
) -> Migration:
"""
Create a new Migration object for the given app (module) name. The new Migration
is not saved to the filesystem: It is just a Migration instance in memory.
Every new migration automatically depends on all the "leaf" nodes in the
existing migration graph. Leaf nodes are those with out_degree == 0 (no edges
pointing out). See:
<https://networkx.org/documentation/stable/reference/classes/generated/networkx.DiGraph.out_degree.html>.
For a worked example, see:
<https://stackoverflow.com/questions/31946253/find-end-nodes-leaf-nodes-in-radial-tree-networkx-graph/31953001>.
NOTE: The existing migration graph is calculated from the filesystem, not what
is applied in any database. Migrations from other branches might currently be
applied in the database; but for the purpose of creating a Migration graph, the
filesystem is the source of truth.
Arguments:
app (str): The name of the app for which to create the new Migration.
other_apps (list[str]): The other apps to include in the dependency graph.
name (Optional[str]): The name (label) for the migration. Default = `""`.
Returns:
migration (Migration): The Migration that has just been created.
"""
migrations = cls.all_migrations(app, *other_apps)
graph = cls.graph(migrations)
depends = [node for node in graph.nodes() if graph.out_degree(node) == 0]
migration = cls(
app=app,
name=name,
depends=depends,
)
return migration
database_migrations(connection, dialect)
classmethod
¶
Query the database with the given connection
and return a dict of the
Migrations in the database, by key. If no Migrations have been applied in the
database, the result is an empty dict.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
connection |
Any |
A database connection. |
required |
Returns:
Type | Description |
---|---|
migrations (dict[str, Migration]) |
A dict of Migrations by key. |
Source code in sqly/migration.py
@classmethod
def database_migrations(
cls, connection: Any, dialect: Dialect
) -> Dict[str, Migration]:
"""
Query the database with the given `connection` and return a dict of the
Migrations in the database, by key. If no Migrations have been applied in the
database, the result is an empty dict.
Arguments:
connection (Any): A database connection.
Returns:
migrations (dict[str, Migration]): A dict of Migrations by key.
"""
if dialect.must_async:
sql = ASQL(dialect=dialect)
else:
sql = SQL(dialect=dialect)
try:
records = lib.gen(sql.select(connection, "select * from sqly_migrations"))
except Exception as exc:
print(str(exc))
records = []
return {m.key: m for m in set(cls(**record) for record in records)}
delete_query(self, dialect)
¶
Render a SQL query to delete this Migration from the sqly_migrations table.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dialect |
Dialect |
The SQL database dialect to render the query for. |
required |
Returns:
Type | Description |
---|---|
tuple (str, params...) |
The SQL query and params formatted for the database dialect. |
Source code in sqly/migration.py
def delete_query(self, dialect):
"""
Render a SQL query to delete this Migration from the sqly_migrations table.
Arguments:
dialect (Dialect): The SQL database dialect to render the query for.
Returns:
tuple (str, params...): The SQL query and params formatted for the database
dialect.
"""
sql = queries.DELETE(
"sqly_migrations", [Q.filter(key) for key in ["app", "ts", "name"]]
)
return SQL(dialect=dialect).render(sql, self.dict())
depends_migrations(self)
¶
All migrations that this migration depends on, recursively.
Returns:
Type | Description |
---|---|
migrations (dict[str, Migration]) |
A dict of Migrations by key. |
Source code in sqly/migration.py
def depends_migrations(self) -> Dict[str, Migration]:
"""
All migrations that this migration depends on, recursively.
Returns:
migrations (dict[str, Migration]): A dict of Migrations by key.
"""
dependencies = {}
for depend in self.depends:
migration = self.key_load(depend)
dependencies |= {depend: migration} | migration.depends_migrations()
return dependencies
descendants(self, graph)
¶
Given a Migration and a graph, return the set of all descendants of this Migration. If this Migration is not in the given graph, a NetworkXError Exception is raised.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
graph |
nx.classes.digraph.DiGraph |
A graph of Migrations including this one. |
required |
Returns:
Type | Description |
---|---|
migration keys (set) |
The set of migrations (keys) that are ancestors. |
Source code in sqly/migration.py
def descendants(self, graph: nx.classes.digraph.DiGraph) -> AbstractSet[str]:
"""
Given a Migration and a graph, return the set of all descendants of this
Migration. If this Migration is not in the given graph, a NetworkXError
Exception is raised.
Arguments:
graph (nx.classes.digraph.DiGraph): A graph of Migrations including this
one.
Returns:
migration keys (set): The set of migrations (keys) that are ancestors.
"""
return nx.descendants(graph, self.key)
dict(self, exclude=None, exclude_none=False)
¶
The Migration serialized as a dict.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
exclude |
Optional[list] |
A list of fields to exclude. |
None |
exclude_none |
bool |
Whether to exclude fields with value None. |
False |
Source code in sqly/migration.py
def dict(
self, exclude: Optional[list] = None, exclude_none: bool = False
) -> Dict[str, Any]:
"""
The Migration serialized as a dict.
Arguments:
exclude (Optional[list]): A list of fields to exclude.
exclude_none (bool): Whether to exclude fields with value None.
"""
return {
key: val
for key, val in asdict(self).items()
if key not in (exclude or []) and (exclude_none is False or val is not None)
}
graph(migrations)
classmethod
¶
Given a mapping of Migrations, create a dependency graph of Migrations. The resulting graph is a DAG (directed acyclic graph) that is a transitive reduction of the Migrations graph. If the graph is not a DAG (e.g., it has cycles) then a networkx.HasACycle exception is raised.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
migrations |
Mapping[str, Migration] |
A mapping of Migrations by key. |
required |
Returns:
Type | Description |
---|---|
graph (nx.classes.digraph.DiGraph) |
A networkx DiGraph of the Migrations. |
Source code in sqly/migration.py
@classmethod
def graph(cls, migrations: Mapping[str, Migration]) -> nx.classes.digraph.DiGraph:
"""
Given a mapping of Migrations, create a dependency graph of Migrations. The
resulting graph is a DAG (directed acyclic graph) that is a [transitive
reduction](https://en.wikipedia.org/wiki/Transitive_reduction) of the Migrations
graph. If the graph is not a DAG (e.g., it has cycles) then a networkx.HasACycle
exception is raised.
Arguments:
migrations (Mapping[str, Migration]): A mapping of Migrations by key.
Returns:
graph (nx.classes.digraph.DiGraph): A networkx DiGraph of the Migrations.
"""
graph = nx.DiGraph()
dag = {key: migrations[key].depends for key in migrations}
for migration_key, migration_depends in dag.items():
graph.add_node(migration_key)
for depend in migration_depends:
graph.add_edge(depend, migration_key)
if not nx.is_directed_acyclic_graph(graph):
raise nx.HasACycle(dag)
return nx.transitive_reduction(graph)
insert_query(self, dialect)
¶
Render a SQL query to insert this Migration into the sqly_migrations table.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dialect |
Dialect |
The SQL database dialect to render the query for. |
required |
Returns:
Type | Description |
---|---|
tuple (str, params...) |
The SQL query and params formatted for the database dialect. |
Source code in sqly/migration.py
def insert_query(self, dialect: Dialect) -> Any:
"""
Render a SQL query to insert this Migration into the sqly_migrations table.
Arguments:
dialect (Dialect): The SQL database dialect to render the query for.
Returns:
tuple (str, params...): The SQL query and params formatted for the database
dialect.
"""
data = {k: v for k, v in self.dict(exclude_none=True).items()}
data["depends"] = json.dumps(data.get("depends") or [])
data["up"] = json.dumps(data.get("up") or [])
data["dn"] = json.dumps(data.get("dn") or [])
if "data" in data:
data.pop("data")
sql = queries.INSERT("sqly_migrations", data)
return SQL(dialect=dialect).render(sql, data)
key_filepath(migration_key)
classmethod
¶
The file path of the Migration that has the given key.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
migration_key |
str |
The Migration key |
required |
Returns:
Type | Description |
---|---|
file path (Path) |
The files file of the Migration |
Source code in sqly/migration.py
@classmethod
def key_filepath(cls, migration_key: str) -> Path:
"""The file path of the Migration that has the given key.
Arguments:
migration_key (str): The Migration key
Returns:
file path (Path): The files file of the Migration
"""
app, basename = migration_key.split(":")
return app_migrations_path(app) / f"{basename}.yaml"
key_load(migration_key)
classmethod
¶
load(filepath)
classmethod
¶
migrate(connection, dialect, migration, dryrun=False)
classmethod
¶
Migrate the database to this migration, either up or down, using the given database connection.
Algorithm:
-
Collate the list of applied migrations in the database with the list of migrations available in this application. Give precedence to the file definitions in the application.
-
Calculate the graph path to reach this migration and whether this is an “up” or “down” migration.
- if this migration has not been applied to the database, then the graph path is from the last applied predecessor “up” to this migration.
- if this migration has been applied to the database, then the graph path is from the last applied successor “down” to this migration.
-
Apply the sequence of migrations (either up or down).
[What about situations in which the path to the given Migration includes both “down” Migrations to back out of another branch and “up” Migrations preceding the given Migration on its branch? Our current solution is to ignore “other” branches and only migrate from the last applied predecessor.]
Parameters:
Name | Type | Description | Default |
---|---|---|---|
connection |
Any |
A database connection. dialect (Dialect): The SQL database |
required |
migration |
Migration |
The Migration that we are migrating to. dryrun |
required |
(bool) |
Whether this is a dry run. |
required |
Source code in sqly/migration.py
@classmethod
def migrate(
cls,
connection: Any,
dialect: Dialect,
migration: Migration,
dryrun: bool = False,
):
"""
Migrate the database to this migration, either up or down, using the given
database connection.
Algorithm:
1. Collate the list of applied migrations in the database with the list of
migrations available in this application. Give precedence to the file
definitions in the application.
2. Calculate the graph path to reach this migration and whether this is an "up"
or "down" migration.
- if this migration has not been applied to the database, then the graph
path is from the last applied predecessor "up" to this migration.
- if this migration has been applied to the database, then the graph path is
from the last applied successor "down" to this migration.
3. Apply the sequence of migrations (either up or down).
[_What about situations in which the path to the given Migration includes both
"down" Migrations to back out of another branch and "up" Migrations preceding
the given Migration on its branch? Our current solution is to ignore "other"
branches and only migrate from the last applied predecessor._]
Arguments:
connection (Any): A database connection. dialect (Dialect): The SQL database
migration (Migration): The Migration that we are migrating _to_. dryrun
(bool): Whether this is a dry run.
"""
db_migrations = cls.database_migrations(connection, dialect)
all_migrations = cls.all_migrations(migration.app)
migrations = db_migrations | all_migrations
graph = cls.graph(migrations)
if migration.key not in db_migrations:
# apply 'up' migrations for all ancestors and this migration
subgraph = nx.subgraph(graph, migration.ancestors(graph) | {migration.key})
for key in nx.lexicographical_topological_sort(subgraph):
if key not in db_migrations:
migrations[key].apply(
connection, dialect, direction="up", dryrun=dryrun
)
else:
# apply 'dn' migrations for all descendants in reverse
subgraph = nx.subgraph(graph, migration.descendants(graph))
for key in reversed(list(nx.lexicographical_topological_sort(subgraph))):
if key in db_migrations:
migrations[key].apply(
connection, dialect, direction="dn", dryrun=dryrun
)
save(self, exclude=None, exclude_none=False)
¶
Save this Migration to the filesystem.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
exclude |
Optional[list] |
A list of fields to exclude. |
None |
exclude_none |
bool |
Whether to exclude fields with value None. |
False |
Returns:
Type | Description |
---|---|
tuple (filepath, size) |
The filepath where the Migration was saved, and its size in bytes. |
Source code in sqly/migration.py
def save(self, exclude: Optional[list] = None, exclude_none: bool = False):
"""
Save this Migration to the filesystem.
Arguments:
exclude (Optional[list]): A list of fields to exclude.
exclude_none (bool): Whether to exclude fields with value None.
Returns:
tuple (filepath, size): The filepath where the Migration was saved, and its
size in bytes.
"""
filepath = app_migrations_path(self.app) / self.filename
os.makedirs(filepath.parent, exist_ok=True)
with open(filepath, "wb") as f:
size = f.write(
self.yaml(exclude=exclude, exclude_none=exclude_none).encode()
)
return filepath, size
yaml(self, exclude=None, exclude_none=False)
¶
Serialize this Migration as a YAML string.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
exclude |
Optional[list] |
A list of fields to exclude. |
None |
exclude_none |
bool |
Whether to exclude fields with value None. |
False |
Source code in sqly/migration.py
def yaml(self, exclude: Optional[list] = None, exclude_none: bool = False) -> str:
"""
Serialize this Migration as a YAML string.
Arguments:
exclude (Optional[list]): A list of fields to exclude.
exclude_none (bool): Whether to exclude fields with value None.
"""
return yaml.dump(
self.dict(exclude=exclude, exclude_none=exclude_none),
default_flow_style=False,
sort_keys=False,
)
app_migrations_path(app)
¶
For a given app name, get the path to its migrations directory.
migration_timestamp()
¶
Return an integer with the UTC timestamp to millisecond resolution (17 digits => bigint)