Why (and How) I Replaced Amazon SQS with MongoDB
What is Amazon SQS?
Amazon SQS (Simple Queue Service) is a reliable message queuing service hosted in the Amazon cloud. This service is ideal for sending messages between servers that need to acknowledge that processing has been completed. When a message is popped from the queue, it is not deleted, but marked with the client who has made the request. The client is then responsible for telling SQS to delete the message from the queue. If the client does not delete a message it has popped within a certain time frame, the client loses ownership of the message and it is made available for other clients.
How am I using it?
One of the systems I am using SQS for is a distributed email delivery service (using SMTP). Since there is not an asynchronous SMTP client for Java (that I know of), I am using JavaMail to deliver messages. Sending messages with JavaMail is pretty slow and can take a number of seconds per message, with a thread being consumed for each message sent. In order to send many many messages in parallel I decided to queue up the outoging messages and spin up many instances of the SMTP application. This approach is dead simple and scales wonderfully without needing to implement an asynchronous SMTP client of my own.
So what’s wrong with Amazon SQS?
The main problem with using SQS in the above scenario is that I can’t push an entire email message onto the SQS queue since each SQS message is limited to 8K of data. To get around this, I store the message in MongoDB and then queue the message ID on SQS. Each client then needs to pull a message from the queue and then look up the email in Mongo. There’s nothing really wrong with this approach, but it can be done better, faster, easier, and cheaper. Amazon charges me a fraction of a cent for every operation I perform on SQS. This doesn’t seem like much, but if I have 10 SMTP applications polling SQS 4 times a second all day every day regardless of whether there are new messages to send, this can add up. Plus, I have diagnostic applications watching the queue size to see if I need to spin up more instances or take down instances. Even if this adds up to $10/day, that’s still $3,650/year just to send out email. That’s too much for a startup with no financial backing!
The approach
I have been using MongoDB for a while now and am enamored with what it can do. I know that it can store lots of schema-less data in 4MB chunks (a document is limited to 4MB) and can store larger files through the use of GridFS. I know that it’s lightning fast (almost memcached speed) for indexed lookups and can handle thousands of operations per second without spiking the CPU over 10% even. I know that I’m paying for the CPU and hard drive space on Amazon EC2 already and thoroughly enjoy minimizing my monthly, weekly, and even daily costs. Blah. Blah. Blah. I want to implement this in Mongo!
With the introduction of server-side javascript and the findAndModify command, using MongoDB for a queue that can be accessed by any client language (of which there are a ton!) is just easy. Below is the code that I am using on my own projects.
The Code
sqs.js
function sqsQueueExists(name) {
return db.queue[name].count() != 0;
};
function sqsQueueMessageCount(name) {
return db.queue[name].count({
alive: true,
expires: {$lt: new Date}
});
};
function sqsDeleteQueue(name) {
db.queue[name].drop();
};
function sqsListQueues(prefix) {
var regex;
if (prefix)
regex = new RegExp('^[^.]+\.queue\.' + prefix + '[^$]*$');
else
regex = /^[^.]+\.queue\.[^$]+$/;
return db.system.namespaces.find({
name: regex
}).map(function (x) {
return x.name.substring(x.name.indexOf('.') + 7);
});
};
function sqsPushMessage(queue, message) {
var _push = function(queue, message) {
db.queue[queue].save({
alive: true,
expires: new Date(0),
owner: new ObjectId('000000000000000000000000'),
body: message
});
};
if (message instanceof Array) {
message.forEach(function(m) {
_push(queue, m);
});
} else {
_push(queue, message);
}
};
function sqsPopMessage(queue, owner, count) {
var now = new Date;
// 10 second expiration, change this to what you want
var expires = new Date(now.getTime() + 10000);
if (!count) {
count = 1;
}
var result = [];
for (var i = 0; i < count; ++i) {
var item = db.queue[queue].findAndModify({
query: {
alive: true,
expires: {$lt: now}
},
update: {
$set: {
expires: expires,
owner: owner
}
},
new: true
});
if (friendlyEqual({}, item))
break;
result.push(item);
}
return result;
};
function sqsDeleteMessage(queue, owner, item_ids) {
if (item_ids instanceof ObjectId)
item_ids = [item_ids];
db.queue[queue].update({
alive: true,
expires: {$gte: new Date},
owner: owner,
_id: {$in: item_ids}
}, {
$set: {
alive: false
}
},
false,
true);
};
If you copy the code into a sqs.js file, you can then run the script below to create the stored procedures on the database of your choice. The alternative is to write the code in the MongoDB driver of your choice.
bash
$ for function in sqsQueueExists sqsQueueMessageCount sqsDeleteQueue sqsListQueues sqsPushMessage
sqsPopMessage sqsDeleteMessage; do
echo "db.system.js.save({_id: '$function', value: $function})" |
mongo [db name] --quiet --shell sqs.js
done
MongoDB Shell
> use [db name];
> load('sqs.js');
> [sqsQueueExists, sqsQueueMessageCount, sqsDeleteQueue, sqsListQueues, sqsPushMessage,
sqsPopMessage, sqsDeleteMessage].forEach(function (x) {
var name = x.toString().match(/^function\s(\w+)/)[1];
db.system.js.save({_id: name, value: x});
});
There are pieces of the SQS API that I have left out of this implementation for simplicity. These are based around the options provided, like changing the visibility timeout of a queue or individual message. This would be pretty trivial to add if necessary. However, if you are not using these features, leaving them out of the code will only make the code faster. To add options to a queue, just add another collection called queue that holds all queue names (in the _id field) and the applicable options. Then just do an query on the queue collection when the options are needed (the _id field is automatically indexed, so this will be fast).
I hope this helps you out!
Pretty cool!
A few notes:
1) db.queue[name].foo is the same as db['queue.'+name].foo
2) rather than bash looping, you could call load(‘sqs.js’) from inside the shell and use a JS loop.
This was one of the primary use cases we had in mind for findAndModify, so I’m glad to see it getting put to use.
Mathias Stearn
25 May 10 at 4:38 pm
Thanks Mathias! I made the changes you mentioned.
Matt Insler
25 May 10 at 5:13 pm
Very cool – I plan to build something very similar to what you have here, so your code will be very valuable to me.
Bryan Migliorisi
8 Jun 10 at 4:44 pm
Hmm… I guess you forgot the “reliable” in your Queuing system.. pull the plug and you’ve lost with mongodb.
Jesper
Jesper
18 Jun 10 at 8:22 am
Well, to make this truly reliable, we can use the replication features in Mongo. There is something known as “safe mode” in most drivers where you can specify that write operations only return after data has been written to N servers (http://www.mongodb.org/display/DOCS/Replication#Replication-ReplicationAcknowledgementviagetlasterror). Granted there are systems that are more fault-tolerant via journaling, however they also tend to be slower. The choice is up to you which letter of the CAP theorem you’re willing to give up.
Matt Insler
21 Jun 10 at 6:11 pm
You mentioned the alternative of writing client code. While this server-side JS is elegant, it seems like it would be complicate deployment, version control, debugging, etc, processes that a developer already understands.
It seems like the main advantage to server-side code is that different clients could use the logic, maybe a ruby maintenance script in addition to Java backend or some such. Are there any other advantages to using server-side logic? Have you found it integrates into your workflow well enough to outweigh the hassle of deployment, debugging, and SCM?
Thanks for the post.
Nick
11 Jul 10 at 11:11 pm
It still isn’t not big enough for some e-mail messages but note that as of the 30 June update to SQS, you can set MaximumMessageSize of a queue up to 64KB.
Nic Wolff
12 Jul 10 at 11:04 pm
That’s the point of using GridFS. You can store very large files (I’m not sure exactly how large, but definitely gigs) in MongoDB. That handles emails just fine and simply crushes 64KB. Plus, with Amazon services, you’d need to pay for S3 and SQS in order to transfer larger documents in a queue. For my email system, I just put a file into GridFS and then pass the id through the queue.
Matt Insler
12 Jul 10 at 11:11 pm
Very helpful, thank you.
When deleting a message, I noticed that alive is set to false but the message remains in the DB (maybe I’m missing something). Is there a reason it isn’t just removed altogether? Do you run a command later to just remove all entries where alive=false?
Thanks!
Chris
7 Sep 10 at 3:31 pm
Does this mean you have to call the pop method and it returns if no items are queued, so all your servers will spin in a busy loop?
I think with SQS you can avoid busy spinning as well as the cost associated with it (as well as the latency of sleeping in between) with a blocking poll. And I would love to see how you do that with mongodb.
Besides: do you have builtin support for duplicate reduction? (when expired messages are deleted).
Gruss
Bernd
Bernd
5 Nov 10 at 4:09 am
Yes, this is a busy wait. At the time I was dealing with this problem, you needed to busy-wait on SQS, which cost money to do, which was one of the reasons I went to my own Mongo instance. I believe that SNS (Simple Notification Service) can help solve that issue now, but will still cost money.
In Mongo, this busy-wait can be avoided if the queue collection is created as a capped collection and then the client uses a tailable cursor to query the queue. Just remember, though, that capped collections aren’t created with an index and have certain restrictions on them.
Duplicates can be handled however you wish. To my knowledge though, since this is just a distributed message queue, duplicate reduction is not something you would usually expect. You can definitely add filters on the pop method in your client to get the latest message where some condition is met. This would be a simple way to do a pub/sub architecture. I’d have an array of the topic (so topic /hello/world would be ['hello','world']) in each message and then I could query /hello/world with {‘topic.0′: ‘hello’, ‘topic.1′:’world’,topic: {$size: 2}} or /hello/* with {‘topic.0′:’hello’}. Just an idea. If the array is indexed, this should be a really fast operation.
Matt Insler
5 Nov 10 at 11:31 am
The 8k limitation is a bummer.
I use OnlineMQ.com instead where the limitation is 64k. more ever, they have a nice GUI to define queue’s.
Hans Ereltzer
1 Jan 11 at 3:05 pm
@Matt Insler 12 Jul 10 at 11:11 pm
Emails can not exceed 20MB, is there a reason why talking gigs?
Abdel
12 May 11 at 10:04 am
Gigs aren’t needed per email, but definitely for the mailbox. I think the document size is going to be raised to 32MB in a newer version, but in older versions you need to split large emails (usually with embedded attachments) across multiple objects through GridFS.
Matt Insler
12 May 11 at 10:18 am
[...] first to see if anyone else had created something already and the closest I got was the post here: Why (and How) I Replaced Amazon SQS with MongoDB. However, from reading the MongoDB website I’d seen that it had Tailable Cursors which, together [...]
« Simple Service Bus / Message Queue with MongoDB
28 May 11 at 6:20 pm
Interesting concept, thanks for sharing.
Tamas Kalman
10 Jun 11 at 7:11 pm