Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Design Review: column level lineage feature #1731

Closed
liangjun-jiang opened this issue Jul 13, 2020 · 22 comments
Closed

Design Review: column level lineage feature #1731

liangjun-jiang opened this issue Jul 13, 2020 · 22 comments
Assignees
Labels
rfc See https://github.com/linkedin/datahub/blob/master/docs/rfc.md for more details

Comments

@liangjun-jiang
Copy link
Contributor

Is your feature request related to a problem? Please describe.
Yes. Column level lineage support has been requested for a few times in the past.

Describe the solution you'd like
This issue is meant to have a documentation to address how to design this feature.

Describe alternatives you've considered
n/a

Additional context
While datahub currently is supporting table-level lineage as a dataset's aspect. There is a strong need to get column-level lineage.
A sample illustration of this column-level lineage as:
column-level-lineage

If we look at the right part of this screenshot. We notice that

  1. table INSERT-SELECT-1 came from table orders and customers
  2. theoid, cid, ottl, sid columns of INSERT-SELECT-1 were from ones of orders table
  3. the cl and cem columns of INSERT-SELECT-1 were from ones of customers table.
  4. there are more tables on the right, small_orders, medium_orders, large_orders and special_orders are derived from INSERT-SELECT-1

Below this INSERT-SELECT-1, there is another lineage representation cases following the similar fashion.

Now we look at the left part of this screenshot. We notice how the SQL statement is used to generate the target table, and how the columns in the target table are derived from the source tables.

In this design review, I think we need to address two important issues:

  1. How should we modify Dataset's Upstream.pdl to support column level lineage. To make it easier to understand, the current Upstream.pdl look like (deleted code comment for abbreviation)
import com.linkedin.common.DatasetUrn

record Upstream {
  auditStamp: AuditStamp
  dataset: DatasetUrn
  type: DatasetLineageType
}
  1. How could we provide sample script (python like) so end-user would use it to parse their sql statement easily, and ingest MCE message so Datahub could pick them up.

To be continued

@liangjun-jiang liangjun-jiang added the feature-request Request for a new feature to be added label Jul 13, 2020
@hshahoss
Copy link
Contributor

Thanks for opening this feature request. One quick comment for the first issue you mentioned - We do not necessarily need to modify the existing Upstream aspect. We could create a separate aspect to capture fine grained lineage.

@liangjun-jiang
Copy link
Contributor Author

Thanks for opening this feature request. One quick comment for the first issue you mentioned - We do not necessarily need to modify the existing Upstream aspect. We could create a separate aspect to capture fine grained lineage.

Agreed. There are still quite some areas I have not thought through. Need you guys' input for sure.

@liangjun-jiang
Copy link
Contributor Author

[Continued]
@clojurians-org has used Uber's QueryParser to do the following work, and help me understanding how the column-level lineage should be generated from a SQL statement. I think it's worth to bring it up right now to help us model the column-level lineage aspect.

In a simplified select-insert SQL statement

INSERT OVERWRITE TABLE TMP.TFVDM1 (cpc, larluo)
SELECT CPC, LARLUO FROM ODS.FVS t
WHERE t.HDATASRC1 = 'A'

Interpreting this SQL statement, we understand

  1. there are two databases: ODS and TMP
  2. table TFVDM1 of database TMP is from FVS of ODS (t for abbr.)
  3. t has three columns appeared, HDATASRC1, CPC and LARLUO
  4. cpc and larluo have been turned into TFVDM1's columns.

These analysis will help us understand the following Query Parser result.
After running QueryParser's analysis (initial script is developed), it will generate a JSON representation of column level lineage as two parts:

  1. TFVDM1 is from FVS based on FVS's column HDATASRC1 constraint;

Note: to represent column level lineage, it might not be necessary to show what query constraint has been used. It might not be shown on graph representation because this hdatasrc1 doesn't appear in the target tfvdm1 table. However, it is nice to have this information.

[
    {
        "Left": {
            "fqtnTableName": "tfvdm1",
            "fqtnDatabaseName": "tmp",
            "fqtnSchemaName": "tfvdm1"
        }
    },
    {
        "columns": [
            {
                "fqcnSchemaName": "fvs",
                "fqcnDatabaseName": "ods",
                "fqcnTableName": "fvs",
                "fqcnColumnName": "hdatasrc1"
            }
        ],
        "tables": [
            {
                "fqtnTableName": "fvs",
                "fqtnDatabaseName": "ods",
                "fqtnSchemaName": "fvs"
            }
        ]
    }
]
  1. Columns in target tfvdm1 table in relationship to columns in source fvs table

from the following, we understand that the cpc column of target tfvdm1 table is from cpc and hdatasrc1 columns of source fvs table.

[
    {
        "Right": {
            "fqcnSchemaName": "tfvdm1",
            "fqcnDatabaseName": "tmp",
            "fqcnTableName": "tfvdm1",
            "fqcnColumnName": "cpc"
        }
    },
    {
        "columns": [
            {
                "fqcnSchemaName": "fvs",
                "fqcnDatabaseName": "ods",
                "fqcnTableName": "fvs",
                "fqcnColumnName": "cpc"
            },
            {
                "fqcnSchemaName": "fvs",
                "fqcnDatabaseName": "ods",
                "fqcnTableName": "fvs",
                "fqcnColumnName": "hdatasrc1"
            }
        ],
        "tables": []
    }
]

and, in similar fashion

[
    {
        "Right": {
            "fqcnSchemaName": "tfvdm1",
            "fqcnDatabaseName": "tmp",
            "fqcnTableName": "tfvdm1",
            "fqcnColumnName": "larluo"
        }
    },
    {
        "columns": [
            {
                "fqcnSchemaName": "fvs",
                "fqcnDatabaseName": "ods",
                "fqcnTableName": "fvs",
                "fqcnColumnName": "hdatasrc1"
            },
            {
                "fqcnSchemaName": "fvs",
                "fqcnDatabaseName": "ods",
                "fqcnTableName": "fvs",
                "fqcnColumnName": "larluo"
            }
        ],
        "tables": []
    }
]

In the next part, I will present my thought about modeling the column level lineage with Datahub

[To be continued]

@liangjun-jiang
Copy link
Contributor Author

liangjun-jiang commented Jul 17, 2020

Dataset has an aspect of SchemaMetadata. In the SchemaMetadat, the fields property is where we can use to store column information.

1. Establish a relationship between a dataset and its fields

firstly, we need to establish a relationship between dataset and its fields. Let's call it hasField for right now.

In the neo4j, the expected graph representation would look like:

disclaimer: I am not a neo4j expert, or actually know too much about it.

column-lineage-diagram

To have this new relationship, a few files would be changed. We can use this Onboarding to GMA Graph - Adding a new relationship type as the reference for the implementation detail.

2. Establish a relationship between a field of a source dataset and a field of target dataset.

Let's call it derivedBy for right now
In the neo4j, the expected graph representation would look like:

column-lineage-diagram-FULL

there is typo in this graph. We actually mean derivedBy
In this diagram, the bottom is source dataset with its fields, the top is target dataset with its fields. The new relationship derivedBy illustrated how different column in target is generated.

3. Modify Upstream.pdl to have contraint

As discussed earlier, we also want to understand what query constraint has been used to derive a target dataset from multiple source dataset. We will add this constraint information into Upstream.pdl. After the modification, it will look like

record Upstream {

  auditStamp: AuditStamp

  dataset: DatasetUrn
  
  constraint: array[SchemaField] // new added 

  type: DatasetLineageType
}


This information will not be shown on UI or neo4j

4. Modify DownstreamOfBuilderFromUpstreamLineage.java

final List<DownstreamOf> downstreamEdges = upstreamLineage.getUpstreams().stream()
        .map(upstream -> new DownstreamOf()
            .setSource(urn)
            .setDestination(upstream.getDataset())
            .setType(upstream.getType())
        )
        .collect(Collectors.toList());

    return Collections.singletonList(
        new GraphBuilder.RelationshipUpdates(downstreamEdges, REMOVE_ALL_EDGES_FROM_SOURCE)
    );

will need to modify to take into account the new relationships: hasField and derivedFrom. The original table-level lineage should also be preserved.

5. UI Change

TBD

@liangjun-jiang
Copy link
Contributor Author

Here are a few more notes:

  1. BaseRelationship.pdl only supports Urn for source and destination right now. It won't work while we plan to use SchemaField as source or destination. Apparently, it won't make sense to lift SchemaField as the first class entity, so derivedFrom probably should be inherits from BaseRelationship

  2. Current code base has contains relationship, but seems it has not gotten used. I assume it can be used for the hasField relationship defined earlier. If we do so, the SchemaField just has to treated and modified as first class entity.

  3. There are all thought I have. Feel free to leave your comment. Thanks

@jplaisted
Copy link
Contributor

In no particular order, some thoughts / questions.

  • I like the idea of fields being a separate entity, with lineage between them, and an edge to the dataset they belong to.
  • I'm wondering if we should punt on constraint for now. We could always try adding it later. If we want to add that level of information, the transformation (if any) may be as important? I'm not sure. Talking with SpotHero at least, they care more about the lineage itself than any information on the lineage, at least initially (just to track what depends on what and what could break by changing a field). @liangjun-jiang , are constraints a must have or a just a nice to have? A hyperlink on DataHub to the source SQL / Job / etc may be good enough for now.
  • If we want to tackle constraints now, do they need to be at a field level instead of a dataset level (as proposed above)?
  • We're also building models for jobs (@hshahoss). Again, maybe we can punt on this part, but might be nice to have the association somewhere.
  • We have models for schemas (at least internally). There's kind of some overlap here. Worth thinking about?
  • Any concerns around nested / repeated fields?
  • What about lineage that isn't from SQL? What if I have a spark scala / java job, for example?

So some alternate, specific proposals / brainstorms:

// Entity for a column; minimal amount of information should be the dataset's URN + column name?
record DatasetColumn {
  name: string;
  dataset: DatasetUrn;
}
// Aspect of DatasetColumn
record DatasetColumnInfo {
  // Do we need this? I left this as a place holder, not sure what we'd put in here. We could try putting the type info,
  // for example, if that is something we think we need.
}
// Aspect of DatasetColumn
record UpstreamDatasetColumn {
  upstreamColumnUrn: DatasetColumnUrn;
  // TODO can add more info on this edge, e.g. constraints, transformations, maybe the job
  // (or that could be a separate edge?)
}

@jplaisted
Copy link
Contributor

jplaisted commented Jul 23, 2020

I made #column-level-lineage in slack (this is an open channel), if we want a more casual place to discuss. Important updates / proposals should still go here.

@liangjun-jiang
Copy link
Contributor Author

Let me put more thoughts on this while I am working on the implementation in the past 48 hours.

  1. the more I am working on it, the more I think that we should make schema as first class entity. and it is also on the roadmap. dataset is a quite broad concept; and schema is more concrete since it does have columns or fields.
  2. In my implementation, I didn't lift schemaField up as a first-class entity (like dataset or CorpUser). It does create problem when I write the relationship to Neo4j with the current code base, because the current Neo4jGraphWritter only likes first class entity. It leaves me two options: 1) modify Neo4jGraphWritter.java 2) make SchemaField as first class entity. The problem I saw with SchemaField as first class entity is that they will be too many of it.
  3. Constraint doesn't have to be here right now. It is more like nice to have. I have not seen that people really care if it exists. Since the QueryParser is able to extract that information so I thought it makes sense to complete the lineage information we can provide.
  4. In the context of UpStreamLineage, say Table A -> Table B with where A.col1 = 123, I think A.col should be part of definition of UpStream as we have now. It makes sense to me to have array[SchemaField] for constraint, and in the file of UpStream. That being said, Constraint is field level.
  5. I actually also heard lineage from Spark jobs? I have not seen a concrete example. My understanding is that Spark job mostly does computation. It works with data sources to do computation. For example , connect Hive to fetch data, do some computation/transformation in memory, write results back to Hive or other data store. As long as we can get the DDL the Spark jobs are using, (which normally is from Hive hook), we can always parse the DDL and extract the table level/column level lineage.
    However, if the Spark jobs is not writing data back to databases, the QueryParser won't work, and we won't be able to parse the table/column lineage information in the way I am proposing.

@jplaisted
Copy link
Contributor

To the above:

  1. Yeah, I'll have to take a closer look at what we have for schemas. The models are open source today, seems like some other features aren't. Here it looks like schemas are first class entities and have fields within them; that may make the graph harder to build without fields as entities, I'm not sure. @keremsahin1 's our graph / Neo4J expert :)
  2. See above
  3. Ok, I think we can punt on this to help get a useful minimal design out first. We can always add it later!
  4. Makes sense. My concern would be if you are building some fields from Table A and some from Table B into Table C. Most of the time you probably have a common constraint, but it could start to be field based. In the table wide constraint, you could just copy that to all the fields, if we store it at the field level.
  5. Spark was just an example. Point is, you can really write any kind of program to read from a DB, do some filtering and/or transformations, and then write to another DB. This is basically what these SQL create table queries are doing. But it doesn't have to be SQL. Point is here, while SQL is a nice example to focus on, it isn't the only possibility. Things may need to be more generic. I'm not sure it impacts the current design / proposal for models. What it probably means is we'll need a nice library people can use in Spark / other jobs to help emit events like this. Not just utilities to parse SQL. Parsing other languages (Spark can be used in Java and Scala) probably isn't feasible :) Instrumenting that code / adding libraries may be a better approach. In either case let's reduce the scope of this conversation to just modeling.

So our current focus is how should we model fields? Are they top level entities? Can we leverage the schema models we have today?

@liangjun-jiang
Copy link
Contributor Author

For the point 5, I don't really know what is the most common use cases. From my background of data engineering, I think working with database DDL is more common, and most practical to be automated.
Without making fields aspect of dataset as top level entity, I manage to represent a graph relationship (hasField) with the current code base with minimal code change, and a little bit hack as well. Here is how it looks like.

Screen Shot 2020-07-23 at 4 14 36 PM

@mars-lan mars-lan added rfc See https://github.com/linkedin/datahub/blob/master/docs/rfc.md for more details and removed feature-request Request for a new feature to be added labels Jul 29, 2020
@liangjun-jiang
Copy link
Contributor Author

liangjun-jiang commented Aug 4, 2020

Based on my thought process, I think I have gotten this feature working, as illustrated below. Will update with more details later.

Screen Shot 2020-08-03 at 9 13 51 PM

@nagarjunakanamarlapudi
Copy link
Contributor

Thanks @liangjun-jiang for your thoughts and continued follow ups on this. Internally we had been putting some thoughts on this as well.

We are thinking of representing field level lineage as a separate aspect called DatasetFieldLevelUpstreamLineage called

Aspect Models

record DatasetFieldLevelUpstreamLineage {
 /**
  * Upstream to downstream field level lineage mappings
  */
 fieldMappings: array[FieldMapping]
}
/**
* Representation of how source fields are mapped to the destination fields.
*/
record FieldMapping {
 /**
  * Source entity from which the fine grained lineage is derived
  */
 sourceUrn: typeref SourceUrn = union[DatasetUrn]
 /**
  * Path spec type of the source fields
  */
 sourceFieldPathType: FieldPathSpecType
 /**
  * Source field paths
  */
 sourceFieldPaths: array[string]
 /**
  * Pathspec type of the destination fields
  */
 destinationFieldPathType: FieldPathSpecType
 /**
  * Destination field path
  */
 destinationFieldPath: string
 /**
  * A UDF mentioning how the source got transformed to destination. UDF also annotates how some metadata can be carried over from source fields to destination fields. BlackBox UDF doesn't know how the fields got transformed. A custom implementation of UDF can tell how the source fields got transformed into destination. It can also mention if any metadata of the source fields can get copied/derived to the destination field. 
  */
transformationUDF: optional string = "com.linkedin.finegrainedmetadata.transformation.BlackBox"
}
/**
* Types of pathspecs
*/
enum FieldPathSpecType {
 TMS, PEGASUS, JSON
}

One other implementation of UDF can be com.linkedin.finegrainedmetadata.transformation.Identity.

  • This UDF works only if there is a single source field.
  • This UDF says it is a pure copy of fields from source to destination.
  • Any metadata attached to source field can be carry forwarded to the destination fields.

As mentioned the above is just some hypothetical assumption of UDF. But, we can come up with more concrete interface for UDF.

What does this enable us to model

  • Ability to specify the more than one source fields from which the destination field is derived.
  • Ability to specify fine grained lineage from more than one source entity. [For instance Dataset and RestResource]
  • Ability to specify the transformation UDF which can elaborate. (The exact syntax of such UDF is yet to be thought thr').
    • How source fields got transformed to destination fields
    • Any metadata of the source fields that can be carried to destination fields.

How to convert the above metadata into graph representation

  1. Every field of a dataset will be a node in the graph. (Let us call it DatasetField. The unique identifier of this node will be combination of DatasetUrn + FieldPath)
  2. There will be a relation from Dataset Node to DatasetField nodes in graph. (Relation = HasField)
  3. A relation(DATASET_FIELD_DERIVED_FROM) will be established from source fields to destination fields.
  4. The above relation will have a property transformationUDF. The value of this relation can be the UDF FQCN

(I can expand this further on how the graph models might look like)

Please let us know on what do u think of this.

@liangjun-jiang
Copy link
Contributor Author

@nagarjunakanamarlapudi this is great. It definitely has more thoughts than what I have implemented.
In the big picture, I think what you have written has some similar as what I have done. I am trying to use the existing model (dataset, schemaMetadata and schemaField as much as possible), also I am using the existing neo4jDaoWrite as much as possible since I am not familiar with neo4j and the current implementation that well.

That being said, I will present my simplest implementation and reasoning in the next few comments.

@liangjun-jiang
Copy link
Contributor Author

liangjun-jiang commented Aug 4, 2020

Mentioned earlier, I have proposed two new relationships:

  1. HasField - the relationship between a dataset and its schemaFields.
@pairings = [{
     "destination" : "com.linkedin.schema.Urn",
     "source" : "com.linkedin.common.urn.DatasetUrn"
 }]
record HasField includes BaseRelationship {

}

2 DerivedBy - the relationship between a dataset's schemaField and its parent's schemaField.

@pairings = [{
     "destination" : "com.linkedin.common.urn.Urn",
     "source" : "com.linkedin.common.urn.Urn"
 }]
record DerivedBy includes BaseRelationship {

}

Because SchemaField is not a first-class entity in Datahub, the current Neo4jGraphWriterDao only supports the first class entity by enforcing [validation rule] implemented by RelationshipValidator

In the other word, the validator looks at the source and target of a pair of relationship:

  1. whether it is a Urn
  2. whether it is a registered Urn (first class entity)

I did the following two hacks

  1. remove this relationship validator.
  2. create a SchemaField urn on the fly by combining the dataset it belongs to and its field path. (more details later)

@liangjun-jiang
Copy link
Contributor Author

liangjun-jiang commented Aug 4, 2020

The next step is to create new aspect for dataset, named FinegrainUpstreamLineage.pdl. For simplicity, it has one property,

record FineGrainUpstreamLineage {
 upstreams: array[FineGrainUpstream]
}

I also created a FineGrainUpstream model, it looks like

record FineGrainUpstream {
  auditStamp: AuditStamp
  dataset: DatasetUrn
  fields: array[FineGrainFieldUpstream]
}

The FineGrainFieldUpstream model is also new, it looks like this:

record FineGrainFieldUpstream {
 sourceField: string
 targetField: string
}

To compare the existing table-level lineage

FineGrainUpstreamLineage --> UpStreamLineage
FineGrainUpstream --> UpStream
FineGrainFieldUpstream (new) --> n/a

@liangjun-jiang
Copy link
Contributor Author

The next steps to implement graph builder with metadata-builders.

  1. we want to build the graph between a dataset and its fields. We name it HasFieldBuilderFromSchemaMetadata.

SchemaMetadata is an existing model

public class HasFieldBuilderFromSchemaMetadata  extends BaseRelationshipBuilder<SchemaMetadata> {

  public HasFieldBuilderFromSchemaMetadata() {
    super(SchemaMetadata.class);
  }

  @Nonnull
  @Override
  public <URN extends Urn> List<GraphBuilder.RelationshipUpdates> buildRelationships(@Nonnull URN urn,
      @Nonnull SchemaMetadata schemaMetadata) {
    if (schemaMetadata.getFields() == null) {
      return Collections.emptyList();
    }
    List<HasField> list = new ArrayList();
    for (SchemaField field : schemaMetadata.getFields()) {
      try {
        list.add(new HasField().setSource(urn).setDestination(new Urn(urn.toString() + ":" + field.getFieldPath())));
      } catch (URISyntaxException e) {

      }
    }
    return Collections.singletonList(new GraphBuilder.RelationshipUpdates(
        list,
        REMOVE_ALL_EDGES_FROM_SOURCE)
    );
  }
}

As you can see here, we create a urn based on the dataset urn and schemaField

  1. we want to build the graph between a dataset's field and the field being derived in another dataset.
public class DerivedByBuilderFromFineGrainUpstreamLineage extends BaseRelationshipBuilder<FineGrainUpstreamLineage>  {
  public DerivedByBuilderFromFineGrainUpstreamLineage() {
    super(FineGrainUpstreamLineage.class);
  }

  @Nonnull
  @Override
  public List<GraphBuilder.RelationshipUpdates> buildRelationships(@Nonnull Urn urn, @Nonnull FineGrainUpstreamLineage upstreamLineage) {
    if (upstreamLineage.getUpstreams().isEmpty()) {
      return Collections.emptyList();
    }

    List<DerivedBy> list = new ArrayList();

    for(FineGrainUpstream stream: upstreamLineage.getUpstreams()) {
      for(FineGrainFieldUpstream fieldUpStream: stream.getFields()) {
        try {
          list.add(new DerivedBy()
              .setSource(Urn.createFromString(stream.getDataset().toString() + ":" + fieldUpStream.getSourceField()))
              .setDestination(Urn.createFromString(urn.toString() + ":" + fieldUpStream.getTargetField())));
        } catch (URISyntaxException e) {

        }
      }
    }

    return Collections.singletonList(new GraphBuilder.RelationshipUpdates(list, REMOVE_ALL_EDGES_FROM_SOURCE));
  }

}

pay attention on how the source urn is set in .setSource(Urn.createFromString(stream.getDataset().toString() + ":" + fieldUpStream.getSourceField()))
and destination urn is set in setDestination(Urn.createFromString(urn.toString() + ":" + fieldUpStream.getTargetField())

The base BaseRelationshipBuilder defines there is an urn always pointing to the dataset having this [FineGrainUpstreamLineage] relationship aspect. And this urn is the destination of upstream lineage.

The final step, we register these two relationship builders with DatasetGraphBuilder

Collections.unmodifiableSet(new HashSet<BaseRelationshipBuilder>() {
        {
          add(new DownstreamOfBuilderFromUpstreamLineage());
          add(new HasFieldBuilderFromSchemaMetadata());
          add(new OwnedByBuilderFromOwnership());
          add(new DerivedByBuilderFromFineGrainUpstreamLineage());
        }
      });

@liangjun-jiang
Copy link
Contributor Author

This implementation supports features

  1. relationship between a dataset and its fields
  2. relationship between a dataset and the other dataset (already available)
  3. relationship between a dataset field derived by one or more fields.

In this Neo4j graph presentation,

  1. dataset c_bar_13 has fields of bar131, bar132, bar133 and col134;
  2. p_foo_13 dataset has fields of foo131, foo132, foo133, and foo134
  3. 'p_2_foo_13` dataset has fields of 2_foo131, 2_foo132, 2_foo133, and 2_foo134
  4. dataset c_bar_13 is downstream of p_foo_13 and p_2_foo_13
  5. bar131 of c_bar_13 is derived by foo131 and 2_foo131
  6. bar 132 of c_bar_13 is derived by foo132 and 2_foo132

Screen Shot 2020-08-04 at 11 42 54 AM

@liangjun-jiang
Copy link
Contributor Author

Here are sample json requests to create dataset p_foo_13, p_2_foo_13 and c_bar_13.

For p_foo_13 and p_2_foo_13, it is nothing new. I put them in the later. For c_bar_13, we introduced new aspect of FineGrainUpStreamLineage.

  1. dataset request Json for c_bar_13
{
    "snapshot": {
        "aspects": [
            {
                "com.linkedin.common.Ownership": {
                    "owners": [
                        {
                            "owner": "urn:li:corpuser:kzhang13",
                            "type": "DATAOWNER"
                        }
                    ],
                    "lastModified": {
                        "time": 0,
                        "actor": "urn:li:corpuser:kzhang13"
                    }
                }
            },
            {
                "com.linkedin.dataset.UpstreamLineage": {
                    "upstreams": [
                        {
                            "auditStamp": {
                                "time": 0,
                                "actor": "urn:li:corpuser:kzhang13"
                            },
                            "dataset": "urn:li:dataset:(urn:li:dataPlatform:hasketl,p_foo_13,PROD)",
                            "type": "TRANSFORMED"
                        },
                        {
                            "auditStamp": {
                                "time": 0,
                                "actor": "urn:li:corpuser:kzhang13"
                            },
                            "dataset": "urn:li:dataset:(urn:li:dataPlatform:hasketl,2_p_foo_13,PROD)",
                            "type": "TRANSFORMED"
                        }
                    ]
                }                  
            },
            {
                "com.linkedin.dataset.FineGrainUpstreamLineage": {
                    "upstreams": [
                        {
                            "auditStamp": {
                                "time": 0,
                                "actor": "urn:li:corpuser:kzhang13"
                            },
                            "dataset": "urn:li:dataset:(urn:li:dataPlatform:hasketl,p_foo_13,PROD)",
                            "fields": [
                                {
                                    "sourceField": "foo131",
                                    "targetField" : "bar131"
                                },
                                {
                                    "sourceField": "foo132",
                                    "targetField" : "bar132"
                                }
                            ]
                        },
                        {
                            "auditStamp": {
                                "time": 0,
                                "actor": "urn:li:corpuser:kzhang13"
                            },
                            "dataset": "urn:li:dataset:(urn:li:dataPlatform:hasketl,p_2_foo_13,PROD)",
                            "fields": [
                                {
                                    "sourceField": "2_foo131",
                                    "targetField" : "bar131"
                                },
                                {
                                    "sourceField": "2_foo132",
                                    "targetField" : "bar132"
                                }
                            ]
                        }
                    ]
                }                  
            },
            {
                "com.linkedin.common.InstitutionalMemory": {
                    "elements": [
                        {
                            "url": "https://www.linkedin.com",
                            "description": "Sample doc",
                            "createStamp": {
                                "time": 0,
                                "actor": "urn:li:corpuser:kzhang13"
                            }
                        }
                    ]
                }
            },
            {
                "com.linkedin.schema.SchemaMetadata": {
                    "schemaName": "hbaseEvent",
                    "platform": "urn:li:dataPlatform:hbase",
                    "version": 0,
                    "created": {
                        "time": 0,
                        "actor": "urn:li:corpuser:kzhang13"
                    },
                    "lastModified": {
                        "time": 0,
                        "actor": "urn:li:corpuser:kzhang13"
                    },
                    "hash": "",
                    "platformSchema": {
                        "com.linkedin.schema.KafkaSchema": {
                            "documentSchema": "{\"type\":\"record\",\"name\":\"MetadataChangeEvent\",\"namespace\":\"com.linkedin.mxe\",\"doc\":\"Kafka event for proposing a metadata change for an entity.\",\"fields\":[{\"name\":\"auditHeader\",\"type\":{\"type\":\"record\",\"name\":\"KafkaAuditHeader\",\"namespace\":\"com.linkedin.avro2pegasus.events\",\"doc\":\"Header\"}}]}"
                        }
                    },
                    "fields": [
                        {
                            "fieldPath": "bar131",
                            "description": "Bar",
                            "nativeDataType": "string",
                            "type": {
                                "type": {
                                    "com.linkedin.schema.StringType": {}
                                }
                            }
                        },
                        {
                            "fieldPath": "bar132",
                            "description": "Bar",
                            "nativeDataType": "string",
                            "type": {
                                "type": {
                                    "com.linkedin.schema.StringType": {}
                                }
                            }
                        },
                        {
                            "fieldPath": "bar133",
                            "description": "Bar",
                            "nativeDataType": "string",
                            "type": {
                                "type": {
                                    "com.linkedin.schema.StringType": {}
                                }
                            }
                        },
                        {
                            "fieldPath": "col134",
                            "description": "Bar",
                            "nativeDataType": "string",
                            "type": {
                                "type": {
                                    "com.linkedin.schema.StringType": {}
                                }
                            }
                        }
                    ]
                }
            }
        ],
        "urn": "urn:li:dataset:(urn:li:dataPlatform:druid,c_bar_13,PROD)"
    }
}

  1. dataset json request to create p_bar_13
{
    "snapshot": {
        "aspects": [
            {
                "com.linkedin.common.Ownership": {
                    "owners": [
                        {
                            "owner": "urn:li:corpuser:kzhang13",
                            "type": "DATAOWNER"
                        }
                    ],
                    "lastModified": {
                        "time": 0,
                        "actor": "urn:li:corpuser:kzhang13"
                    }
                }
            },
            {
                "com.linkedin.schema.SchemaMetadata": {
                    "schemaName": "hbaseEvent",
                    "platform": "urn:li:dataPlatform:hbase",
                    "version": 0,
                    "created": {
                        "time": 0,
                        "actor": "urn:li:corpuser:kzhang13"
                    },
                    "lastModified": {
                        "time": 0,
                        "actor": "urn:li:corpuser:kzhang13"
                    },
                    "hash": "",
                    "platformSchema": {
                        "com.linkedin.schema.KafkaSchema": {
                            "documentSchema": "{\"type\":\"record\",\"name\":\"MetadataChangeEvent\",\"namespace\":\"com.linkedin.mxe\",\"doc\":\"Kafka event for proposing a metadata change for an entity.\",\"fields\":[{\"name\":\"auditHeader\",\"type\":{\"type\":\"record\",\"name\":\"KafkaAuditHeader\",\"namespace\":\"com.linkedin.avro2pegasus.events\",\"doc\":\"Header\"}}]}"
                        }
                    },
                    "fields": [
                        {
                            "fieldPath": "foo131",
                            "description": "foo",
                            "nativeDataType": "string",
                            "type": {
                                "type": {
                                    "com.linkedin.schema.StringType": {}
                                }
                            }
                        },
                        {
                            "fieldPath": "foo132",
                            "description": "foo",
                            "nativeDataType": "string",
                            "type": {
                                "type": {
                                    "com.linkedin.schema.StringType": {}
                                }
                            }
                        },
                        {
                            "fieldPath": "foo133",
                            "description": "foo",
                            "nativeDataType": "string",
                            "type": {
                                "type": {
                                    "com.linkedin.schema.StringType": {}
                                }
                            }
                        },
                        {
                            "fieldPath": "foo134",
                            "description": "foo",
                            "nativeDataType": "string",
                            "type": {
                                "type": {
                                    "com.linkedin.schema.StringType": {}
                                }
                            }
                        }
                    ]
                }
            }
        ],
        "urn": "urn:li:dataset:(urn:li:dataPlatform:hasketl,p_foo_13,PROD)"
    }
}



  1. dataset to create p_2_foo_13
{
    "snapshot": {
        "aspects": [
            {
                "com.linkedin.common.Ownership": {
                    "owners": [
                        {
                            "owner": "urn:li:corpuser:kzhang13",
                            "type": "DATAOWNER"
                        }
                    ],
                    "lastModified": {
                        "time": 0,
                        "actor": "urn:li:corpuser:kzhang13"
                    }
                }
            },
            {
                "com.linkedin.schema.SchemaMetadata": {
                    "schemaName": "hbaseEvent",
                    "platform": "urn:li:dataPlatform:hbase",
                    "version": 0,
                    "created": {
                        "time": 0,
                        "actor": "urn:li:corpuser:kzhang13"
                    },
                    "lastModified": {
                        "time": 0,
                        "actor": "urn:li:corpuser:kzhang13"
                    },
                    "hash": "",
                    "platformSchema": {
                        "com.linkedin.schema.KafkaSchema": {
                            "documentSchema": "{\"type\":\"record\",\"name\":\"MetadataChangeEvent\",\"namespace\":\"com.linkedin.mxe\",\"doc\":\"Kafka event for proposing a metadata change for an entity.\",\"fields\":[{\"name\":\"auditHeader\",\"type\":{\"type\":\"record\",\"name\":\"KafkaAuditHeader\",\"namespace\":\"com.linkedin.avro2pegasus.events\",\"doc\":\"Header\"}}]}"
                        }
                    },
                    "fields": [
                        {
                            "fieldPath": "2_foo131",
                            "description": "foo",
                            "nativeDataType": "string",
                            "type": {
                                "type": {
                                    "com.linkedin.schema.StringType": {}
                                }
                            }
                        },
                        {
                            "fieldPath": "2_foo132",
                            "description": "foo",
                            "nativeDataType": "string",
                            "type": {
                                "type": {
                                    "com.linkedin.schema.StringType": {}
                                }
                            }
                        },
                        {
                            "fieldPath": "2_foo133",
                            "description": "foo",
                            "nativeDataType": "string",
                            "type": {
                                "type": {
                                    "com.linkedin.schema.StringType": {}
                                }
                            }
                        },
                        {
                            "fieldPath": "2_foo134",
                            "description": "foo",
                            "nativeDataType": "string",
                            "type": {
                                "type": {
                                    "com.linkedin.schema.StringType": {}
                                }
                            }
                        }
                    ]
                }
            }
        ],
        "urn": "urn:li:dataset:(urn:li:dataPlatform:hasketl,p_2_foo_13,PROD)"
    }
}

@jplaisted
Copy link
Contributor

So I think this issue has a lot of really great ideas in it, but it is starting to get a little large and hard to follow. Jumping right from here to a large PR isn't that easy either :)

Can we maybe try the full RFC process here? i.e. a design doc? That should be easier to follow than this issue (the latest state of the RFC PR is the current proposal, no need to read a large back and forth discussion if you want to jump right in), and we can review that, and then after that is ok'd we can start code reviews.

I would also strongly suggest multiple PRs; try to make them smaller. A good example is the first PR should probably be models only, no code changes. Then you can start adding code support.

Let me know what you think, thanks!

@liangjun-jiang
Copy link
Contributor Author

Absolutely. @jplaisted . This issue was created before RFC was adopted. Happy to convert this into a RFC for future reference.
Also happy to split it into multiple PRs. Intrinsically, I don't expect this PR gotten merged, since I did some hacks.

@mars-lan
Copy link
Contributor

Assuming we can close this in favor of #1841. Feel free to reopen otherwise.

@anigos
Copy link

anigos commented Feb 15, 2024

Suppose I have a SQL in snowflake like this

INSERT INTO EMPLOYEE
(NAME, ADDRESS, ZIP)
FROM
SELECT F_NAME+L_NAME AS NAME,
SUBSTRING(FULL_ADDRESS,1,100),
ZIP
FROM
SOURCE_DATA;

How can I get lineage in datahub like

F_NAME+L_NAME ----> NAME
SUBSTRING(FULL_ADDRESS,1,100) ----> ADDRESS
ZIP ----> ZIP

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
rfc See https://github.com/linkedin/datahub/blob/master/docs/rfc.md for more details
Projects
None yet
Development

No branches or pull requests

6 participants