- Dec 2024
-
book.originit.top book.originit.top
-
Spark与hive的集成分为两种,spark with hive和hive on spark
-
spark cli需要与hive部署在一起,但生产环境hive都是独立部署,因此spark cli的方式一般不使用
-
-
book.originit.top book.originit.top
-
当多个rdd的数据进行交互的时候,rdd前一步的计算会进行shuffle,并shuffle到对应的executor上
-
倾斜分区判定完毕之后,下一步,就是根据 advisoryPartitionSizeInBytes 参数指定的目标尺寸,对大分区进行拆分
将倾斜分区拆分成多个分区
-
Spark SQL 必须要仰仗运行时的执行状态,而 Shuffle 中间文件,则是这些状态的唯一来源。
AQE需要获取数据的统计信息来决定是否将数据进行广播
-
-
book.originit.top book.originit.top
-
Shuffle 在 Map 阶段往往会对数据做排序,而这恰恰正中 SMJ 机制的下怀。对于已经排好序的两张表,SMJ 的复杂度是 O(M + N),这样的执行效率与 HJ 的 O(M) 可以说是不相上下。再者,SMJ 在执行稳定性方面,远胜于 HJ,在内存受限的情况下,SMJ 可以充分利用磁盘来顺利地完成关联计算。因此,考虑到 Shuffle SMJ 的诸多优势,Shuffle HJ 就像是关公后面的周仓,Spark SQL 向来对之视而不见,所以对于 HJ 你大概知道它的作用就行。
由于 Shuffle 会对分区内的数据进行排序,因此 SMJ性能和 HJ 一样,那 Shuffle HJ 就没有优势了
-
在任何情况下,不论数据的体量是大是小、不管内存是否足够,Shuffle Join 在功能上都能够“不辱使命”,成功地完成数据关联的计算。
那如果是非等值条件关联查询呢?
-
与前两者相比,Nested Loop Join 看上去有些多余,嵌套的双层 for 循环带来的计算复杂度最高:O(M * N)。不过,尺有所短寸有所长,执行高效的 HJ 和 SMJ 只能用于等值关联,也就是说关联条件必须是等式,像 salaries(“id”) < employees(“id”) 这样的关联条件,HJ 和 SMJ 是无能为力的
非等值的连表关联还得使用 NLJ
-
-
book.originit.top book.originit.top
-
考虑到 SMJ 对于排序的苛刻要求,后来又有人推出了 HJ 算法。HJ 的设计初衷是以空间换时间,力图将基表扫描的计算复杂度降低至 O(1)。
使用 Hash算法做关联似乎在大数据领域比较好用
-
-
book.originit.top book.originit.top
-
我们知道,使用 Java Object 来存储数据会引入大量额外的存储开销。为此,Tungsten 设计并实现了一种叫做 Unsafe Row 的二进制数据结构。Unsafe Row 本质上是字节数组,它以极其紧凑的格式来存储 DataFrame 的每一条数据记录,大幅削减存储开销,从而提升数据的存储与访问效率。
使用 Java对象会存在大量的存储开销,因此使用本地内存加 byte数组进行存储
-
而使用 DataFrame API 开发的应用,则会先过一遍 Spark SQL,由 Spark SQL 优化过后再交由 Spark Core 去做执行
DataFrame 使用的 Spark SQL 进行解析执行,因此其也存在逻辑计划、物理计划
-
-
book.originit.top book.originit.top
-
数据块的概念与 RDD 数据分区(Partitions)是一致的
通过blockId能够获取需要的数据,无论在内存还是磁盘中
-
LinkedHashMap[BlockId, MemoryEntry]。
spark内存数据存储结构
-
Spark 存储系统负责维护所有暂存在内存与磁盘中的数据,这些数据包括 Shuffle 中间文件、RDD Cache 以及广播变量。
spark存储服务的对象
-
-
book.originit.top book.originit.top
-
广播变量与累加器的区别在于一个是共享读取,一个是共享写入
-
Executor是进行干活的工人,而task是spark计算的最小粒度,工人不断地执行task来完成每一阶段的task然后转向下一阶段,直到所有的阶段执行完毕
-
如果使用广播变量,那么数据只会在driver和executor上都存一份,task在executor上执行,如果使用到广播变量则直接从executor上获取即可
-
如果在任务执行的过程中引用了外部的变量,那么driver需要将变量传输给每一个相关的task,因此传输开销很大
-
-
book.originit.top book.originit.top
-
coalesce 会降低同一个 stage 计算的并行度,导致 cpu 利用率不高,任务执行时间变长。我们目前有一个实现是需要将最终的结果写成单个 avro 文件,前面的转换过程可能是各种各样的,我们在最后阶段加上 repartition(1).write().format('avro').mode('overwrite').save('path')。最近发现有时前面的转换过程中有排序时,使用 repartition(1) 有时写得单文件顺序不对,使用 coalesce(1) 顺序是对的,但 coalesce(1) 有性能问题。目前想到可以 collect 到 d
-
collect 算子有两处性能隐患,一个是拉取数据过程中引入的网络开销,另一个 Driver 的 OOM(内存溢出,Out of Memory)
收集数据会导致Driver的内存占用
-
其中 spark.executor.memory 是绝对值,它指定了 Executor 进程的 JVM Heap 总大小。另外两个配置项,spark.memory.fraction 和 spark.memory.storageFraction 都是比例值,它们指定了划定不同区域的空间占比。spark.memory.fraction 用于标记 Spark 处理分布式数据集的内存总大小,这部分内存包括 Execution Memory 和 Storage Memory 两部分,也就是图中绿色的矩形区域。(M – 300)* (1 – mf)刚好就是 User Memory 的区域大小,也就是图中蓝色区域的部分。spark.memory.storageFraction 则用来进一步区分 Execution Memory 和 Storage Memory 的初始大小。我们之前说过,Reserved Memory 固定为 300MB。(M – 300)* mf * sf 是 Storage Memory 的初始大小,相应地,(M – 300)* mf * (1 – sf)就是 Execution Memory 的初始大小。
内存划分配置,Reserved Memory为300M,总内存量为M,剩余内存M - 300,然后划分为User Memory(1 - mf)和共享内存[Execution Memory+ Storage Memory(mf * sf)]
-
当一个 RDD 在代码中的引用次数大于 1 时,你可以考虑通过给 RDD 加 Cache 来提升作业性能
Cache并不是自动的,而是人手动添加的,将RDD物化到内存中
-
对于 Storage Memory 抢占的 Execution Memory 部分,当分布式任务有计算需要时,Storage Memory 必须立即归还抢占的内存,涉及的缓存数据要么落盘、要么清除;对于 Execution Memory 抢占的 Storage Memory 部分,即便 Storage Memory 有收回内存的需要,也必须要等到分布式任务执行完毕才能释放。
Execution Memory在共享内存中的优先级更高,如果Storage Memory抢占了EM的内存且EM的内存需要则必须归还
-
在 1.6 版本之后,Spark 推出了统一内存管理模式,在这种模式下,Execution Memory 和 Storage Memory 之间可以相互转化
内存共享
-
sortByKey:排序
排序算子与其他聚合算子不同https://blog.csdn.net/raintungli/article/details/73663733
-
aggregateByKey
在Map端聚合后,在Reduce端基于Map端的聚合结果再进行聚合,从而实现在Map端减少传输的数据量,而在Reduce端又能实现目的
-
一个 Map 端聚合函数 f1,以及一个 Reduce 端聚合函数 f2
指定Map、Reduce端的聚合函数
-
从图中你可以看出来,尽管 reduceByKey 也会引入 Shuffle,但相比 groupByKey 以全量原始数据记录的方式消耗磁盘与网络,reduceByKey 在落盘与分发之前,会先在 Shuffle 的 Map 阶段做初步的聚合计算。比如,在数据分区 0 的处理中,在 Map 阶段,reduceByKey 把 Key 同为 Streaming 的两条数据记录聚合为一条,聚合逻辑就是由函数 f 定义的、取两者之间 Value 较大的数据记录,这个过程我们称之为“Map 端聚合”。相应地,数据经由网络分发之后,在 Reduce 阶段完成的计算,我们称之为“Reduce 端聚合”。
Map端聚合能够减少数据的分发数量(原本需要传输原数据,而聚合后则可以将多条数据合并为聚合数据)
-
对于所有 Map Task 生成的中间文件,Reduce Task 需要通过网络从不同节点的硬盘中下载并拉取属于自己的数据内容。不同的 Reduce Task 正是根据 index 文件中的起始索引来确定哪些数据内容是“属于自己的”。Reduce 阶段不同于 Reduce Task 拉取数据的过程,往往也被叫做 Shuffle Read。
shuffle过程并不是Map阶段的节点将数据发送给Reduce阶段的节点,而是Reduce节点主动去Map阶段的节点上拉取数据,根据index文件来获取自己需要的数据
-
(Reduce Task Partition ID,Record Key)
基于这个逐渐可以将相同分区相同key的数据进行合并
-
当 Map 结构被灌满之后,Spark 根据主键对 Map 中的数据记录做排序,然后把所有内容溢出到磁盘中的临时文件
基于溢出机制,在受限内存中处理大量数据,将处理过程中的部分结果进行排序写入临时文件,最终将所有文件进行归并排序得到任务最终的data文件和index文件
-
shuffle过程不需要直接进行排序,而只需要基于key进行hash分区然后分发。而分发完成后,在进行排序聚合是合理的
-
Map 阶段与 Reduce 阶段的计算过程相对清晰明了,二者都是利用 reduce 运算完成局部聚合与全局聚合。在 reduceByKey 的计算过程中,Shuffle 才是关键。
在Shuffle的时候,Map阶段可以进行局部聚合,而局部聚合后可以确定第一轮数据分发的节点,基于hash或者什么算法进行分区后,再一次进行全局聚合,从而将相同key的数据进行聚合
-
在不同的工地上有不同类型的砖块,需要将相同类型的砖块分发给对应的节点,因此需要砖头在集群范围内跨节点、跨进程的数据分发
数据在不同节点上处理之后,需要基于key进行分发
-
SchedulerBackend 与集群内所有 Executors 中的 ExecutorBackend 保持周期性通信,双方通过 LaunchedExecutor、RemoveExecutor、StatusUpdate 等消息来互通有无、变更可用计算资源
SchedulerBackend通过Executor上的agent ExecutorBackend获取机器上的计算资源信息.ExecutorBackend还负责执行代码
-
像上面这种定向到计算节点粒度的本地性倾向,Spark 中的术语叫做 NODE_LOCAL。除了定向到节点,Task 还可以定向到进程(Executor)、机架、任意地址,它们对应的术语分别是 PROCESS_LOCAL、RACK_LOCAL 和 ANY。
本地倾向性,包括机架感知、节点感知、进程感知,进程感知最高
-
以 Actions 算子为起点,从后向前回溯 DAG,以 Shuffle 操作为边界去划分 Stages
构建DAG时,会将shuffle之间的一系列RDD操作划分为一个TaskSets,他们之间是有依赖关系的,但是并不需要进行shuffle
-
DAGScheduler 是任务调度的发起者,DAGScheduler 以 TaskSet 为粒度,向 TaskScheduler 提交任务调度请求
TaskScheduler对任务进行调度和分配资源,资源信息WorkOffer来源于SchedulerBackend,任务信息TaskSets来源于TaskScheduler
-
-
rustwiki.org rustwiki.org
-
这里有一种特殊的生命周期值得讨论:'static,其生命周期能够存活于整个程序期间。
static变量也是
-
-
rustwiki.org rustwiki.org
-
现在换一种方式思考这个关系,父节点应该拥有其子节点:如果父节点被丢弃了,其子节点也应该被丢弃。然而子节点不应该拥有其父节点:如果丢弃子节点,其父节点应该依然存在。这正是弱引用的例子!
如果父节点没人引用了,而子节点还有人引用,这时候可能父节点就会直接销毁掉,看起来挺奇怪的
-
-
rustwiki.org rustwiki.org
-
结合 Rc<T> 和 RefCell<T> 来拥有多个可变数据所有者
一个地方修改每个引用可见
-
在任意给定时刻,只能拥有一个可变引用或任意数量的不可变引用 之一(而不是两者)。
- 要么一个可变引用
- 要么0个可变引用 + 1或多个不可变引用
-
-
rustwiki.org rustwiki.org
-
这里的drop方法是用于提前清理Drop Trait的
-
-
rustwiki.org rustwiki.org
-
那Box是如何销毁掉堆上的对象的呢?
-
所以 Message 值需要的最大空间是存储其最大成员所需的空间大小。
对于枚举其实和union类似,取需要最大空间的一个类型的空间即可
-
使用 Box<T> 指向堆上的数据
Box感觉可以理解为Reference
-
当希望拥有一个值并只关心它的类型是否实现了特定 trait 而不是其具体类型的时候
支持上转型?
-
当有大量数据并希望在确保数据不被拷贝的情况下转移所有权的时候
转移所有权并不会被拷贝,拷贝也不会转移所有权,所以这里是一大堆的数据相互关联,然后通过转移或者拷贝栈上的引用来转移所有权?
-
当有一个在编译时未知大小的类型,而又想要在需要确切大小的上下文中使用这个类型值的时候
由于栈上的数据是需要知道确切大小的,因此如果大小不确定则只能放在堆上,而栈上使用引用
Tags
Annotators
URL
-
-
ddia.qtmuniao.com ddia.qtmuniao.com
-
版本向量
看不懂
-
r 和 w 都常都选择超过半数,如 (n+1)/2
由于r和w都超过半数,因此是能够保证写入和读取是最新的
-
由于 w + r > n 时,总会至少有一个节点(读写子集至少有一个节点的交集)
因为读写的节点比总结点多,读的节点中一定会存在上一次写的节点,但是上一次写的节点能保证写入时这些节点相对那些不写的节点更新吗
-
两个有因果依赖的(先插入,后更新)的语句,在复制到 Leader 2 时,由于速度不同,导致其接收到的数据违反了因果一致性。
由于支持多点写入,那么多个操作之间如果有依赖关系,但是后面的操作写入时写入的还未同步的节点,那么就会出问题
-
多主模型应用场景
实际上应该会在业务上进行妥协,对不同数据中心的用户或者数据进行隔离。或者直接使用一个数据中心也不是不可以,比如国内访问油管发送评论,那应该是在同一个数据中心进行的操作。如果是离线业务,那么就更不需要多主写入了
-
让所有有因果关系的事件路由到一个分区。
分区策略,如基于hash分区或者前缀分区
-
写后读保证的是写后读顺序,单调读保证的是多次读之间的顺序。
由于不同副本的同步进度不同,因此多次读取如果是不同节点,则可能出现不同的结果
-
客户端记下本客户端上次改动时的时间戳,在读从副本时,利用此时间戳来看某个从副本是否已经同步了改时间戳之前内容
记录客户端的上一次修改,从而保证从已经同步该修改的节点上获取数据
-
字段标号不能修改,只能追加。这样旧代码在看到不认识的标号时,省略即可。
向前兼容
-
在更改模式时(比如 alter table),数据库不允许增加既没有默认值、也不允许为空的列
添加列时向后兼容
-
Avro
Avro相对Protobuf和Thrift来讲没有使用字段标号(原本是为了标识数据属于哪个字段),也没有显示指定类型,其会显式写入write schema版本,同时解析时使用read schema读取获取数据类型等信息
-
向后兼容:新加的字段需为 optional。这样在解析旧数据时,才不会出现字段缺失的情况。
数据兼容
-
- Nov 2024
-
ddia.qtmuniao.com ddia.qtmuniao.com
-
列式存储的写入
所以列式存储的底层是使用什么数据结构?看起来像是顺序存储的样子
-
不同副本,不同排序
好主意,不过似乎对于复杂查询实际上帮助不大,毕竟只能按三个列的顺序存储
-
不可能同时对多列进行排序。因为我们需要维护多列间的下标间的对应关系,才可能按行取数据。
这是很疑惑的地方,每一列分开存储,应该可以分开排序才对,同时存储对应的行索引即可,而主键列也同样如此。这样每个列都进行顺序存储,并且行索引顺序且与列值位置映射,是否能实现这样的功能呢?只是说会额外浪费一定的空间,同时对于多列的排序无法实现
-
聚集索引(clustered index)
感觉聚簇索引相对来讲在读取多个数据节点时顺序读效率更高
-
性能优化
LSM-Tree进行范围查询性能很差,需要遍历所有的SSTable
-
图模型和网络模型
网状模型相对图模型有着更严格的约束,而图模型则更宽松可扩展,不过看起来图的查询效率依然很低
-
无论是 BFS、DFS 还是剪枝等实现细节
图的查询一般基于图的遍历操作,只是基于相关元数据进行了剪枝加速
-
数据类型和结构由外部决定,你没办法控制数据的变化。
既然无法确定数据类型,那么能对数据做的操作其实也有限,只能在应用能够通过配置操作、计算指定列数据时能够很好的利用。否则只能存储。
-
关系模型
相对网状模型进一步抽象,使得访问方式统一,同时基于用户需求统一生成执行计划进行数据访问
-
元组(tuples)
奇特的视角
-
在每一层,通过对外暴露简洁的数据模型,我们隔离和分解了现实世界的复杂度。
通过分层简化问题
-
弱 Schema(读时解析)
非关系型数据库比如hive使用
-
强 Schema(写时约束)
一般关系型数据库使用
-
-
book.originit.top book.originit.top
-
周同步,每
test
-
-
time.geekbang.org time.geekbang.org
-
大数据
测试111
-