Spark Connector Reader 原理与实践是怎样的
本篇文章为大家展示了Spark Connector Reader 原理与实践是怎样的,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。
下面主要讲述如何利用 Spark Connector 进行 Nebula Graph 数据的读取。
Spark Connector 简介
Spark Connector 是一个 Spark 的数据连接器,可以通过该连接器进行外部数据系统的读写操作,Spark Connector 包含两部分,分别是 Reader 和 Writer,而本文侧重介绍 Spark Connector Reader,Writer 部分将在下篇和大家详聊。
Spark Connector Reader 原理
Spark Connector Reader 是将 Nebula Graph 作为 Spark 的扩展数据源,从 Nebula Graph 中将数据读成 DataFrame,再进行后续的 map、reduce 等操作。
Spark SQL 允许用户自定义数据源,支持对外部数据源进行扩展。通过 Spark SQL 读取的数据格式是以命名列方式组织的分布式数据集 DataFrame,Spark SQL 本身也提供了众多 API 方便用户对 DataFrame 进行计算和转换,能对多种数据源使用 DataFrame 接口。
Spark 调用外部数据源包的是 org.apache.spark.sql
,首先了解下 Spark SQL 提供的的扩展数据源相关的接口。
Basic Interfaces
BaseRelation:表示具有已知 Schema 的元组集合。所有继承 BaseRelation 的子类都必须生成 StructType 格式的 Schema。换句话说,BaseRelation 定义了从数据源中读取的数据在 Spark SQL 的 DataFrame 中存储的数据格式的。
RelationProvider:获取参数列表,根据给定的参数返回一个新的 BaseRelation。
DataSourceRegister:注册数据源的简写,在使用数据源时不用写数据源的全限定类名,而只需要写自定义的 shortName 即可。
Providers
RelationProvider:从指定数据源中生成自定义的 relation。
createRelation()
会基于给定的 Params 参数生成新的 relation。SchemaRelationProvider:可以基于给定的 Params 参数和给定的 Schema 信息生成新的 Relation。
RDD
RDD[InternalRow]: 从数据源中 Scan 出来后需要构造成 RDD[Row]
要实现自定义 Spark 外部数据源,需要根据数据源自定义上述部分方法。
在 Nebula Graph 的 Spark Connector 中,我们实现了将 Nebula Graph 作为 Spark SQL 的外部数据源,通过 sparkSession.read
形式进行数据的读取。该功能实现的类图展示如下:
定义数据源 NebulaRelatioProvider,继承 RelationProvider 进行 relation 自定义,继承 DataSourceRegister 进行外部数据源的注册。
定义 NebulaRelation 定义 Nebula Graph 的数据 Schema 和数据转换方法。在
getSchema()
方法中连接 Nebula Graph 的 Meta 服务获取配置的返回字段对应的 Schema 信息。定义 NebulaRDD 进行 Nebula Graph 数据的读取。
compute()
方法中定义如何读取 Nebula Graph 数据,主要涉及到进行 Nebula Graph 数据 Scan、将读到的 Nebula Graph Row 数据转换为 Spark 的 InternalRow 数据,以 InternalRow 组成 RDD 的一行,其中每一个 InternalRow 表示 Nebula Graph 中的一行数据,最终通过分区迭代的形式将 Nebula Graph 所有数据读出组装成最终的 DataFrame 结果数据。
Spark Connector Reader 实践
Spark Connector 的 Reader 功能提供了一个接口供用户编程进行数据读取。一次读取一个点/边类型的数据,读取结果为 DataFrame。
下面开始实践,拉取 GitHub 上 Spark Connector 代码:
git clone -b v1.0 git@github.com:vesoft-inc/nebula-java.git cd nebula-java/tools/nebula-spark mvn clean compile package install -Dgpg.skip -Dmaven.javadoc.skip=true
将编译打成的包 copy 到本地 Maven 库。
应用示例如下:
在 mvn 项目的 pom 文件中加入
nebula-spark
依赖
<dependency> <groupId>com.vesoft</groupId> <artifactId>nebula-spark</artifactId> <version>1.1.0</version> </dependency>
在 Spark 程序中读取 Nebula Graph 数据:
// 读取 Nebula Graph 点数据 val vertexDataset: Dataset[Row] = spark.read .nebula("127.0.0.1:45500", "spaceName", "100") .loadVerticesToDF("tag", "field1,field2") vertexDataset.show() // 读取 Nebula Graph 边数据 val edgeDataset: Dataset[Row] = spark.read .nebula("127.0.0.1:45500", "spaceName", "100") .loadEdgesToDF("edge", "*") edgeDataset.show()
配置说明:
nebula(address: String, space: String, partitionNum: String)
address:可以配置多个地址,以英文逗号分割,如“ip1:45500,ip2:45500” space: Nebula Graph 的 graphSpace partitionNum: 设定spark读取Nebula时的partition数,尽量使用创建 Space 时指定的 Nebula Graph 中的 partitionNum,可确保一个Spark的partition读取Nebula Graph一个part的数据。
loadVertices(tag: String, fields: String)
tag:Nebula Graph 中点的 Tag fields:该 Tag 中的字段,,多字段名以英文逗号分隔。表示只读取 fields 中的字段,* 表示读取全部字段
loadEdges(edge: String, fields: String)
edge:Nebula Graph 中边的 Edge fields:该 Edge 中的字段,多字段名以英文逗号分隔。表示只读取 fields 中的字段,* 表示读取全部字段
上述内容就是Spark Connector Reader 原理与实践是怎样的,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注蜗牛博客行业资讯频道。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:niceseo99@gmail.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
评论