YugaByte DB

The YugaByte Database Blog

Thoughts on open source, cloud native and distributed databases

Building Scalable Cloud Services — An Instant Messaging App

Source: https://stackoverflow.com/questions/47276519/how-should-i-or-should-not-use-cassandra-and-redis-together-to-build-a-scalable

This is the first post in a series about building real-world, distributed cloud services using a transactional cloud database like YugaByte DB.

We are going to look at how to build a scalable chat or messaging application like Facebook Messages. This is close to heart to a number of us at YugaByte — we were the team behind the database platform that powers the Facebook Messages app.


Developing A Scalable Chat App

I came across a relevant question on StackOverflow recently, where a user was discussing how to build a messaging or chat app:

How should I or should not use Cassandra and Redis together to build a scalable one on one chat application?

There was some great analysis by the user on which APIs he wanted to use for which portions of the app. Here is the summary:

So we have two parts here. The right part which is just all messages in a chat thread, and the left part that shows chat threads with information from the last message, and your chat partners information such as name and image and so on.

Cassandra is really good at writing, and reading. But not so much at deleting data because of tombstones. And you don’t want to set gc_grace_seconds to 0 because if a node goes down and deletes occur then that deleted row might come back to life when repair is done. So we might end up deleting all data from the node before it enters the cluster.

The analysis here is accurate. Note that YugaByte DB’s Cassandra API does not have this issue with tombstones, since it is designed on top of strongly-consistent replication.

In an attempt to see how YugaByte DB can simplify development and deployment, I tried to model the various portions of the app —the users, the message-list on the right, the thread-list on the left and the thread-snippets which are needed to display the last few characters of the latest message in each thread. Here is an account of that journey.


Design of the Chat App

Let’s say we are building a one-on-one chat application. That is, a pair of users communicate on one thread (no group threads). It is easy enough to extend this to the general case of multi-user threads.

Here are a couple of high level considerations:

  • In order to make the application really scalable, we are going to de-normalize the data. Each user will have their own private copy of messages.
  • It is desirable to keep all the pieces of data for a single user co-located where possible. This will make batched queries for a single user’s data more efficient.

Let us look at the entities and actions in this app.

Entities

Users: We need a unique user_id for each user, which is associated with the user’s profile (first name, last name, image, etc). These are the users that send and receive messages in the system. The unique id for the user could be an email address, a uuid or simply a number.

Messages: These are the messages that users send to one another. Each message has a unique message_id, a sender_id, a recipient_id, a message_time and a message_body.

Threads: Threads represent a set of messages constituting a conversation between two users. In our 1–1 chat application, we assume that there’s exactly one thread for each pair of users. And all exchanges between that pair of users go to the same thread. The thread id can be composed by using the (user_id, other_user_id) combo.

Actions

Sending/Receiving a message: The message is added to both the (recipient_id, sender_id) thread and the (sender_id, recipient_id) thread of the appropriate users. We also need to update the timestamp of the recipient’s thread so that it shows up on top.

Viewing the inbox: This involves fetching the user’s 10 most recent threads, along with information about which threads are unread. Next, for each of the threads fetched above, we also need to fetch the latest message and display the first 40 or so characters on the thread-list (left part of the app). Finally, we need to fetch the most recent 10 messages for the thread_id being viewed.


Using YugaByte DB to Model the App

Let us model the various portions of this app in three sections — the users, the thread-list and the message-list.

Users


Modeling users is simple. We can use the native YugaByte DB support for either the Redis or Cassandra API. I am going with Redis HASHES mainly because it is easier to model a flexible schema user profile where we need to add fields on the fly.

For example, to create the profile details for a user with the id 1 we would do something as follows:

HMSET user:1:profile firstname John lastname Smith

If we wanted to add another field to this user’s profile, say their profile_pic, that can be done as follows:

HMSET user:1:profile profile_pic "/my-url/user-1-pic.png"

It is easy to fetch the entire profile information for this user as follows:

HGETALL user:1:profile
1) "firstname"
2) "John"
3) "image"
4) "/my-pic.png"
5) "lastname"
6) "Smith"
7) "profile_pic"
8) "/my-url/user-1-pic.png"

Remember that YugaByte DB persists this information as a DB, and pages this in on demand — so you do not need to worry about persistence, data unavailability on failures or implementing the cache population/invalidation in your app.

The message-list

The message list is the right portion of the app — this is shown in the figure below.


It needs to deal with the following concerns:

  • When new messages for a thread arrive, they need to be added to the message-list for a given thread. The new messages for a thread are always added in a time-ordered fashion, and these messages are immutable. There are no changes to the content of the messages.
  • The list of messages for a thread typically keeps growing over time. So having the ability to store a large data set and retrieve recent data with low latency.
  • The common read pattern is to fetch the latest N (say 10 or 20) messages for a given thread.

The Cassandra API is a great choice for modeling the message-list based on the above considerations. As mentioned before, YugaByte’s Cassandra API is designed on top of strongly-consistent replication, and does not have issues with eventual consistency and deletes resurfacing because of retention of tombstones.

Here are some thoughts on how to make the schema efficient for our use case.

  • Firstly, the access pattern is always by user, we make the user_id the hash (partitioning) portion of the primary key.
  • Secondly, for a given user, messages are always accessed in the context of a sender (or a thread), so the other_user_id should be a part of the primary key. If we make the other_user_id a part of the primary hash key column, a single user’s data will be distributed across multiple nodes. Therefore we are going to make it a part of the clustering column the primary key in order to keep a user’s entire data on one node. This is a better choice if we ever want to access all of a user’s data.
  • Thirdly, we would be reading messages by their ids, so the message id should also be a part of the primary key columns. We mostly read the 10 most recent messages for a given user and thread combo, or a specific message given its id. We there need a unique ID to reference each message and keep it sorted by most recent messages received. Therefore, it is best to make the message id a TIMEUUID and make that a part of the clustering column in the primary key.
CREATE TABLE chat.user_messages (
user_id INT,
other_user_id INT,
message_id TIMEUUID,
message_author_id INT,
message_body TEXT,
PRIMARY KEY ((user_id),
other_user_id, message_id)
) WITH CLUSTERING ORDER BY (
other_user_id ASC, message_id DESC);

Let us say we wanted to insert a message into this table when a user id 5 sends a message to user id 1. This can easily be done as follows.

// Insert into the sender's thread.
INSERT INTO chat.user_messages
(user_id,
other_user_id, message_id, message_author_id,
message_body)
VALUES
(5, 1, now(), 5, "Hi there!")
// Insert into the recipient's thread.
INSERT INTO chat.user_messages
(user_id,
other_user_id, message_id, message_author_id,
message_body)
VALUES
(1, 5, now(), 5, "Hi there!")

Selecting the 10 most recent messages for user 1 in the above thread with user 5 can be accomplished quite easily as well.

SELECT * FROM chat.user_messages 
WHERE user_id = 1 AND
other_user_id = 5
LIMIT 10;

The thread-list

The thread-list is the left portion of the app shown below.


Messages can arrive for any thread at any point of time. This means that the incoming messages will dynamically change the sort order of the threads. If there are currently 10 threads in the inbox for a user — [t1, t2 … t10] with t1 being the most recent. If a message arrives for thread t2, we need to remove t2 from position 2 and insert it in position 1 in a transactional way. Apache Cassandra does not support multi-step (or multi-row) operations with ACID guarantees (in other words, transactions) needed to support this access pattern. We could write the entire list of threads as one record, but this will be inefficient to update due to its write amplification.

Here is where using the Redis API of YugaByte as a DB makes things easy. We can maintain the thread list for a user using a Redis SORTED SET. It is easy to update the structure using the ZADD command.

Since this is a one-on-one chat app, we can make the thread id the user id of the other user on the thread. Now, imagine users 5, 23 and 7 sent messages to user 1 and the messages were received at times 100, 150 and 180 (these should be epoch times in the real world, this is just an example). We can easily add these entries into the threads index as follows:

ZADD user:1:threads 100 thread:user:5
ZADD user:1:threads 150 thread:user:23
ZADD user:1:threads 180 thread:user:7

To fetch all the most recent 10 threads when user 1 views their inbox, simply do the following:

ZREVRANGE user:1:threads 0 10 WITHSCORES
1) "thread:user:7"
2) 180.0
3) "thread:user:23"
4) 150.0
5) "thread:user:5"
6) 100.0

Now let us assume user 5 sent another message to user 1 at time 200 (the latest message). Updating the thread index is just as easy:

ZADD user:1:threads 200 thread:user:5
# Fetch threads to verify that the update worked.
ZREVRANGE user:1:threads 0 10 WITHSCORES
1) "thread:user:5"
2) 200.0
3) "thread:user:7"
4) 180.0
5) "thread:user:23"
6) 150.0

If user 1 deletes an entire thread with user 23, that is accomplished as follows:

ZREM user:1:threads thread:user:23
# Fetch threads to verify that the delete worked.
ZREVRANGE user:1:threads 0 10 WITHSCORES
1) "thread:user:5"
2) 200.0
3) "thread:user:7"
4) 180.0

The thread-snippets

We also need to fetch the message “snippets” for each of these threads. A snippet for a thread is the first 40 characters of the latest message in the threads. In order to optimize that, we can store the set of snippets for the 10 recent threads in one key-value, and update it each time we get a new message.

SET user:1:snippets '{ "thread:user:7": "You there?", ... }'

Putting it all together

What we have shown here is the power of a polyglot, multi-model database. In the case of YugaByte DB, it is even simpler to get going because it is wire-compatible with popular, existing APIs like Redis and Cassandra.

Stay tuned for a sample implementation of this messaging app described above. In the meanwhile, feel free to show your love by giving us a star on GitHub.

Karthik Ranganathan

Founder & CTO