Last week I ran into a an ugly problem of Scalding:
I needed to read a really large table from MySQL to process it in a certain job. In generall this is trivial: just use a JDBC Source, select your columns and that’s it.
Usually we do this by using 1-3 parallel connections to the SQL-server. This time I started running out of memory because scalding didn’t (more precicely: couldn’t) swap/spill to disk. The problem here is the default behaviour of the mysql-connector. The api docs says:
By default, ResultSets are completely retrieved and stored in memory. In most cases this is the most efficient way to operate, and due to the design of the MySQL network protocol is easier to implement. If you are working with ResultSets that have a large number of rows or large values, and can not allocate heap space in your JVM for the memory required, you can tell the driver to stream the results back one row at a time.
So, what does this mean: If you query a 10 GB table, you get all the data and the connector tries to buffer it in memory – which is a bad idea if you just want to process tuple by tuple. You can then split this large query into 10 smaller ones: SELECT ... FROM ... LIMIT 0, x, SELECT ... FROM ... LIMIT x+1, y, … etc. This works – but partitioning a large result this way is not very efficient because starting from the second query, MySQL has to iterate over x rows until it can start gathering and returning results. So you partition the big query into 10 smaller results but you put quite a lot of load to the server. And over all you still have to keep a lot of results in RAM.
The better solution would be to use just one or two connections that stream the rows directly into the cascading/scalding job. The framework can then decide whether it can process the data or if it needs to spill to disk.
The solution seems dead easy: simply turn on streaming! The Api docs even shows how to do it:
stmt = conn.createStatement(java.sql.ResultSet.TYPE_FORWARD_ONLY, 
      java.sql.ResultSet.CONCUR_READ_ONLY);
stmt.setFetchSize(Integer.MIN_VALUE);
Looking at the cascading source, the statement is already created in the right way. But it’s not that easy to get the statement before the query is being submitted (at least if you want to avoid dirty hacks!).
So I invested a couple of hours fiddling around with gradle, cascading, scalding and sbt to get it done the right way. As I saw this other issue describing exactly the same problem, I made a pull request which is currently under review accepted and merged into the 2.5.5 branch – the tests in my local jobs already work like a charm!
If you’re using Scalding, you can easily use the modified MySqlScheme by doing the following steps:
- Clone my fork repothe official 2.5.5 branch.
- ensure you have gradle 1.x (>= 8) installed
- build cascading and install it into your local repo:
 gradle install-Dcascading.jdbc.url.mysql="jdbc:mysql://a-host/a-db?user=&password=" -i -x test
 -x testdisables the tests (which saves ~10min).
- add your local repo to the source repositories in your project
- add the dependency to your project: cascading cascading-jdbc-mysql 2.5.5. If you want a different version number, just changeversion.propertiesaccordingly.
To really use the streaming, you have to initialize the MySqlScheme on your own. In Scalding, this can be done for example using this code (Scalding 0.9):
abstract class StreamingJDBCSource extends JDBCSource {
  override val maxConcurrentReads = 1
  override protected def getJDBCScheme = new MySqlScheme(
    classOf[MySqlDBInputFormat[DBWritable]],  // inputFormatClass
    columnNames.toArray,
    null,  // orderBy
    filterCondition.getOrElse(null),
    updateBy.toArray,
    false // replace on Insert
  )
}
That’s it. Your usual JdbcSources were probably extending JDBCSource. Just change them to extend StreamingJDBCSource and you are done. Now your mappers require less RAM and you can cut down the amount of parallel connections for each single source.