Scalding hiding NPEs in “operator Each failed executing operation”

Yesterday I was surprised by a failing Scalding task. Everything worked fine locally and all I git was like “job failed, see cluster log”. In the cluster log I saw the following:

2014-10-24 14:38:41,222 INFO org.apache.hadoop.mapred.TaskInProgress: Error from attempt_201410101555_2230_m_000005_3: cascading.pipe.OperatorException: [com.twitter.scalding.T…][com.twitter.scalding.RichPipe.each(RichPipe.scala:471)] operator Each failed executing operation
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:107)
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:39)
at cascading.flow.stream.FunctionEachStage$1.collect(FunctionEachStage.java:80)
at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:145)
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:133)
at cascading.operation.Identity$2.operate(Identity.java:137)
at cascading.operation.Identity.operate(Identity.java:150)
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:99)
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:39)
at cascading.flow.stream.SourceStage.map(SourceStage.java:102)
at cascading.flow.stream.SourceStage.run(SourceStage.java:58)
at cascading.flow.hadoop.FlowMapper.run(FlowMapper.java:130)

Continue reading Scalding hiding NPEs in “operator Each failed executing operation”

Enable output compression in Scalding

I just wanted to enable final output compression in one of my Scalding jobs (because I needed to reorganize a some-TB-data set).

Unfortunately scalding always produced uncompressed files. After some googling, I came across a github issue that adressed exactly this problem. Via some links I got the sample code from this repo which can be used to write compressed TSVs.

Continue reading Enable output compression in Scalding

Scalding Exception: diverging implicit expansion for type com.twitter.algebird.Semigroup[T]

I was just doing a again some scalding jobs and again got an .. interesting exception:

In a groupBy operation, I wanted to sum something up using:

.groupBy('a) {
  _.sum('a -> 'c)
}

And was rewarded with this one:

[error] example.scala:20: diverging implicit expansion for type com.twitter.algebird.Semigroup[T]
[error] starting with method eitherSemigroup in object Semigroup
[error]       _.sum('a -> 'c)
[error]            ^
[error] one error found
[error] (compile:compile) Compilation failed

WTF??

Solution:

Spot the mistake? It’s the missing type hint at sum:

.groupBy('a) {
  _.sum<strong>[Int]</strong>('a -> 'c)  //  <-- [Int]
}

Scalding: unable to compare stream elements in position: 0

I’m currently working quite a bit with Twitter’s Scalding.
Recently I split up a job into sub-jobs and suddenly got an Exception in my join:

Caused by: cascading.CascadingException: unable to compare stream elements in position: 0

If I had remembered the Fields API in detail, I would have thought about this paragraph (it’s about sorting, but the consequence is the same):

Note: When reading from a CSV, the data types are set to String,hence the sorting will be alphabetically, therefore to sort by age, an int, you need to convert it to an integer. For example …

Solution:

Ensure you are joining the correct data types and possibly convert them before. For example:

.map ('myField-> 'myField) {x:Int => x}

Enable MySQL Streaming in Cascading / Scalding

Last week I ran into a an ugly problem of Scalding:
I needed to read a really large table from MySQL to process it in a certain job. In generall this is trivial: just use a JDBC Source, select your columns and that’s it.

Usually we do this by using 1-3 parallel connections to the SQL-server. This time I started running out of memory because scalding didn’t (more precicely: couldn’t) swap/spill to disk. The problem here is the default behaviour of the mysql-connector. The api docs says:

By default, ResultSets are completely retrieved and stored in memory. In most cases this is the most efficient way to operate, and due to the design of the MySQL network protocol is easier to implement. If you are working with ResultSets that have a large number of rows or large values, and can not allocate heap space in your JVM for the memory required, you can tell the driver to stream the results back one row at a time.

So, what does this mean: If you query a 10 GB table, you get all the data and the connector tries to buffer it in memory – which is a bad idea if you just want to process tuple by tuple. You can then split this large query into 10 smaller ones: SELECT ... FROM ... LIMIT 0, x, SELECT ... FROM ... LIMIT x+1, y, … etc. This works – but partitioning a large result this way is not very efficient because starting from the second query, MySQL has to iterate over x rows until it can start gathering and returning results. So you partition the big query into 10 smaller results but you put quite a lot of load to the server. And over all you still have to keep a lot of results in RAM.

Continue reading Enable MySQL Streaming in Cascading / Scalding