## Tuesday, November 24, 2015

### Building a Streaming Search Platform

On average, Twitter users worldwide generate about 6,000 tweets per second. Obviously, there is much interest in extracting real-time signal from this rich but noisy stream of data. More generally, there many open and interesting problems in using high-velocity streaming text sources to track real-time events. In this post, I describe the key components of a platform that will allow for near real-time search of a streaming text data source such as the Twitter firehose.

Such a platform can have many applications far beyond monitoring Twitter. For example, a network of speech to text monitors could transcribe radio and television feeds and pass the transcriptions to the platform. When key phrases or features are found in the feeds, the platform could be configured to trigger real-time event management. This application is potentially relevant to financial, marketing, and other domains that depend on real-time information processing.

All code for the platform I describe here can be found in my github project Straw. The code base includes:
• Complete Java-based Storm implementation, including both Lucene-Luwak and Elasticsearch-Percolators implementations of streaming search.
• Scripts to automate AWS deployment using boto3
• A local run mode enabling testing on a single machine using dockerized components
• Benchmark utilities
• A demo multiuser web interface where users register queries and receive streaming matches from a simulated twitter firehose
I completed this project as a Fellow in the Insight Data Engineering Program. The original inspiration for for this project came from two excellent blog posts on streaming search:

# Streaming Search

The key data structure for solving a traditional text search problem is an inverted index built from the collection of documents you want to be able to query. In its simplest form, an inverted index is just a map whose keys are the set of all unique terms in the documents. The value associated to a particular term in the map is a list of all the documents which use that term.

After the index has been built, users can submit queries to run against the index. For example, we can have a query that should return all the documents that contain both words in the phrase "llama pajamas". The query engine will split the input phrase into the tokens "llama" and "pajamas", then it will check the inverted index to get the list of all documents that contain the word "llamas" and the list of all documents that contain the word "pajamas". The engine will then return the intersection of these two lists, i.e. the list of the documents that are present in both lists.

In the streaming case, documents arrive at a very fast rate (e.g. average of 6000 per second in the case of Twitter) and with this kind of velocity and volume it is impractical to build the inverted document index in real-time. Moreover, the goal is not to create a static index of tweets--rather it is to scan the tweets as they arrive in real-time and determine if they match a registered query. Here's where we can play a clever trick. Instead of building our inverted index from the documents, we can instead build the index from the queries themselves.

As a simple example, suppose a user wants to see all the tweets that contain the word "llama" and "pajamas". To add this query to the inverted index we would:
1. Create an identifier for the query, say "q1".
2. If "llama" is in the inverted index add "q1" to the list of keys at "llama". Otherwise, initialize "llama" in the index with a list containing "q1".
3. If "pajamas" is in the inverted index add "q1" to the list of keys at "pajamas". Otherwise, initialize "pajamas" in the index with a list containing "q1".
As tweets arrive in the stream, the query engine will break the text into tokens and then query engine would return the intersection of all the list values whose key is a token in the inverted index.

Fortunately, there are already several existing tools which can be used to build an inverted query index:
• Elasticsearch percolators is a standard feature of Elasticsearch that allows us to index queries and "percolate" documents.
• Luwak for Lucene is a Lucene module that uses significant pre-filtering to optimize matching against an inverted query index. Speed performance compared to percolators can be very significant.

# Architecture

Now that we've got the basic tools for streaming search (Elasticsearch-Percolators or Lucene-Luwak), lets describe the architecture for the platform. The Straw platform is made up of the following components:
• A streaming text source, such as the Twitter firehose, which emits a continuous stream of JSON documents
• An Apache Kafka cluster, which handles ingestion from the text stream
• An Apache Storm cluster, which distributes computation across multiple search engine workers
• A Redis server, which provides a PUBSUB framework to collect and distribute matches to subscribed users
• One or more clients, who submit queries and listen for matches on behalf of users

## Streaming Sources

The Twitter streaming API does not offer access to the firehose without special permission. To see how Straw would perform under firehose level load, we can instead use the sample endpoint to collect a large corpus of tweets. We can either store these tweets in a file or alternatively send them directly to the Kafka cluster's documents topic:

Alternatively, we can load tweets from a file into Kafka with a simple producer script:

To maintain a high load we can run multiple instances of this script and restart the script as soon as it finishes reading the file, using for example a supervisor.

Though the Straw project was designed for handling discrete JSON documents, by change the internal parsers it could be very easy to use other formats like XML. A more interesting challenge is handling continuous stream data, such as audio transcriptions. In this case, several strategies could be tried. For example, we could detect sentence breaks and treat each detected break as a separate document in the stream.

## Kafka

The Kafka cluster has two topics: documents and queries. The producer script above can be used to populate the documents topic. The frontend client populates the query topic with user subscriptions. In production, I found a 5 node Kafka cluster could easily accommodate Twitter level volume. For the documents topic, I used a partition factor of 5 and a replication factor of 2. While high availability is very important to accommodate the volume of the stream, document loss may not be a big concern. For queries, I used only 2 partitions with a replication factor of 3. Queries are infrequent so availability may not be important but query loss is not acceptable. Note that the partition factor should be less than or equal to the number of KafkaSpouts in our Storm topology, since each spout will consume from exactly one partition.

One other important Kafka configuration is in kafka.server.properties:
# The minimum age of a log file to be eligible for deletion
log.retention.hours=1

The Kafka default is 168 hours--far too big since you can easily fill a modestly sized disk under load. As messages should ideally be consumed in real-time, I recommend using the minimum value which is 1 hour. Note, however, that you may still need to ensure that you have a sufficiently large volume for the Kafka log. In production, I gave each Kafka node a 64GB volume with a 1 hour retention.

## Storm

The Storm topology implements KafkaSpouts for the documents and queries topics. In production, I used 5 document spouts and 3 query spouts (consistent with the Kafka partitioning). The bolts in the topology search the document stream and publish any matches to Redis. In production, I allocated a total of 6 workers. Sizing the cluster correctly proved to be somewhat challenging. I highly recommend this post which explains the key concepts of Storm parallelism. Also the Storm built-in UI can be helpful for monitoring and understanding how the cluster is performing.

In the most basic scenario, we assume that the number of queries is small and can fit easily into memory on a single machine. Then scaling to the volume of the stream is quite easy. The idea is to give each bolt a complete copy of the in memory Lucene-Luwak index (remember that the queries are what's being indexed here). So each time a user registers a new query, we must broadcast it to all of the bolts in the topology to maintain the local query index. When a document arrives from the stream, we can then randomly assign it to any bolt since each bolt has a full copy of the query index. To handle failover, we can also keep a global copy of the all the queries, so that if a bolt dies we can replace it with a new one and populate its index from the global store. This Java snippet defines such a topology:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("query-spout", new KafkaSpout(query_spout_config), 3);
builder.setSpout("document-spout", new KafkaSpout(document_spout_config), 5);
builder.setBolt("search-bolt", new LuwakSearchBolt(), 5)
.allGrouping("query-spout")
.shuffleGrouping("document-spout");


Since this platform is intended to be multiuser and multitenant, we can easily imagine a situation where the number of queries can't practically fit in memory on a single bolt. In this case, we can add another layer of bolts to the Storm topology:

Here the complete query index is partitioned across a small cluster of bolts. Incoming queries are broadcast to the fan bolts. Each fan bolt will then randomly choose one Lucene worker to index that query. Documents from the stream can be shuffled among the fan bolts. Each fan bolt must then broadcast the document so that each Lucene bolt can check the document against its partition of the index.

If we use Percolators instead of Luwak then each bolt contains an Elasticsearch client. In this case, it is a good idea to collocate the Elasticsearch cluster with the search bolts and to use high replication so as to minimize network overhead. Note that Percolator queries are also stored in-memory, so we still face difficulties as the number of queries becomes large.

## Redis

Redis is most commonly used as an in-memory application cache, but it also has a simple and elegant publish-subscribe framework. Here's an example of pubsub using the Redis-cli:

In a terminal A, listeners subscribe to a topic:
127.0.0.1:6379> SUBSCRIBE "llama-topic"

In a separate terminal B, the publisher publishes to the topic:
127.0.0.1:6379> PUBLISH "llama-topic" "llamas love to wear pajamas"

Back in terminal A, the subscriber receives the message:
1) "message"
2) "llama-topic"
3) "llamas love to wear pajamas"

That's all there is to it. All standard Redis clients expose an API to interact with the PUBSUB framework.

When a user registers a query in the Straw platform, here's what happens:
1. The client passes the query to the Kafka queries topic.
2. The client computes the MD5 hash of the query which will be the ID for the query.
3. The client subscribes the user to the computed ID in Redis PUBSUB.
4. The Storm cluster receives the query from the Kafka spout and broadcasts it to the Lucene bolts
5. Each bolt computes the MD5 hash of the query and registers the query with Luwak using the hash as the query ID
6. When a bolt receives a document, it uses Luwak to check if the document matches any query in the index. If Luwak finds a match, it will return one or matching IDs. For each ID returned by Luwak, the bolt will use Redis PUBSUB to publish the original document using the ID as the topic.
7. Subscribed clients will receive documents as they are published to Redis.
Using the hash as the query ID allows two or more users to subscribe to the same query while only needing to actually index a single query.

## Clients

A client for Straw has the following duties:
1. Manage users. In particular, it must keep track of which users have subscribed to which queries
2. Push user queries to Kafka and subscribe to queries in Redis
3. Listen for responses from queries
The Straw platform comes packaged with a default client which is a simple Python Flask webserver. The webserver is sessionized so that users can follow particular queries. The server implements a basic Kafka producer to publish queries to Kafka and Redis keeps track of the list of subscribed query IDs for each user. The listening is handled by a single background thread that holds a Redis client subscribed to all unique queries across the entire set of active users. When a query ID and document pair are found, the background thread queries Redis to find which users are subscribed to that query ID. It will then copy the document text to a result pool for each subscribed user. The user interface will checks the user's pool for updates every half-second so that results stream into the console. Here is a video of UI in action:

# Benchmarks and Conclusions

One goal of the Straw project was to compare and measure performance of Elasticsearch-Percolators vs. Lucene-Luwak. Measuring this performance is not completely straightforward. I used the following very basic approach to measuring throughput:
• Fill Kafka's documents topic with a very large number of documents
• Add a fixed number of reasonably complex queries to the Kafka query topic
• Start the Kafka cluster
• Each worker Bolt has a counter and a stopwatch running in a background thread
• Each time a document is passed to Lucene and response (empty or non-empty) is recieved, increment the counter
• When the stopwatch reaches 10 seconds, publish the value of the counter to a special Redis topic e.g. "benchmark". Set the counter to 0 and restart the stopwatch
By monitoring the benchmark channel in Redis, we can then track the search throughput of the system. Pictured below are density plots for estimated total throughput per second obtained by running this procedure for several hours:

• In both cases, Lucene-Luwak strongly outperforms Elasticsearch-Percolators. However, the Elasticsearch cluster I used was not especially optimized for this experiment. I suspect that a portion of the differential would disappear if more effort was made to localize the Elasticsearch index to the search bolts
• As the number of queries increases we see significant reduction in the throughput. It would be very interesting to see if the fan bolt solution described above would improve this performance
• The variance of throughput is very high, particularly for Luwak
• In the small query case, we are easily accommodating average twitter level volume; for large queries we are close and could likely scale horizontally to obtain a solution
• The queries used here are available in the straw repository. I generated these by computing bigram frequency across a sample of 40M English language tweets and keeping the most frequent bigrams. It would be interesting to evaluate performance with more complex queries
• The documents here are tweets which are limited to 140 characters. It would be interesting to evaluate performance with longer text sources

## Sunday, March 8, 2015

### Sparse Quadratic Programming with Ipoptr

This post is a follow up to my last post on quadratic programming facilities in R. A commenter pointed me to the ipoptr project which exposes an R interface to the COIN-OR optimization routine Ipopt. COIN-OR is a suite of optimization utilities implemented in C++ and supported by a back-end of configurable FORTRAN linear system solvers. ipoptr may be a good solution for users wishing to solve sparse nonlinear constrained optimization problems through an R frontend. Some highlights of this solver are:
1. It is a general interior point solver that can handle nonlinear objectives with nonlinear constraints. In particular, no convexity assumptions are required to obtain local solutions.
2. It offers a flexible and lightweight specification of the objective function and a sparse matrix representation for the constraints and other problem data.
In this post, I'll explain how ipoptr can be applied to solve quadratic programs and I'll compare the performance of this solver to other quadratic program solvers (quadprog, ipop) available in R. We'll see that ipoptr is very fast and efficient on large sparse quadratic programs, seemingly an order of magnitude faster than quadprog on the demonstration problem considered in my previous post. Because the Ipopt backend is a bit tricky to install, the last section provides a detailed overview of how I successfully built this solver under Ubuntu Linux.

# The ipoptr interface

Before applying ipoptr to quadratic programming, it may be helpful to present a high-level picture of the ipoptr interface. The following is a summary of the ipoptr interface drawn from the documentation. ipoptr stages and invokes an interior point algorithm from within R to find a local solution of the following constrained optimization problem: \begin{aligned} \underset{\alpha \in \mathbb{R}^n}{\text{Minimize}}: \qquad & f(x) \\ \text{Subject to:} \qquad & g_L \leq g(x) \leq g_U \\ & x_L \leq x \leq x_U \end{aligned} where:
• $f:\mathbb{R}^n \to \mathbb{R}$ the objective function is twice continuously differentiable
• $g:\mathbb{R}^n \to \mathbb{R}^m$ defines the constraint set and is twice continuously differentiable
• $x_U$, and $x_L$ are fixed vectors in $\mathbb{R}^n$
• $g_U$, and $g_L$ are fixed vectors in $\mathbb{R}^m$
Note that the solver does not need to make any assumptions about the convexity of either $f$ or $g$. The solver makes use of the following ingredients to find a solution:
1. An initial guess $x_0$ for the solution
2. The objective function gradient $\nabla f$
3. the Jacobian of the constraint map $J_g$
4. Optionally, the Hessian of the problem Lagrangian in the form: $$H(x, \sigma_f, \vec{\lambda}) = \sigma_f \nabla^2 f(x) + \sum_{i=1}^m \lambda_i \nabla^2 g_i(x).$$
When $n$ is large, the dense matrix representations of $g$, $\nabla f$, and $J_g$ will have a substantial footprint. In application problems, however, the Jacobian and/or the Hessian object will often be sparse. For efficiency, ipoptr utilizes a sparse matrix representation for $H$ and $J_g$. This format is defined as follows. Let $A_{\mathrm{values}}$ be the list of all non-zero values in $A$ (read into the list from left to right along the rows of $A$). Then let $A_{\mathrm{mask}}$ be a list of lists, where the list at position $i$ in $A_{\mathrm{mask}}$ provides the column indexes of the nonzero elements in the $i$-th row of $A$. Then $(A_{\mathrm{values}}$, $A_{\mathrm{mask}})$ is a sparse representation of $A$.

Generally, it could be quite difficult to come up with the explicit representations of the Jacobian and Hessian that ipoptr consumes. Although the Hessian is an optional argument, including it can significantly improve convergence behaviour (try taking the Hessian arguments out in the examples below). For a quadratic program, however, it is simple to compute the Jacobian and Hessian directly. We'll do this in the next section and then provide an R wrapper function that can transform any symmetric positive definite quadratic program in standard matrix form into a quadratic program that can be solved by ipoptr.

# Solving Quadratic Programs with ipoptr

Consider the quadratic program: \begin{aligned} \underset{\alpha \in \mathbb{R}^n}{\text{Minimize}}: \qquad & f(x) = -d^Tx + \frac{1}{2}x^T D x \\ \text{Subject to:} \qquad & g(x) = Ax \leq b \end{aligned} where
• $D$ is an $n \times n$ matrix (which we'll assume is symmetric)
• $A$ is an $m \times n$ matrix.
• $d$ is a fixed vector in $\mathbb{R}^n$ and $b$ is a fixed vector in $\mathbb{R}^m$.
Then using some basic identities of vector calculus and the symmetry of $D$: \begin{aligned} \nabla f(x) &= -d + \frac{1}{2}(D^T + D) x = -d + Dx \\ \nabla^2 f(x) &= D \\ J_g(x) &= A^T \\ \nabla^2 g_i(x) &= 0 \mbox{ for all } i \\ H(x, \sigma_f, \vec{\lambda}) &= \sigma_f D. \end{aligned} Because $D$ is positive definite, the global minimizer $\tilde{x}$ of $f$ can be computed as: $$\nabla f(\tilde{x}) = 0 \rightarrow \tilde{x} = D^{-1} d.$$ Goldfarb and Idnani, whose QP algorithm is implemented in the quadprog package, use the global minimizer $\tilde{x}$ as the initial point for their interior point primal/dual method. This choice is based on empirical evidence cited by the authors that this selection can substantially reduce the number of iterations required to find a solution. We'll follow suit with ipoptr by setting $x_0 =\tilde{x}$ for our initial guess.

The following R function uses these facts to solve a quadratic program in standard form with ipoptr:

# Performance Comparison

In my previous post, I compared the performance of quadprog and kernlab's ipop solvers on the circus tent demonstration problem. For this post, I repeated the same experiment with the ipoptr solver and the results were very impressive. Ipoptr was substantially faster at solving this sparse demonstration problem than quadprog. Here are the solve times I obtained:

   problem.size quadprog.time ipop.time ipoptr.time
1            64         0.002     0.024       0.010
2           256         0.048     0.710       0.034
3           576         0.518     4.759       0.126
4          1024         2.903    23.430       0.414
5          1600        10.815    77.708       0.855
6          2304        31.899   232.355       1.819
7          3136        79.749   544.193       3.318
8          4096       176.743  1521.225       8.297
9          5184       354.839  2543.754       9.039
10         6400       664.804  5053.922      19.551


Here is the experiment code:

It's not so surprising that the ipoptr package is substantially faster in this example because the matrix Dmat that defines the objective function for the tent problem is very sparse. Quadprog requires dense matrix representations to describe the QP that we wish to solve, while in ipoptr we need only specify the functional form the quadratic objective function. Here is the plot of Dmat's sparsity structure:

To investigate the performance of ipoptr on dense QPs, I generated some random positive definite QPs of differing sizes and compared solve times against the solver from quadprog and the ipop solver from the kernlab package. In this dense problem case, quadprog appears to have a significant advantage over ipoptr. Still, ipoptr shows solid performance and is considerably faster than the pure R implementation of the interior point solver found in kernlab's ipop.

  problem.size quadprog.time ipop.time ipoptr.time
1          500         0.068     1.942       0.294
2         1500         0.909    35.352       5.374
3         2500         4.598   155.138      23.160
4         3500        13.186   414.762      60.888
5         4500        28.419   868.141     126.278
6         5500        49.608  1570.674     226.914
7         6500        81.756  2574.962     369.469
8         7500       128.451  3937.629     563.236


Here is the code for the randomly generated QP experiment:

In this post, we looked at how the ipoptr interface can be used to solve quadratic programming problems and compared the Ipopt solver to other quadratic program solvers available in R. We found:
• ipoptr is very efficient and well-suited for solving large, sparse quadratic programs, even outperforming the quadprog package solver in this case.
• However, for dense symmetric positive definite QPs, quadprog is much faster than ipoptr.
• Both solvers are substantially faster than the pure R interior point solver ipop from the kernlab package.
• For the experiments in this post, Ipopt was configured to use the MUMPS linear solver. It is possible to configure Ipopt to use other linear solvers and this could have an impact on performance.
• Initializing Ipopt with $x_0$ as the global minimum of the objective function (as done internally in the quadprog package) seems to be somewhat beneficial for reducing runtimes. This effect should be investigated more carefully, however.
• The timings presented above do not include the time required to translate from a QP specified in matrix format (e.g. Dmat, Amat, etc.) to the input format required by ipoptr. In practice, however, this step is rather expensive and users will probably want to generate the ipoptr input data directly instead of forming dense matrix inputs and then converting to the ipoptr format. The specifics of how this should be done will depend on the particular problem which the user is trying to solve.

# Installing ipoptr for Ubuntu Linux

Because of complex licensing issues in COIN-OR suite and its many dependencies, it's not possible to bundle the backend ipopt solver and its R interface into a simple package installable directly from R. Thus, installing ipoptr won't be as easy as installing most other R pacakges. Fortunately, the CoinOpt documentation and the R interface documentation are fairly complete.

After a bit of work, I was able to install everything on a Ubuntu-Linux box using the following steps.
#  Change into directory where you want to put the ipoptr source and build path
cd ~/projects/

# install dependencies, including latest blas and lapack.
sudo apt-get update
sudo apt-get install gcc g++ gfortran subversion patch wget
sudo apt-get install libblas3 liblapack3

# get the project -- note, will need to accept SSL cert
svn co https://projects.coin-or.org/svn/Ipopt/stable/3.12 CoinIpopt

# get and build Mumps linear solver
cd CoinIpopt/ThirdParty/Mumps
./get.Mumps
cd ../..

# build and install Ipopt; it's a good idea to inspect the output of make test
# With this setup, AMPL test will fail (ok) but all the other tests should pass.
mkdir build
cd build
../configure
make
make test
make install

# Move RInterface Makevars from build to src directory
cp Ipopt/contrib/RInterface/src/Makevars ../Ipopt/contrib/RInterface/src

Now, we can fire up R and install the package from source using:
install.packages("~/projects/CoinIpopt/Ipopt/contrib/RInterface",
repos=NULL, type="source")


## Tuesday, February 10, 2015

### More on Quadratic Programming in R

This post is another tour of quadratic programming algorithms and applications in R. First, we look at the quadratic program that lies at the heart of support vector machine (SVM) classification. Then we'll look at a very different quadratic programming demo problem that models the energy of a circus tent. The key difference between these two problems is that the energy minimization problem has a positive definite system matrix whereas the SVM problem has only a semi-definite one. This distinction has important implications when it comes to choosing a quadratic program solver and we'll do some solver benchmarking to further illustrate this issue.

# QP and SVM

Let's consider the following very simple version of the SVM problem. Suppose we have observed $y_i \in \{-1,1\}$, $X_i \in \mathbb{R}^{m}$ for $n$ (perfectly linearly separable) training cases. We let $y$ denote the $n \times 1$ vector of training labels and $X$ the $n \times m$ matrix of predictor variables. Our task is to find the hyperplane in $\mathbb{R}^m$ which "best separates" our two classes of labels $-1$ and $1$. Visually:
library("e1071")
library("rgl")
train <- iris
train$y <-ifelse(train[,5]=="setosa", 1, -1) sv <- svm(y~Petal.Length+Petal.Width+Sepal.Length, data=train, kernel="linear", scale=FALSE, type="C-classification") W <- rowSums(sapply(1:length(sv$coefs), function(i) sv$coefs[i]*sv$SV[i,]))
plot3d(train$Petal.Length, train$Petal.Width,
train$Sepal.Length, col= ifelse(train$y==-1,"red","blue"),
size = 2, type='s', alpha = .6)