Distributing the Singular Value Decomposition with Spark

[Post originally appeared on the Databricks Developer Blog]


The Singular Value Decomposition (SVD) is one of the cornerstones of linear algebra and has widespread application in many real-world modeling situations. Problems such as recommender systems, linear systems, least squares, and many others can be solved using the SVD. It is frequently used in statistics where it is related to principal component analysis (PCA) and to correspondence analysis, and in signal processing and pattern recognition. Another usage is latent semantic indexing in natural language processing.

Decades ago, before the rise of distributed computing, computer scientists developed the single-core ARPACK package for computing the eigenvalue decomposition of a matrix. Since then, the package has matured and is in widespread use by the numerical linear algebra community, in tools such as SciPy,GNU Octave, and MATLAB.

An important feature of ARPACK is its ability to allow for arbitrary matrix storage formats. This is possible because it doesn’t operate on the matrices directly, but instead acts on the matrix via prespecified operations, such as matrix-vector multiplies. When a matrix operation is required, ARPACK gives control to the calling program with a request for a matrix-vector multiply. The calling program must then perform the multiply and return the result to ARPACK.

By using the distributed-computing power of Spark, we can distribute the matrix-vector multiplies, and thus exploit the years of numerical computing expertise that have gone into building ARPACK, and the years of distributing computing expertise that have gone into Spark.

Since ARPACK is written in Fortran77, it cannot immediately be used on the Java Virtual Machine. However, through the netlib-java and breeze interfaces, we can use ARPACK on the JVM. This also means that low-level hardware optimizations can be exploited for any local linear algebraic operations. As with all linear algebraic operations within MLlib, we use hardware acceleration whenever possible.

We are building the linear algebra capabilities of MLlib. Currently in Spark 1.0 there is support for Tall and Skinny SVD and PCA, and as of Spark 1.1, we will have support for SVD and PCA via ARPACK.

Example Runtime

A very popular matrix in the recommender systems community is the Netflix Prize Matrix. The matrix has 17,770 rows, 480,189 columns, and 100,480,507 non-zeros. Below we report results on several larger matrices (up to 16x larger) we experimented with at Twitter.

With the Spark implementation of SVD using ARPACK, calculating wall-clock time with 68 executors and 8GB memory in each, looking for the top 5 singular vectors, we can factorize larger matrices distributed in RAM across a cluster, in a few seconds.

Matrix size Number of nonzeros Time per iteration (s) Total time (s)
23,000,000 x 38,000 51,000,000 0.2 10
63,000,000 x 49,000 440,000,000 1 50
94,000,000 x 4,000 1,600,000,000 0.5 50

Apart from being fast, SVD is also easy to run. Here is a code snippet showing how to run it on sparse data loaded from a text file.

Using Twitter to measure earthquake impact in almost real time

[Post originally went up at the Twitter Engineering Blog]

At Twitter, we know that Tweets can sometimes travel as fast as an earthquake. We were curious to know just how accurate such a correlation might be, so we collaborated with Stanford researchers to model how Tweets can help create more accurate ShakeMaps, which provide near-real-time maps of ground motion and shaking intensity following significant earthquakes.

These maps are used by federal, state and local organizations, both public and private, for post-earthquake response and recovery, public and scientific information, as well as for preparedness exercises and disaster planning.

Currently, ShakeMaps produced by the U.S. Geological Survey represent the state-of-the-art rapid shaking intensity estimation. When an earthquake happens, a ShakeMap is typically produced in a matter of minutes using a combination of recordings, a simple ground motion prediction equation, and geological site correction factors. As time progresses, the ShakeMap is continually updated as new information becomes available, including “did you feel it?” data — qualitative first-hand accounts of the earthquake collected via online surveys.

To help improve the accuracy of ShakeMaps, we used all geo-tagged Tweets around the world containing the keyword “earthquake” or “tsunami” in several languages that occurred in the first 10 minutes following Japanese earthquakes of magnitude 6 or greater from 2011 to 2012. We found that the model with lowest error was based on a combination of earthquake and Tweet-based features, such as local site conditions, source-to-site distance and the number of Tweets within a certain radius. Ground shaking intensity estimates from our model are comparable with historical recordings and conventional estimates provided, for example, by USGS ShakeMaps.

Figure 1. For the Tohoku (c0001xgp) earthquake: (a) number of geo-tagged Tweets containing an earthquake keyword per minute after the event, (b) distance between each Tweet and the epicenter as a function of time, and (c) map showing the number of Tweets, where the star represents the epicenter location.

Figure 1 shows Tweet activity following a significant earthquake. From these, we generated 8 Tweet-based features, each considered within varying radii of a recording station. They included: count of Tweets, average number of tokens, average number of characters, average index of first mention of an earthquake keyword, average number of appearances of earthquake keywords, average number of exclamation points, average number of dashes and average number of appearances of a selected Japanese keyword. With these features, we trained several models, including a simple linear regression, elastic net regression and k-Nearest-Neighbors regression. For a more thorough description, you can read this paper, which we’ll present in July at theNational Conference on Earthquake Engineering.

Reference:
Rapid Estimate of Ground Shaking Intensity by Combining Simple Earthquake Characteristics with Tweets
Mahalia Miller, Lynne Burks, Reza Zadeh
Tenth U.S. National Conference on Earthquake Engineering

Efficient similarity algorithm now in Spark, thanks to Twitter

[This post originally appeared on the Databricks Engineering Blog]

Our friends at Twitter have contributed to MLlib, and this post uses material from Twitter’s description of its open-source contribution, with permission. The associated pull request is slated for release in Spark 1.2.


Introduction

We are often interested in finding users, hashtags and ads that are very similar to one another, so they may be recommended and shown to users and advertisers. To do this, we must consider many pairs of items, and evaluate how “similar” they are to one another.

We call this the “all-pairs similarity” problem, sometimes known as a “similarity join.” We have developed a new efficient algorithm to solve the similarity join called “Dimension Independent Matrix Square using MapReduce,” or DIMSUM for short, which made one of Twitter’s most expensive batch computations 40% more efficient.

To describe the problem we’re trying to solve more formally, when given a dataset of sparse vector data, the all-pairs similarity problem is to find all similar vector pairs according to a similarity function such as cosine similarity, and a given similarity score threshold.

Not all pairs of items are similar to one another, and yet a naive algorithm will spend computational effort to consider even those pairs of items that are not very similar. The brute force approach of considering all pairs of items quickly breaks, since its computational effort scales quadratically.

For example, for a million vectors, it is not feasible to check all roughly trillion pairs to see if they’re above the similarity threshold. Having said that, there exist clever sampling techniques to focus the computational effort on only those pairs that are above the similarity threshold, thereby making the problem feasible. We’ve developed the DIMSUM sampling scheme to focus the computational effort on only those pairs that are highly similar, thus making the problem feasible.

Intuition

The main insight that allows gains in efficiency is sampling columns that have many non-zeros with lower probability. On the flip side, columns that have fewer non-zeros are sampled with higher probability. This sampling scheme can be shown to provably accurately estimate cosine similarities, because those columns that have many non-zeros have more trials to be included in the sample, and thus can be sampled with lower probability.

There is an in-depth description of the algorithm on the Twitter Engineering blog post.

Experiments

We run DIMSUM on a production-scale ads dataset. Upon replacing the traditional cosine similarity computation in late June, we observed 40% improvement in several performance measures, plotted below.

Usage from Spark

The algorithm is available in MLlib as a method in RowMatrix. This makes it easy to use and access:

// Arguments for input and threshold
val filename = args(0)
val threshold = args(1).toDouble

// Load and parse the data file.
val rows = sc.textFile(filename).map { line =>
val values = line.split(‘ ‘).map(_.toDouble)
Vectors.dense(values)
}
val mat = new RowMatrix(rows)

// Compute similar columns perfectly, with brute force.
val simsPerfect = mat.columnSimilarities()

// Compute similar columns with estimation using DIMSUM
val simsEstimate = mat.columnSimilarities(threshold)

Here is an example invocation of DIMSUM. This functionality will be available as of Spark 1.2.

Additional information can be found in the GigaOM article covering the DIMSUM algorithm.

All-pairs similarity via DIMSUM

[Originally posted on the Twitter Engineering Blog]

We are often interested in finding users, hashtags and ads that are very similar to one another, so they may be recommended and shown to users and advertisers. To do this, we must consider many pairs of items, and evaluate how “similar” they are to one another.

We call this the “all-pairs similarity” problem, sometimes known as a “similarity join.” We have developed a new efficient algorithm to solve the similarity join called “Dimension Independent Matrix Square using MapReduce,” or DIMSUM for short, which made one of our most expensive computations 40% more efficient.

Introduction

To describe the problem we’re trying to solve more formally, when given a dataset of sparse vector data, the all-pairs similarity problem is to find all similar vector pairs according to a similarity function such as cosine similarity, and a given similarity score threshold.

Not all pairs of items are similar to one another, and yet a naive algorithm will spend computational effort to consider even those pairs of items that are not very similar. The brute force approach of considering all pairs of items quickly breaks, since its computational effort scales quadratically.

For example, for a million vectors, it is not feasible to check all roughly trillion pairs to see if they’re above the similarity threshold. Having said that, there exist clever sampling techniques to focus the computational effort on only those pairs that are above the similarity threshold, thereby making the problem feasible. We’ve developed the DIMSUM sampling scheme to focus the computational effort on only those pairs that are highly similar, thus making the problem feasible.

In November 2012, we reported the DISCO algorithm to solve the similarity join problem using MapReduce. More recently, we have started using a new version called DIMSUMv2, and the purpose of this blog post is to report experiments and contributions of the new algorithm to two open-source projects. We have contributed DIMSUMv2 to the Spark and Scalding open-source projects.

The algorithm

First, let’s lay down some notation: we’re looking for all pairs of similar columns in an m x n matrix whose entries are denoted a_ij, with the i’th row denoted r_i and the j’th column denoted c_j. There is an oversampling parameter labeled ɣ that should be set to 4 log(n)/s to get provably correct results (with high probability), where s is the similarity threshold.

The algorithm is stated with a Map and Reduce, with proofs of correctness and efficiency in published papers [1] [2]. The reducer is simply the summation reducer. The mapper is more interesting, and is also the heart of the scheme. As an exercise, you should try to see why in expectation, the map-reduce below outputs cosine similarities.

The mapper above is more computationally efficient than the mapper presented in [1], in that it tosses fewer coins than the one presented in [1]. Nonetheless, its proof of correctness is the same as Theorem 1 mentioned in [1]. It is also more general than the algorithm presented in [2] since it can handle real-valued vectors, as opposed to only {0,1}-valued vectors. Lastly, this version of DIMSUM is suited to handle rows that may be skewed and have many nonzeros.

Experiments

We run DIMSUM daily on a production-scale ads dataset. Upon replacing the traditional cosine similarity computation in late June, we observed 40% improvement in several performance measures, plotted below.

Open source code

We have contributed an implementation of DIMSUM to two open source projects: Scalding and Spark.

Scalding github pull-request: https://github.com/twitter/scalding/pull/833
Spark github pull-request: https://github.com/apache/spark/pull/336

Collaborators

Thanks to Kevin Lin, Oscar Boykin, Ashish Goel and Gunnar Carlsson.

References

[1] Bosagh-Zadeh, Reza and Carlsson, Gunnar (2013), Dimension Independent Matrix Square using MapReduce, arXiv:1304.1467

[2] Bosagh-Zadeh, Reza and Goel, Ashish (2012), Dimension Independent Similarity Computation, arXiv:1206.2082

Dimension Independent Similarity Computation (DISCO)

[This blog post original went up on Twitter Engineering Blog]

MapReduce is a programming model for processing large data sets, typically used to do distributed computing on clusters of commodity computers. With large amount of processing power at hand, it’s very tempting to solve problems by brute force. However, we often combine clever sampling techniques with the power of MapReduce to extend its utility.

Consider the problem of finding all pairs of similarities between D indicator (0/1 entries) vectors, each of dimension N. In particular we focus on cosine similarities between all pairs of D vectors in R^N. Further assume that each dimension is L-sparse, meaning each dimension has at most L non-zeros across all points. For example, typical values to compute similarities between all pairs of a subset of Twitter users can be:

D = 10M
N = 1B
L = 1000

Since the dimensions are sparse, it is natural to store the points dimension by dimension. To compute cosine similarities, we can easily feed each dimension t into MapReduce by using the following Mapper and Reducer combination

Where #(w) counts the number of dimensions in which point w occurs, and #(w1, w2) counts the number of dimensions in which w1 and w2 co-occur, i.e., the dot product between w1 and w2. The steps above compute all dot products, which will then be scaled by the cosine normalization factor.

There are two main complexity measures for MapReduce: “shuffle size”, and “reduce-key complexity”, defined shortly (Ashish Goel and Kamesh Munagala 2012). It can be easily shown that the above mappers will output on the order of O(NL^2) emissions, which for the example parameters we gave is infeasible. The number of emissions in the map phase is called the “shuffle size”, since that data needs to be shuffled around the network to reach the correct reducer.

Furthermore, the maximum number of items reduced to a single key is at most #(w1, w2), which can be as large as N. Thus the “reduce-key complexity” for the above scheme is N.

We can drastically reduce the shuffle size and reduce-key complexity by some clever sampling:

Notation: p and ε are oversampling parameters.

In this case, the output of the reducers are random variables whose expectations are the cosine similarities. Two proofs are needed to justify the effectiveness of this scheme. First, that the expectations are indeed correct and obtained with high probability, and second, that the shuffle size is greatly reduced.

We prove both of these claims in (Reza Bosagh-Zadeh and Ashish Goel 2012). In particular, in addition to correctness, we prove that the shuffle size of the above scheme is only O(DL log(D)/ε), with no dependence on the “dimension” N, hence the name.

This means as long as you have enough mappers to read your data, you can use the DISCO sampling scheme to make the shuffle size tractable. Furthermore, each reduce key gets at most O(log(D)/ε) values, thus making the reduce-key complexity tractable too.

Within Twitter, we use the DISCO sampling scheme to compute similar users. We have also used the scheme to find highly similar pairs of words, by taking each dimension to be the indicator vector that signals in which Tweets the word appears. We further empirically verify the claims and observe large reductions in shuffle size, with details in the paper.

Finally, this sampling scheme can be used to implement many other similarity measures. For Jaccard Similarity, we improve the implementation of the well-known MinHash (http://en.wikipedia.org/wiki/MinHash) scheme on Map-Reduce.

Posted by
Reza Zadeh (@Reza_Zadeh) and Ashish Goel (@ashishgoel) – Personalization & Recommendation Systems Group and Revenue Group

Bosagh-Zadeh, Reza and Goel, Ashish (2012), Dimension Independent Similarity Computation, arXiv:1206.2082

Goel, Ashish and Munagala, Kamesh (2012), Complexity Measures for Map-Reduce, and Comparison to Parallel Computing,http://www.stanford.edu/~ashishg/papers/mapreducecomplexity.pdf