Writing Custom Aggregate Functions in SQL Just Like a Java 8 Stream Collector

Spread the love
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  

All SQL databases support the standard aggregate functions COUNT(), SUM(), AVG(), MIN(), MAX().
Some databases support other aggregate functions, like:
EVERY()
STDDEV_POP()
STDDEV_SAMP()
VAR_POP()
VAR_SAMP()
ARRAY_AGG()
STRING_AGG()
But what if you want to roll your own?
Java 8 Stream Collector
When using Java 8 streams, we can easily roll our own aggregate function (i.e. a Collector). Let’s assume we want to find the second highest value in a stream. The highest value can be obtained like this:

System.out.println(
Stream.of(1, 2, 3, 4)
.collect(Collectors.maxBy(Integer::compareTo))
) ;

Yielding:

Optional[4]

Now, what about the second highest value? We can write the following collector:

System.out.println(
Stream.of(1, 6, 2, 3, 4, 4, 5).parallel()
.collect(Collector.of(
() -> new int[] {
Integer.MIN_VALUE,
Integer.MIN_VALUE
},
(a, i) -> {
if (a[0] < i) { a[1] = a[0]; a[0] = i; } else if (a[1] < i) a[1] = i; }, (a1, a2) -> {
if (a2[0] > a1[0]) {
a1[1] = a1[0];
a1[0] = a2[0];

if (a2[1] > a1[1])
a1[1] = a2[1];
}
else if (a2[0] > a1[1])
a1[1] = a2[0];

return a1;
},
a -> a[1]
))
) ;

It doesn’t do anything fancy. It has these 4 functions:
Supplier: A supplier that provides an intermediary int[] of length 2, initialised with Integer.MIN_VALUE, each. This array will remember the MAX() value in the stream at position 0 and the SECOND_MAX() value in the stream at position 1
BiConsumer: A accumulator that accumulates new values from the stream into our intermediary data structure.
BinaryOperator: A combiner that combines two intermediary data structures. This is used for parallel streams only.
Function: The finisher function that extracts the SECOND_MAX() function from the second position in our intermediary array.
The output is now:

5

How to do the same thing with SQL?
Many SQL databases offer a very similar way of calculating custom aggregate functions. Here’s how to do the exact same thing with…
Oracle:
With the usual syntactic ceremony…

CREATE TYPE u_second_max AS OBJECT (

— Intermediary data structure
MAX NUMBER,
SECMAX NUMBER,

— Corresponds to the Collector.supplier() function
STATIC FUNCTION ODCIAggregateInitialize(sctx IN OUT u_second_max) RETURN NUMBER,

— Corresponds to the Collector.accumulate() function
MEMBER FUNCTION ODCIAggregateIterate(self IN OUT u_second_max, value IN NUMBER) RETURN NUMBER,

— Corresponds to the Collector.combineer() function
MEMBER FUNCTION ODCIAggregateMerge(self IN OUT u_second_max, ctx2 IN u_second_max) RETURN NUMBER,

— Correspodns to the Collector.finisher() function
MEMBER FUNCTION ODCIAggregateTerminate(self IN u_second_max, returnValue OUT NUMBER, flags IN NUMBER) RETURN NUMBER
)
/

— This is our “colletor” implementation
CREATE OR REPLACE TYPE BODY u_second_max IS
STATIC FUNCTION ODCIAggregateInitialize(sctx IN OUT u_second_max)
RETURN NUMBER IS
BEGIN
SCTX := U_SECOND_MAX(0, 0);
RETURN ODCIConst.Success;
END;

MEMBER FUNCTION ODCIAggregateIterate(self IN OUT u_second_max, value IN NUMBER) RETURN NUMBER IS
BEGIN
IF VALUE > SELF.MAX THEN
SELF.SECMAX := SELF.MAX;
SELF.MAX := VALUE;
ELSIF VALUE > SELF.SECMAX THEN
SELF.SECMAX := VALUE;
END IF;
RETURN ODCIConst.Success;
END;

MEMBER FUNCTION ODCIAggregateTerminate(self IN u_second_max, returnValue OUT NUMBER, flags IN NUMBER) RETURN NUMBER IS
BEGIN
RETURNVALUE := SELF.SECMAX;
RETURN ODCIConst.Success;
END;

MEMBER FUNCTION ODCIAggregateMerge(self IN OUT u_second_max, ctx2 IN u_second_max) RETURN NUMBER IS
BEGIN
IF CTX2.MAX > SELF.MAX THEN
SELF.SECMAX := SELF.MAX;
SELF.MAX := CTX2.MAX;

IF CTX2.SECMAX > SELF.SECMAX THEN
SELF.SECMAX := CTX2.SECMAX;
END IF;
ELSIF CTX2.MAX > SELF.SECMAX THEN
SELF.SECMAX := CTX2.MAX;
END IF;

RETURN ODCIConst.Success;
END;
END;
/

— Finally, we have to give this aggregate function a name
CREATE FUNCTION SECOND_MAX (input NUMBER) RETURN NUMBER
PARALLEL_ENABLE AGGREGATE USING u_second_max;
/

We can now run the above on the Sakila database:

SELECT
max(film_id),
second_max(film_id)
FROM film;

To get:

MAX SECOND_MAX
——————
1000 999

And what’s even better, we can use the aggregate function as a window function for free!

SELECT
film_id,
length,
max(film_id) OVER (PARTITION BY length),
second_max(film_id) OVER (PARTITION BY length)
FROM film
ORDER BY length, film_id;

The above yields:

FILM_ID LENGTH MAX SECOND_MAX
———————————
15 46 730 505
469 46 730 505
504 46 730 505
505 46 730 505
730 46 730 505
237 47 869 784
247 47 869 784
393 47 869 784
398 47 869 784
407 47 869 784
784 47 869 784
869 47 869 784
2 48 931 866
410 48 931 866
575 48 931 866
630 48 931 866
634 48 931 866
657 48 931 866
670 48 931 866
753 48 931 866
845 48 931 866
866 48 931 866
931 48 931 866

Beautiful, right?
PostgreSQL
PostgreSQL supports a slightly more concise syntax in the CREATE AGGREGATE statement. If we don’t allow for parallelism, we can write this minimal implementation:

CREATE FUNCTION second_max_sfunc (
state INTEGER[], data INTEGER
) RETURNS INTEGER[] AS
$$
BEGIN
IF state IS NULL THEN
RETURN ARRAY[data, NULL];
ELSE
RETURN CASE
WHEN state[1] > data
THEN CASE
WHEN state[2] > data
THEN state
ELSE ARRAY[state[1], data]
END
ELSE ARRAY[data, state[1]]
END;
END IF;
END;
$$ LANGUAGE plpgsql;
/

CREATE FUNCTION second_max_ffunc (
state INTEGER[]
) RETURNS INTEGER AS
$$
BEGIN
RETURN state[2];
END;
$$ LANGUAGE plpgsql;

CREATE AGGREGATE second_max (INTEGER) (
SFUNC = second_max_sfunc,
STYPE = INTEGER[],
FINALFUNC = second_max_ffunc
);

Here, we use the STYPE (Collector.supplier()), the SFUNC (Collector.accumulator()), and the FINALFUNC (Collector.finisher()) specifications.
Other databases
Many other databases allow for specifying user defined aggregate functions. Look up your database manual’s details to learn more. They always work in the same way as a Java 8 Collector.

X ITM Cloud News

Emily

Next Post

Imperative Loop or Functional Stream Pipeline? Beware of the Performance Impact!

Sun Nov 24 , 2019
Spread the love          I like weird, yet concise language constructs and API usages Because you’re evil. — Nicolai Parlog (@nipafx) October 25, 2018 Yes. I am guilty. Evil? Don’t know. But guilty. I heavily use and abuse the java.lang.Boolean type to implement three valued logic in Java: Boolean.TRUE means true (duh) […]
X- ITM

Cloud Computing – Consultancy – Development – Hosting – APIs – Legacy Systems

X-ITM Technology helps our customers across the entire enterprise technology stack with differentiated industry solutions. We modernize IT, optimize data architectures, and make everything secure, scalable and orchestrated across public, private and hybrid clouds.

This image has an empty alt attribute; its file name is x-itmdc.jpg

The enterprise technology stack includes ITO; Cloud and Security Services; Applications and Industry IP; Data, Analytics and Engineering Services; and Advisory.

Watch an animation of  X-ITM‘s Enterprise Technology Stack

We combine years of experience running mission-critical systems with the latest digital innovations to deliver better business outcomes and new levels of performance, competitiveness and experiences for our customers and their stakeholders.

X-ITM invests in three key drivers of growth: People, Customers and Operational Execution.

The company’s global scale, talent and innovation platforms serve 6,000 private and public-sector clients in 70 countries.

X-ITM’s extensive partner network helps drive collaboration and leverage technology independence. The company has established more than 200 industry-leading global Partner Network relationships, including 15 strategic partners: Amazon Web Services, AT&T, Dell Technologies, Google Cloud, HCL, HP, HPE, IBM, Micro Focus, Microsoft, Oracle, PwC, SAP, ServiceNow and VMware

.

X ITM