scala - Add column index to dataframe based on another column (user in this case) -
i have dataframe given below last column represents number of times user has searched location , stay
| hanks| rotterdam| airbnb7| 1| |sanders| rotterdam| airbnb2| 1| | hanks| rotterdam| airbnb2| 3| | hanks| tokyo| airbnb8| 2| | larry| hanoi| | 2| | mango| seoul| airbnb5| 1| | larry| hanoi| airbnb1| 2|
which want transform follows
| hanks| rotterdam| airbnb7| 1| 1| |sanders| rotterdam| airbnb2| 1| 1| | hanks| rotterdam| airbnb2| 3| 2| | hanks| tokyo| airbnb8| 2| 3| | larry| hanoi| | 2| 0| | mango| seoul| airbnb5| 1| 1| | larry| hanoi| airbnb1| 2| 1|
notice column 5 represents index of unique combination of options(location+stay) user selected. eg
| hanks| rotterdam| airbnb7| 1| 1| | hanks| rotterdam| airbnb2| 3| 2| | hanks| tokyo| airbnb8| 2| 3|
i tried using groupby/agg implementing udf function following in agg function.
val df2 = df1.groupby("user", "clickeddestination", "clickedairbnb") .agg(indexuserdetailsudf(col("clickedairbnb")) ("clickedairbnbindex"))
and udf follows
var cnt = 0 val airbnbclickindex:(string) => string = (airbnb) => { if(airbnb== "") "null" //return 0 airbnbclickindex when airbnb empty else{cnt+=1; cnt.tostring()} //otherwise return incremented value } val indexuserdetailsudf = udf(airbnbclickindex)
but not working. input appreciated. thanks.
update1: daniel's suggestion of dense_rank following user
|meera| amsterdam| airbnb12| 1| 1| |meera| amsterdam| airbnb2| 1| 2| |meera| amsterdam| airbnb7| 1| 3| |meera| amsterdam| airbnb8| 1| 4| |meera| bangalore| | 1| 5| |meera| bangalore| airbnb11| 1| 6| |meera| bangalore| airbnb8| 1| 7| |meera| hanoi| airbnb1| 2| 8| |meera| hanoi| airbnb2| 1| 9| |meera| hanoi| airbnb7| 1| 10| |meera| mumbai| | 1| 11| |meera| oslo| | 2| 12| |meera| oslo| airbnb8| 1| 13| |meera| paris| | 1| 14| |meera| paris| airbnb11| 1| 15| |meera| paris| airbnb6| 1| 16| |meera| paris| airbnb7| 1| 17| |meera| paris| airbnb8| 2| 18| |meera| rotterdam| airbnb2| 1| 19|
i assumed dense_rank push records empty field values (in case 3rd empty field) last. correct?
if got right, want windowed rank. try following:
import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions.window val window = window.partitionby("user").orderby("user", "clickeddestination", "clickedairbnb") val result = df.withcolumn("clickedairbnbindex", dense_rank().over(window))
if needed, can find reading window functions in spark here.
also, functions package api documentation useful.
Comments
Post a Comment