elasticsearch - Aggregate only newest document -
i have elastic index has documents user state history. data looks this;
{ "session_id": "yunus", "state_name": "start", "entry_time": "2016-11-09 15:27:03" }, { "session_id": "yunus", "state_name": "end", "entry_time": "2016-11-09 16:30:00" }, { "session_id": "can", "state_name": "start", "entry_time": "2016-11-09 12:01:00" }, { "session_id": "rick", "state_name": "start", "entry_time": "2016-11-09 09:00:00" }, { "session_id": "rick", "state_name": "end", "entry_time": "2016-11-10 10:00:00" }
i want aggregate state name date histogram relevant last state @ time. result can be;
2016-11-08 start = 0 end = 0 2016-11-09 start = 2 end = 1 2016-11-10 start = 1 end = 2
actually plan generate grouped bar chart timeline show states change on time.
i tried several things aggregation pipelines, top hits couldn't make progress.
any appreciated.
for interested, solved spark. used elastic-spark read elasticsearch , write elasticsearch.
here read es
rdd
;
val alldata = sc.esrdd(s"states_${id}/log", query)
then first group session id, sort date find latest state of session;
val lateststates = alldata.groupby(k => k._2.get("session_id").get).map(k => (k._2).reduceleft((d1, d2) => { d1._2.get("timestamp").get.asinstanceof[long] > d2._2.get("timestamp").get.asinstanceof[long] match { case true => d1 case false => d2 } })).map(_._2)
once have latest states of session, filter exit states count value;
val statesummary = lateststates .filter(s => s.isdefinedat("state_id") && s("state_id").asinstanceof[long] != -1) .map(s => (s("state_id"), s("state_name"))) .countbyvalue() .map(d => map("state_id" -> d._1._1.asinstanceof[long], "state_name" -> d._1._2.asinstanceof[string], "count" -> d._2)).tolist
now have current number of sessions in states. (current configurable can set specific time), thing left, write elasticsearch;
sc.makerdd(seq(finalelasticdoc)).savetoes(s"states_${id}/analytic_daily")
Comments
Post a Comment