Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pinot query options to query #21902

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

naman-patel
Copy link

@naman-patel naman-patel commented May 9, 2024

Description

Currently no Pinot query options can be passed along with query. This means Pinot null handling cant be leveraged. This PR provides a way to provide these query options.

Additional context and related issues

The same PR was also implemented for Presto. Here is the reference to that.

Fixes #21897

Release notes

(x) Release notes are required, with the following suggested text:

# Pinot
* Allow users to configure Pinot query options. ({21897}`issuenumber`)

@@ -44,4 +44,18 @@ public void testConnectionTimeoutParsedProperly()
.build();
assertThat(PinotSessionProperties.getConnectionTimeout(session)).isEqualTo(new Duration(0.25, TimeUnit.MINUTES));
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a new assertion to BasePinotConnectorSmokeTest#testQueryOptions with query_options session property.

Copy link
Member

@ebyhr ebyhr May 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can define the variable like below and use it in assertion method.

        Session queryOptions = Session.builder(getQueryRunner().getDefaultSession())
                .setCatalogSessionProperty("pinot", "query_options", "...")
                .build();

Please refer to BasePinotConnectorSmokeTest.testAggregationPushdown.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is connector config. I tried session catalog property. It complains its not a recognized session property.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can use the above code. How did you try?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exacty what you have there. It compiles but since that is not catalog session property, I see a runtime exception.

Session queryOptions = Session.builder(getQueryRunner().getDefaultSession())
                .setCatalogSessionProperty("pinot", "query_options", "skipUpsert=true")
                .build();
        assertThat(query(queryOptions,"SELECT city, \"sum(long_number)\" FROM" +
                         " \"SET skipUpsert = 'true';" +
                         " SET numReplicaGroupsToQuery = '1';" +
                         " SELECT city, SUM(long_number)" +
                         "  FROM my_table" +
                         "  GROUP BY city" +
                         "  HAVING SUM(long_number) > 10000\""))
                .matches("VALUES (VARCHAR 'Los Angeles', DOUBLE '50000.0'), (VARCHAR 'New York', DOUBLE '20000.0')")
                .isFullyPushedDown();

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The session value skipUpsert=true should be skipUpsert:true.

@elonazoulay elonazoulay requested a review from xiangfu0 May 10, 2024 04:39
@naman-patel
Copy link
Author

@ebyhr addrressed your feedback except for the last one as I dont see a way to add those session properties in test framework.

Comment on lines +243 to +267
public static Optional<String> getQueryOptionsString(String options)
{
if (isNullOrEmpty(options)) {
return Optional.empty();
}

Map<String, String> queryOptionsMap = ImmutableMap.copyOf(MAP_SPLITTER.split(options));
return getQueryOptions(queryOptionsMap);
}

public static Optional<String> getQueryOptions(Map<String, String> queryOptionsMap)
{
if (queryOptionsMap.isEmpty()) {
return Optional.empty();
}
StringBuilder result = new StringBuilder();
for (Map.Entry<String, String> entry : queryOptionsMap.entrySet()) {
result.append("SET ")
.append(entry.getKey())
.append(" = ")
.append(entry.getValue())
.append(";\n");
}
return Optional.of(result.toString());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use

public Map<String, String> getAdditionalHeaders()
{
return additionalHeaders;
}
@Config("hive.metastore.http.client.additional-headers")
@ConfigDescription("Comma separated key:value pairs to be send to metastore as additional headers")
public ThriftHttpMetastoreConfig setAdditionalHeaders(String httpHeaders)
{
try {
// we allow escaping the delimiters like , and : using back-slash.
// To support that we create a negative lookbehind of , and : which
// are not preceded by a back-slash.
String headersDelim = "(?<!\\\\),";
String kvDelim = "(?<!\\\\):";
Map<String, String> temp = new HashMap<>();
if (httpHeaders != null) {
for (String kv : httpHeaders.split(headersDelim)) {
String key = kv.split(kvDelim, 2)[0].trim();
String val = kv.split(kvDelim, 2)[1].trim();
temp.put(key, val);
}
this.additionalHeaders = ImmutableMap.copyOf(temp);
}
}
catch (IndexOutOfBoundsException e) {
throw new IllegalArgumentException(String.format("Invalid format for 'hive.metastore.http.client.additional-headers'. " +
"Value provided is %s", httpHeaders), e);
}
return this;
}
as PoC on how to deal with a config value containing a map of values.

@ebyhr
Copy link
Member

ebyhr commented May 23, 2024

Could you rebase on master to resolve conflicts?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

Pinot connector has no way to pass query option along with query
3 participants