Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge develop into cascading3 #1776

Open
wants to merge 50 commits into
base: cascading3
Choose a base branch
from

Conversation

johnynek
Copy link
Collaborator

make sure the cascading branch is up to date with develop

cc @rubanm @fwbrasil

johnynek and others added 30 commits September 22, 2017 20:55
* remove a few warnings

* fix/suppress more warning

* fix some more warnings
* use the latest scala versions

* fix repl for 2.12.3
The `getCounter` method of the `Reporter` returned from `HadoopFlowProcess` was
returning null in some cases for a few jobs that we run in production. (It is
unclear why these jobs were seeing null counters.)

From looking at the Hadoop source code, getCounter does return null in some instances,
in particular the Reporter.NULL implementation unconditionally returns null from
its getCounter implementation. Hadoop does this despite not documenting that null
is a valid return value.

Solution: Null check the return value of `Reporter.getCounter` to workaround the issue.

Fixes #1716
)

* Adds more tests around the macros for trait edge cases and others

* Add a few fix, remove some warnings and warts

* Use blackbox macros

* Remove unneeded brackets

* Some warnings

* So macros fail under blackbox :(
* Implement Dagon.toLiteral

* reduce stack depth

* rename LitPipe to LiteralPipe

* respond to review comments
Use a null check rather than foreach
* Implement Dagon.toLiteral

* reduce stack depth

* Add generic TypedPipe optimization rules

* fix compilation error, add a few more rules

* fix serialization issue with 2.12

* Add tests of correctness to optimization rules

* add comments, improve some rules

* fix bug with outerjoin

* fix match error
* Implement Dagon.toLiteral

* reduce stack depth

* Add generic TypedPipe optimization rules

* fix compilation error, add a few more rules

* fix serialization issue with 2.12

* Add tests of correctness to optimization rules

* add comments, improve some rules

* checkpoint

* fix bug with outerjoin

* Cut over the the compiler approach

* add a comment

* Use optimization rules to get the tests to pass

* fixes to make the tests pass

* update comment about dagon post 0.2.2

* fix a bug in the filter composition rule, go tests\!
* Make TypedPipe Optimizations configurable

* remove the unsafe cache-cleaning prematurely added

* add more test coverage
* create new function to extend daterange in past time

* unit tests for preprend/extend/embiggen
* Extend TextLine with TypedSink

* Add test for TextLine

* Remove need for implicit

* Add run so scalding-core tests pass

* TextLine requires an offset. Need a separate test name for run and
runhadoop
We have observed NPEs for null counters at least two companies
using scalding. We have not root-caused this issue or found
a good fix, but previously set to ignore all null counters.

Since some people rely on counters this is not a great plan,
so instead this patch makes ignoring an opt-in behavior.
In current versions of Scalding, we have observed that increasing
graph size massively increases planning time. We believe this is due
to Cascading code that is cubic (or worse) in the number of vertices.

This test currently passes for size=64 (with size=128 commented out)
but still takes 18s to plan at size=64, versus <1s for size=32. We ran
this test on the cascading3 branch and observed a basically linear
behavior (e.g. size=128 ran in <3s).
Make TypedPipe abstract class for better binary compatibility
@CLAassistant
Copy link

CLAassistant commented Jan 30, 2018

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you all sign our Contributor License Agreement before we can accept your contribution.
6 out of 10 committers have signed the CLA.

✅ ianoc
✅ johnynek
✅ moulimukherjee
✅ fwbrasil
✅ alexeygorobets
✅ erik-stripe
❌ Tom Dyas
❌ FlavSF
❌ ianoc-stripe
❌ snoble


Tom Dyas seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account.
You have signed the CLA already but the status is still pending? Let us recheck it.

@johnynek
Copy link
Collaborator Author

I'm going to merge when this goes green since all the PRs have already been merged to develop.

@johnynek
Copy link
Collaborator Author

johnynek commented Jan 30, 2018

This seems like a real failure of some of the random planning:

[info] - all optimization rules do not increase steps *** FAILED ***
[info]   PlannerException was thrown during property evaluation.
[info]     Message: union of pipelines have 8 fewer elements than parent node: MapReduceHadoopRuleRegistry, missing: [[apply() @ org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)] -> HashJoin(_pipe_2227-2226*IterableSource(List())-127212450535c26cb9-6b2b-428c-8409-aa1e5844aa3e)[by: _pipe_2227-2226:[{1}:'key'] IterableSource(List())-127212450535c26cb9-6b2b-428c-8409-aa1e5844aa3e:[{1}:'key1']], [apply() @ org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)] -> Each(_pipe_2227-2226*IterableSource(List())-127212450535c26cb9-6b2b-428c-8409-aa1e5844aa3e)[FlatMapFunction[decl:0]], [apply() @ org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)] -> Each(_pipe_2227-2226*IterableSource(List())-127212450535c26cb9-6b2b-428c-8409-aa1e5844aa3e)[MapFunction[decl:0]], [cascading.flow.hadoop.planner.HadoopPlanner.makeTempTap(HadoopPlanner.java:232)] -> TempHfs["SequenceFile[['key', 'value']]"][5988349142/_pipe_2227-2226/], [apply() @ org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)] -> Each(IterableSource(List())-127212450535c26cb9-6b2b-428c-8409-aa1e5844aa3e)[MapFunction[decl:'key1', 'value1']], [apply() @ org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)] -> Each(_pipe_2227-2226*IterableSource(List())-127212450535c26cb9-6b2b-428c-8409-aa1e5844aa3e)[MapFunction[decl:0]], [apply() @ org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)] -> Each(IterableSource(List())-127212450535c26cb9-6b2b-428c-8409-aa1e5844aa3e)[MapFunction[decl:0]], [apply() @ org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)] -> Each(_pipe_2227-2226*IterableSource(List())-127212450535c26cb9-6b2b-428c-8409-aa1e5844aa3e)[MapFunction[decl:'key1', 'value1']]]
[info]     Occurred when passed generated values (
[info]       arg0 = WithDescriptionTypedPipe(Mapped(WithDescriptionTypedPipe(MapValues(CoGroupedPipe(MapGroup(Pair(IdentityReduce(scala.math.Ordering$Int$@6f790d18,WithDescriptionTypedPipe(WithDescriptionTypedPipe(FlatMapped(EmptyTypedPipe,<function1>),org.scalacheck.Gen$R$class.map(Gen.scala:237),true),org.scalacheck.Gen$R$class.map(Gen.scala:237),true),None,List()),IdentityReduce(scala.math.Ordering$Int$@6f790d18,WithDescriptionTypedPipe(CoGroupedPipe(MapGroup(Pair(IdentityReduce(scala.math.Ordering$Int$@6f790d18,WithDescriptionTypedPipe(WithDescriptionTypedPipe(FlatMapped(WithDescriptionTypedPipe(EmptyTypedPipe,tvo3aakgrh9jrzxoyeuqnfawbmjnxhaixoNgomuxeg41zfcpu,false),<function1>),org.scalacheck.Gen$R$class.map(Gen.scala:237),true),org.scalacheck.Gen$R$class.map(Gen.scala:237),true),None,List()),IdentityReduce(scala.math.Ordering$Int$@6f790d18,WithDescriptionTypedPipe(WithDescriptionTypedPipe(FlatMapped(WithDescriptionTypedPipe(MergedTypedPipe(WithDescriptionTypedPipe(WithDescriptionTypedPipe(WithDescriptionTypedPipe(Mapped(WithDescriptionTypedPipe(CrossPipe(WithDescriptionTypedPipe(Mapped(WithDescriptionTypedPipe(CrossValue(WithDescriptionTypedPipe(TrappedPipe(EmptyTypedPipe,com.twitter.scalding.source.FixedTypedText(m8x5mxgwljgg4zWaq),com.twitter.scalding.LowPriorityTupleConverters$$anon$3@3b6d7f73),org.scalacheck.Gen$R$class.map(Gen.scala:237),true),LiteralValue(2)),org.scalacheck.Gen$R$class.map(Gen.scala:237),true),<function1>),org.scalacheck.Gen$R$class.map(Gen.scala:237),true),EmptyTypedPipe),org.scalacheck.Gen$R$class.map(Gen.scala:237),true),<function1>),org.scalacheck.Gen$R$class.map(Gen.scala:237),true),pqbttw,false),rzeykwyetbqpay9k7kmyfqrihXolLbo1gkqhq,false),EmptyTypedPipe),org.scalacheck.Gen$R$class.map(Gen.scala:237),true),<function1>),org.scalacheck.Gen$R$class.map(Gen.scala:237),true),org.scalacheck.Gen$R$class.map(Gen.scala:237),true),None,List()),<function3>),<function2>)),org.scalacheck.Gen$R$class.map(Gen.scala:237),true),None,List()),<function3>),<function2>)),<function1>),org.scalacheck.Gen$R$class.map(Gen.scala:237),true),<function1>),org.scalacheck.Gen$R$class.map(Gen.scala:237),true),
[info]       arg1 = com.twitter.scalding.typed.OptimizationRules$EmptyIsOftenNoOp$@16b4d0e9.orElse(com.twitter.scalding.typed.OptimizationRules$ComposeMap$@3205734f).orElse(com.twitter.scalding.typed.OptimizationRules$ComposeFilter$@649ffdcc).orElse(com.twitter.scalding.typed.OptimizationRules$EmptyIterableIsEmpty$@401dea94).orElse(com.twitter.scalding.typed.OptimizationRules$RemoveDuplicateForceFork$@709e59e6).orElse(com.twitter.scalding.typed.OptimizationRules$IgnoreNoOpGroup$@6918c3b1).orElse(com.twitter.scalding.typed.OptimizationRules$ComposeFilterMap$@72f222c3).orElse(com.twitter.scalding.typed.OptimizationRules$FilterKeysEarly$@5d943f43)
[info]     )
[info]   org.scalatest.exceptions.GeneratorDrivenPropertyCheckFailedException:
[info]   at org.scalatest.enablers.CheckerAsserting$$anon$2.indicateFailure(CheckerAsserting.scala:223)
[info]   at org.scalatest.enablers.CheckerAsserting$$anon$2.indicateFailure(CheckerAsserting.scala:219)
[info]   at org.scalatest.enablers.UnitCheckerAsserting$CheckerAssertingImpl.check(CheckerAsserting.scala:140)
[info]   at org.scalatest.prop.GeneratorDrivenPropertyChecks$class.forAll(GeneratorDrivenPropertyChecks.scala:1136)
[info]   at com.twitter.scalding.typed.OptimizationRulesTest.forAll(OptimizationRulesTest.scala:198)
[info]   at com.twitter.scalding.typed.OptimizationRulesTest$$anonfun$35.apply(OptimizationRulesTest.scala:258)
[info]   at com.twitter.scalding.typed.OptimizationRulesTest$$anonfun$35.apply(OptimizationRulesTest.scala:24

[info]   Cause: cascading.flow.planner.PlannerException: union of pipelines have 8 fewer elements than parent node: MapReduceHadoopRuleRegistry, missing: [[apply() @ org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)] -> HashJoin(_pipe_2227-2226*IterableSource(List())-127212450535c26cb9-6b2b-428c-8409-aa1e5844aa3e)[by: _pipe_2227-2226:[{1}:'key'] IterableSource(List())-127212450535c26cb9-6b2b-428c-8409-aa1e5844aa3e:[{1}:'key1']], [apply() @ org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)] -> Each(_pipe_2227-2226*IterableSource(List())-127212450535c26cb9-6b2b-428c-8409-aa1e5844aa3e)[FlatMapFunction[decl:0]], [apply() @ org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)] -> Each(_pipe_2227-2226*IterableSource(List())-127212450535c26cb9-6b2b-428c-8409-aa1e5844aa3e)[MapFunction[decl:0]], [cascading.flow.hadoop.planner.HadoopPlanner.makeTempTap(HadoopPlanner.java:232)] -> TempHfs["SequenceFile[['key', 'value']]"][5988349142/_pipe_2227-2226/], [apply() @ org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)] -> Each(IterableSource(List())-127212450535c26cb9-6b2b-428c-8409-aa1e5844aa3e)[MapFunction[decl:'key1', 'value1']], [apply() @ org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)] -> Each(_pipe_2227-2226*IterableSource(List())-127212450535c26cb9-6b2b-428c-8409-aa1e5844aa3e)[MapFunction[decl:0]], [apply() @ org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)] -> Each(IterableSource(List())-127212450535c26cb9-6b2b-428c-8409-aa1e5844aa3e)[MapFunction[decl:0]], [apply() @ org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)] -> Each(_pipe_2227-2226*IterableSource(List())-127212450535c26cb9-6b2b-428c-8409-aa1e5844aa3e)[MapFunction[decl:'key1', 'value1']]]
[info]   at cascading.flow.planner.FlowPlanner.verifyResultInternal(FlowPlanner.java:703)
[info]   at cascading.flow.planner.FlowPlanner.verifyResult(FlowPlanner.java:565)
[info]   at cascading.flow.planner.rule.RuleSetExec.execPlannerFor(RuleSetExec.java:163)
[info]   at cascading.flow.planner.rule.RuleSetExec$3.call(RuleSetExec.java:336)
[info]   at cascading.flow.planner.rule.RuleSetExec$3.call(RuleSetExec.java:328)
[info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[info]   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[info]   at java.lang.Thread.run(Thread.java:748)

@johnynek
Copy link
Collaborator Author

cc @cwensel

@johnynek
Copy link
Collaborator Author

here is another one: " Message: registry: MapReduceHadoopRuleRegistry, phase: PostPipelines, failed on rule: RemoveMalformedHashJoinPipelineTransformer, see attached source element-graph"

I guess this PR is trying to merge the scalacheck code that randomly generates graphs and tries to plan them and we are seeing it fail.

I think this probably means cascading has some rules that can't plan some of these graphs.

It would be nice to know what the limitations are in cascading to we can transform them in our rules.

@piyushnarang @rubanm any ideas? you both have added some work arounds.

@piyushnarang
Copy link
Collaborator

@johnynek - do you know what graph it fails on? We had to work through a few job failures on the planning phase as the cascading3 planner didn't like the graph (even though the same graph worked fine on cascading2). This was one of our concerns also about merging the branch at twitter as we were afraid of having internal users that hadn't tried cascading3 on their jobs running into these issues.

@johnynek
Copy link
Collaborator Author

@piyushnarang these were randomly generated. The toString is in the message, but it is pretty huge, I don't want to paste it here. The error seems like it is a self hashjoin. I'll try to write a more direct test to trigger it. If that's it, we can fix it with an optimizer rule.

@johnynek
Copy link
Collaborator Author

johnynek commented Feb 1, 2018

okay, here is another failure in the current state:

arg0 = WithDescriptionTypedPipe(Mapped(WithDescriptionTypedPipe(CrossValue(WithDescriptionTypedPipe(MergedTypedPipe(IterablePipe(List(-2147483648, -642344067)),WithDescriptionTypedPipe(Mapped(WithDescriptionTypedPipe(CrossValue(WithDescriptionTypedPipe(MergedTypedPipe(WithDescriptionTypedPipe(WithDescriptionTypedPipe(Mapped(WithDescriptionTypedPipe(CrossPipe(WithDescriptionTypedPipe(TrappedPipe(IterablePipe(List(312548152, 458345207)),com.twitter.scalding.source.FixedTypedText(ked),com.twitter.scalding.LowPriorityTupleConverters$$anon$3@6064ee0),org.scalacheck.Gen$R.map(Gen.scala:237),true),IterablePipe(List(-2147483648, 0))),org.scalacheck.Gen$R.map(Gen.scala:237),true),<function1>),org.scalacheck.Gen$R.map(Gen.scala:237),true),cpq6jceulrzEgHkvjvnEpxngbsenkccrAzZiu2eanNk,false),WithDescriptionTypedPipe(WithDescriptionTypedPipe(Filter(WithDescriptionTypedPipe(Mapped(WithDescriptionTypedPipe(CrossPipe(IterablePipe(List(-151163257, -2147483648)),IterablePipe(List(-1992242190, -1163143704))),org.scalacheck.Gen$R.map(Gen.scala:237),true),<function1>),org.scalacheck.Gen$R.map(Gen.scala:237),true),org.scalacheck.GenArities$$Lambda$441/1771876489@56f9ef3e),org.scalacheck.Gen$R.map(Gen.scala:237),true),hy0zwcjzomxTl7Prkmgcs1Chmsmcxtfyfgfpiothasorzoz0hxiygwznwia,false)),org.scalacheck.Gen$R.map(Gen.scala:237),true),LiteralValue(2)),org.scalacheck.Gen$R.map(Gen.scala:237),true),<function1>),org.scalacheck.Gen$R.map(Gen.scala:237),true)),org.scalacheck.Gen$R.map(Gen.scala:237),true),LiteralValue(2)),org.scalacheck.Gen$R.map(Gen.scala:237),true),<function1>),org.scalacheck.Gen$R.map(Gen.scala:237),true),
[info]       arg1 = com.twitter.scalding.typed.OptimizationRules$EmptyIterableIsEmpty$@11f2def

with the cascading stack of:

[info]   Cause: cascading.flow.planner.PlannerException: registry: MapReduceHadoopRuleRegistry, phase: PostPipelines, failed on rule: RemoveMalformedHashJoinPipelineTransformer, see attached source element-graph
[info]   at cascading.flow.planner.rule.RuleExec.performTransform(RuleExec.java:422)
[info]   at cascading.flow.planner.rule.RuleExec.performMutation(RuleExec.java:226)
[info]   at cascading.flow.planner.rule.RuleExec.executeRulePhase(RuleExec.java:178)
[info]   at cascading.flow.planner.rule.RuleExec.planPhases(RuleExec.java:125)
[info]   at cascading.flow.planner.rule.RuleExec.exec(RuleExec.java:86)
[info] m  at cascading.flow.planner.rule.RuleSetExec.execPlannerFor(RuleSetExec.java:153)
[info]   at cascading.flow.planner.rule.RuleSetExec$3.call(RuleSetExec.java:336)
[info]   at cascading.flow.planner.rule.RuleSetExec$3.call(RuleSetExec.java:328)

@johnynek
Copy link
Collaborator Author

johnynek commented Feb 1, 2018

@piyushnarang okay here is a test that is still currently failing even with the hashjoin checkpoints we have added. here is the cascading debug output:
https://www.dropbox.com/s/6o0s4uoxmpe9eyp/286EB2.tgz?dl=0

You can see the scala code in the PR. I will try to minimize it down a bit and see if it still fails.

cc @cwensel

@johnynek
Copy link
Collaborator Author

johnynek commented Feb 1, 2018

Okay, I simplified the failing case and it indeed looks related to merges and hashjoins. We had some code to try to protect against this but it looks like it is not working here.

Here is the cascading log output: https://www.dropbox.com/s/7qyc4a9pxtstwio/E552D2.tgz?dl=0
and here is a rendered dot of the input as cascading sees it:
https://www.dropbox.com/s/iffadh9x7unrg5w/01-BalanceAssembly-init.dot.png?dl=0

as you can see, it is actually not a very complex job that causes the failure:

      val p1 =
        TypedPipe.from(List(1, 2))
          .cross(TypedPipe.from(List(3, 4)))

      val p2 =
        TypedPipe.from(List(5, 6))
          .cross(TypedPipe.from(List(8, 9)))

      val p3 = (p1 ++ p2) // this can be planned
      val p4 = (TypedPipe.from(List((8, 1), (10, 2))) ++ p3) // this cannot be planned.

We can possibly add complexity to our rule about HashJoins and merges. Not clear what the recipe is though...

@cwensel in a world where people have functions that return pipes, how can you be sure you are not merging HashJoins? Should we be avoiding this, or is this minimal case enough to see about fixing a bug?

@piyushnarang
Copy link
Collaborator

I tried out a few variants of the failing job and it seems like the planner seems to trigger due to both p1 and p2 performing a cross(hashJoin). If I drop the cross from either p1 or p2, the planner seems to proceed. It seems like a potential workaround seems to be to add a forceToDisk on p3. This seems to work val p3 = (p1 ++ p2).forceToDisk. It's not great as it adds an extra step but it might be a workaround till we can fix the planner? Not sure if it can be an simple as if both the pipes feeding into a merge have a hashJoin, then we force to disk.

@johnynek
Copy link
Collaborator Author

johnynek commented Feb 1, 2018 via email

@johnynek
Copy link
Collaborator Author

johnynek commented Feb 1, 2018

I wonder if the rule is: you can have at most one hashJoin input to a merge.

@piyushnarang
Copy link
Collaborator

piyushnarang commented Feb 1, 2018

It seems like the behavior matches that - I tried out a run where p1 didn't have the .cross which worked and the same for when just p2 didn't have the .cross.
Though the funny thing as you saw is - if you drop p4, just have up to p3 which is the merge of two pipes with hashJoins, that seems to work :-)

I thought flipping the order of the final merge would work, but I realized I had a bug on my end (was writing p3 instead of p4). Seems like if p4's merge lhs / rhs is a pipe which gets two hashJoins it fails.

@oscar-stripe
Copy link

filed cwensel/cascading#61 to help track.

@johnynek
Copy link
Collaborator Author

johnynek commented Feb 2, 2018

looks like Chris has a repro and a partial fix (does not yet work for Tez) cwensel/cascading#61 (comment)

@johnynek
Copy link
Collaborator Author

johnynek commented Feb 4, 2018

Well, cascading 3.3.0-wip-18 fixes the hashjoin merge cases we saw (and so far passes the randomly generated tests), but it seems that it has a regression related to partitioned schemes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.