A while ago, I needed a chat server for my game. I only had rudimentary requirements. After a short research, I decided to use XMPP as the protocol and MongooseIM as the backing server. Over time, I came to the conclusion, that this is far from ideal. The rest API isn’t easy to use, and the server configuration is error-prone.
I don’t need the whole XMPP protocol. I only need some non-persistent chat rooms so people of the same party can communicate with each other. The chat rooms should communicate with the clients through a WebSocket protocol. The server should not save any messages and forward them to the right people.
I invested significant effort in configuring Mongoose at runtime.
There is a lot of code dynamically creating rooms and delivering messages
So I decided that I would try to come up with a more basic version for my requirements.
I already had a lot of experience with Kotlin’s coroutines and was sure they would fit my needs.
I also wanted to try the relatively new Flow construct for hot flows.
My backend server uses SpringBoot. Room-Orchestration happens through a rest API.
The basic lifecycle for a room looks like this:
Room Management #
So the first thing we need to do is create a room. A room is a rather basic construct. It only needs an ID, a name, and a message container.
data class Room(val id: UUID,
val name: String,
val messages: MutableSharedFlow<ChatMessage>)
We use a MutableSharedFlow
to keep track of chat messages.
In simple terms, it is a thread-safe buffer with some convenient methods to notify watchers of changes.
The bland-named service RoomService
managed these rooms.
class RoomService(
private val userRoomHost: UserRoomHost,
) {
private val rooms = ConcurrentHashMap<UUID, Room>()
fun createRoom(id: UUID, name: String) {
val messageFlow = MutableSharedFlow<ChatMessage>()
rooms[id] = Room(id, name, messageFlow)
}
}
A ConcurrentHashMap
holds them in memory.
The MutableSharedFlow
has zero buffer capacity initially. We might need to increase this later if it causes transmission hiccups.
Next, we need to create a service that can keep track of which user is in which rooms.
class UserRoomHost {
private val playerRooms = ConcurrentHashMap<UUID, Set<UUID>>() //Store all room assignments
private val roomMutex = Mutex() //Use a mutex for room assignment for safety reasons
fun assignPlayerToRoom(roomId: UUID, userId: UUID) {
if (playerRooms[userId]?.contains(roomId) == true) {
return
}
runBlocking {
roomMutex.withLock {
val existingRooms = playerRooms[userId] ?: emptySet()
playerRooms[userId] = existingRooms + setOf(roomId)
}
}
}
}
Although the ConcurrentHashMap
should prevent some errors while accessing it multithreaded, I added the additional Mutex
.
So we don’t lose any updates if we call this method concurrently. You will see a lot of ConcurrentHashMaps in this code because I’m a die-hard fan.
UserRoomHost also contains additional methods to remove users from rooms and query present users.
Message transmission #
As soon as a player connects per WebSocket, the server creates a session.
This session will spawn a UserChatHandler
that starts to query the UserRoomHost
periodically.
It dynamically joins new rooms and unsubscribes old ones.
Furthermore, it forwards incoming messages to the WebSocket and vice versa.
// some parts are omitted for readability, you can find all sources at the end of the article
class UserChatHandler(
private val userRoomHost: UserRoomHost,
private val roomService: RoomService,
// The coroutineScope for this user, it is created with the session
private val scope: CoroutineScope,
) {
private val joinedRooms = mutableMapOf<UUID, UserInRoom>()
// Websocket sessions don't support concurrent send, so we need to make sure we don't do that
private val sendMutex = Mutex()
init {
scope.launch {
roomWatcher()
}
}
private suspend fun roomWatcher() {
while (session.isOpen) {
updatePlayerRooms()
delay(1000)
}
}
private suspend fun updatePlayerRooms() {
userRoomHost.playerRooms(userId).let {
roomService.roomsForIds(it)
}.let {
updateRooms(it)
}
}
private suspend fun updateRooms(newRooms: Set<Room>) {
// This method calls joinRoom() accordingly
joinNewRooms(newRooms)
// This method calls leaveRoom() accordingly
leaveMissingRooms(newRooms)
}
private suspend fun joinRoom(room: Room) {
session.sendJoin(room, sendMutex)
// We need to keep track of the job, so we can cancel it, when the user leaves the room
val receiveJob = scope.launch {
room.messages.collect {
// Forward all messages to the WebSocket
session.sendMessage(room, sendMutex, it)
}
}
joinedRooms[room.id] = UserInRoom(room, receiveJob)
}
private suspend fun leaveRoom(room: Room) {
// Cancel receiving job
joinedRooms[room.id]?.receiveJob?.cancel()
session.sendLeave(room, sendMutex)
joinedRooms.remove(room.id)
}
}
To send a message to this room, we can now add another method:
suspend fun sendMessage(roomId: UUID, message: String) {
joinedRooms[roomId]?.room?.sendMessage(ChatMessage(username, message))
}
We’re just using a convenience method from our room class:
suspend fun sendMessage(chatMessage: ChatMessage) {
messages.emit(chatMessage)
}
It emits a message to the MutableSharedFlow.
The whole component view looks like this:
There is some simplification in this, but you get the idea.
Overall the MutableSharedFlow
makes it easy to create a solution for my problem.
The produced code is minimal and even less than the previous MongooseIM solution.
I didn’t test this code for performance. There might be a lot of optimization potential.
The whole code is available here: https://gist.github.com/klg71/c672e06d590382121cb2c5448009eb5b