scala - Spark joining DataFrames and aggregation -
this sort of contrived example, captures trying using spark/scala
pet types
val pets = array(row(1,"cat"),row(2,"dog")) val petsrdd = sc.parallelize(pets) val petschema = structtype(array(structfield("id",integertype),structfield("type",stringtype))) val petsdf = sqlcontext.createdataframe(petsrdd,petschema)
pet names
val petnames = array(row(1,1,"tigger","m"),row(2,1,"winston","m"),row(3,1,"snowball","f"),row(4,2,"spot","m"),row(5,2,"barf","m"),row(6,2,"snoppy","m")) val petnamesrdd = sc.parallelize(petnames) val petnameschema = structtype(array(structfield("id",integertype),structfield("pet_id",integertype),structfield("name",stringtype),structfield("gender",stringtype))) val petnamesdf = sqlcontext.createdataframe(petnamesrdd,petnameschema)
from here can join dataframes ...
val join = petsdf.join(petnamesdf, petsdf("id") === petnamesdf("pet_id") ), "leftouter")
results
+---+-----+---+--------+---------+------+ | id| type| id| pet_id | name |gender +---+-----+---+--------+---------+------+ | 1| cat| 1 | 1 |tigger | m | 1| cat| 2 | 1 |winston | m | 1| cat| 3 | 1 |snowball | f | 2| dog| 4 | 2 |spot | m | 3| dog| 5 | 2 |barf | m | 3| dog| 6 | 2 |snoopy | f +---+-----+---+--------+---------+------+
i flatten results looks can map results more processing.
((1,"cat"),(1,"tigger","m"),(2,"winston","m"),(3,"snowball","f")) ((2,"dog"),(1,"spot","m"),(2,"barf","m"),(3,"snoopy","f"))
i started looking @ userdefinedaggregatefunctions not work. did not try hard, seems not fit.
i looked using map transform each petdf row petdf (list of petnames), nested df not allowed.
i hoping missing built spark or idea work. new spark/scala.
thanks
Comments
Post a Comment