Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "to support more spark commands" #4945

Merged
merged 1 commit into from
May 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Stack;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -29,9 +28,6 @@
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.execution.QueryExecution;
import org.apache.spark.sql.execution.SQLExecution;
import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.execution.columnar.InMemoryRelation;
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart;

Expand Down Expand Up @@ -105,9 +101,6 @@ public void run() {

DatasetLineage lineage = new DatasetLineage(sqlStart.description(), plan.toString(), outputDS.get());
Collection<QueryPlan<?>> allInners = new ArrayList<>();
Collection<QueryPlan<?>> allInmemoryRelationSparkPlan = new ArrayList<>();
Collection<QueryPlan<?>> allInmemoryRelationInnersSparkPlan = new ArrayList<>();
Stack<QueryPlan<?>> allInmemoryRelationTableScanPlan = new Stack<>();

plan.collect(new AbstractPartialFunction<LogicalPlan, Void>() {

Expand All @@ -117,12 +110,6 @@ public Void apply(LogicalPlan plan) {
Optional<? extends SparkDataset> inputDS = DatasetExtractor.asDataset(plan, ctx, false);
inputDS.ifPresent(x -> lineage.addSource(x));
allInners.addAll(JavaConversions.asJavaCollection(plan.innerChildren()));

//deal with sparkPlans in complex logical plan
if (plan instanceof InMemoryRelation) {
InMemoryRelation cmd = (InMemoryRelation) plan;
allInmemoryRelationSparkPlan.add(cmd.cachedPlan());
}
return null;
}

Expand All @@ -147,12 +134,6 @@ public Void apply(LogicalPlan plan) {
inputDS.ifPresent(
x -> log.debug("source added for " + ctx.appName() + "/" + sqlStart.executionId() + ": " + x));
inputDS.ifPresent(x -> lineage.addSource(x));

//deal with sparkPlans in complex logical plan
if (plan instanceof InMemoryRelation) {
InMemoryRelation cmd = (InMemoryRelation) plan;
allInmemoryRelationSparkPlan.add(cmd.cachedPlan());
}
return null;
}

Expand All @@ -163,107 +144,6 @@ public boolean isDefinedAt(LogicalPlan x) {
});
}

for (QueryPlan<?> qpInmemoryRelation : allInmemoryRelationSparkPlan) {
if (!(qpInmemoryRelation instanceof SparkPlan)) {
continue;
}
SparkPlan sparkPlan = (SparkPlan) qpInmemoryRelation;
sparkPlan.collect(new AbstractPartialFunction<SparkPlan, Void>() {

@Override
public Void apply(SparkPlan sp) {
Optional<? extends SparkDataset> inputDSSp = DatasetExtractor.asDataset(sp, ctx, false);
inputDSSp.ifPresent(x -> lineage.addSource(x));
allInmemoryRelationInnersSparkPlan.addAll(JavaConversions.asJavaCollection(sp.innerChildren()));

if (sp instanceof InMemoryTableScanExec) {
InMemoryTableScanExec cmd = (InMemoryTableScanExec) sp;
allInmemoryRelationTableScanPlan.push(cmd);
}
return null;
}
@Override
public boolean isDefinedAt(SparkPlan x) {
return true;
}
});
}

for (QueryPlan<?> qpInmemoryRelationInners : allInmemoryRelationInnersSparkPlan) {
if (!(qpInmemoryRelationInners instanceof SparkPlan)) {
continue;
}
SparkPlan sparkPlan = (SparkPlan) qpInmemoryRelationInners;
sparkPlan.collect(new AbstractPartialFunction<SparkPlan, Void>() {

@Override
public Void apply(SparkPlan sp) {
Optional<? extends SparkDataset> inputDSSp = DatasetExtractor.asDataset(sp, ctx, false);
inputDSSp.ifPresent(x -> lineage.addSource(x));

if (sp instanceof InMemoryTableScanExec) {
InMemoryTableScanExec cmd = (InMemoryTableScanExec) sp;
allInmemoryRelationTableScanPlan.push(cmd);
}
return null;
}
@Override
public boolean isDefinedAt(SparkPlan x) {
return true;
}
});
}

while (!allInmemoryRelationTableScanPlan.isEmpty()) {
QueryPlan<?> qpInmemoryRelationTableScan = allInmemoryRelationTableScanPlan.pop();
InMemoryTableScanExec imPlan = (InMemoryTableScanExec) qpInmemoryRelationTableScan;
Collection<QueryPlan<?>> allInnerPhysicalPlan = new ArrayList<>();
imPlan.relation().cachedPlan().collect(new AbstractPartialFunction<SparkPlan, Void>() {

@Override
public Void apply(SparkPlan sp) {
Optional<? extends SparkDataset> inputDSSp = DatasetExtractor.asDataset(sp, ctx, false);
inputDSSp.ifPresent(x -> lineage.addSource(x));
allInnerPhysicalPlan.addAll(JavaConversions.asJavaCollection(sp.innerChildren()));

if (sp instanceof InMemoryTableScanExec) {
InMemoryTableScanExec cmd = (InMemoryTableScanExec) sp;
allInmemoryRelationTableScanPlan.push(cmd);
}
return null;
}
@Override
public boolean isDefinedAt(SparkPlan x) {
return true;
}
});

for (QueryPlan<?> qpInner : allInnerPhysicalPlan) {
if (!(qpInner instanceof SparkPlan)) {
continue;
}
SparkPlan sparkPlan = (SparkPlan) qpInner;
sparkPlan.collect(new AbstractPartialFunction<SparkPlan, Void>() {

@Override
public Void apply(SparkPlan sp) {
Optional<? extends SparkDataset> inputDSSp = DatasetExtractor.asDataset(sp, ctx, false);
inputDSSp.ifPresent(x -> lineage.addSource(x));

if (sp instanceof InMemoryTableScanExec) {
InMemoryTableScanExec cmd = (InMemoryTableScanExec) sp;
allInmemoryRelationTableScanPlan.push(cmd);
}
return null;
}
@Override
public boolean isDefinedAt(SparkPlan x) {
return true;
}
});
}
}

SQLQueryExecStartEvent evt =
new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), getPipelineName(ctx), ctx.applicationId(),
sqlStart.time(), sqlStart.executionId(), lineage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,13 @@
import org.apache.spark.SparkContext;
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.execution.FileSourceScanExec;
import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand;
import org.apache.spark.sql.execution.datasources.HadoopFsRelation;
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand;
import org.apache.spark.sql.execution.datasources.LogicalRelation;
import org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand;
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions;
import org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation;
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec;
import org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2;
import org.apache.spark.sql.hive.execution.CreateHiveTableAsSelectCommand;
import org.apache.spark.sql.hive.execution.InsertIntoHiveTable;
import org.apache.spark.sql.sources.BaseRelation;
Expand All @@ -48,12 +43,10 @@
public class DatasetExtractor {

private static final Map<Class<? extends LogicalPlan>, PlanToDataset> PLAN_TO_DATASET = new HashMap<>();
private static final Map<Class<? extends SparkPlan>, SparkPlanToDataset> SPARKPLAN_TO_DATASET = new HashMap<>();
private static final Map<Class<? extends BaseRelation>, RelationToDataset> REL_TO_DATASET = new HashMap<>();
private static final Set<Class<? extends LogicalPlan>> OUTPUT_CMD = ImmutableSet.of(
InsertIntoHadoopFsRelationCommand.class, SaveIntoDataSourceCommand.class,
CreateDataSourceTableAsSelectCommand.class, CreateHiveTableAsSelectCommand.class, InsertIntoHiveTable.class,
WriteToDataSourceV2.class);
CreateDataSourceTableAsSelectCommand.class, CreateHiveTableAsSelectCommand.class, InsertIntoHiveTable.class);
private static final String DATASET_ENV_KEY = "metadata.dataset.env";
private static final String DATASET_PLATFORM_INSTANCE_KEY = "metadata.dataset.platformInstance";
// TODO InsertIntoHiveDirCommand, InsertIntoDataSourceDirCommand
Expand All @@ -65,10 +58,6 @@ private static interface PlanToDataset {
Optional<? extends SparkDataset> fromPlanNode(LogicalPlan plan, SparkContext ctx, Config datahubConfig);
}

private static interface SparkPlanToDataset {
Optional<? extends SparkDataset> fromSparkPlanNode(SparkPlan plan, SparkContext ctx, Config datahubConfig);
}

private static interface RelationToDataset {
Optional<? extends SparkDataset> fromRelation(BaseRelation rel, SparkContext ctx, Config datahubConfig);
}
Expand Down Expand Up @@ -123,45 +112,6 @@ private static interface RelationToDataset {
return Optional.of(new CatalogTableDataset(cmd.tableMeta(), getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig)));
});

PLAN_TO_DATASET.put(WriteToDataSourceV2.class, (p, ctx, datahubConfig) -> {
WriteToDataSourceV2 cmd = (WriteToDataSourceV2) p;
if (!cmd.writer().toString().contains("IcebergWrite")) {
return Optional.empty();
} else {
String[] names = cmd.writer().toString().split(",")[0].split("\\.");
String tableName = names[names.length - 1];
return Optional.of(new CatalogTableDataset("iceberg", tableName, getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig)));
}
});

PLAN_TO_DATASET.put(DataSourceV2Relation.class, (p, ctx, datahubConfig) -> {
DataSourceV2Relation cmd = (DataSourceV2Relation) p;
if (!cmd.source().toString().contains("IcebergSource") && !cmd.source().toString().contains("iceberg")) {
return Optional.empty();
} else {
String[] names = cmd.options().get("path").get().split("\\.");
String tableName = names[names.length - 1];
return Optional.of(new CatalogTableDataset("iceberg", tableName, getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig)));
}
});

SPARKPLAN_TO_DATASET.put(DataSourceV2ScanExec.class, (sp, ctx, datahubConfig) -> {
DataSourceV2ScanExec cmd = (DataSourceV2ScanExec) sp;
if (!sp.toString().contains("iceberg")) {
return Optional.empty();
} else {
String[] names = cmd.options().get("path").get().split("\\.");
String tableName = names[names.length - 1];
return Optional.of(new CatalogTableDataset("iceberg", tableName, getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig)));
}
});

SPARKPLAN_TO_DATASET.put(FileSourceScanExec.class, (sp, ctx, datahubConfig) -> {
FileSourceScanExec cmd = (FileSourceScanExec) sp;
String tableName = cmd.tableIdentifier().get().table();
return Optional.of(new CatalogTableDataset("hive", tableName, getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig)));
});

REL_TO_DATASET.put(HadoopFsRelation.class, (r, ctx, datahubConfig) -> {
List<Path> res = JavaConversions.asJavaCollection(((HadoopFsRelation) r).location().rootPaths()).stream()
.map(p -> getDirectoryPath(p, ctx.hadoopConfiguration())).distinct().collect(Collectors.toList());
Expand Down Expand Up @@ -192,15 +142,6 @@ static Optional<? extends SparkDataset> asDataset(LogicalPlan logicalPlan, Spark
return PLAN_TO_DATASET.get(logicalPlan.getClass()).fromPlanNode(logicalPlan, ctx, datahubconfig);
}

static Optional<? extends SparkDataset> asDataset(SparkPlan sparkPlan, SparkContext ctx, boolean outputNode) {

if (!SPARKPLAN_TO_DATASET.containsKey(sparkPlan.getClass())) {
return Optional.empty();
}
Config datahubconfig = LineageUtils.parseSparkConfig();
return SPARKPLAN_TO_DATASET.get(sparkPlan.getClass()).fromSparkPlanNode(sparkPlan, ctx, datahubconfig);
}

private static Path getDirectoryPath(Path p, Configuration hadoopConf) {
try {
if (p.getFileSystem(hadoopConf).getFileStatus(p).isFile()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,4 @@ public CatalogTableDataset(String dsName, String platformInstance, FabricType fa
super("hive", platformInstance, dsName, fabricType);
}

public CatalogTableDataset(String dbName, String dsName, String platformInstance, FabricType fabricType) {
super(dbName, platformInstance, dsName, fabricType);
}
}