Skip to content

Commit

Permalink
[#24789] Spot fix fullvalue wrapping for SDF. (#24826)
Browse files Browse the repository at this point in the history
* [#24789] Spot fix fullvalue wrapping for SDF.

* Fix indenting on diagrams

* Reinstate comment
  • Loading branch information
lostluck committed Dec 29, 2022
1 parent 3f28d55 commit 0bba439
Showing 1 changed file with 23 additions and 20 deletions.
43 changes: 23 additions & 20 deletions sdks/go/pkg/beam/core/runtime/exec/sdf.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,13 @@ func (n *SplitAndSizeRestrictions) StartBundle(ctx context.Context, id string, d
// Input Diagram:
//
// *FullValue {
// Elm: *FullValue (original input)
// Elm2: *FullValue {
// Elm: Restriction
// Elm2: Watermark estimator state
// }
// Windows
// Timestamps
// Elm: *FullValue (original input)
// Elm2: *FullValue {
// Elm: Restriction
// Elm2: Watermark estimator state
// }
// Windows
// Timestamps
// }
//
// ProcessElement splits the given restriction into one or more restrictions and
Expand All @@ -175,22 +175,25 @@ func (n *SplitAndSizeRestrictions) StartBundle(ctx context.Context, id string, d
//
// Output Diagram:
//
// *FullValue {
// Elm: *FullValue {
// Elm: *FullValue (original input)
// Elm2: *FullValue {
// Elm: Restriction
// Elm2: Watermark estimator state
// }
// }
// Elm2: float64 (size)
// Windows
// Timestamps
// }
// *FullValue {
// Elm: *FullValue {
// Elm: *FullValue (original input)
// Elm2: *FullValue {
// Elm: Restriction
// Elm2: Watermark estimator state
// }
// }
// Elm2: float64 (size)
// Windows
// Timestamps
// }
func (n *SplitAndSizeRestrictions) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error {
rest := elm.Elm2.(*FullValue).Elm
ws := elm.Elm2.(*FullValue).Elm2
mainElm := elm.Elm.(*FullValue)

// If receiving directly from a datasource,
// the element may not be wrapped in a *FullValue
mainElm := convertIfNeeded(elm.Elm, &FullValue{})

splitRests := n.splitInv.Invoke(mainElm, rest)

Expand Down

0 comments on commit 0bba439

Please sign in to comment.