安徽住房和建设网站,视频网站能备案吗,个人怎么做网站推广,网站后台 设计0、环境信息 本文采用阿里云maxcompute的spark环境为基础进行的#xff0c;搭建本地spark环境参考搭建Windows开发环境_云原生大数据计算服务 MaxCompute-阿里云帮助中心 版本spark 2.4.5#xff0c;maven版本大于3.8.4
①配置pom依赖 详见2-1
②添加运行jar包 ③添加配置信…0、环境信息 本文采用阿里云maxcompute的spark环境为基础进行的搭建本地spark环境参考搭建Windows开发环境_云原生大数据计算服务 MaxCompute-阿里云帮助中心 版本spark 2.4.5maven版本大于3.8.4
①配置pom依赖 详见2-1
②添加运行jar包 ③添加配置信息 odps.project.name
odps.access.id
odps.access.key
odps.end.point 1、数据准备
create TABLE dwd_sl_user_ids(
user_name STRING COMMENT 用户
,user_id STRING COMMENT 用户id
,device_id STRING COMMENT 设备号
,id_card STRING COMMENT 身份证号
,phone STRING COMMENT 电话号
,pay_id STRING COMMENT 支付账号
,ssoid STRING COMMENT APPID
) PARTITIONED BY (
ds BIGINT
)
; INSERT OVERWRITE TABLE dwd_sl_user_ids PARTITION(ds20230818)
VALUES
(大法_官网,1,device_a,130826,185133,zhi1111,U130311)
,(大神_官网,2,device_b,220317,165133,zhi2222,)
,(耀总_官网,3,,310322,133890,zhi3333,U120311)
,(大法_app,1,device_x,130826,,zhi1111,)
,(大神_app,2,device_b,220317,165133,,)
,(耀总_app,,,,133890,zhi333,U120311)
,(大法_小程序,,device_x,130826,,,U130311)
,(大神_小程序,2,device_b,220317,165133,,U140888)
,(耀总_小程序,,,310322,133890,,U120311)
; 结果表
create TABLE itsl_dev.dwd_patient_oneid_info_df(
oneid STRING COMMENT 生成的ONEID
,id STRING COMMENT 用户的各类id
,id_hashcode STRING COMMENT 用户各类ID的id_hashcode
,guid STRING COMMENT 聚合的guid
,guid_hashcode STRING COMMENT 聚合的guid_hashcode
)PARTITIONED BY (
ds BIGINT
); 2、代码准备
①pom.xml
?xml version1.0 encodingUTF-8?project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.gwm/groupIdartifactIdgraph/artifactIdversion1.0-SNAPSHOT/versionnamegraph/name!-- FIXME change it to the projects website --urlhttp://www.example.com/urlpropertiesproject.build.sourceEncodingUTF-8/project.build.sourceEncodingmaven.compiler.source1.8/maven.compiler.sourcemaven.compiler.target1.8/maven.compiler.targetspark.version2.3.0/spark.versionjava.version1.8/java.versioncupid.sdk.version3.3.8-public/cupid.sdk.versionscala.version2.11.8/scala.versionscala.binary.version2.11/scala.binary.version/propertiesdependenciesdependencygroupIdjunit/groupIdartifactIdjunit/artifactIdversion4.11/versionscopetest/scope/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-sql_2.11/artifactIdversion${spark.version}/version
!-- scopeprovided/scope--/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-core_2.11/artifactIdversion${spark.version}/version
!-- scopeprovided/scope--/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-graphx_2.11/artifactIdversion${spark.version}/version
!-- scopeprovided/scope--/dependencydependencygroupIdcom.thoughtworks.paranamer/groupIdartifactIdparanamer/artifactIdversion2.8/version
!-- scopeprovided/scope--/dependency!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common --dependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-common/artifactIdversion2.6.5/version
!-- scopeprovided/scope--/dependencydependencygroupIdcom.aliyun.odps/groupIdartifactIdcupid-sdk/artifactIdversion${cupid.sdk.version}/versionscopeprovided/scope/dependency!-- dependency--!-- groupIdcom.aliyun.odps/groupId--!-- artifactIdhadoop-fs-oss/artifactId--!-- version${cupid.sdk.version}/version--!-- /dependency--dependencygroupIdcom.aliyun.odps/groupIdartifactIdodps-spark-datasource_${scala.binary.version}/artifactIdversion${cupid.sdk.version}/versionscopeprovided/scope/dependency!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --dependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.73/version/dependencydependencygroupIdcommons-codec/groupIdartifactIdcommons-codec/artifactIdversion1.13/version/dependencydependencygroupIdcommons-lang/groupIdartifactIdcommons-lang/artifactIdversion2.6/version/dependency/dependencies!-- build--!-- pluginManagementlt;!ndash; lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) ndash;gt;--!-- plugins--!-- lt;!ndash; clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle ndash;gt;--!-- plugin--!-- artifactIdmaven-clean-plugin/artifactId--!-- version3.1.0/version--!-- /plugin--!-- lt;!ndash; default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging ndash;gt;--!-- plugin--!-- artifactIdmaven-resources-plugin/artifactId--!-- version3.0.2/version--!-- /plugin--!-- plugin--!-- artifactIdmaven-compiler-plugin/artifactId--!-- version3.8.0/version--!-- /plugin--!-- plugin--!-- artifactIdmaven-surefire-plugin/artifactId--!-- version2.22.1/version--!-- /plugin--!-- plugin--!-- artifactIdmaven-jar-plugin/artifactId--!-- version3.0.2/version--!-- /plugin--!-- plugin--!-- artifactIdmaven-install-plugin/artifactId--!-- version2.5.2/version--!-- /plugin--!-- plugin--!-- artifactIdmaven-deploy-plugin/artifactId--!-- version2.8.2/version--!-- /plugin--!-- lt;!ndash; site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle ndash;gt;--!-- plugin--!-- artifactIdmaven-site-plugin/artifactId--!-- version3.7.1/version--!-- /plugin--!-- plugin--!-- artifactIdmaven-project-info-reports-plugin/artifactId--!-- version3.0.0/version--!-- /plugin--!-- plugin--!-- groupIdorg.scala-tools/groupId--!-- artifactIdmaven-scala-plugin/artifactId--!-- version2.15.2/version--!-- executions--!-- execution--!-- goals--!-- goalcompile/goal--!-- goaltestCompile/goal--!-- /goals--!-- /execution--!-- /executions--!-- /plugin--!-- /plugins--!-- /pluginManagement--!-- /build--buildpluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-assembly-plugin/artifactIdversion3.1.1/versionconfigurationarchivemanifestmainClasscom.gwm.OdpsGraphx/mainClass/manifest/archivedescriptorRefsdescriptorRefjar-with-dependencies/descriptorRef/descriptorRefs/configurationexecutionsexecutionidmake-assembly/idphasepackage/phasegoalsgoalsingle/goal/goals/execution/executions/pluginplugingroupIdorg.scala-tools/groupIdartifactIdmaven-scala-plugin/artifactIdversion2.15.2/versionexecutionsexecutiongoalsgoalcompile/goalgoaltestCompile/goal/goals/execution/executions/plugin/plugins/build
/project②代码
package com.gwmimport java.math.BigInteger
import java.text.SimpleDateFormat
import java.util.Calendarimport org.apache.commons.codec.digest.DigestUtils
import org.apache.spark.SparkConf
import org.apache.spark.graphx.{Edge, Graph}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.spark_project.jetty.util.StringUtilimport scala.collection.mutable.ListBuffer/*** author yangyingchun* date 2023/8/18 10:32* version 1.0*/
object OneID {val sparkConf (new SparkConf).setAppName(OdpsGraph).setMaster(local[1])sparkConf.set(spark.hadoop.odps.access.id, yours access.id )sparkConf.set(spark.hadoop.odps.access.key, yours access.key)sparkConf.set(spark.hadoop.odps.end.point, yours end.point)sparkConf.set(spark.hadoop.odps.project.name, yours project.name)sparkConf.set(spark.sql.catalogImplementation, hive) //in-memory 2.4.5以上hiveval spark SparkSession.builder.appName(Oneid).master(local[1]).config(spark.sql.broadcastTimeout, 1200L).config(spark.sql.crossJoin.enabled, true).config(odps.exec.dynamic.partition.mode, nonstrict).config(sparkConf).getOrCreateval sc spark.sparkContextdef main(args: Array[String]): Unit {val bizdateargs(0)val c Calendar.getInstanceval format new SimpleDateFormat(yyyyMMdd)c.setTime(format.parse(bizdate))c.add(Calendar.DATE, -1)val bizlastdate format.format(c.getTime)println(s 时间参数 ${bizdate} ${bizlastdate})// dwd_sl_user_ids 就是我们用户的各个ID 也就是我们的数据源// 获取字段这样我们就可以扩展新的ID 字段但是不用更新代码val columns spark.sql(s|select| *|from| itsl.dwd_sl_user_ids|where| ds${bizdate}|limit| 1|.stripMargin).schema.fields.map(f f.name).filterNot(ee.equals(ds)).toListprintln(字段信息columns)// 获取数据val dataFrame spark.sql(s|select| ${columns.mkString(,)}|from| itsl.dwd_sl_user_ids|where| ds${bizdate}|.stripMargin)// 数据准备val data dataFrame.rdd.map(row {val list new ListBuffer[String]()for (column - columns) {val value row.getAs[String](column)list.append(value)}list.toList})import spark.implicits._// 顶点集合val veritx data.flatMap(list {for (i - 0 until columns.length if StringUtil.isNotBlank(list(i)) (!null.equals(list(i))))yield (new BigInteger(DigestUtils.md5Hex(list(i)),16).longValue, list(i))}).distinctval veritxDFveritx.toDF(id_hashcode,id)veritxDF.createOrReplaceTempView(veritx)// 生成边的集合val edges data.flatMap(list {for (i - 0 to list.length - 2 if StringUtil.isNotBlank(list(i)) (!null.equals(list(i))); j - i 1 to list.length - 1 if StringUtil.isNotBlank(list(j)) (!null.equals(list(j))))yield Edge(new BigInteger(DigestUtils.md5Hex(list(i)),16).longValue,new BigInteger(DigestUtils.md5Hex(list(j)),16).longValue, )}).distinct// 开始使用点集合与边集合进行图计算训练val graph Graph(veritx, edges)//计算每个顶点的连接组件成员身份并返回具有该顶点的图值该值包含包含该顶点的连接组件中的最低顶点id,迭代次数 控制迭代次数//var vertices: DataFrame ConnectedComponents.run(graph, 2).vertices.toDF(id_hashcode, guid_hashcode)val connectedGraphgraph.connectedComponents()// 连通节点val vertices connectedGraph.vertices.toDF(id_hashcode,guid_hashcode)vertices.createOrReplaceTempView(to_graph)// 加载昨日的oneid 数据 (oneid,id,id_hashcode)val ye_oneid spark.sql(s|select| oneid,id,id_hashcode|from| itsl.dwd_patient_oneid_info_df|where| ds${bizlastdate}|.stripMargin)ye_oneid.createOrReplaceTempView(ye_oneid)// 关联获取 已经存在的 oneid这里的min 函数就是我们说的oneid 的选择问题val exists_oneidspark.sql(|select| a.guid_hashcode,min(b.oneid) as oneid|from| to_graph a|inner join| ye_oneid b|on| a.id_hashcodeb.id_hashcode|group by| a.guid_hashcode|.stripMargin)exists_oneid.createOrReplaceTempView(exists_oneid)var result: DataFrame spark.sql(s|select| nvl(b.oneid,md5(cast(a.guid_hashcode as string))) as oneid,c.id,a.id_hashcode,d.id as guid,a.guid_hashcode,${bizdate} as ds|from| to_graph a|left join| exists_oneid b|on| a.guid_hashcodeb.guid_hashcode|left join| veritx c|on| a.id_hashcodec.id_hashcode|left join| veritx d|on| a.guid_hashcoded.id_hashcode|.stripMargin)// 不存在则生成 存在则取已有的 这里nvl 就是oneid 的更新逻辑存在则获取 不存在则生成var resultFrame: DataFrame result.toDF()resultFrame.show()resultFrame.write.mode(SaveMode.Append).partitionBy(ds).saveAsTable(dwd_patient_oneid_info_df)sc.stop}
}③ 本地运行必须增加resources信息 3、问题解决
①Exception in thread main java.lang.IllegalArgumentException: Error while instantiating org.apache.spark.sql.hive.HiveSessionStateBuilder:
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.hive.HiveSessionStateBuilder 缺少Hive相关依赖增加
dependencygroupIdorg.apache.spark/groupIdartifactIdspark-hive_2.11/artifactIdversion${spark.version}/version!-- scopeprovided/scope--
/dependency
但其实针对odps不需要加此依赖只需要按0步配置好环境即可
②Exception in thread main org.apache.spark.sql.AnalysisException: Table or view not found: itsl.dwd_sl_user_ids; line 5 pos 3;
需要按照 0 步中按照要求完成环境准备
③Exception in thread main org.apache.spark.sql.AnalysisException: The format of the existing table itsl.dwd_patient_oneid_info_df is OdpsTableProvider. It doesnt match the specified format ParquetFileFormat.; 解决ALTER TABLE dwd_patient_oneid_info_df SET FILEFORMAT PARQUET;
本地读写被禁用 需要上线解决 4、打包上传
①需取消 .master(local[1])
②取消maven依赖 ③odps.conf不能打包建临时文件不放在resources下 本地测试时放resources下 参考用户画像之ID-Mapping_id mapping_大数据00的博客-CSDN博客 上线报
org.apache.spark.sql.AnalysisException: Table or view not found: itsl.dwd_sl_user_ids; line 5 pos 3;
原因是本节③ 5、运行及结果 结果
oneid id id_hashcode guid guid_hashcode ds 598e7008ffc3c6adeebd4d619e2368f3 耀总_app 8972546956853102969 133890 -9124021106546307510 20230818 598e7008ffc3c6adeebd4d619e2368f3 310322 1464684454693316922 133890 -9124021106546307510 20230818 598e7008ffc3c6adeebd4d619e2368f3 zhi333 6097391781232248718 133890 -9124021106546307510 20230818 598e7008ffc3c6adeebd4d619e2368f3 3 2895972726640982771 133890 -9124021106546307510 20230818 598e7008ffc3c6adeebd4d619e2368f3 耀总_小程序 -6210536828479319643 133890 -9124021106546307510 20230818 598e7008ffc3c6adeebd4d619e2368f3 zhi3333 -2388340305120644671 133890 -9124021106546307510 20230818 598e7008ffc3c6adeebd4d619e2368f3 133890 -9124021106546307510 133890 -9124021106546307510 20230818 598e7008ffc3c6adeebd4d619e2368f3 耀总_官网 -9059665468531982172 133890 -9124021106546307510 20230818 598e7008ffc3c6adeebd4d619e2368f3 U120311 -2948409726589830290 133890 -9124021106546307510 20230818 d39364f7fb05a0729646a766d6d43340 U140888 -8956123177900303496 U140888 -8956123177900303496 20230818 d39364f7fb05a0729646a766d6d43340 大神_官网 7742134357614280661 U140888 -8956123177900303496 20230818 d39364f7fb05a0729646a766d6d43340 220317 4342975012645585979 U140888 -8956123177900303496 20230818 d39364f7fb05a0729646a766d6d43340 device_b 934146606527688393 U140888 -8956123177900303496 20230818 d39364f7fb05a0729646a766d6d43340 165133 -8678359668161914326 U140888 -8956123177900303496 20230818 d39364f7fb05a0729646a766d6d43340 大神_app 3787345307522484927 U140888 -8956123177900303496 20230818 d39364f7fb05a0729646a766d6d43340 大神_小程序 8356079890110865354 U140888 -8956123177900303496 20230818 d39364f7fb05a0729646a766d6d43340 2 8000222017881409068 U140888 -8956123177900303496 20230818 d39364f7fb05a0729646a766d6d43340 zhi2222 8743693657758842828 U140888 -8956123177900303496 20230818 34330e92b91e164549cf750e428ba9cd 130826 -5006751273669536424 大法_app -7101862661925406891 20230818 34330e92b91e164549cf750e428ba9cd device_a -3383445179222035358 大法_app -7101862661925406891 20230818 34330e92b91e164549cf750e428ba9cd 1 994258241967195291 大法_app -7101862661925406891 20230818 34330e92b91e164549cf750e428ba9cd device_x 3848069073815866650 大法_app -7101862661925406891 20230818 34330e92b91e164549cf750e428ba9cd zhi1111 7020506831794259850 大法_app -7101862661925406891 20230818 34330e92b91e164549cf750e428ba9cd 185133 -2272106561927942561 大法_app -7101862661925406891 20230818 34330e92b91e164549cf750e428ba9cd 大法_app -7101862661925406891 大法_app -7101862661925406891 20230818 34330e92b91e164549cf750e428ba9cd U130311 5694117693724929174 大法_app -7101862661925406891 20230818 34330e92b91e164549cf750e428ba9cd 大法_官网 -4291733115832359573 大法_app -7101862661925406891 20230818 34330e92b91e164549cf750e428ba9cd 大法_小程序 -5714002662175910850 大法_app -7101862661925406891 20230818 6、思考
如果联通图是循环的怎么处理呢A是B的朋友B是C的朋友C是A的朋友