Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27658][SQL] Add FunctionCatalog API #24559

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.catalog;

import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;

/**
* Catalog methods for working with Functions.
*/
public interface FunctionCatalog extends CatalogPlugin {

/**
* List the functions in a namespace from the catalog.
rdblue marked this conversation as resolved.
Show resolved Hide resolved
* <p>
* If there are no functions in the namespace, implementations should return an empty array.
*
* @param namespace a multi-part namespace
* @return an array of Identifiers for functions
* @throws NoSuchNamespaceException If the namespace does not exist (optional).
*/
Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceException;

/**
* Load a function by {@link Identifier identifier} from the catalog.
*
* @param ident a function identifier
* @return an unbound function instance
* @throws NoSuchFunctionException If the function doesn't exist
*/
UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionException;

}
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.catalog.functions;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.DataType;

import java.io.Serializable;

/**
* Interface for a function that produces a result value by aggregating over multiple input rows.
* <p>
* For each input row, Spark will call an update method that corresponds to the
* {@link #inputTypes() input data types}. The expected JVM argument types must be the types used by
* Spark's InternalRow API. If no direct method is found or when not using codegen, Spark will call
* update with {@link InternalRow}.
* <p>
* The JVM type of result values produced by this function must be the type used by Spark's
* InternalRow API for the {@link DataType SQL data type} returned by {@link #resultType()}.
* <p>
* All implementations must support partial aggregation by implementing merge so that Spark can
* partially aggregate and shuffle intermediate results, instead of shuffling all rows for an
* aggregate. This reduces the impact of data skew and the amount of data shuffled to produce the
* result.
* <p>
* Intermediate aggregation state must be {@link Serializable} so that state produced by parallel
* tasks can be serialized, shuffled, and then merged to produce a final result.
*
* @param <S> the JVM type for the aggregation's intermediate state; must be {@link Serializable}
* @param <R> the JVM type of result values
*/
public interface AggregateFunction<S extends Serializable, R> extends BoundFunction {

/**
* Initialize state for an aggregation.
* <p>
* This method is called one or more times for every group of values to initialize intermediate
cloud-fan marked this conversation as resolved.
Show resolved Hide resolved
* aggregation state. More than one intermediate aggregation state variable may be used when the
* aggregation is run in parallel tasks.
* <p>
* Implementations that return null must support null state passed into all other methods.
*
* @return a state instance or null
*/
S newAggregationState();

/**
* Update the aggregation state with a new row.
* <p>
* This is called for each row in a group to update an intermediate aggregation state.
*
* @param state intermediate aggregation state
* @param input an input row
* @return updated aggregation state
*/
default S update(S state, InternalRow input) {
throw new UnsupportedOperationException("Cannot find a compatible AggregateFunction#update");
}

/**
* Merge two partial aggregation states.
* <p>
* This is called to merge intermediate aggregation states that were produced by parallel tasks.
*
* @param leftState intermediate aggregation state
* @param rightState intermediate aggregation state
* @return combined aggregation state
*/
S merge(S leftState, S rightState);

/**
* Produce the aggregation result based on intermediate state.
*
* @param state intermediate aggregation state
* @return a result value
*/
R produceResult(S state);

cloud-fan marked this conversation as resolved.
Show resolved Hide resolved
}
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.catalog.functions;

import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.IntegerType;
import org.apache.spark.sql.types.StructType;

import java.util.UUID;

/**
* Represents a function that is bound to an input type.
*/
public interface BoundFunction extends Function {

/**
* Returns the required {@link DataType data types} of the input values to this function.
* <p>
* If the types returned differ from the types passed to {@link UnboundFunction#bind(StructType)},
* Spark will cast input values to the required data types. This allows implementations to
* delegate input value casting to Spark.
*
* @return an array of input value data types
*/
DataType[] inputTypes();

/**
* Returns the {@link DataType data type} of values produced by this function.
* <p>
* For example, a "plus" function may return {@link IntegerType} when it is bound to arguments
* that are also {@link IntegerType}.
*
* @return a data type for values produced by this function
*/
DataType resultType();

/**
* Returns whether the values produced by this function may be null.
* <p>
* For example, a "plus" function may return false when it is bound to arguments that are always
* non-null, but true when either argument may be null.
*
* @return true if values produced by this function may be null, false otherwise
*/
default boolean isResultNullable() {
return true;
}

/**
* Returns whether this function result is deterministic.
* <p>
* By default, functions are assumed to be deterministic. Functions that are not deterministic
* should override this method so that Spark can ensure the function runs only once for a given
* input.
*
* @return true if this function is deterministic, false otherwise
*/
default boolean isDeterministic() {
return true;
}

/**
* Returns the canonical name of this function, used to determine if functions are equivalent.
* <p>
* The canonical name is used to determine whether two functions are the same when loaded by
* different catalogs. For example, the same catalog implementation may be used for by two
* environments, "prod" and "test". Functions produced by the catalogs may be equivalent, but
* loaded using different names, like "test.func_name" and "prod.func_name".
* <p>
* Names returned by this function should be unique and unlikely to conflict with similar
* functions in other catalogs. For example, many catalogs may define a "bucket" function with a
* different implementation. Adding context, like "com.mycompany.bucket(string)", is recommended
* to avoid unintentional collisions.
*
* @return a canonical name for this function
*/
default String canonicalName() {
// by default, use a random UUID so a function is never equivalent to another, even itself.
// this method is not required so that generated implementations (or careless ones) are not
// added and forgotten. for example, returning "" as a place-holder could cause unnecessary
// bugs if not replaced before release.
return UUID.randomUUID().toString();
}
cloud-fan marked this conversation as resolved.
Show resolved Hide resolved
}
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.catalog.functions;

import java.io.Serializable;

/**
* Base class for user-defined functions.
*/
public interface Function extends Serializable {

/**
* A name to identify this function. Implementations should provide a meaningful name, like the
* database and function name from the catalog.
*/
String name();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we mention that function implementations may be case sensitive or case insensitive?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not here because this is the name reported by the function for Spark to use in messages. Case sensitivity is a concern when loading. There, we can't change whether the catalog is case sensitive or not, so I'm not sure it would provide much value. If you think it does, then please follow up with a PR that adds some wording for the TableCatalog and FunctionCatalog interfaces.


}
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.catalog.functions;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.DataType;

/**
* Interface for a function that produces a result value for each input row.
* <p>
* For each input row, Spark will call a produceResult method that corresponds to the
* {@link #inputTypes() input data types}. The expected JVM argument types must be the types used by
* Spark's InternalRow API. If no direct method is found or when not using codegen, Spark will call
* {@link #produceResult(InternalRow)}.
* <p>
* The JVM type of result values produced by this function must be the type used by Spark's
* InternalRow API for the {@link DataType SQL data type} returned by {@link #resultType()}.
*
* @param <R> the JVM type of result values
*/
public interface ScalarFunction<R> extends BoundFunction {

/**
* Applies the function to an input row to produce a value.
*
* @param input an input row
* @return a result value
*/
default R produceResult(InternalRow input) {
throw new UnsupportedOperationException(
"Cannot find a compatible ScalarFunction#produceResult");
}

}
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.catalog.functions;

import org.apache.spark.sql.types.StructType;

/**
* Represents a user-defined function that is not bound to input types.
*/
public interface UnboundFunction extends Function {

/**
* Bind this function to an input type.
* <p>
* If the input type is not supported, implementations must throw
* {@link UnsupportedOperationException}.
* <p>
* For example, a "length" function that only supports a single string argument should throw
* UnsupportedOperationException if the struct has more than one field or if that field is not a
* string, and it may optionally throw if the field is nullable.
*
* @param inputType a struct type for inputs that will be passed to the bound function
* @return a function that can process rows with the given input type
* @throws UnsupportedOperationException If the function cannot be applied to the input type
*/
BoundFunction bind(StructType inputType);

/**
* Returns Function documentation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit -- the method is called "description", should we say that this returns a description of the function (as opposed to "documentation")?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think they're synonymous in this context.

*
* @return this function's documentation
*/
String description();

}