`

udaf 返回的 子属性

 
阅读更多


udaf 返回的 子属性

spark.sql("select createCrowdHllc(uuid,tmp_id,'crowdid_appid').uuiduv   from h5     ").show(10)

package cn.analysys.udf.crowd

import cn.analysys.batch.userprocess.HbaseInit
import cn.analysys.meta.MetaMapInfo
import cn.analysys.udf.utils.CommonUtils
import cn.analysys.udf.utils.CommonUtils.HbasePutArrayData
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import scala.collection.mutable

/*
* input:  uuid iterater
* output: Bytes[]
* */

class CreateCrowdHllc extends UserDefinedAggregateFunction {
  override def inputSchema: StructType = StructType(
    StructField("uuid", StringType, true) ::
    StructField("imeisi", StringType, true) ::
    StructField("crowdid", StringType, true) :: Nil)

  def bufferSchema: StructType = StructType(
    StructField("uuidByes", ArrayType(ByteType), true) ::
    StructField("imeiByes", ArrayType(ByteType), true) ::
    StructField("crowdid", StringType, true) :: Nil)

  override def dataType: DataType = StructType(
    StructField("uuiduv", LongType, true)
    ::StructField("imeiuv", LongType, true)
    :: Nil) //ArrayType(LongType)


  override def deterministic: Boolean = true

  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = Array.empty[ByteType]
    buffer(1) = Array.empty[ByteType]
    buffer(2) = ""
  }

  override def update(buffer: MutableAggregationBuffer, inputrow: Row): Unit = {
    //如果有特别大的 app,有这个函数,避免数据倾斜,大内存占用的问题。
    val hllcUuid = CommonUtils.getHllcFromByte(buffer.getAs[mutable.WrappedArray[Byte]](0).toArray[Byte])
    val hllcImei = CommonUtils.getHllcFromByte(buffer.getAs[mutable.WrappedArray[Byte]](1).toArray[Byte])
    hllcUuid.add(inputrow.getAs[String](0))
    hllcImei.add(inputrow.getAs[String](1))
    buffer(0) = CommonUtils.getByteFromHllc(hllcUuid)
    buffer(1) = CommonUtils.getByteFromHllc(hllcImei)
    buffer(2) = inputrow.getAs[String](2)
  }

  override def merge(buffer: MutableAggregationBuffer, buffer2: Row): Unit = {
    val hllcUuid = CommonUtils.getHllcFromByte(buffer.getAs[mutable.WrappedArray[Byte]](0).toArray[Byte])
    val hllcImei = CommonUtils.getHllcFromByte(buffer.getAs[mutable.WrappedArray[Byte]](1).toArray[Byte])
    val hllcUuid2 = CommonUtils.getHllcFromByte(buffer2.getAs[mutable.WrappedArray[Byte]](0).toArray[Byte])
    val hllcImei2 = CommonUtils.getHllcFromByte(buffer2.getAs[mutable.WrappedArray[Byte]](1).toArray[Byte])
    hllcUuid.merge(hllcUuid2)
    hllcImei.merge(hllcImei2)
    buffer(0) = CommonUtils.getByteFromHllc(hllcUuid)
    buffer(1) = CommonUtils.getByteFromHllc(hllcImei)
    buffer(2) = buffer2.getAs[String](2)
  }

  override def evaluate(buffer: Row): Any = {
    val hllcUuid = CommonUtils.getHllcFromByte(buffer.getAs[mutable.WrappedArray[Byte]](0).toArray[Byte])
    val hllcImei = CommonUtils.getHllcFromByte(buffer.getAs[mutable.WrappedArray[Byte]](1).toArray[Byte])
    val crowd_id = buffer.getAs[String](2)
    println(s"uuid uv:${hllcUuid.getCountEstimate} ; ")
    println(s"imei uv:${hllcImei.getCountEstimate} ; ")
    // put byte[] to hbase
    CommonUtils.putData(new HbasePutArrayData(HbaseInit.CROWD_INFO, crowd_id,
      Map(MetaMapInfo.QUALIFIER_CROWD_UUID -> CommonUtils.getByteFromHllc(hllcUuid))))
    CommonUtils.putData(new HbasePutArrayData(HbaseInit.CROWD_INFO, crowd_id,
      Map(MetaMapInfo.QUALIFIER_CROWD_IMEI -> CommonUtils.getByteFromHllc(hllcImei))))
    (hllcUuid.getCountEstimate,hllcImei.getCountEstimate)
  }






}
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics