Iceberg:COW模式下的MERGEINTO的执行流程
- 开源代码
- 2025-07-21 19:05:58

MergeInto命令 MERGE INTO target_table t USING source_table s ON s.id = t.id //这里是JOIN的关联条件 WHEN MATCHED AND s.opType = 'delete' THEN DELETE // WHEN条件是对当前行进行打标的匹配条件 WHEN MATCHED AND s.opType = 'update' THEN UPDATE SET id = s.id, name = s.name WHEN NOT MATCHED AND s.opType = 'insert' THEN INSERT (key, value) VALUES (key, value)
如上是一条MERGE INTO语句,经过Spark Analyzer解析时,会发现它是MERGE INTO命令,因此将解析target_table对应生成的SparkTable实例封装成RowLevelOperationTable的实例,它会绑定一个SparkCopyOnWriteOperation的实例,并且实现了创建ScanBuilder和WriteBuilder的方法。
ScanBuilder和WriteBuilder是Spark中定义的接口,分别用于构建读数据器(Scan)和写数据器(BatchWrite)。
Iceberg基于Spark 3.x提供的外部Catalog及相关的读写接口,实现了对于Iceberg表(存储格式)的数据读写。
下面以SparkCopyOnWriteOperation跟踪分析如何利用Spark写出数据为Iceberg表格式。
Iceberg行级更新的操作,目前支持UPDATE / DELETE / MERGE INTO三个语法。
预备知识 SparkTable定义 public class SparkTable implements org.apache.spark.sql.connector.catalog.Table, // 继承自Spark的接口 SupportsRead, SupportsWrite, SupportsDelete, // 支持删除 SupportsRowLevelOperations, // 支持行级的数据更新 SupportsMetadataColumns { private final Table icebergTable; private final Long snapshotId; private final boolean refreshEagerly; private final Set<TableCapability> capabilities; private String branch; private StructType lazyTableSchema = null; private SparkSession lazySpark = null; public SparkTable(Table icebergTable, Long snapshotId, boolean refreshEagerly) { this.icebergTable = icebergTable; this.snapshotId = snapshotId; this.refreshEagerly = refreshEagerly; boolean acceptAnySchema = PropertyUtil.propertyAsBoolean( icebergTable.properties(), TableProperties.SPARK_WRITE_ACCEPT_ANY_SCHEMA, TableProperties.SPARK_WRITE_ACCEPT_ANY_SCHEMA_DEFAULT); this.capabilities = acceptAnySchema ? CAPABILITIES_WITH_ACCEPT_ANY_SCHEMA : CAPABILITIES; } /** * 该表支持读取,因此实现了此方法返回一个ScanBuilder实例 */ @Override public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { if (options.containsKey(SparkReadOptions.FILE_SCAN_TASK_SET_ID)) { // skip planning the job and fetch already staged file scan tasks // 如果设置了此参数,则会在读取数据后,将此次生成的Iceberg ScanTasks缓存在本地进程中的ScanTaskSetManager实例里, // 后面再对同相同的FileSet集合(或scan file的任务集合)构建时,可以避免重复构建任务集, // 起到缓存的作用 return new SparkFilesScanBuilder(sparkSession(), icebergTable, options); } if (options.containsKey(SparkReadOptions.SCAN_TASK_SET_ID)) { // 作用同上 return new SparkStagedScanBuilder(sparkSession(), icebergTable, options); } if (refreshEagerly) { icebergTable.refresh(); } // 可以支持基于branch或是基于SnapshotId创建SparkTable // 如果基于SnapshotID,则需要显示地解析SnapshotId归属的branch CaseInsensitiveStringMap scanOptions = branch != null ? options : addSnapshotId(options, snapshotId); return new SparkScanBuilder( sparkSession(), icebergTable, branch, snapshotSchema(), scanOptions); } /** * 该表支持写,因此实现了此方法返回一个WriteBuilder实例 */ @Override public WriteBuilder newWriteBuilder(LogicalWriteInfo info) { Preconditions.checkArgument( snapshotId == null, "Cannot write to table at a specific snapshot: %s", snapshotId); return new SparkWriteBuilder(sparkSession(), icebergTable, branch, info); } Spark Analyzer Resolving Table假设我们有如下配置,定义了一个新的用户自定义的catalog,其name为iceberg。并通过spark.sql.catalog.iceberg指定了这个catalog的实现类org.apache.iceberg.spark.SparkCatalog,其类型为hive(意味着会在Iceberg的侧使用HiveCatalog解析库、表),meta存储地址为thrift://metastore-host:port。
spark.sql.catalog.iceberg = org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.iceberg.type = hive spark.sql.catalog.iceberg.uri = thrift://metastore-host:port
当我们执行SELECT * FROM iceberg.test_tbl时,在SQL解析过程中,会通过如下的过程来解析CatalogName和TableName,并创建对应的Catalog和Table实例,即对应Iceberg中的实现类SparkCatalog和SparkTable。
/** * 如果当前Plan是还未解析的表视图或是表,或是INSERT INTO语句,则应用此Rule, * 查看绑定的目标表名是否是SQL层级的临时视图或是Session级别的全局视图表名,最终返回一个新的SubqueryAlias的实例 */ object ResolveTempViews extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { case u @ UnresolvedRelation(ident) => lookupTempView(ident).getOrElse(u) case i @ InsertIntoStatement(UnresolvedRelation(ident), _, _, _, _) => lookupTempView(ident) .map(view => i.copy(table = view)) .getOrElse(i) case u @ UnresolvedTable(ident) => lookupTempView(ident).foreach { _ => u.failAnalysis(s"${ident.quoted} is a temp view not table.") } u case u @ UnresolvedTableOrView(ident) => lookupTempView(ident).map(_ => ResolvedView(ident.asIdentifier)).getOrElse(u) } def lookupTempView(identifier: Seq[String]): Option[LogicalPlan] = { // Permanent View can't refer to temp views, no need to lookup at all. if (isResolvingView) return None identifier match { case Seq(part1) => v1SessionCatalog.lookupTempView(part1) case Seq(part1, part2) => v1SessionCatalog.lookupGlobalTempView(part1, part2) case _ => None } } } /** * Resolve table relations with concrete relations from v2 catalog. * * [[ResolveRelations]] still resolves v1 tables. */ object ResolveTables extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = ResolveTempViews(plan).resolveOperatorsUp { case u: UnresolvedRelation => lookupV2Relation(u.multipartIdentifier) .map { rel => val ident = rel.identifier.get SubqueryAlias(rel.catalog.get.name +: ident.namespace :+ ident.name, rel) }.getOrElse(u) case u @ UnresolvedTable(NonSessionCatalogAndIdentifier(catalog, ident)) => // NonSessionCatalogAndIdentifier的unapply方法,会尝试解析catalog,并通过Spark.CatalogManager::catalog(name)解析并构建Catalog实例 // 这里就是一个Iceberg中定义的SparkCatalog实现类,然后通过工具类的方法加载表,并创建SparkTable实例。 CatalogV2Util.loadTable(catalog, ident) .map(ResolvedTable(catalog.asTableCatalog, ident, _)) .getOrElse(u) case u @ UnresolvedTableOrView(NonSessionCatalogAndIdentifier(catalog, ident)) => CatalogV2Util.loadTable(catalog, ident) .map(ResolvedTable(catalog.asTableCatalog, ident, _)) .getOrElse(u) case i @ InsertIntoStatement(u: UnresolvedRelation, _, _, _, _) if i.query.resolved => lookupV2Relation(u.multipartIdentifier) .map(v2Relation => i.copy(table = v2Relation)) .getOrElse(i) case alter @ AlterTable(_, _, u: UnresolvedV2Relation, _) => CatalogV2Util.loadRelation(u.catalog, u.tableName) .map(rel => alter.copy(table = rel)) .getOrElse(alter) case u: UnresolvedV2Relation => CatalogV2Util.loadRelation(u.catalog, u.tableName).getOrElse(u) } /** * Performs the lookup of DataSourceV2 Tables from v2 catalog. */ private def lookupV2Relation(identifier: Seq[String]): Option[DataSourceV2Relation] = expandRelationName(identifier) match { case NonSessionCatalogAndIdentifier(catalog, ident) => CatalogV2Util.loadTable(catalog, ident) match { case Some(table) => Some(DataSourceV2Relation.create(table, Some(catalog), Some(ident))) case None => None } case _ => None } } SparkCatalog路由加载表的过程到HiveCatalogSparkCatalog由于继承自TableCatalog,因此拥有 Table loadTable(Identifier ident) throws NoSuchTableException方法,在Spark内部进行SQL解析时,可以调用此方法,生成用户自定义的Table实例。
SparkCatalog定位于Spark与Iceberg之前的桥梁,最终的实现效果是将某个Catalog的解析并创建表的任务,路由给Iceberg中的Catalog实现,例如HiveCatalog。
/** * 实现了Spark中的如下接口: * public interface TableCatalog extends CatalogPlugin * 可以在SQL解析过程时,应用`ResolveTables`规则时,通过Catalog + Identifier创建 */ public class SparkCatalog extends BaseCatalog { /** * 由于继承自CatalogPlugin接口类,因此需要重写initialize(...)方法,以初始化SparkCatalog实例 */ @Override public final void initialize(String name, CaseInsensitiveStringMap options) { this.cacheEnabled = PropertyUtil.propertyAsBoolean( options, CatalogProperties.CACHE_ENABLED, CatalogProperties.CACHE_ENABLED_DEFAULT); long cacheExpirationIntervalMs = PropertyUtil.propertyAsLong( options, CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS, CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_DEFAULT); // An expiration interval of 0ms effectively disables caching. // Do not wrap with CachingCatalog. if (cacheExpirationIntervalMs == 0) { this.cacheEnabled = false; } // 创建Iceberg支持的catalog实例,一共支持如下几个 // public static final String ICEBERG_CATALOG_TYPE_HADOOP = "hadoop"; // public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive"; // public static final String ICEBERG_CATALOG_TYPE_REST = "rest"; // public static final String ICEBERG_CATALOG_HADOOP = "org.apache.iceberg.hadoop.HadoopCatalog"; // public static final String ICEBERG_CATALOG_HIVE = "org.apache.iceberg.hive.HiveCatalog"; // public static final String ICEBERG_CATALOG_REST = "org.apache.iceberg.rest.RESTCatalog"; // 默认情况下,我们创建的是HiveCatalog,后续调用loadTable(...)方法创建SparkTable时,则通过HiveCatalog::loadTable(name)方法生成 Catalog catalog = buildIcebergCatalog(name, options); this.catalogName = name; SparkSession sparkSession = SparkSession.active(); this.useTimestampsWithoutZone = SparkUtil.useTimestampWithoutZoneInNewTables(sparkSession.conf()); this.tables = new HadoopTables(SparkUtil.hadoopConfCatalogOverrides(SparkSession.active(), name)); this.icebergCatalog = cacheEnabled ? CachingCatalog.wrap(catalog, cacheExpirationIntervalMs) : catalog; // 支持通过参数的方式,指定默认的namespace,默认值为default if (catalog instanceof SupportsNamespaces) { this.asNamespaceCatalog = (SupportsNamespaces) catalog; if (options.containsKey("default-namespace")) { this.defaultNamespace = Splitter.on('.').splitToList(options.get("default-namespace")).toArray(new String[0]); } } EnvironmentContext.put(EnvironmentContext.ENGINE_NAME, "spark"); EnvironmentContext.put( EnvironmentContext.ENGINE_VERSION, sparkSession.sparkContext().version()); EnvironmentContext.put(CatalogProperties.APP_ID, sparkSession.sparkContext().applicationId()); } @Override public Table loadTable(Identifier ident) throws NoSuchTableException { // 基于标识符创建SparkTable实例 try { return load(ident); } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { throw new NoSuchTableException(ident); } } @Override public Table loadTable(Identifier ident, String version) throws NoSuchTableException { Table table = loadTable(ident); // ... } @Override public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableException { Table table = loadTable(ident); // ... } } SparkCatalog::buildIcebergCatalog public static Catalog buildIcebergCatalog(String name, Map<String, String> options, Object conf) { String catalogImpl = options.get(CatalogProperties.CATALOG_IMPL); if (catalogImpl == null) { String catalogType = PropertyUtil.propertyAsString(options, ICEBERG_CATALOG_TYPE, ICEBERG_CATALOG_TYPE_HIVE); switch (catalogType.toLowerCase(Locale.ENGLISH)) { case ICEBERG_CATALOG_TYPE_HIVE: catalogImpl = ICEBERG_CATALOG_HIVE; break; case ICEBERG_CATALOG_TYPE_HADOOP: catalogImpl = ICEBERG_CATALOG_HADOOP; break; case ICEBERG_CATALOG_TYPE_REST: catalogImpl = ICEBERG_CATALOG_REST; break; default: throw new UnsupportedOperationException("Unknown catalog type: " + catalogType); } } else { String catalogType = options.get(ICEBERG_CATALOG_TYPE); Preconditions.checkArgument( catalogType == null, "Cannot create catalog %s, both type and catalog-impl are set: type=%s, catalog-impl=%s", name, catalogType, catalogImpl); } return CatalogUtil.loadCatalog(catalogImpl, name, options, conf); } CatalogUtil::loadCatalog这里以Hive为例,解析如何加载Custom Catalog
public static Catalog loadCatalog( String impl, String catalogName, Map<String, String> properties, Object hadoopConf) { // impl = ICEBERG_CATALOG_HIVE // catalogName = hive // properties = spark.sql.catalog.[catalogName].x // 其中properties指的是catalogName对应的配置选项,是从Spark.SQLConf解析得到的 Preconditions.checkNotNull(impl, "Cannot initialize custom Catalog, impl class name is null"); DynConstructors.Ctor<Catalog> ctor; try { // 通过默认的impl名字,通过Refect机制,调用无参的构造函数,生成对应的类的实例 ctor = DynConstructors.builder(Catalog.class).impl(impl).buildChecked(); } catch (NoSuchMethodException e) { throw new IllegalArgumentException( String.format("Cannot initialize Catalog implementation %s: %s", impl, e.getMessage()), e); } Catalog catalog; try { catalog = ctor.newInstance(); } catch (ClassCastException e) { throw new IllegalArgumentException( String.format("Cannot initialize Catalog, %s does not implement Catalog.", impl), e); } configureHadoopConf(catalog, hadoopConf); // 通过properties,来助力catalog对象的初始化过程 catalog.initialize(catalogName, properties); return catalog; } HiveCatalog::initialize @Override public void initialize(String inputName, Map<String, String> properties) { this.catalogProperties = ImmutableMap.copyOf(properties); this.name = inputName; if (conf == null) { LOG.warn("No Hadoop Configuration was set, using the default environment Configuration"); this.conf = new Configuration(); } // 解析指定的Hive metastore地址 if (properties.containsKey(CatalogProperties.URI)) { this.conf.set(HiveConf.ConfVars.METASTOREURIS.varname, properties.get(CatalogProperties.URI)); } // 解析指定的metastore的工作目录 if (properties.containsKey(CatalogProperties.WAREHOUSE_LOCATION)) { this.conf.set( HiveConf.ConfVars.METASTOREWAREHOUSE.varname, LocationUtil.stripTrailingSlash(properties.get(CatalogProperties.WAREHOUSE_LOCATION))); } this.listAllTables = Boolean.parseBoolean(properties.getOrDefault(LIST_ALL_TABLES, LIST_ALL_TABLES_DEFAULT)); // 解析指定的读写文件的 接口实现类,如果不指定则默认为HadoopFileIO // 否则加载用户自定义的实现类 String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL); this.fileIO = fileIOImpl == null ? new HadoopFileIO(conf) : CatalogUtil.loadFileIO(fileIOImpl, properties, conf); // 初始化元数据交互的客户端,使用默认使用HiveClientPool this.clients = new CachedClientPool(conf, properties); } HiveCatalog加载SparkTable在生成SparkCatalog时,会根据spark.sql.catalog.iceberg.type这个配置,知道我们要创建的表在Iceberg中的类型是hive,因此需要通过HiveCatalog加载表。
HiveCatalog::loadTable @Override public Table loadTable(TableIdentifier identifier) { Table result; if (isValidIdentifier(identifier)) { // 对于HiveCatalog来说,所有的identifier都是合法的,因此会通过下面的方法得到对应类型的TableOperations实例 // 在Iceberg世界中,实际上是不存在表的,只是利用表的概念,将TableMetadata进行了抽象, // 因此Iceberg中的Table都必须绑定一个TableOperations实例,来读取TableMetadata数据 // 例如在这里会对应生成HiveTableOperations TableOperations ops = newTableOps(identifier); if (ops.current() == null) { // the identifier may be valid for both tables and metadata tables if (isValidMetadataIdentifier(identifier)) { result = loadMetadataTable(identifier); } else { throw new NoSuchTableException("Table does not exist: %s", identifier); } } else { // result = new BaseTable(ops, fullTableName(name(), identifier), metricsReporter()); } } else if (isValidMetadataIdentifier(identifier)) { result = loadMetadataTable(identifier); } else { throw new NoSuchTableException("Invalid table identifier: %s", identifier); } LOG.info("Table loaded by catalog: {}", result); return result; } Scan的定义、构建及执行流程MERGE INTO重写时,会为目标表生成一个DataSourceV2Relation的逻辑计划实例,以读取目标表中的相关数据,因此在过滤表达式下推优化时,会同时构建COW模式下的Scan实例。 Scan是Spark中定义的读取数据的接口。
object V2ScanRelationPushDown extends Rule[LogicalPlan] { import DataSourceV2Implicits._ override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { case ScanOperation(project, filters, relation: DataSourceV2Relation) => // 调用这里的Table是一个RowLevelOperationTable的实例,同时它绑定了一个SparkCopyOnWriteOperation实例 // 因此底层实际上调用的是SparkCopyOnWriteOperation::newScanBuilder方法 val scanBuilder = relation.table.asReadable.newScanBuilder(relation.options) val normalizedFilters = DataSourceStrategy.normalizeExprs(filters, relation.output) val (normalizedFiltersWithSubquery, normalizedFiltersWithoutSubquery) = normalizedFilters.partition(SubqueryExpression.hasSubquery) // `pushedFilters` will be pushed down and evaluated in the underlying data sources. // `postScanFilters` need to be evaluated after the scan. // `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter. val (pushedFilters, postScanFiltersWithoutSubquery) = PushDownUtils.pushFilters( scanBuilder, normalizedFiltersWithoutSubquery) val postScanFilters = postScanFiltersWithoutSubquery ++ normalizedFiltersWithSubquery val normalizedProjects = DataSourceStrategy .normalizeExprs(project, relation.output) .asInstanceOf[Seq[NamedExpression]] // 列裁剪,同时调用scanBuilder.build()方法,生成一个读取具体数据源的Scan实例 val (scan, output) = PushDownUtils.pruneColumns( scanBuilder, relation, normalizedProjects, postScanFilters) logInfo( s""" |Pushing operators to ${relation.name} |Pushed Filters: ${pushedFilters.mkString(", ")} |Post-Scan Filters: ${postScanFilters.mkString(",")} |Output: ${output.mkString(", ")} """.stripMargin) val wrappedScan = scan match { case v1: V1Scan => val translated = filters.flatMap(DataSourceStrategy.translateFilter(_, true)) V1ScanWrapper(v1, translated, pushedFilters) case _ => scan } // 这里生成一个读取数据的逻辑计划 val scanRelation = DataSourceV2ScanRelation(relation.table, wrappedScan, output) val projectionOverSchema = ProjectionOverSchema(output.toStructType) val projectionFunc = (expr: Expression) => expr transformDown { case projectionOverSchema(newExpr) => newExpr } val filterCondition = postScanFilters.reduceLeftOption(And) val newFilterCondition = filterCondition.map(projectionFunc) val withFilter = newFilterCondition.map(Filter(_, scanRelation)).getOrElse(scanRelation) val withProjection = if (withFilter.output != project) { val newProjects = normalizedProjects .map(projectionFunc) .asInstanceOf[Seq[NamedExpression]] Project(newProjects, withFilter) } else { withFilter } // 返回最终的逻辑计划 withProjection } } Iceberg中实现的Scan从前面我们知道,在Spark进行Filter Pushdown优化时,会调用Table::newScanBuilder方法构建一个具体的数据描述器(Scan),实际上是会最终调用Iceberg中如下的方法:
class SparkCopyOnWriteOperation implements RowLevelOperation { @Override public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { if (lazyScanBuilder == null) { lazyScanBuilder = new SparkScanBuilder(spark, table, branch, options) { @Override public Scan build() { // 构建COW模式的Scan实例 Scan scan = super.buildCopyOnWriteScan(); SparkCopyOnWriteOperation.this.configuredScan = scan; return scan; } }; } return lazyScanBuilder; } }如下是对SparkCopyOnWriteOperation::buildCopyOnWriteScan方法的完整定义:
public Scan buildCopyOnWriteScan() { // table变量,是一个BaseTable实例,因为从Spark的代码流转到Iceberg侧时,使用的都是Iceberg中定义的类 // 这里是从当前表找到最新的Snapshot Snapshot snapshot = SnapshotUtil.latestSnapshot(table, readConf.branch()); if (snapshot == null) { return new SparkCopyOnWriteScan( spark, table, readConf, schemaWithMetadataColumns(), filterExpressions); } Schema expectedSchema = schemaWithMetadataColumns(); // Snapshot存在,说明有数据,因此需要生成Scan实例 // default BatchScan newBatchScan() { // return new BatchScanAdapter(newScan()); // } // 由于这里的table类型为BaseTable,因此会调用newScan()方法生成DataTableScan的实例,而BatchScan则是一个代理类 BatchScan scan = table .newBatchScan() .useSnapshot(snapshot.snapshotId()) .ignoreResiduals() .caseSensitive(caseSensitive) .filter(filterExpression()) .project(expectedSchema); scan = configureSplitPlanning(scan); // 返回一个实现了Spark中的Scan接口的实例 return new SparkCopyOnWriteScan( spark, table, scan, snapshot, readConf, expectedSchema, filterExpressions); } SparkCopyOnWriteScan负责生成Spark.Batch我们知道SparkCopyOnWriteScan实现的Spark中的Scan接口,而Scan是一个逻辑上的数据读取器,就像逻辑计划那样,因此还需要通过它的Scan::toBatch方法,创建一个直接可执行的实体类对象
class SparkCopyOnWriteScan extends SparkPartitioningAwareScan<FileScanTask> @Override public Batch toBatch() { // 返回一个Spark可操作的Batch实例,负责对待读取的数据划分Batches // 注意这里在创建SparkBatch实例时,taskGroups()的调用,这个方法实际上是调用Iceberg的接口,搜索此次Scan任务需要读取的所有数据。 return new SparkBatch( sparkContext, table, readConf, groupingKeyType(), taskGroups(), expectedSchema, hashCode()); } } SparkCopyOnWriteScan::taskGroups基于SnapshotScan::planFiles方法实现 public abstract class SnapshotScan<ThisT, T extends ScanTask, G extends ScanTaskGroup<T>> @Override public CloseableIterable<T> planFiles() { // 获取要读取的Snapshot Snapshot snapshot = snapshot(); if (snapshot == null) { LOG.info("Scanning empty table {}", table()); return CloseableIterable.empty(); } LOG.info( "Scanning table {} snapshot {} created at {} with filter {}", table(), snapshot.snapshotId(), DateTimeUtil.formatTimestampMillis(snapshot.timestampMillis()), ExpressionUtil.toSanitizedString(filter())); Listeners.notifyAll(new ScanEvent(table().name(), snapshot.snapshotId(), filter(), schema())); List<Integer> projectedFieldIds = Lists.newArrayList(TypeUtil.getProjectedIds(schema())); List<String> projectedFieldNames = projectedFieldIds.stream().map(schema()::findColumnName).collect(Collectors.toList()); Timer.Timed planningDuration = scanMetrics().totalPlanningDuration().start(); return CloseableIterable.whenComplete( doPlanFiles(), // doPlanFiles()方法会通过Iceberg的接口,搜索所有要读取的data文件和delete文件 () -> { planningDuration.stop(); Map<String, String> metadata = Maps.newHashMap(context().options()); metadata.putAll(EnvironmentContext.get()); ScanReport scanReport = ImmutableScanReport.builder() .schemaId(schema().schemaId()) .projectedFieldIds(projectedFieldIds) .projectedFieldNames(projectedFieldNames) .tableName(table().name()) .snapshotId(snapshot.snapshotId()) .filter(ExpressionUtil.sanitize(filter())) .scanMetrics(ScanMetricsResult.fromScanMetrics(scanMetrics())) .metadata(metadata) .build(); context().metricsReporter().report(scanReport); }); } } SparkBatch负责生成Partitions及Partition ReaderSparkBatch继承自Spark中的Batch接口
class SparkBatch implements Batch { private final JavaSparkContext sparkContext; private final Table table; private final String branch; private final SparkReadConf readConf; private final Types.StructType groupingKeyType; // 保存了由SparkCopyOnWriteScan::taskGroups()方法生成的所有要读取的Iceberg管理的data文件和delete文件, // 这些文件按对应的分区数据进行分组,并且一个分区的数据文件可能被划分到多个groups private final List<? extends ScanTaskGroup<?>> taskGroups; private final Schema expectedSchema; private final boolean caseSensitive; private final boolean localityEnabled; private final int scanHashCode; @Override public InputPartition[] planInputPartitions() { // 负责对要读取的数据进行分区 // broadcast the table metadata as input partitions will be sent to executors Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); String expectedSchemaString = SchemaParser.toJson(expectedSchema); // 一个Group就对应Spark中的一个Partition InputPartition[] partitions = new InputPartition[taskGroups.size()]; Tasks.range(partitions.length) .stopOnFailure() .executeWith(localityEnabled ? ThreadPools.getWorkerPool() : null) .run( index -> partitions[index] = new SparkInputPartition( groupingKeyType, // 一个taskGroup包含的文件拥有相同的Grouping key taskGroups.get(index), tableBroadcast, branch, expectedSchemaString, caseSensitive, localityEnabled)); return partitions; } @Override public PartitionReaderFactory createReaderFactory() { // 负责创建读取数据的Reader,支持列式读取和行式读取 if (useParquetBatchReads()) { int batchSize = readConf.parquetBatchSize(); return new SparkColumnarReaderFactory(batchSize); } else if (useOrcBatchReads()) { int batchSize = readConf.orcBatchSize(); return new SparkColumnarReaderFactory(batchSize); } else { return new SparkRowReaderFactory(); } } } 从SparkBatch构建数据读取的物理执行计划前文提到的Spark中有关数据的读写接口,都是由DataSourceV2中定义的,因此对于数据读取的逻辑计划(DataSourceV2ScanRelation),会先转换成物理执行计划BatchScanExec。
case class BatchScanExec( output: Seq[AttributeReference], @transient scan: Scan) extends DataSourceV2ScanExecBase { // scan,对应于Iceberg中的SparkCopyOnWriteScan // 因此batch变量是一个SparkBatch实例 @transient lazy val batch = scan.toBatch // TODO: unify the equal/hashCode implementation for all data source v2 query plans. override def equals(other: Any): Boolean = other match { case other: BatchScanExec => this.batch == other.batch case _ => false } override def hashCode(): Int = batch.hashCode() // 调用SparkBatch::planInputPartitions生成partitions信息 @transient override lazy val partitions: Seq[InputPartition] = batch.planInputPartitions() // 调用SparkBatch::createReaderFactory生成Reader工厂对象 override lazy val readerFactory: PartitionReaderFactory = batch.createReaderFactory() override lazy val inputRDD: RDD[InternalRow] = { // 执行时,净当前的物理执行计划,转换为一个RDD,并传递给所有的RDD以及Reader工厂 new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar) } override def doCanonicalize(): BatchScanExec = { this.copy(output = output.map(QueryPlan.normalizeExpressions(_, output))) } } DataSourceRDD计算时实例化Reader并完成读数据这里需要重点关注的是compute方法,在Spark中,每一个Partition都对应一个Task,这个Task负责最终调用compute方法,触发当前分区上的计算逻辑。
// columnar scan. class DataSourceRDD( sc: SparkContext, @transient private val inputPartitions: Seq[InputPartition], partitionReaderFactory: PartitionReaderFactory, columnarReads: Boolean) extends RDD[InternalRow](sc, Nil) { override protected def getPartitions: Array[Partition] = { inputPartitions.zipWithIndex.map { case (inputPartition, index) => new DataSourceRDDPartition(index, inputPartition) }.toArray } private def castPartition(split: Partition): DataSourceRDDPartition = split match { case p: DataSourceRDDPartition => p case _ => throw new SparkException(s"[BUG] Not a DataSourceRDDPartition: $split") } override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { // partition对应于Iceberg中的一个TaskGroup,而一个TaskGroup的数据文件拥有相同的Partition data val inputPartition = castPartition(split).inputPartition val (iter, reader) = if (columnarReads) { // 列读 // batchReader实际上是一个BatchDataReader的实例 val batchReader = partitionReaderFactory.createColumnarReader(inputPartition) val iter = new MetricsBatchIterator(new PartitionIterator[ColumnarBatch](batchReader)) (iter, batchReader) } else { // 行读 val rowReader = partitionReaderFactory.createReader(inputPartition) val iter = new MetricsRowIterator(new PartitionIterator[InternalRow](rowReader)) (iter, rowReader) } context.addTaskCompletionListener[Unit](_ => reader.close()) // TODO: SPARK-25083 remove the type erasure hack in data source scan new InterruptibleIterator(context, iter.asInstanceOf[Iterator[InternalRow]]) } override def getPreferredLocations(split: Partition): Seq[String] = { castPartition(split).inputPartition.preferredLocations() } } BatchDataReader读取数据BatchDataReader继承自Spark中的PartitionReader<ColumnarBatch>接口
class BatchDataReader extends BaseBatchReader<FileScanTask> implements PartitionReader<ColumnarBatch> { // 返回一个迭代器,可以在FileScanTask包含的所有data文件和delete文件, @Override protected CloseableIterator<ColumnarBatch> open(FileScanTask task) { String filePath = task.file().path().toString(); LOG.debug("Opening data file {}", filePath); // update the current file for Spark's filename() function InputFileBlockHolder.set(filePath, task.start(), task.length()); Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema()); InputFile inputFile = getInputFile(filePath); Preconditions.checkNotNull(inputFile, "Could not find InputFile associated with FileScanTask"); // 创建一个SparkDeleteFilter实例,它负责收集等值删除文件 和 位置删除文件,并建立删除数据记录的索引, // 如此在每遍历一个data file时,就可以根据索引信息,确定当前的record是不是存活的。 SparkDeleteFilter deleteFilter = task.deletes().isEmpty() ? null : new SparkDeleteFilter(filePath, task.deletes(), counter()); // newBatchIterable()方法会根据inputFile的类型,创建相应的文件读取器,例如为Parquet创建VectorizedParquetReader return newBatchIterable( inputFile, task.file().format(), task.start(), task.length(), task.residual(), idToConstant, deleteFilter) .iterator(); } } 读取数据转换成Spark中的ColumnarBatch不论是Parquet/Orc文件,最底层都是通过ColumnarBatchReader负责真正的数据读取与过滤
public class ColumnarBatchReader extends BaseBatchReader<ColumnarBatch> { private final boolean hasIsDeletedColumn; private DeleteFilter<InternalRow> deletes = null; private long rowStartPosInBatch = 0; public ColumnarBatchReader(List<VectorizedReader<?>> readers) { super(readers); this.hasIsDeletedColumn = readers.stream().anyMatch(reader -> reader instanceof DeletedVectorReader); } @Override public void setRowGroupInfo( PageReadStore pageStore, Map<ColumnPath, ColumnChunkMetaData> metaData, long rowPosition) { super.setRowGroupInfo(pageStore, metaData, rowPosition); this.rowStartPosInBatch = rowPosition; } public void setDeleteFilter(DeleteFilter<InternalRow> deleteFilter) { this.deletes = deleteFilter; } @Override public final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) { if (reuse == null) { closeVectors(); } // 通过内部类ColumnBatchLoader代理完成数据的读取与结果转换 ColumnarBatch columnarBatch = new ColumnBatchLoader(numRowsToRead).loadDataToColumnBatch(); rowStartPosInBatch += numRowsToRead; return columnarBatch; } private class ColumnBatchLoader { private final int numRowsToRead; // the rowId mapping to skip deleted rows for all column vectors inside a batch, it is null when // there is no deletes private int[] rowIdMapping; // the array to indicate if a row is deleted or not, it is null when there is no "_deleted" // metadata column private boolean[] isDeleted; /** * Build a row id mapping inside a batch, which skips deleted rows. Here is an example of how we * delete 2 rows in a batch with 8 rows in total. [0,1,2,3,4,5,6,7] -- Original status of the * row id mapping array [F,F,F,F,F,F,F,F] -- Original status of the isDeleted array Position * delete 2, 6 [0,1,3,4,5,7,-,-] -- After applying position deletes [Set Num records to 6] * [F,F,T,F,F,F,T,F] -- After applying position deletes * * @param deletedRowPositions a set of deleted row positions * @return the mapping array and the new num of rows in a batch, null if no row is deleted */ Pair<int[], Integer> buildPosDelRowIdMapping(PositionDeleteIndex deletedRowPositions) { if (deletedRowPositions == null) { return null; } int[] posDelRowIdMapping = new int[numRowsToRead]; int originalRowId = 0; int currentRowId = 0; while (originalRowId < numRowsToRead) { if (!deletedRowPositions.isDeleted(originalRowId + rowStartPosInBatch)) { posDelRowIdMapping[currentRowId] = originalRowId; currentRowId++; } else { if (hasIsDeletedColumn) { isDeleted[originalRowId] = true; } deletes.incrementDeleteCount(); } originalRowId++; } if (currentRowId == numRowsToRead) { // there is no delete in this batch return null; } else { return Pair.of(posDelRowIdMapping, currentRowId); } } int[] initEqDeleteRowIdMapping() { int[] eqDeleteRowIdMapping = null; if (hasEqDeletes()) { eqDeleteRowIdMapping = new int[numRowsToRead]; for (int i = 0; i < numRowsToRead; i++) { eqDeleteRowIdMapping[i] = i; } } return eqDeleteRowIdMapping; } /** * Filter out the equality deleted rows. Here is an example, [0,1,2,3,4,5,6,7] -- Original * status of the row id mapping array [F,F,F,F,F,F,F,F] -- Original status of the isDeleted * array Position delete 2, 6 [0,1,3,4,5,7,-,-] -- After applying position deletes [Set Num * records to 6] [F,F,T,F,F,F,T,F] -- After applying position deletes Equality delete 1 <= x <= * 3 [0,4,5,7,-,-,-,-] -- After applying equality deletes [Set Num records to 4] * [F,T,T,T,F,F,T,F] -- After applying equality deletes * * @param columnarBatch the {@link ColumnarBatch} to apply the equality delete */ void applyEqDelete(ColumnarBatch columnarBatch) { Iterator<InternalRow> it = columnarBatch.rowIterator(); int rowId = 0; int currentRowId = 0; while (it.hasNext()) { InternalRow row = it.next(); if (deletes.eqDeletedRowFilter().test(row)) { // the row is NOT deleted // skip deleted rows by pointing to the next undeleted row Id rowIdMapping[currentRowId] = rowIdMapping[rowId]; currentRowId++; } else { if (hasIsDeletedColumn) { isDeleted[rowIdMapping[rowId]] = true; } deletes.incrementDeleteCount(); } rowId++; } columnarBatch.setNumRows(currentRowId); } } } Write的执行过程从前面的章节可以看到,在构建Scan的过程中,会同时搜集data files和delete files,因此在调用Reader实例读取每一个TaskGroup中的数据文件时,同时会应用DeleteFilter,来过滤掉那些被删除的记录。
这个过程实际上就是一个\Merge On Read的过程。
而MERGE INTO的Write过程,在我之前的文章有解析,大体的思路就是将从target_table Scan得到的、经过删除过滤后的数据集,与source_table中的数据JOIN;从而产生带有变更标记的结果数据集(每个被标记为INSERT/UPDATE/DELETE);在写出数据到文件时,就可以根据每一行的标记确定写出行为,最终只会产生Data Files,数据文件更加干净。
Iceberg:COW模式下的MERGEINTO的执行流程由讯客互联开源代码栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“Iceberg:COW模式下的MERGEINTO的执行流程”