scala - Spark joining DataFrames and aggregation -


this sort of contrived example, captures trying using spark/scala

pet types

val pets = array(row(1,"cat"),row(2,"dog")) val petsrdd = sc.parallelize(pets) val petschema = structtype(array(structfield("id",integertype),structfield("type",stringtype))) val petsdf = sqlcontext.createdataframe(petsrdd,petschema) 

pet names

val petnames = array(row(1,1,"tigger","m"),row(2,1,"winston","m"),row(3,1,"snowball","f"),row(4,2,"spot","m"),row(5,2,"barf","m"),row(6,2,"snoppy","m")) val petnamesrdd = sc.parallelize(petnames) val petnameschema = structtype(array(structfield("id",integertype),structfield("pet_id",integertype),structfield("name",stringtype),structfield("gender",stringtype))) val petnamesdf = sqlcontext.createdataframe(petnamesrdd,petnameschema) 

from here can join dataframes ...

val join = petsdf.join(petnamesdf, petsdf("id") === petnamesdf("pet_id") ), "leftouter") 

results

+---+-----+---+--------+---------+------+ | id| type| id| pet_id | name    |gender +---+-----+---+--------+---------+------+ |  1|  cat| 1 | 1      |tigger   | m |  1|  cat| 2 | 1      |winston  | m |  1|  cat| 3 | 1      |snowball | f |  2|  dog| 4 | 2      |spot     | m |  3|  dog| 5 | 2      |barf     | m |  3|  dog| 6 | 2      |snoopy   | f +---+-----+---+--------+---------+------+ 

i flatten results looks can map results more processing.

((1,"cat"),(1,"tigger","m"),(2,"winston","m"),(3,"snowball","f")) ((2,"dog"),(1,"spot","m"),(2,"barf","m"),(3,"snoopy","f")) 

i started looking @ userdefinedaggregatefunctions not work. did not try hard, seems not fit.

i looked using map transform each petdf row petdf (list of petnames), nested df not allowed.

i hoping missing built spark or idea work. new spark/scala.
thanks


Comments

Popular posts from this blog

jOOQ update returning clause with Oracle -

java - Warning equals/hashCode on @Data annotation lombok with inheritance -

java - BasicPathUsageException: Cannot join to attribute of basic type -