Building a Semantic Book Search: Scale an Embedding Pipeline with Apache Spark and AWS EMR Serverless | by Eva Revear | Jan, 2024
In a previous post I did a little PoC to see if I could use OpenAI’s Clip model to build a semantic book search. It worked surprisingly well, in my opinion, but I couldn’t help wondering if it would be better with more data. The previous version used only about 3.5k books, but there are millions in the Openlibrary data set, and I thought it was worthwhile to try adding more options to the search space.
However, the full dataset is about 40GB, and trying to handle that much data on my little laptop, or even in a Colab notebook was a bit much, so I had to figure out a pipeline that could manage filtering and embedding a larger data set.
TLDR; Did it improve the search? I think it did! We 15x’ed the data, which gives the search much more to work with. Its not perfect, but I thought the results were fairly interesting; although I haven’t done a formal accuracy measure.
This was one example I couldn’t get to work no matter how I phrased it in the last iteration, but works fairly well in the version with more data.
If you’re curious you can try it out in Colab!
Overall, it was an interesting technical journey, with a lot of roadblocks and learning opportunities along the way. The tech stack still includes the OpenAI Clip model, but this time I leverage Apache Spark and AWS EMR to run the embedding pipeline.
This seemed like a good opportunity to use Spark, as it allows us to parallelize the embedding computation.
I decided to run the pipeline in EMR Serverless, which is a fairly new AWS offering that provides a serverless environment for EMR and manages scaling resources automatically. I felt it would work well for this use case — as opposed to spinning up an EMR on EC2 cluster — because this is a fairly ad-hoc project, I’m paranoid about cluster costs, and initially I was unsure about what resources the job would require. EMR Serverless makes it pretty easy to experiment with job parameters.
Below is the full process I went through to get everything up and running. I imagine there are better ways to manage certain steps, this is just what ended up working for me, so if you have thoughts or opinions, please do share!
Building an embedding pipeline job with Spark
The initial step was writing the Spark job(s). The full pipeline is broken out into two stages, the first takes in the initial data set and filters for recent fiction (within the last 10 years). This resulted in about 250k books, and around 70k with cover images available to download and embed in the second stage.
First we pull out the relevant columns from the raw data file.
Then do some general data transformation on data types, and filter out everything but English fiction with more than 100 pages.
The second stage grabs the first stage’s output dataset, and runs the images through the Clip model, downloaded from Hugging Face. The important step here is turning the various functions that we need to apply to the data into Spark UDFs. The main one of interest is get_image_embedding, which takes in the image and returns the embedding
We register it as a UDF:
And call that UDF on the dataset:
Setting up the vector database
As a last, optional, step in the code, we can setup a vector database, in this case Milvus, to load and query from. Note, I did not do this as part of the cloud job for this project, as I pickled my embeddings to use without having to keep a cluster up and running indefinitely. However, it is fairly simple to setup Milvus and load a Spark Dataframe to a collection.
First, create a collection with an index on the image embedding column that the database can use for the search.
Then we can access the collection in the Spark script, and load the embeddings into it from the final Dataframe.
Finally, we can simply embed the search text with the same method used in the UDF above, and hit the database with the embeddings. The database does the heavy lifting of figuring out the best matches
Setting up the pipeline in AWS
Prerequisites
Now there’s a bit of setup to go through in order to run these jobs on EMR Serverless.
As prerequisites we need:
- An S3 bucket for job scripts, inputs and outputs, and other artifacts that the job needs
- An IAM role with Read, List, and Write permissions for S3, as well as Read and Write for Glue.
- A trust policy that allows the EMR jobs to access other AWS services.
There are great descriptions of the roles and permissions policies, as well as a general outline of how to get up and running with EMR Serverless in the AWS docs here: Getting started with Amazon EMR Serverless
Next we have to setup an EMR Studio: Create an EMR Studio
Accessing the web via an Internet Gateway
Another bit of setup that’s specific to this particular job is that we have to allow the job to reach out to the Internet, which the EMR application is not able to do by default. As we saw in the script, the job needs to access both the images to embed, as well as Hugging Face to download the model configs and weights.
Note: There are likely more efficient ways to handle the model than downloading it to each worker (broadcasting it, storing it somewhere locally in the system, etc), but in this case, for a single run through the data, this is sufficient.
Anyway, allowing the machine the Spark job is running on to reach out to the Internet requires VPC with private subnets that have NAT gateways. All of this setup starts with accessing AWS VPC interface -> Create VPC -> selecting VPC and more -> selecting option for at least on NAT gateway -> clicking Create VPC.
The VPC takes a few minutes to set up. Once that is done we also need to create a security group in the security group interface, and attach the VPC we just created.
Creating the EMR Serverless application
Now for the EMR Serverless application that will submit the job! Creating and launching an EMR studio should open a UI that offers a few options including creating an application. In the create application UI, select Use Custom settings -> Network settings. Here is where the VPC, the two private subnets, and the security group come into play.
Building a virtual environment
Finally, the environment doesn’t come with many libraries, so in order to add additional Python dependencies we can either use native Python or create and package a virtual environment: Using Python libraries with EMR Serverless.
I went the second route, and the easiest way to do this is with Docker, as it allows us to build the virtual environment within the Amazon Linux distribution that’s running the EMR jobs (doing it in any other distribution or OS can become incredibly messy).
Another warning: be careful to pick the version of EMR that corresponds to the version of Python that you are using, and choose package versions accordingly as well.
The Docker process outputs the zipped up virtual environment as pyspark_dependencies.tar.gz, which then goes into the S3 bucket along with the job scripts.
We can then send this packaged environment along with the rest of the Spark job configurations
Nice! We have the job script, the environmental dependencies, gateways, and an EMR application, we get to submit the job! Not so fast! Now comes the real fun, Spark tuning.
As previously mentioned, EMR Serverless scales automatically to handle our workload, which typically would be great, but I found (obvious in hindsight) that it was unhelpful for this particular use case.
A few tens of thousands of records is not at all “big data”; Spark wants terabytes of data to work through, and I was just sending essentially a few thousand image urls (not even the images themselves). Left to its own devices, EMR Serverless will send the job to one node to work through on a single thread, completely defeating the purpose of parallelization.
Additionally, while embedding jobs take in a relatively small amount of data, they expand it significantly, as the embeddings are quite large (512 in the case of Clip). Even if you leave that one node to churn away for a few days, it’ll run out of memory long before it finishes working through the full set of data.
In order to get it to run, I experimented with a few Spark properties so that I could use large machines in the cluster, but split the data into very small partitions so that each core would have just a bit to work through and output:
- spark.executor.memory: Amount of memory to use per executor process
- spark.sql.files.maxPartitionBytes: The maximum number of bytes to pack into a single partition when reading files.
- spark.executor.cores: The number of cores to use on each executor.
You’ll have to tweak these depending on the particular nature of the your data, and embedding still isn’t a speedy process, but it was able to work through my data.
Conclusion
As with my previous post the results certainly aren’t perfect, and by no means a replacement for solid book recommendations from other humans! But that being said there were some spot on answers to a number of my searches, which I thought was pretty cool.
If you want to play around with the app yourself, its in Colab, and the full code for the pipeline is in Github!