Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

docs: KLIP-28 introduce CREATE OR REPLACE #5611

Merged
merged 4 commits into from
Jun 23, 2020
Merged

Conversation

agavra
Copy link
Contributor

@agavra agavra commented Jun 11, 2020

@agavra agavra marked this pull request as ready for review June 11, 2020 22:55
@agavra agavra requested a review from a team as a code owner June 11, 2020 22:55
@big-andy-coates
Copy link
Contributor

Could you add the .md file extension please?

Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few questions...


- Any _transparent_ upgrade will be supported
- Any _data selection_ upgrade will be supported
- _Schema evolution_ upgrades will be supported on simple and stateful queries, but it will be
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this include removing columns from the output schema, as well as adding columns?

What about changing the type of a column?

Removing and changing the type of existing columns has implications for downstream queries. The simplest solution would be to not allow either of these in this first pass, but not sure of the intent of the KLIP from this statement. Can we make sure the doc is clearer on this please?

IMHO it's crucial that we ensure dumping the full database schema and running it on a fresh instance would succeed and result in all the same output schemas and data.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example,

Let's say we have:

CREATE STREAM S1 AS 
   SELECT ID, NAME, COST FROM BLAH;

CREATE STREAM S2 AS
   SELECT ID, NAME FROM S1;

But then we run:

-- drops the `NAME` column from the projection:
CREATE OR REPLACE S1 AS
   SELECT ID, COST FROM BLAH;

Now S2 references a column in S1 that doesn't exist in the DB schema! While this would work with formats that can support evolution and optional fields etc... it leaves the DB schema inconsistent, i.e. we lose referential integrity of the db schema.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I'd go with not allowing rename / removing initially.

Later we can build up system tables that track what columns are actually used in downstream queries and only allows you to remove / rename columns that aren't in use.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be interesting to see what a trad db does if you try and rename/remove a column in DB if its used in a materialized view?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@big-andy-coates I feel like this should follow the schema compatibility rules defined in schema registry, shouldn't it?
If schema registry isn't configured, maybe fall back to FORWARD, then one can only remove nullable fields, but I don't have a strong opinion on that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@PeterLinder originally I was thinking along the same lines as you, but I think Andy has me convinced. While removing fields would be acceptable from a data schema evolution perspective, the table schema evolution would be broken by removing fields that are used in downstream queries.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added:

Note that _Schema Evolution_ compatibility is defined by the limitations of the serialization
format that is used with the added restrictions against removing fields and changing types to ensure
referential integrity of ksqlDB tables. This way, downstream query output schemas willn not be affected
by upstream schema evolution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! Though note its not just removing columns, but also renaming columns!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my perspective, there isn't be a difference between removing and renaming (renaming=removing and adding). If I'm checking that all columns from the previous schema exist in the new one, if one was renamed that check would fail!


For step 4, we need to be able to generate queryIDs differently for `CREATE OR REPLACE` statements
than for others (i.e. it shouldn't just be the offset of the command, but rather the same queryID
as the original query.) One simple way to implement this is to allow the queryID to be specified
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One simple way to implement this is to allow the queryID to be specified in the command topic.

FYI, the query plan already includes the query id.

Copy link
Member

@rmoff rmoff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 👍

- Compute: recomputing the complete history for a query may not be feasible

Kafka Streams provides more granular mechanisms (e.g. restarting queries with different behaviors
but identical consumer groups), but these methods burden users with extra complexity and lack guardrails.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of

but these methods burden users with extra complexity and lack guardrails

how about

but to directly expose these methods to users through ksqlDB would burden them with extra complexity and lack guardrails

Or, perhaps I'm misunderstanding the intention here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think both statements are true - the intention in this KLIP is also to call out that there are improvements that need to be made in Kafka Streams to ensure that an upgrade is valid. Some of that burden will be on KSQL in the short term, but the intention is to address some of these concerns inside Kafka Streams as well

| | Topology | These upgrades are invisible to the user, but change the topology, such as the number of sub-topologies or the ordering of operations (e.g. filter push down) |
| | Scaling | Scaling upgrades change the physical properties of the query in order to enable better performance characteristics. |
| | Unsupported | Unsupported upgrades are ones that will semantically change the query in an unsupported way. There are no plans to implement these migrations. |
| Environment | Backfill | Backfill requires the output data to be accurate not just from a point in time, but from the earliest point of retained history |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which Environment would be for a Simple query that didn't require backfill? For example, a query with a predicate that needed to be changed but only future-processed records needed to reflect it (an example would be changing the alert threshold for a monitor)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't specified this, but the intention is that none of the environments need to apply (so in your example it would be simply a "Simple" query). I will clarify the KLIP

@big-andy-coates
Copy link
Contributor

Another thought... the KLIP does not directly address / reference / mention CREATE OR REPLACE on CREATE TABLE / CREATE STREAM statements, i..e CT/CS not CTAS/CSAS. Is this intentional? Or, given these are the easier ones to tackle, not intentional?

@agavra
Copy link
Contributor Author

agavra commented Jun 19, 2020

the KLIP does not directly address / reference / mention CREATE OR REPLACE on CREATE TABLE / CREATE STREAM statements

I'll update the KLIP to add a section on those, though it is the same as the DDL part of C*AS statements.

Copy link
Member

@vpapavas vpapavas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great job @agavra ! LGTM. I am only confused by what INSERT INTO has to do with this proposal

Copy link
Contributor

@vcrfxia vcrfxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM (with a few questions inline)!

| | Schema Evolution | Schema evolving query upgrades change the output type of the data |
| | Source Modifying | These upgrades change the source data, whether by means of modifying a JOIN or swapping out a source |
| | Topology | These upgrades are invisible to the user, but change the topology, such as the number of sub-topologies or the ordering of operations (e.g. filter push down) |
| | Scaling | Scaling upgrades change the physical properties of the query in order to enable better performance characteristics. |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's an example of this? Changing the number of streams threads, and other streams properties?

What about changing parameters such as the number of output partitions -- and other properties from the "WITH" clause, such as output format? Will that be supported?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's an example of this?

I don't have a good example as available in the language today, but things like scaling out partitions or streams threads would such "scaling" operations. We could imagine changing number of stream threads might be one such thing (e.g. CREATE OR REPLACE ... WITH(num_threads=1000))

The scope of what this KLIP will implement doesn't really address any scaling changes (such as changing number of output partitions) but that could be some advanced upgrades that we want to support in the future. I felt it was important to include this in the table though for completeness

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a CREATE OR REPLACE statement is issued, does the new query use the query properties (e.g., number of streams threads) set at the time of the CREATE OR REPLACE statement or does it maintain properties from the original query (if present)? I assume the latter but think this is worth calling out explicitly. If this is the case, then changing the number of streams threads is already supported under this proposal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would indeed use the most recent one - so I suppose it is already supported 😂 I can update the KLIP if you'd like, though perhaps just seeing this discussion is sufficient documentation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we planning to do any validation on the new query properties, in terms of checking for compatibility? For example, I'm not entirely clear on what happens if a Streams app is restarted with a different value for replication.factor (assuming the internal topics are already created with the old value) but I suspect there are combinations of values where this might fail?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we planning to do any validation on the new query properties, in terms of checking for compatibility?

The intention for compatibility was to setup a whitelist of items that can change, and if anything else changes we consider the upgrade incompatible. Honestly, I hadn't considered the query properties until now, but they definitely seem something important to put into the compatibility calculus.

As for your specific question, I think upping the replication factor is supported but reducing it is not (though I might be wrong on that one).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Just wanted to check we had a plan :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I appreciate the thorough review 😄

| | Source Modifying | These upgrades change the source data, whether by means of modifying a JOIN or swapping out a source |
| | Topology | These upgrades are invisible to the user, but change the topology, such as the number of sub-topologies or the ordering of operations (e.g. filter push down) |
| | Scaling | Scaling upgrades change the physical properties of the query in order to enable better performance characteristics. |
| | Unsupported | Unsupported upgrades are ones that will semantically change the query in an unsupported way. There are no plans to implement these migrations. |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"No plans to implement these migrations" at this time, or ever?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ever - these are things like changing a stream to table, or a table to a stream. Things that just don't make sense in our mental model.

- _Topology changes_ will not be supported

Note that _Schema Evolution_ compatibility is defined by the limitations of the serialization
format that is used with the added restrictions against removing fields and changing types to ensure
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean the only thing that's supported is adding fields? What about reordering fields? Reordering fields would be breaking in the case of DELIMITED.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see my note below - that would be a limitation of the serialization format. Reordering would be OK in JSON for example, but not in delimited (and I don't think in AVRO or PROTOBUF either). But essentially the only "useful" query upgrade for schema evolution is adding fields - which I suspect is a vast majority of the use cases for schema evolution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've even got to be careful when reordering JSON columns. Once we support multiple key columns, changing the order of key columns would break the binary compatibility of keys.

IMHO, best just to no allow column re-ordering. I can't see it being a much asked for thing anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair, the ethos of this project is to be extra restrictive and lift restrictions later so we may as well be extra cautious up front.


Note that _Schema Evolution_ compatibility is defined by the limitations of the serialization
format that is used with the added restrictions against removing fields and changing types to ensure
referential integrity of ksqlDB tables. This way, downstream query output schemas willn not be affected
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this true? Wouldn't adding a field would break downstream queries in the case of DELIMITED?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - that's why I mention "defined by the limitations of the serialization format that is used" and then there's added restrictions against removing fields and changing types, even if the format supports that. Changing a type, for example, would be supported by delimited and JSON - but I plan on explicitly restricting that.

Insert into can eventually be replaced with `UNION` as proposed in [KLIP-17](https://github.com/confluentinc/ksql/pull/4125),
but that must happen in lock-step with this proposal. At first, we will support `CREATE OR REPLACE`
and `INSERT INTO`. Then, we will add support for `UNION`, allowing us to model consecutive `INSERT INTO`
statements as replacing unions with larger unions (essentially adding an extra source to the union).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool!

@agavra
Copy link
Contributor Author

agavra commented Jun 23, 2020

I am only confused by what INSERT INTO has to do with this proposal

Thanks for the review @vpapavas - INSERT INTO is relevant for two reasons. Let's imagine I have the following:

CREATE STREAM a (col1 INT, col2 INT) WITH ...;
CREATE STREAM c (col1 INT, col2 INT) WITH ...;

CREATE STREAM b AS SELECT col1 FROM a;
INSERT INTO b SELECT col1 AS col1 FROM c;

CREATE OR REPLACE b AS SELECT col1, col2 FROM b;

For an INSERT INTO to work, the output schema needs to exactly match that of the sink. Specifically INSERT INTO b SELECT col1 AS col1 FROM c would work when the schema for b is just col1 INT, but if it changes to col1 INT, col2 INT the original INSERT INTO would need to be updated to also give a value for col2. We don't have a way to do that with a single CREATE OR REPLACE statement and that would complicate things dramatically - so I decided for this KLIP to just not support CREATE OR REPLACE if there's an INSERT INTO writing into the source.

The second thing, is that we're planning on removing INSERT INTO and replacing it with UNION. CREATE OR REPLACE would make this more powerful because you could add new UNIONs on the fly (see KLIP-17).

I hope that clears things up!

Co-authored-by: Victoria Xia <victoria.f.xia281@gmail.com>
@agavra
Copy link
Contributor Author

agavra commented Jun 23, 2020

Merging with three +1s, we can always make changes as implementation comes up or afterwards if people want to still get in on the review!

@agavra agavra merged commit 19f277d into confluentinc:master Jun 23, 2020
@agavra agavra deleted the klip-28 branch June 23, 2020 17:42
Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

- _Topology changes_ will not be supported

Note that _Schema Evolution_ compatibility is defined by the limitations of the serialization
format that is used with the added restrictions against removing fields and changing types to ensure
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not just removing fields, but also renaming fields.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copying from above:

From my perspective, there isn't be a difference between removing and renaming (renaming=removing and adding). If I'm checking that all columns from the previous schema exist in the new one, if one was renamed that check would fail!

The reason I'm pedantic about this is that figuring out whether a field is "renamed" is actually pretty darn tough!

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants