Support bidirectional storage conversion.
This commit is contained in:
parent
0bc025e5f8
commit
a487a8dadf
5 changed files with 171 additions and 53 deletions
SharpChat.MariaDB
SharpChat.SQLite
SharpChat
SharpChatCommon
|
@ -10,6 +10,34 @@ using ZLogger;
|
|||
namespace SharpChat.MariaDB;
|
||||
|
||||
public class MariaDBMessageStorage(MariaDBStorage storage, ILogger logger) : MessageStorage {
|
||||
public async Task LogMessage(Message msg) {
|
||||
try {
|
||||
using MariaDBConnection conn = await storage.CreateConnection();
|
||||
await conn.RunCommand(
|
||||
"INSERT IGNORE INTO sqc_events (event_id, event_type, event_channel, event_data"
|
||||
+ ", event_sender, event_sender_name, event_sender_colour, event_sender_rank, event_sender_nick, event_sender_perms"
|
||||
+ ", event_created, event_deleted)"
|
||||
+ " VALUES (@id, @type, @channel, @data"
|
||||
+ ", @sender, @sender_name, @sender_colour, @sender_rank, @sender_nick, @sender_perms"
|
||||
+ ", FROM_UNIXTIME(@created), FROM_UNIXTIME(@deleted))",
|
||||
new MySqlParameter("id", msg.Id),
|
||||
new MySqlParameter("type", msg.Type),
|
||||
new MySqlParameter("channel", string.IsNullOrWhiteSpace(msg.ChannelName) ? null : msg.ChannelName),
|
||||
new MySqlParameter("data", JsonSerializer.SerializeToUtf8Bytes(msg.Data)),
|
||||
new MySqlParameter("sender", long.TryParse(msg.SenderId, out long senderId64) && senderId64 > 0 ? senderId64 : null),
|
||||
new MySqlParameter("sender_name", msg.SenderName),
|
||||
new MySqlParameter("sender_colour", msg.SenderColour.ToMisuzu()),
|
||||
new MySqlParameter("sender_rank", msg.SenderRank),
|
||||
new MySqlParameter("sender_nick", string.IsNullOrWhiteSpace(msg.SenderNickName) ? null : msg.SenderNickName),
|
||||
new MySqlParameter("sender_perms", MariaDBUserPermissionsConverter.To(msg.SenderPermissions)),
|
||||
new MySqlParameter("created", msg.Created.ToUnixTimeSeconds()),
|
||||
new MySqlParameter("deleted", msg.Deleted?.ToUnixTimeSeconds())
|
||||
);
|
||||
} catch(MySqlException ex) {
|
||||
logger.ZLogError($"Error in LogMessage(Message): {ex}");
|
||||
}
|
||||
}
|
||||
|
||||
public async Task LogMessage(
|
||||
long id,
|
||||
string type,
|
||||
|
@ -41,7 +69,7 @@ public class MariaDBMessageStorage(MariaDBStorage storage, ILogger logger) : Mes
|
|||
new MySqlParameter("sender_perms", MariaDBUserPermissionsConverter.To(senderPerms))
|
||||
);
|
||||
} catch(MySqlException ex) {
|
||||
logger.ZLogError($"Error in LogMessage(): {ex}");
|
||||
logger.ZLogError($"Error in LogMessage(long, ...): {ex}");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -103,6 +131,34 @@ public class MariaDBMessageStorage(MariaDBStorage storage, ILogger logger) : Mes
|
|||
return null;
|
||||
}
|
||||
|
||||
public async Task<long> CountMessages(
|
||||
string? channelName = null,
|
||||
bool includeDeleted = false
|
||||
) {
|
||||
List<MySqlParameter> parameters = [];
|
||||
bool firstParam = true;
|
||||
StringBuilder qb = new();
|
||||
qb.Append("SELECT COUNT(*) FROM sqc_events");
|
||||
|
||||
if(!includeDeleted) {
|
||||
firstParam = false;
|
||||
qb.Append(" WHERE event_deleted IS NULL");
|
||||
}
|
||||
|
||||
if(!string.IsNullOrEmpty(channelName)) {
|
||||
qb.AppendFormat(" {0} (event_channel = @channel OR event_channel IS NULL)", firstParam ? "WHERE" : "AND");
|
||||
parameters.Add(new MySqlParameter("channel", channelName));
|
||||
}
|
||||
|
||||
try {
|
||||
using MariaDBConnection conn = await storage.CreateConnection();
|
||||
return await conn.RunQueryValue<long>(qb.ToString(), [.. parameters]);
|
||||
} catch(MySqlException ex) {
|
||||
logger.ZLogError($"Error in CountMessages({channelName}, {includeDeleted}): {ex}");
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<IEnumerable<Message>> GetMessages(
|
||||
string? channelName = null,
|
||||
int? take = 20,
|
||||
|
|
|
@ -11,6 +11,29 @@ using ZLogger;
|
|||
namespace SharpChat.SQLite;
|
||||
|
||||
public class SQLiteMessageStorage(ILogger logger, SQLiteConnection conn) : MessageStorage {
|
||||
public async Task LogMessage(Message msg) {
|
||||
try {
|
||||
await conn.RunCommand(
|
||||
"INSERT OR IGNORE INTO messages (msg_id, msg_type, msg_created, msg_channel, msg_sender, msg_sender_name, msg_sender_colour, msg_sender_rank, msg_sender_nick, msg_sender_perms, msg_data)"
|
||||
+ " VALUES (@id, @type, @created, @channel, @sender, @sender_name, @sender_colour, @sender_rank, @sender_nick, @sender_perms, @data)",
|
||||
new SQLiteParameter("id", msg.Id),
|
||||
new SQLiteParameter("type", msg.Type),
|
||||
new SQLiteParameter("channel", string.IsNullOrWhiteSpace(msg.ChannelName) ? null : msg.ChannelName),
|
||||
new SQLiteParameter("data", JsonSerializer.SerializeToUtf8Bytes(msg.Data)),
|
||||
new SQLiteParameter("sender", long.TryParse(msg.SenderId, out long senderId64) && senderId64 > 0 ? senderId64 : null),
|
||||
new SQLiteParameter("sender_name", msg.SenderName),
|
||||
new SQLiteParameter("sender_colour", msg.SenderColour.Rgb.HasValue ? msg.SenderColour.Rgb.Value.Raw : null),
|
||||
new SQLiteParameter("sender_rank", msg.SenderRank),
|
||||
new SQLiteParameter("sender_nick", string.IsNullOrWhiteSpace(msg.SenderNickName) ? null : msg.SenderNickName),
|
||||
new SQLiteParameter("sender_perms", SQLiteUserPermissionsConverter.To(msg.SenderPermissions)),
|
||||
new SQLiteParameter("created", $"{msg.Created:s}Z"),
|
||||
new SQLiteParameter("deleted", msg.Deleted is null ? null : $"{msg.Deleted:s}Z")
|
||||
);
|
||||
} catch(SQLiteException ex) {
|
||||
logger.ZLogError($"Error in LogMessage(Message): {ex}");
|
||||
}
|
||||
}
|
||||
|
||||
public async Task LogMessage(
|
||||
long id,
|
||||
string type,
|
||||
|
@ -40,7 +63,7 @@ public class SQLiteMessageStorage(ILogger logger, SQLiteConnection conn) : Messa
|
|||
new SQLiteParameter("data", data == null ? "{}" : JsonSerializer.SerializeToUtf8Bytes(data))
|
||||
);
|
||||
} catch(SQLiteException ex) {
|
||||
logger.ZLogError($"Error in LogMessage(): {ex}");
|
||||
logger.ZLogError($"Error in LogMessage(long, ...): {ex}");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -61,7 +84,7 @@ public class SQLiteMessageStorage(ILogger logger, SQLiteConnection conn) : Messa
|
|||
reader.GetString("msg_type"),
|
||||
reader.IsDBNull(reader.GetOrdinal("msg_sender")) ? null : reader.GetString("msg_sender"),
|
||||
reader.IsDBNull(reader.GetOrdinal("msg_sender_name")) ? string.Empty : reader.GetString("msg_sender_name"),
|
||||
ColourInheritable.FromMisuzu((int)reader.GetInt64("msg_sender_colour")),
|
||||
reader.IsDBNull(reader.GetOrdinal("msg_sender_colour")) ? ColourInheritable.None : ColourInheritable.FromRaw((int)reader.GetInt64("msg_sender_colour")),
|
||||
(int)reader.GetInt64("msg_sender_rank"),
|
||||
SQLiteUserPermissionsConverter.From((SQLiteUserPermissions)reader.GetInt64("msg_sender_perms")),
|
||||
reader.IsDBNull(reader.GetOrdinal("msg_sender_nick")) ? string.Empty : reader.GetString("msg_sender_nick"),
|
||||
|
@ -87,6 +110,33 @@ public class SQLiteMessageStorage(ILogger logger, SQLiteConnection conn) : Messa
|
|||
}
|
||||
}
|
||||
|
||||
public async Task<long> CountMessages(
|
||||
string? channelName = null,
|
||||
bool includeDeleted = false
|
||||
) {
|
||||
List<SQLiteParameter> parameters = [];
|
||||
bool firstParam = true;
|
||||
StringBuilder qb = new();
|
||||
qb.Append("SELECT COUNT(*) FROM messages");
|
||||
|
||||
if(!includeDeleted) {
|
||||
firstParam = false;
|
||||
qb.Append(" WHERE msg_deleted IS NULL");
|
||||
}
|
||||
|
||||
if(!string.IsNullOrEmpty(channelName)) {
|
||||
qb.AppendFormat(" {0} (msg_channel = @channel OR msg_channel IS NULL)", firstParam ? "WHERE" : "AND");
|
||||
parameters.Add(new SQLiteParameter("channel", channelName));
|
||||
}
|
||||
|
||||
try {
|
||||
return await conn.RunQueryValue<long>(qb.ToString(), [.. parameters]);
|
||||
} catch(SQLiteException ex) {
|
||||
logger.ZLogError($"Error in CountMessages({channelName}, {includeDeleted}): {ex}");
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<IEnumerable<Message>> GetMessages(
|
||||
string? channelName = null,
|
||||
int? take = 20,
|
||||
|
|
|
@ -154,57 +154,10 @@ using StreamConfig config = StreamConfig.FromPath(configFile);
|
|||
|
||||
if(cts.IsCancellationRequested) return;
|
||||
|
||||
if(args.Contains("--convert-db")) {
|
||||
logger.ZLogInformation($"Converting MariaDB storage to SQLite");
|
||||
|
||||
MariaDBStorage mariadbStorage = new(logFactory.CreateLogger("mariadb"), MariaDBStorage.BuildConnectionString(config.ScopeTo("mariadb")));
|
||||
await mariadbStorage.UpgradeStorage();
|
||||
|
||||
if(args.Contains("--migrate-storage") || args.Contains("--convert-db")) {
|
||||
MariaDBStorage mariadb = new(logFactory.CreateLogger("mariadb"), MariaDBStorage.BuildConnectionString(config.ScopeTo("mariadb")));
|
||||
using SQLiteStorage sqlite = new(logFactory.CreateLogger("sqlite"), SQLiteStorage.BuildConnectionString(config.ScopeTo("sqlite"), false));
|
||||
await sqlite.UpgradeStorage();
|
||||
|
||||
using MariaDBConnection mariadb = await mariadbStorage.CreateConnection();
|
||||
long rows = await mariadb.RunQueryValue<long>("SELECT COUNT(*) FROM sqc_events");
|
||||
|
||||
using MySqlCommand export = mariadb.Connection.CreateCommand();
|
||||
export.CommandText = "SELECT event_id, event_type, UNIX_TIMESTAMP(event_created) AS event_created, UNIX_TIMESTAMP(event_deleted) AS event_deleted, event_channel, event_sender, event_sender_name, event_sender_colour, event_sender_rank, event_sender_nick, event_sender_perms, event_data FROM sqc_events";
|
||||
export.CommandTimeout = int.MaxValue;
|
||||
using MySqlDataReader reader = await export.ExecuteReaderAsync();
|
||||
|
||||
using SQLiteCommand import = sqlite.Connection.Connection.CreateCommand();
|
||||
import.CommandText = "INSERT OR IGNORE INTO messages (msg_id, msg_type, msg_created, msg_deleted, msg_channel, msg_sender, msg_sender_name, msg_sender_colour, msg_sender_rank, msg_sender_nick, msg_sender_perms, msg_data) VALUES (@id, @type, @created, @deleted, @channel, @sender, @sender_name, @sender_colour, @sender_rank, @sender_nick, @sender_perms, @data)";
|
||||
|
||||
long completed = 0;
|
||||
DateTimeOffset lastReport = DateTimeOffset.UtcNow;
|
||||
logger.ZLogInformation($"Beginning conversion of {rows} rows...");
|
||||
while(reader.Read()) {
|
||||
if(cts.IsCancellationRequested) return;
|
||||
|
||||
import.Parameters.Clear();
|
||||
import.Parameters.Add(new SQLiteParameter("id", reader.GetInt64("event_id").ToString()));
|
||||
import.Parameters.Add(new SQLiteParameter("type", reader.GetString("event_type")));
|
||||
import.Parameters.Add(new SQLiteParameter("created", DateTimeOffset.FromUnixTimeSeconds(reader.GetInt32("event_created")).ToString("s") + "Z"));
|
||||
import.Parameters.Add(new SQLiteParameter("deleted", reader.IsDBNull(reader.GetOrdinal("event_deleted")) ? null : DateTimeOffset.FromUnixTimeSeconds(reader.GetInt32("event_deleted")).ToString("s") + "Z"));
|
||||
import.Parameters.Add(new SQLiteParameter("channel", reader.IsDBNull(reader.GetOrdinal("event_channel")) ? null : reader.GetString("event_channel")));
|
||||
import.Parameters.Add(new SQLiteParameter("sender", reader.IsDBNull(reader.GetOrdinal("event_sender")) ? null : reader.GetString("event_sender")));
|
||||
import.Parameters.Add(new SQLiteParameter("sender_name", reader.IsDBNull(reader.GetOrdinal("event_sender_name")) ? string.Empty : reader.GetString("event_sender_name")));
|
||||
ColourInheritable colour = ColourInheritable.FromMisuzu(reader.GetInt32("event_sender_colour"));
|
||||
import.Parameters.Add(new SQLiteParameter("sender_colour", colour.Rgb.HasValue ? colour.Rgb.Value : null));
|
||||
import.Parameters.Add(new SQLiteParameter("sender_rank", reader.IsDBNull(reader.GetOrdinal("event_sender_rank")) ? null : reader.GetInt32("event_sender_rank")));
|
||||
import.Parameters.Add(new SQLiteParameter("sender_nick", reader.IsDBNull(reader.GetOrdinal("event_sender_nick")) ? string.Empty : reader.GetString("event_sender_nick")));
|
||||
import.Parameters.Add(new SQLiteParameter("sender_perms", SQLiteUserPermissionsConverter.To(MariaDBUserPermissionsConverter.From((MariaDBUserPermissions)reader.GetInt32("event_sender_perms")))));
|
||||
import.Parameters.Add(new SQLiteParameter("data", reader.GetString("event_data")));
|
||||
await import.PrepareAsync();
|
||||
await import.ExecuteNonQueryAsync();
|
||||
|
||||
++completed;
|
||||
if(DateTimeOffset.UtcNow - lastReport > TimeSpan.FromMinutes(1)) {
|
||||
lastReport = DateTimeOffset.UtcNow;
|
||||
double completion = (double)completed / rows;
|
||||
logger.ZLogInformation($"{completed} of {rows} converted ({completion:P2})...");
|
||||
}
|
||||
}
|
||||
logger.ZLogInformation($"Converted all {completed} rows!");
|
||||
await new StorageMigrator(logFactory.CreateLogger("migrate"), mariadb, sqlite).Migrate(cts.Token);
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
namespace SharpChat.Messages;
|
||||
|
||||
public interface MessageStorage {
|
||||
Task LogMessage(Message msg);
|
||||
Task LogMessage(
|
||||
long id,
|
||||
string type,
|
||||
|
@ -15,6 +16,10 @@ public interface MessageStorage {
|
|||
);
|
||||
Task DeleteMessage(Message msg);
|
||||
Task<Message?> GetMessage(long id);
|
||||
Task<long> CountMessages(
|
||||
string? channelName = null,
|
||||
bool includeDeleted = false
|
||||
);
|
||||
Task<IEnumerable<Message>> GetMessages(
|
||||
string? channelName = null,
|
||||
int? take = 20,
|
||||
|
|
54
SharpChatCommon/StorageMigrator.cs
Normal file
54
SharpChatCommon/StorageMigrator.cs
Normal file
|
@ -0,0 +1,54 @@
|
|||
using Microsoft.Extensions.Logging;
|
||||
using SharpChat.Messages;
|
||||
using ZLogger;
|
||||
|
||||
namespace SharpChat;
|
||||
|
||||
public class StorageMigrator(ILogger logger, Storage source, Storage target) {
|
||||
public async Task Migrate(CancellationToken cancellationToken) {
|
||||
try {
|
||||
logger.ZLogInformation($"Converting from {source.GetType().Name} to {target.GetType().Name}!");
|
||||
logger.ZLogInformation($"Ensuring both source and target are fully upgraded...");
|
||||
if(cancellationToken.IsCancellationRequested) return;
|
||||
|
||||
await source.UpgradeStorage();
|
||||
if(cancellationToken.IsCancellationRequested) return;
|
||||
|
||||
await target.UpgradeStorage();
|
||||
if(cancellationToken.IsCancellationRequested) return;
|
||||
|
||||
logger.ZLogInformation($"Creating message storage instances...");
|
||||
MessageStorage sourceMsgs = source.CreateMessageStorage();
|
||||
MessageStorage targetMsgs = target.CreateMessageStorage();
|
||||
|
||||
if(cancellationToken.IsCancellationRequested) return;
|
||||
long msgCount = await sourceMsgs.CountMessages(includeDeleted: true);
|
||||
if(msgCount < 1) {
|
||||
logger.ZLogInformation($"No messages to migrate, skipping...");
|
||||
} else {
|
||||
logger.ZLogInformation($"Migrating {msgCount} message{(msgCount == 1 ? "" : "s")}...");
|
||||
|
||||
long migrated = 0;
|
||||
DateTimeOffset lastReport = DateTimeOffset.UtcNow;
|
||||
IEnumerable<Message> msgs = await sourceMsgs.GetMessages(take: null, includeDeleted: true);
|
||||
foreach(Message msg in msgs) {
|
||||
if(cancellationToken.IsCancellationRequested) break;
|
||||
|
||||
await targetMsgs.LogMessage(msg);
|
||||
|
||||
++migrated;
|
||||
if(DateTimeOffset.UtcNow - lastReport > TimeSpan.FromMinutes(1)) {
|
||||
lastReport = DateTimeOffset.UtcNow;
|
||||
double completion = (double)migrated / msgCount;
|
||||
logger.ZLogInformation($"{migrated} of {msgCount} message{(migrated == 1 ? "" : "s")} migrated ({completion:P2})...");
|
||||
}
|
||||
}
|
||||
|
||||
logger.ZLogInformation($"Migrated {migrated} message{(migrated == 1 ? "" : "s")}!");
|
||||
}
|
||||
} catch(Exception ex) {
|
||||
logger.ZLogError($"Error during migration: {ex}");
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue