Using EC2 to split up and expedite large processing jobs

by rbrant on January 4, 2011

The problem:
You have a large number of records that need to be modified. You don’t have the processing resources to accomplish this as quickly as you’d like.

The solution:
Break the data into chunked csv files, with each file containing a certain number of records. Spin up a bunch of EC2 instances whereby each one, on startup, processes one file. This also allows you to run a large number of processes concurrently to get the job done quickly. 100 ‘micro’ instances all running at the same time will cost $10 per hour ($.10/hour). That’s not bad!

Creating the chunked files

file_num = 1

Thing.all.in_groups_of(2500, false) do |thing_group|
  csv_data = FasterCSV.generate do |csv|
    thing_group.each_with_index do |thing|
      csv << [thing.attr1, thing.attr2]
    end
  end
end

File.open("chunked_things/file#{file_num}.csv", 'w') {|f| f.write(csv_data) }
file_num += 1

Configure an EC2 instance to serve as the source instance
Configure an EC2 instance from which you will create an AMI to be used to spawn the other worker instances. The important part is to make sure this instance has a copy of all the chunked files created above. I'll explain why later. I used an AMI from bitnami, this one to be specific (listed toward the bottom of the page). It's an EBS backed AMI which makes creating AMIs from running instances easier than from S3 backed ones. This may have changed, not sure.

Use cron to run the script at start up
Create a cron task set to run when the instance is booted. You can use the @reboot shortcut to accomplish this:

@reboot /path/to/ruby /path/to/your/script

The goal here is to have your script run when the server boots. This would be pretty straight forward, however, we need to be sure all the EC2 instances don't process the same file at once, or process the same file twice. Enter SQS.

SQS
SQS basically allows you to create a simple queue that holds messages. For us, these messages take the format of the names of all the files we want to process. When the server boots, and the script runs, it hits the SQS service and asks for the name of a file. SQS responds with a message off the queue (which is the name of a file). That message is locked so no other requests will respond with it. Once the message is retrieved, you can pop is off the queue. Each spawned instance is guaranteed to get it's own filename.

right_aws is an awesome wrapper for AWS. The code to create the create the queue looks like this:

sqs   = RightAws::SqsGen2.new(aws_key, aws_secret)
queue = RightAws::SqsGen2::Queue.create(sqs, 'chunk_queue')

Populate the queue with your filenames

queue.push 'your_filename'

Processing the file
To get a filename from the queue:

chunked_filename = queue.pop.to_s

Now we have the file with CSV data to process. Loop through it, hitting service you need to.

Push your modifed data back to S3
When you are finished processing your file, you'll want to put the results somewhere. S3 is an obvious choice. Again with right_aws, it's pretty easy. This will basically write your data back to a file on S3:

s3     = RightAws::S3.new(aws_key, aws_secret)
bucket = s3.bucket(s3_bucket_name)
key    = RightAws::S3::Key.create(bucket, "#{s3_bucket_key}#{chunked_filename}")
key.put(your_data)

Altogether now, this is some pseudo code(not tested) that addresses the core parts of the process. This is the script that would be executed at start up:

require 'rubygems'
require 'right_aws'
require 'fastercsv'

# aws
aws_key    = 'your key'
aws_secret = 'your secret'

# s3 bucket
s3_bucket_name = 'your_bucket'
s3_bucket_key  = 'name_of_key'

sqs   = RightAws::SqsGen2.new(aws_key, aws_secret)
queue = RightAws::SqsGen2::Queue.create(sqs, 'chunk_queue')

# pop a file anme off the queue
chunked_filename = queue.pop.to_s

modified_data = []

# process the filename SQS gave us (a copy of all files is on each instance)
file_to_process = "#{File.expand_path(File.dirname(__FILE__))}/#{chunked_filename}"

FasterCSV.foreach(file_to_process) do |row|
  data_for_api = "#{row[0]}, #{row[1]}, #{row[2]} #{row[3]}"
  results  = # hit your web service or do what you need to here

  # use api results
  modified_data << [results.value1, results.value2]
end

# generating csv data
csv_data = FasterCSV.generate{ |csv| modified_data.each{ |modified| csv << modified } }

# put the file on S3
s3       = RightAws::S3.new(aws_key, aws_secret)

# grab the bucket
bucket   = s3.bucket(s3_bucket_name)

# create the S3 key where the csv data will be stored
key      = RightAws::S3::Key.create(bucket, "#{s3_bucket_key}#{chunked_filename}")

# write the data to S3
key.put(csv_data)

{ 1 comment… read it below or add one }

Jordan March 29, 2011 at 2:00 pm

Thanks for the post on this, I’m setting up my own EC2-based web crawler that uses some of these techniques. I’m stuck at the moment, though, with the cron task. How did you configure it? When I use crontab -e after ssh’ing to my instance, it seems to create cron jobs that always fail when I restart the instance…
Thanks

Reply

Leave a Comment

Previous post:

Next post: