1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
|
# Pleroma: A lightweight social networking server
# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Config.Versioning do
@moduledoc """
Module that manages versions of database configs.
"""
import Ecto.Query, only: [from: 2]
alias Ecto.Multi
alias Pleroma.Config.Version
alias Pleroma.ConfigDB
alias Pleroma.Repo
@type change :: %{
optional(:delete) => boolean(),
optional(:value) => any(),
group: atom(),
key: atom() | nil
}
@doc """
Creates new config version:
- convert changes to elixir types
- splits changes by type and processes them in `config` table
- sets all pointers to false
- gets all rows from `config` table and inserts them as keyword in `backup` field
"""
@spec new_version([change()] | change()) ::
{:ok, map()} | {:error, :no_changes} | {:error, atom() | tuple(), any(), any()}
def new_version([]), do: {:error, :empty_changes}
def new_version(change) when is_map(change), do: new_version([change])
def new_version(changes) when is_list(changes) do
changes
|> Enum.reduce(Multi.new(), fn
%{delete: true} = deletion, acc ->
Multi.run(acc, {:delete_or_update, deletion[:group], deletion[:key]}, fn _, _ ->
ConfigDB.delete_or_update(deletion)
end)
operation, acc ->
{name, fun} =
if Keyword.keyword?(operation[:value]) or
(operation[:group] == :pleroma and
operation[:key] in ConfigDB.pleroma_not_keyword_values()) do
{:insert_or_update,
fn _, _ ->
ConfigDB.update_or_create(operation)
end}
else
{:error,
fn _, _ ->
{:error, {:value_must_be_keyword, operation}}
end}
end
Multi.run(acc, {name, operation[:group], operation[:key]}, fun)
end)
|> set_current_flag_false_for_all_versions()
|> insert_new_version()
|> Repo.transaction()
end
def new_version(_), do: {:error, :bad_format}
defp set_current_flag_false_for_all_versions(multi) do
Multi.update_all(multi, :update_all_versions, Version, set: [current: false])
end
defp insert_new_version(multi) do
Multi.run(multi, :insert_version, fn repo, _ ->
%Version{
backup: ConfigDB.all_as_keyword()
}
|> repo.insert()
end)
end
@doc """
Rollbacks config version by N steps:
- checks possibility for rollback
- truncates config table and restarts pk
- inserts config settings from backup
- sets all pointers to false
- sets current pointer to true for rollback version
- deletes versions after current
"""
@spec rollback(pos_integer()) ::
{:ok, map()}
| {:error, atom() | tuple(), any(), any()}
| {:error, :steps_format}
| {:error, :no_current_version}
| {:error, :rollback_not_possible}
def rollback(steps \\ 1)
def rollback(steps) when is_integer(steps) and steps > 0 do
with version_id when is_integer(version_id) <- get_current_version_id(),
%Version{} = version <- get_version_by_steps(steps) do
do_rollback(version)
end
end
def rollback(_), do: {:error, :steps_format}
@doc """
Same as `rollback/1`, but rollbacks for a given version id.
"""
@spec rollback_by_id(pos_integer()) ::
{:ok, map()}
| {:error, atom() | tuple(), any(), any()}
| {:error, :not_found}
| {:error, :version_is_already_current}
def rollback_by_id(id) when is_integer(id) do
with %Version{current: false} = version <- get_version_by_id(id) do
do_rollback(version)
else
%Version{current: true} -> {:error, :version_is_already_current}
error -> error
end
end
defp get_current_version_id do
query = from(v in Version, where: v.current == true)
with nil <- Repo.aggregate(query, :max, :id) do
{:error, :no_current_version}
end
end
defp get_version_by_id(id) do
with nil <- Repo.get(Version, id) do
{:error, :not_found}
end
end
defp get_version_by_steps(steps) do
query = from(v in Version, order_by: [desc: v.id], limit: 1, offset: ^steps)
with nil <- Repo.one(query) do
{:error, :rollback_not_possible}
end
end
defp do_rollback(version) do
multi =
truncate_config_table()
|> reset_pk_in_config_table()
version.backup
|> ConfigDB.from_keyword_to_maps()
|> add_insert_commands(multi)
|> set_current_flag_false_for_all_versions()
|> Multi.update(:move_current_pointer, Ecto.Changeset.change(version, current: true))
|> Multi.delete_all(
:delete_next_versions,
from(v in Version, where: v.id > ^version.id)
)
|> Repo.transaction()
end
defp truncate_config_table(multi \\ Multi.new()) do
Multi.run(multi, :truncate_config_table, fn repo, _ ->
repo.query("TRUNCATE config;")
end)
end
defp reset_pk_in_config_table(multi) do
Multi.run(multi, :reset_pk, fn repo, _ ->
repo.query("ALTER SEQUENCE config_id_seq RESTART;")
end)
end
defp add_insert_commands(changes, multi) do
Enum.reduce(changes, multi, fn change, acc ->
Multi.run(acc, {:insert, change[:group], change[:key]}, fn _, _ ->
ConfigDB.update_or_create(change)
end)
end)
end
@doc """
Resets config table and creates new empty version.
"""
@spec reset() :: {:ok, map()} | {:error, atom() | tuple(), any(), any()}
def reset do
truncate_config_table()
|> reset_pk_in_config_table()
|> set_current_flag_false_for_all_versions()
|> insert_new_version()
|> Repo.transaction()
end
@doc """
Migrates settings from config file into database:
- truncates config table and restarts pk
- inserts settings from config file
- sets all pointers to false
- gets all rows from `config` table and inserts them as keyword in `backup` field
"""
@spec migrate(Path.t()) :: {:ok, map()} | {:error, atom() | tuple(), any(), any()}
def migrate(config_path) do
multi =
truncate_config_table()
|> reset_pk_in_config_table()
config_path
|> Pleroma.Config.Loader.read!()
|> Pleroma.Config.Loader.filter()
|> ConfigDB.from_keyword_to_maps()
|> add_insert_commands(multi)
|> set_current_flag_false_for_all_versions()
|> insert_new_version()
|> Repo.transaction()
end
@doc """
Common function to migrate old config namespace to the new one keeping the old value.
"""
@spec migrate_namespace({atom(), atom()}, {atom(), atom()}) ::
{:ok, map()} | {:error, atom() | tuple(), any(), any()}
def migrate_namespace({o_group, o_key}, {n_group, n_key}) do
config = ConfigDB.get_by_params(%{group: o_group, key: o_key})
configs_changes_fun =
if config do
fn ->
config
|> Ecto.Changeset.change(group: n_group, key: n_key)
|> Repo.update()
end
else
fn -> {:ok, nil} end
end
versions_changes_fun = fn %{backup: backup} = version ->
with {value, rest} when not is_nil(value) <- pop_in(backup[o_group][o_key]) do
rest =
if rest[o_group] == [] do
Keyword.delete(rest, o_group)
else
rest
end
updated_backup =
if Keyword.has_key?(rest, n_group) do
put_in(rest[n_group][n_key], value)
else
Keyword.put(rest, n_group, [{n_key, value}])
end
version
|> Ecto.Changeset.change(backup: updated_backup)
|> Repo.update()
else
_ -> {:ok, nil}
end
end
migrate_configs_and_versions(configs_changes_fun, versions_changes_fun)
end
@doc """
Abstract function for config migrations to keep changes in config table and changes in versions backups in transaction.
Accepts two functions:
- first function makes changes to the configs
- second function makes changes to the backups in versions
"""
@spec migrate_configs_and_versions(function(), function()) ::
{:ok, map()} | {:error, atom() | tuple(), any(), any()}
def migrate_configs_and_versions(configs_changes_fun, version_change_fun)
when is_function(configs_changes_fun, 0) and
is_function(version_change_fun, 1) do
versions = Repo.all(Version)
multi =
Multi.new()
|> Multi.run(:configs_changes, fn _, _ ->
configs_changes_fun.()
end)
versions
|> Enum.reduce(multi, fn version, acc ->
Multi.run(acc, {:version_change, version.id}, fn _, _ ->
version_change_fun.(version)
end)
end)
|> Repo.transaction()
end
end
|