Asyncify event storage.

This commit is contained in:
flash 2025-04-27 00:31:33 +00:00
parent dd377358e2
commit 0a7e01f154
Signed by: flash
GPG key ID: 2C9C2C574D47FE3E
8 changed files with 63 additions and 60 deletions

View file

@ -27,14 +27,14 @@ public class DeleteMessageClientCommand : ClientCommand {
return; return;
} }
StoredEventInfo? delMsg = ctx.Chat.Events.GetEvent(delSeqId); StoredEventInfo? delMsg = await ctx.Chat.Events.GetEvent(delSeqId);
if(delMsg?.Sender is null || delMsg.Sender.Rank > ctx.User.Rank || (!deleteAnyMessage && delMsg.Sender.UserId != ctx.User.UserId)) { if(delMsg?.Sender is null || delMsg.Sender.Rank > ctx.User.Rank || (!deleteAnyMessage && delMsg.Sender.UserId != ctx.User.UserId)) {
await ctx.Chat.SendTo(ctx.User, new CommandResponseS2CPacket(msgId, LCR.MESSAGE_DELETE_ERROR)); await ctx.Chat.SendTo(ctx.User, new CommandResponseS2CPacket(msgId, LCR.MESSAGE_DELETE_ERROR));
return; return;
} }
ctx.Chat.Events.RemoveEvent(delMsg); await ctx.Chat.Events.RemoveEvent(delMsg);
await ctx.Chat.Send(new ChatMessageDeleteS2CPacket(delMsg.Id)); await ctx.Chat.Send(new ChatMessageDeleteS2CPacket(delMsg.Id));
} }
} }

View file

@ -71,7 +71,7 @@ public class Context {
)); ));
} }
Events.AddEvent( await Events.AddEvent(
mce.MessageId, "msg:add", mce.MessageId, "msg:add",
mce.ChannelName, mce.ChannelName,
mce.SenderId, mce.SenderName, mce.SenderColour, mce.SenderRank, mce.SenderNickName, mce.SenderPerms, mce.SenderId, mce.SenderName, mce.SenderColour, mce.SenderRank, mce.SenderNickName, mce.SenderPerms,
@ -212,7 +212,7 @@ public class Context {
if(!IsInChannel(user, chan)) { if(!IsInChannel(user, chan)) {
long msgId = RandomSnowflake.Next(); long msgId = RandomSnowflake.Next();
await SendTo(chan, new UserConnectS2CPacket(msgId, DateTimeOffset.Now, user.UserId, user.LegacyNameWithStatus, user.Colour, user.Rank, user.Permissions)); await SendTo(chan, new UserConnectS2CPacket(msgId, DateTimeOffset.Now, user.UserId, user.LegacyNameWithStatus, user.Colour, user.Rank, user.Permissions));
Events.AddEvent(msgId, "user:connect", chan.Name, user.UserId, user.UserName, user.Colour, user.Rank, user.NickName, user.Permissions, null, StoredEventFlags.Log); await Events.AddEvent(msgId, "user:connect", chan.Name, user.UserId, user.UserName, user.Colour, user.Rank, user.NickName, user.Permissions, null, StoredEventFlags.Log);
} }
await conn.Send(new AuthSuccessS2CPacket( await conn.Send(new AuthSuccessS2CPacket(
@ -236,7 +236,8 @@ public class Context {
)) ))
)); ));
foreach(StoredEventInfo msg in Events.GetChannelEventLog(chan.Name)) IEnumerable<StoredEventInfo> msgs = await Events.GetChannelEventLog(chan.Name);
foreach(StoredEventInfo msg in msgs)
await conn.Send(new ContextMessageS2CPacket(msg)); await conn.Send(new ContextMessageS2CPacket(msg));
await conn.Send(new ContextChannelsS2CPacket( await conn.Send(new ContextChannelsS2CPacket(
@ -262,7 +263,7 @@ public class Context {
long msgId = RandomSnowflake.Next(); long msgId = RandomSnowflake.Next();
await SendTo(chan, new UserDisconnectS2CPacket(msgId, DateTimeOffset.Now, user.UserId, user.LegacyNameWithStatus, reason)); await SendTo(chan, new UserDisconnectS2CPacket(msgId, DateTimeOffset.Now, user.UserId, user.LegacyNameWithStatus, reason));
Events.AddEvent(msgId, "user:disconnect", chan.Name, user.UserId, user.UserName, user.Colour, user.Rank, user.NickName, user.Permissions, new { reason = (int)reason }, StoredEventFlags.Log); await Events.AddEvent(msgId, "user:disconnect", chan.Name, user.UserId, user.UserName, user.Colour, user.Rank, user.NickName, user.Permissions, new { reason = (int)reason }, StoredEventFlags.Log);
if(chan.IsTemporary && chan.IsOwner(user)) if(chan.IsTemporary && chan.IsOwner(user))
await RemoveChannel(chan); await RemoveChannel(chan);
@ -300,11 +301,11 @@ public class Context {
long leaveId = RandomSnowflake.Next(); long leaveId = RandomSnowflake.Next();
await SendTo(oldChan, new UserChannelLeaveS2CPacket(leaveId, user.UserId)); await SendTo(oldChan, new UserChannelLeaveS2CPacket(leaveId, user.UserId));
Events.AddEvent(leaveId, "chan:leave", oldChan.Name, user.UserId, user.UserName, user.Colour, user.Rank, user.NickName, user.Permissions, null, StoredEventFlags.Log); await Events.AddEvent(leaveId, "chan:leave", oldChan.Name, user.UserId, user.UserName, user.Colour, user.Rank, user.NickName, user.Permissions, null, StoredEventFlags.Log);
long joinId = RandomSnowflake.Next(); long joinId = RandomSnowflake.Next();
await SendTo(chan, new UserChannelJoinS2CPacket(joinId, user.UserId, user.LegacyNameWithStatus, user.Colour, user.Rank, user.Permissions)); await SendTo(chan, new UserChannelJoinS2CPacket(joinId, user.UserId, user.LegacyNameWithStatus, user.Colour, user.Rank, user.Permissions));
Events.AddEvent(joinId, "chan:join", chan.Name, user.UserId, user.LegacyName, user.Colour, user.Rank, user.NickName, user.Permissions, null, StoredEventFlags.Log); await Events.AddEvent(joinId, "chan:join", chan.Name, user.UserId, user.LegacyName, user.Colour, user.Rank, user.NickName, user.Permissions, null, StoredEventFlags.Log);
await SendTo(user, new ContextClearS2CPacket(ContextClearS2CPacket.Mode.MessagesUsers)); await SendTo(user, new ContextClearS2CPacket(ContextClearS2CPacket.Mode.MessagesUsers));
await SendTo(user, new ContextUsersS2CPacket( await SendTo(user, new ContextUsersS2CPacket(
@ -319,7 +320,8 @@ public class Context {
)) ))
)); ));
foreach(StoredEventInfo msg in Events.GetChannelEventLog(chan.Name)) IEnumerable<StoredEventInfo> msgs = await Events.GetChannelEventLog(chan.Name);
foreach(StoredEventInfo msg in msgs)
await SendTo(user, new ContextMessageS2CPacket(msg)); await SendTo(user, new ContextMessageS2CPacket(msg));
await ForceChannel(user, chan); await ForceChannel(user, chan);

View file

@ -1,7 +1,7 @@
namespace SharpChat.EventStorage; namespace SharpChat.EventStorage;
public interface EventStorage { public interface EventStorage {
void AddEvent( Task AddEvent(
long id, long id,
string type, string type,
string channelName, string channelName,
@ -14,7 +14,7 @@ public interface EventStorage {
object? data = null, object? data = null,
StoredEventFlags flags = StoredEventFlags.None StoredEventFlags flags = StoredEventFlags.None
); );
void RemoveEvent(StoredEventInfo evt); Task RemoveEvent(StoredEventInfo evt);
StoredEventInfo? GetEvent(long seqId); Task<StoredEventInfo?> GetEvent(long seqId);
IEnumerable<StoredEventInfo> GetChannelEventLog(string channelName, int amount = 20, int offset = 0); Task<IEnumerable<StoredEventInfo>> GetChannelEventLog(string channelName, int amount = 20, int offset = 0);
} }

View file

@ -7,7 +7,7 @@ namespace SharpChat.EventStorage;
public partial class MariaDBEventStorage(string connString) : EventStorage { public partial class MariaDBEventStorage(string connString) : EventStorage {
private string ConnectionString { get; } = connString ?? throw new ArgumentNullException(nameof(connString)); private string ConnectionString { get; } = connString ?? throw new ArgumentNullException(nameof(connString));
public void AddEvent( public async Task AddEvent(
long id, long id,
string type, string type,
string channelName, string channelName,
@ -20,7 +20,7 @@ public partial class MariaDBEventStorage(string connString) : EventStorage {
object? data = null, object? data = null,
StoredEventFlags flags = StoredEventFlags.None StoredEventFlags flags = StoredEventFlags.None
) { ) {
RunCommand( await RunCommand(
"INSERT INTO `sqc_events` (`event_id`, `event_created`, `event_type`, `event_target`, `event_flags`, `event_data`" "INSERT INTO `sqc_events` (`event_id`, `event_created`, `event_type`, `event_target`, `event_flags`, `event_data`"
+ ", `event_sender`, `event_sender_name`, `event_sender_colour`, `event_sender_rank`, `event_sender_nick`, `event_sender_perms`)" + ", `event_sender`, `event_sender_name`, `event_sender_colour`, `event_sender_rank`, `event_sender_nick`, `event_sender_perms`)"
+ " VALUES (@id, NOW(), @type, @target, @flags, @data" + " VALUES (@id, NOW(), @type, @target, @flags, @data"
@ -39,9 +39,9 @@ public partial class MariaDBEventStorage(string connString) : EventStorage {
); );
} }
public StoredEventInfo? GetEvent(long seqId) { public async Task<StoredEventInfo?> GetEvent(long seqId) {
try { try {
using MySqlDataReader? reader = RunQuery( using MySqlDataReader? reader = await RunQuery(
"SELECT `event_id`, `event_type`, `event_flags`, `event_data`, `event_target`" "SELECT `event_id`, `event_type`, `event_flags`, `event_data`, `event_target`"
+ ", `event_sender`, `event_sender_name`, `event_sender_colour`, `event_sender_rank`, `event_sender_nick`, `event_sender_perms`" + ", `event_sender`, `event_sender_name`, `event_sender_colour`, `event_sender_rank`, `event_sender_nick`, `event_sender_perms`"
+ ", UNIX_TIMESTAMP(`event_created`) AS `event_created`" + ", UNIX_TIMESTAMP(`event_created`) AS `event_created`"
@ -86,11 +86,11 @@ public partial class MariaDBEventStorage(string connString) : EventStorage {
); );
} }
public IEnumerable<StoredEventInfo> GetChannelEventLog(string channelName, int amount = 20, int offset = 0) { public async Task<IEnumerable<StoredEventInfo>> GetChannelEventLog(string channelName, int amount = 20, int offset = 0) {
List<StoredEventInfo> events = []; List<StoredEventInfo> events = [];
try { try {
using MySqlDataReader? reader = RunQuery( using MySqlDataReader? reader = await RunQuery(
"SELECT `event_id`, `event_type`, `event_flags`, `event_data`, `event_target`" "SELECT `event_id`, `event_type`, `event_flags`, `event_data`, `event_target`"
+ ", `event_sender`, `event_sender_name`, `event_sender_colour`, `event_sender_rank`, `event_sender_nick`, `event_sender_perms`" + ", `event_sender`, `event_sender_name`, `event_sender_colour`, `event_sender_rank`, `event_sender_nick`, `event_sender_perms`"
+ ", UNIX_TIMESTAMP(`event_created`) AS `event_created`" + ", UNIX_TIMESTAMP(`event_created`) AS `event_created`"
@ -121,9 +121,8 @@ public partial class MariaDBEventStorage(string connString) : EventStorage {
return events; return events;
} }
public void RemoveEvent(StoredEventInfo evt) { public async Task RemoveEvent(StoredEventInfo evt) {
ArgumentNullException.ThrowIfNull(evt); await RunCommand(
RunCommand(
"UPDATE IGNORE `sqc_events` SET `event_deleted` = NOW() WHERE `event_id` = @id AND `event_deleted` IS NULL", "UPDATE IGNORE `sqc_events` SET `event_deleted` = NOW() WHERE `event_id` = @id AND `event_deleted` IS NULL",
new MySqlParameter("id", evt.Id) new MySqlParameter("id", evt.Id)
); );

View file

@ -29,20 +29,20 @@ public partial class MariaDBEventStorage {
}.ToString(); }.ToString();
} }
private MySqlConnection GetConnection() { private async Task<MySqlConnection> GetConnectionAsync() {
MySqlConnection conn = new(ConnectionString); MySqlConnection conn = new(ConnectionString);
conn.Open(); await conn.OpenAsync();
return conn; return conn;
} }
private int RunCommand(string command, params MySqlParameter[] parameters) { private async Task<int> RunCommand(string command, params MySqlParameter[] parameters) {
try { try {
using MySqlConnection conn = GetConnection(); using MySqlConnection conn = await GetConnectionAsync();
using MySqlCommand cmd = conn.CreateCommand(); using MySqlCommand cmd = conn.CreateCommand();
if(parameters?.Length > 0) if(parameters?.Length > 0)
cmd.Parameters.AddRange(parameters); cmd.Parameters.AddRange(parameters);
cmd.CommandText = command; cmd.CommandText = command;
return cmd.ExecuteNonQuery(); return await cmd.ExecuteNonQueryAsync();
} catch(MySqlException ex) { } catch(MySqlException ex) {
Logger.Write(ex); Logger.Write(ex);
} }
@ -50,14 +50,14 @@ public partial class MariaDBEventStorage {
return 0; return 0;
} }
private MySqlDataReader? RunQuery(string command, params MySqlParameter[] parameters) { private async Task<MySqlDataReader?> RunQuery(string command, params MySqlParameter[] parameters) {
try { try {
MySqlConnection conn = GetConnection(); MySqlConnection conn = await GetConnectionAsync();
MySqlCommand cmd = conn.CreateCommand(); MySqlCommand cmd = conn.CreateCommand();
if(parameters?.Length > 0) if(parameters?.Length > 0)
cmd.Parameters.AddRange(parameters); cmd.Parameters.AddRange(parameters);
cmd.CommandText = command; cmd.CommandText = command;
return cmd.ExecuteReader(System.Data.CommandBehavior.CloseConnection); return await cmd.ExecuteReaderAsync(System.Data.CommandBehavior.CloseConnection);
} catch(MySqlException ex) { } catch(MySqlException ex) {
Logger.Write(ex); Logger.Write(ex);
} }
@ -65,17 +65,17 @@ public partial class MariaDBEventStorage {
return null; return null;
} }
private T RunQueryValue<T>(string command, params MySqlParameter[] parameters) private async Task<T> RunQueryValue<T>(string command, params MySqlParameter[] parameters)
where T : struct { where T : struct {
try { try {
using MySqlConnection conn = GetConnection(); using MySqlConnection conn = await GetConnectionAsync();
using MySqlCommand cmd = conn.CreateCommand(); using MySqlCommand cmd = conn.CreateCommand();
if(parameters?.Length > 0) if(parameters?.Length > 0)
cmd.Parameters.AddRange(parameters); cmd.Parameters.AddRange(parameters);
cmd.CommandText = command; cmd.CommandText = command;
cmd.Prepare(); await cmd.PrepareAsync();
object? raw = cmd.ExecuteScalar(); object? raw = await cmd.ExecuteScalarAsync();
if(raw is T value) if(raw is T value)
return value; return value;
} catch(MySqlException ex) { } catch(MySqlException ex) {

View file

@ -3,23 +3,23 @@
namespace SharpChat.EventStorage; namespace SharpChat.EventStorage;
public partial class MariaDBEventStorage { public partial class MariaDBEventStorage {
private void DoMigration(string name, Action action) { private async Task DoMigration(string name, Func<Task> action) {
bool done = RunQueryValue<long>( bool done = await RunQueryValue<long>(
"SELECT COUNT(*) FROM `sqc_migrations` WHERE `migration_name` = @name", "SELECT COUNT(*) FROM `sqc_migrations` WHERE `migration_name` = @name",
new MySqlParameter("name", name) new MySqlParameter("name", name)
) > 0; ) > 0;
if(!done) { if(!done) {
Logger.Write($"Running migration '{name}'..."); Logger.Write($"Running migration '{name}'...");
action(); await action();
RunCommand( await RunCommand(
"INSERT INTO `sqc_migrations` (`migration_name`) VALUES (@name)", "INSERT INTO `sqc_migrations` (`migration_name`) VALUES (@name)",
new MySqlParameter("name", name) new MySqlParameter("name", name)
); );
} }
} }
public void RunMigrations() { public async Task RunMigrations() {
RunCommand( await RunCommand(
"CREATE TABLE IF NOT EXISTS `sqc_migrations` (" "CREATE TABLE IF NOT EXISTS `sqc_migrations` ("
+ "`migration_name` VARCHAR(255) NOT NULL," + "`migration_name` VARCHAR(255) NOT NULL,"
+ "`migration_completed` TIMESTAMP NOT NULL DEFAULT current_timestamp()," + "`migration_completed` TIMESTAMP NOT NULL DEFAULT current_timestamp(),"
@ -28,36 +28,36 @@ public partial class MariaDBEventStorage {
+ ") COLLATE='utf8mb4_unicode_ci' ENGINE=InnoDB;" + ") COLLATE='utf8mb4_unicode_ci' ENGINE=InnoDB;"
); );
DoMigration("create_events_table", CreateEventsTable); await DoMigration("create_events_table", CreateEventsTable);
DoMigration("allow_null_target", AllowNullTarget); await DoMigration("allow_null_target", AllowNullTarget);
DoMigration("event_data_as_medium_blob", EventDataAsMediumBlob); await DoMigration("event_data_as_medium_blob", EventDataAsMediumBlob);
DoMigration("event_user_and_nick_name_to_1000", EventUserAndNickNameTo1000); await DoMigration("event_user_and_nick_name_to_1000", EventUserAndNickNameTo1000);
} }
private void EventUserAndNickNameTo1000() { private async Task EventUserAndNickNameTo1000() {
RunCommand( await RunCommand(
"ALTER TABLE `sqc_events`" "ALTER TABLE `sqc_events`"
+ " CHANGE COLUMN `event_sender_name` `event_sender_name` VARCHAR(1000) NULL DEFAULT NULL COLLATE 'utf8mb4_unicode_520_ci' AFTER `event_sender`," + " CHANGE COLUMN `event_sender_name` `event_sender_name` VARCHAR(1000) NULL DEFAULT NULL COLLATE 'utf8mb4_unicode_520_ci' AFTER `event_sender`,"
+ " CHANGE COLUMN `event_sender_nick` `event_sender_nick` VARCHAR(1000) NULL DEFAULT NULL COLLATE 'utf8mb4_unicode_520_ci' AFTER `event_sender_rank`;" + " CHANGE COLUMN `event_sender_nick` `event_sender_nick` VARCHAR(1000) NULL DEFAULT NULL COLLATE 'utf8mb4_unicode_520_ci' AFTER `event_sender_rank`;"
); );
} }
private void EventDataAsMediumBlob() { private async Task EventDataAsMediumBlob() {
RunCommand( await RunCommand(
"ALTER TABLE `sqc_events`" "ALTER TABLE `sqc_events`"
+ " CHANGE COLUMN `event_data` `event_data` MEDIUMBLOB NULL DEFAULT NULL AFTER `event_flags`;" + " CHANGE COLUMN `event_data` `event_data` MEDIUMBLOB NULL DEFAULT NULL AFTER `event_flags`;"
); );
} }
private void AllowNullTarget() { private async Task AllowNullTarget() {
RunCommand( await RunCommand(
"ALTER TABLE `sqc_events`" "ALTER TABLE `sqc_events`"
+ " CHANGE COLUMN `event_target` `event_target` VARBINARY(255) NULL AFTER `event_type`;" + " CHANGE COLUMN `event_target` `event_target` VARBINARY(255) NULL AFTER `event_type`;"
); );
} }
private void CreateEventsTable() { private async Task CreateEventsTable() {
RunCommand( await RunCommand(
"CREATE TABLE `sqc_events` (" "CREATE TABLE `sqc_events` ("
+ "`event_id` BIGINT(20) NOT NULL," + "`event_id` BIGINT(20) NOT NULL,"
+ "`event_sender` BIGINT(20) UNSIGNED NULL DEFAULT NULL," + "`event_sender` BIGINT(20) UNSIGNED NULL DEFAULT NULL,"

View file

@ -5,7 +5,7 @@ namespace SharpChat.EventStorage;
public class VirtualEventStorage : EventStorage { public class VirtualEventStorage : EventStorage {
private readonly Dictionary<long, StoredEventInfo> Events = []; private readonly Dictionary<long, StoredEventInfo> Events = [];
public void AddEvent( public Task AddEvent(
long id, long id,
string type, string type,
string channelName, string channelName,
@ -40,18 +40,20 @@ public class VirtualEventStorage : EventStorage {
flags flags
) )
); );
return Task.CompletedTask;
} }
public StoredEventInfo? GetEvent(long seqId) { public Task<StoredEventInfo?> GetEvent(long seqId) {
return Events.TryGetValue(seqId, out StoredEventInfo? evt) ? evt : null; return Task.FromResult(Events.TryGetValue(seqId, out StoredEventInfo? evt) ? evt : null);
} }
public void RemoveEvent(StoredEventInfo evt) { public Task RemoveEvent(StoredEventInfo evt) {
ArgumentNullException.ThrowIfNull(evt);
Events.Remove(evt.Id); Events.Remove(evt.Id);
return Task.CompletedTask;
} }
public IEnumerable<StoredEventInfo> GetChannelEventLog(string channelName, int amount = 20, int offset = 0) { public Task<IEnumerable<StoredEventInfo>> GetChannelEventLog(string channelName, int amount = 20, int offset = 0) {
IEnumerable<StoredEventInfo> subset = Events.Values.Where(ev => ev.ChannelName == channelName); IEnumerable<StoredEventInfo> subset = Events.Values.Where(ev => ev.ChannelName == channelName);
int start = subset.Count() - offset - amount; int start = subset.Count() - offset - amount;
@ -60,6 +62,6 @@ public class VirtualEventStorage : EventStorage {
start = 0; start = 0;
} }
return [.. subset.Skip(start).Take(amount)]; return Task.FromResult(subset.Skip(start).Take(amount).ToArray() as IEnumerable<StoredEventInfo>);
} }
} }

View file

@ -129,7 +129,7 @@ if(string.IsNullOrWhiteSpace(config.SafeReadValue("mariadb:host", string.Empty))
} else { } else {
MariaDBEventStorage mdbes = new(MariaDBEventStorage.BuildConnString(config.ScopeTo("mariadb"))); MariaDBEventStorage mdbes = new(MariaDBEventStorage.BuildConnString(config.ScopeTo("mariadb")));
evtStore = mdbes; evtStore = mdbes;
mdbes.RunMigrations(); await mdbes.RunMigrations();
} }
if(hasCancelled) return; if(hasCancelled) return;