- 浏览: 120117 次
- 性别:
- 来自: 杭州
文章分类
最新评论
udaf 返回的 子属性
spark.sql("select createCrowdHllc(uuid,tmp_id,'crowdid_appid').uuiduv from h5 ").show(10)
package cn.analysys.udf.crowd
import cn.analysys.batch.userprocess.HbaseInit
import cn.analysys.meta.MetaMapInfo
import cn.analysys.udf.utils.CommonUtils
import cn.analysys.udf.utils.CommonUtils.HbasePutArrayData
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import scala.collection.mutable
/*
* input: uuid iterater
* output: Bytes[]
* */
class CreateCrowdHllc extends UserDefinedAggregateFunction {
override def inputSchema: StructType = StructType(
StructField("uuid", StringType, true) ::
StructField("imeisi", StringType, true) ::
StructField("crowdid", StringType, true) :: Nil)
def bufferSchema: StructType = StructType(
StructField("uuidByes", ArrayType(ByteType), true) ::
StructField("imeiByes", ArrayType(ByteType), true) ::
StructField("crowdid", StringType, true) :: Nil)
override def dataType: DataType = StructType(
StructField("uuiduv", LongType, true)
::StructField("imeiuv", LongType, true)
:: Nil) //ArrayType(LongType)
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Array.empty[ByteType]
buffer(1) = Array.empty[ByteType]
buffer(2) = ""
}
override def update(buffer: MutableAggregationBuffer, inputrow: Row): Unit = {
//如果有特别大的 app,有这个函数,避免数据倾斜,大内存占用的问题。
val hllcUuid = CommonUtils.getHllcFromByte(buffer.getAs[mutable.WrappedArray[Byte]](0).toArray[Byte])
val hllcImei = CommonUtils.getHllcFromByte(buffer.getAs[mutable.WrappedArray[Byte]](1).toArray[Byte])
hllcUuid.add(inputrow.getAs[String](0))
hllcImei.add(inputrow.getAs[String](1))
buffer(0) = CommonUtils.getByteFromHllc(hllcUuid)
buffer(1) = CommonUtils.getByteFromHllc(hllcImei)
buffer(2) = inputrow.getAs[String](2)
}
override def merge(buffer: MutableAggregationBuffer, buffer2: Row): Unit = {
val hllcUuid = CommonUtils.getHllcFromByte(buffer.getAs[mutable.WrappedArray[Byte]](0).toArray[Byte])
val hllcImei = CommonUtils.getHllcFromByte(buffer.getAs[mutable.WrappedArray[Byte]](1).toArray[Byte])
val hllcUuid2 = CommonUtils.getHllcFromByte(buffer2.getAs[mutable.WrappedArray[Byte]](0).toArray[Byte])
val hllcImei2 = CommonUtils.getHllcFromByte(buffer2.getAs[mutable.WrappedArray[Byte]](1).toArray[Byte])
hllcUuid.merge(hllcUuid2)
hllcImei.merge(hllcImei2)
buffer(0) = CommonUtils.getByteFromHllc(hllcUuid)
buffer(1) = CommonUtils.getByteFromHllc(hllcImei)
buffer(2) = buffer2.getAs[String](2)
}
override def evaluate(buffer: Row): Any = {
val hllcUuid = CommonUtils.getHllcFromByte(buffer.getAs[mutable.WrappedArray[Byte]](0).toArray[Byte])
val hllcImei = CommonUtils.getHllcFromByte(buffer.getAs[mutable.WrappedArray[Byte]](1).toArray[Byte])
val crowd_id = buffer.getAs[String](2)
println(s"uuid uv:${hllcUuid.getCountEstimate} ; ")
println(s"imei uv:${hllcImei.getCountEstimate} ; ")
// put byte[] to hbase
CommonUtils.putData(new HbasePutArrayData(HbaseInit.CROWD_INFO, crowd_id,
Map(MetaMapInfo.QUALIFIER_CROWD_UUID -> CommonUtils.getByteFromHllc(hllcUuid))))
CommonUtils.putData(new HbasePutArrayData(HbaseInit.CROWD_INFO, crowd_id,
Map(MetaMapInfo.QUALIFIER_CROWD_IMEI -> CommonUtils.getByteFromHllc(hllcImei))))
(hllcUuid.getCountEstimate,hllcImei.getCountEstimate)
}
}
发表评论
-
Spark SQL运行 过程 抄的别人的,记录 学习
2018-05-13 23:07 1007抄的别人的,觉得写的特别好 val FILESOURCE ... -
thriftserver log4j.properties 生效
2018-04-09 11:46 420/home/isuhadoop/spark2/sbin/sta ... -
spark datasource
2018-03-16 16:36 638DataFrameWriter format val c ... -
如何 map 端 Join。
2018-03-04 19:31 594Hive 中 修改表的 rawDataSize = 1 1 ... -
spark thrift server 修改
2018-03-04 12:58 556org.apache.spark.sql.hive.thrif ... -
hive hbase thriftserver run
2018-03-03 15:13 372正确方法 : 0\ 拷贝对应目录到 spark2 jars ... -
scala package
2018-01-25 09:48 488#scala 打包 mvn clean scala:com ... -
SPARK SERVER
2018-01-23 22:15 497sbin/start-thriftserver.sh --dr ... -
driver class
2018-01-21 22:11 497sbin/start-thriftserver.sh -- ... -
spark thrift server 调试
2017-10-20 15:50 832spark-hive-thriftserver 本地调试 ... -
spark SQL conf
2017-10-18 14:36 586org.apache.spark.sql.internal.S ... -
java 死锁 ,内存问题 分析
2017-10-17 10:50 315jstack -l pid /opt/soft/jdk/ ... -
thriftServer proxy
2017-10-16 14:21 900sudo yum install haproxy 257 ... -
hive spark conf
2017-09-26 17:44 1274CREATE TABLE org_userbehavior_a ... -
get day
2017-09-19 08:41 250def timeDayNow() = { var ... -
thriftserver
2017-09-14 19:47 429export SPARK_CONF_DIR=/home/yun ... -
thriftserver dynamicallocation
2017-09-08 14:41 548./sbin/start-thriftserver.sh -- ... -
test code2
2017-09-03 13:45 463package org.test.udf import co ... -
test code
2017-08-24 17:52 260def taskcal(data:Array[(String, ... -
struct streaming SQL udf udaf
2017-08-22 11:50 647spark aggregator class H ...
相关推荐
数据架构师第015节UDAF实战:实现udaf第16节数据说明和重要操作演示.mp4
A custom UDAF to group oncatenates all arguments from different rows into a single string.
hive udaf 实现按位取与或 hive udaf 实现按位取与或 hive udaf 实现按位取与或
1.从HDFS中加载数据到DataFrame中 2.注册UDF函数,函数名为toUpper就是将所有名字变成大写 3.创建临时视图,然后执行注册的函数
个人 Hive UDAF 有一堆 Hive UDAF(用户定义的聚合函数)不在标准 Hive 分布中,因为它们可能会导致大型数据集的 OOM。 要使用它们,您需要加载 jar 文件,然后为每个要使用的函数创建一个临时函数: ADD JAR target...
nighgvvbbnjjkkkkk
title: "使用 [通用] UDAF 计算两个数的线性回归系数"例如select s,regression(x,y) group by s;参考Hive U
这是一些有用的 Hive UDF 和 UDAF 的集合。 提供的功能 UDAF Mode ( de.frosner.hive.udaf.Mode ) - 计算组列的统计模式 从源头构建 git clone https://github.com/FRosner/mustached-hive-udfs.git cd mustached...
如果链接失效,请与我联系!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
自定义 hive udf udaf 有url解析,获取网站主域名,根据ip获取区域码,有rownum,列聚合以及一些业务实现udf。
hive所有函数 包括UDTs、UDAF、UDTF函数和运算符等,中文汉化,翻译并测试
hive-udf-hook UDF开发及发布过程 1 用户编写UDF实现类 ...1 如果hive-site.xml文件没有配置如下属性,则需要添加该属性,如果已经存在则省略此过程 <name>hive.exec.driver.run.hooks <value>com.cha
hive常用函数,包括时间、类型、udf、udaf等等的归纳。
4.tools类:GBase 8a UDF&UDAF使用手册.pdf GBase 8a 全文检索参考手册.pdf 5.user类: GBase 8a SQL参考手册.pdf 注:资源大小限制 所以在用百度云盘提取 GBase 8a 安装手册.pdf GBase 8a 错误手册.pdf ...
java sql笔试题示例 Hive UDF 项目 介绍 该项目只是一个示例,包含多个 (UDF),用于 Apache Spark。 它旨在演示如何在 Scala 或 Java 中构建 Hive UDF 并在 . 为什么要使用 Hive UDF? Hive UDF ...D
本机 Spark-SQL,当前实现仅依赖于 Hive UDAF。 新的实现使用 Spark SQL 聚合。 虽然仍然支持 Hive UDAF。 在运行案例(例如 BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)和 UNBOUDED FOLLOWING 案例中,性能...
Hive常用数据类型介绍,表创建,内外部表、分区分桶表介绍,hive内置函数,UDTF,UDAF函数介绍,hive数据的导入导出以及JDBC配置方法。详细介绍了hive一些函数的使用和应用。
1.8.2 编写UDAF 第13章 HBase 2.1 HBasics 2.1.1 背景 2.2 概念 2.2.1 数据模型的"旋风之旅" 2.2.2 实现 2.3 安装 2.3.1 测试驱动 2.4 客户机 2.4.1 Java 2.4.2 Avro,REST,以及Thrift 2.5 示例 2.5.1 模式 2.5.2 ...
=================适用于Apache Hive的DataSketches Java UDF / UDAF适配器请访问主要的以获取更多信息。 如果您有兴趣对此站点做出贡献,请参阅我们的页面以了解如何与我们联系。Hadoop Hive UDF / UDAF 请参阅Java...
1.8.2 编写UDAF 第13章 HBase 2.1 HBasics 2.1.1 背景 2.2 概念 2.2.1 数据模型的“旋风之旅” 2.2.2 实现 2.3 安装 2.3.1 测试驱动 2.4 客户机 2.4.1 Java 2.4.2 Avro,REST,以及Thrift ...