Spark 2.3.2 Scala search pattern

I have only N (num rows in pattern) and two datasets with types:

root
 |-- ts_begin: long (nullable = true)
 |-- btype: integer (nullable = false)
 |-- disp: double (nullable = true)
 |-- log_co: double (nullable = true)

One of them too big (history data) and one has size=N (generally no more then 10 rows).

Example of data (history):

+----------+-----+-------+-------+
|  ts_begin|btype|   disp| log_co|
+----------+-----+-------+-------+
|1535536647|    1|3.44E-4| 4.4E-4|
|1535536947|    1|1.16E-4| 4.0E-4|
|1535537250|   -1|1.03E-4|-2.0E-4|
|1535537550|    1|1.15E-4| 1.4E-4|
|1535537847|    1| 1.5E-4| 1.7E-4|
|1535538148|   -1|1.27E-4|-3.8E-4|
|1535538447|    1| 8.6E-5| 8.0E-5|
|1535538747|   -1|1.13E-4|-1.6E-4|
|1535539047|   -1| 6.6E-5|-4.0E-5|
|1535539347|   -1|1.75E-4|-2.5E-4|
|1535539649|   -1|1.76E-4|-5.2E-4|

Example of pattern:

+----------+-----+-------+-------+
|  ts_begin|btype|   disp| log_co|
+----------+-----+-------+-------+
|1635536601|    1|3.44E-4| 4.4E-4|
|1635536902|    1|1.16E-4| 4.0E-4|
|1635537203|   -1|1.03E-4|-2.0E-4|
+----------+-----+-------+-------+

I want to make a pattern search with conditions like this:

  pattern.row.btype == hist.row.btype and
  pattern.row.disp between hist.row.disp*0.8 and hist.row.disp*1.2

for all rows from a pattern.

Both datasets sorted by ts_begin asc.

ts_begin values not compared.

As a result, I want to see something like this:

+----------+-----+-------+-------+--------+
|  ts_begin|btype|   disp| log_co|comp_res|
+----------+-----+-------+-------+--------+
|1535536647|    1|3.44E-4| 4.4E-4|       0|
|1535536947|    1|1.16E-4| 4.0E-4|       0|
|1535537250|   -1|1.03E-4|-2.0E-4|       0|
|1535537550|    1|1.15E-4| 1.4E-4|       1|
|1535537847|    1| 1.5E-4| 1.7E-4|       0|
|1535538148|   -1|1.27E-4|-3.8E-4|       0|
|1535538447|    1| 8.6E-5| 8.0E-5|       0|
|1535538747|   -1|1.13E-4|-1.6E-4|       1|
|1535539047|   -1| 6.6E-5|-4.0E-5|       0|
|1535539347|   -1|1.75E-4|-2.5E-4|       0|
|1535539649|   -1|1.76E-4|-5.2E-4|       0|

Little bit more explanation, 1 in field comp_res means that seq of rows

    |1535537550|    1|1.15E-4| 1.4E-4|
    |1535537847|    1| 1.5E-4| 1.7E-4|
    |1535538148|   -1|1.27E-4|-3.8E-4|

"is equal" to pattern.

I was tring solve it with using

 val lb = listBars
    .withColumn("rn", row_number() over Window.orderBy(col("ts_begin").asc) )
    .withColumn("rn", floor(($"rn"-1)/nBuckets))
    .withColumn("rnk", row_number() over Window.partitionBy("rn").orderBy(col("ts_begin").asc) )
    .groupBy("rn")
    .pivot("rnk", 1 to nBuckets)
    .agg(
      sum("ts_begin").alias("ts_begin"),
      sum("btype").alias("btype"),
      sum("disp").alias("disp"),
      sum("log_co").alias("log_co")
    )

and then UDF with 2 Rows as input, like ...

  def udf_comp(p: Row) = udf(
    (r: Row) =>
    {

It's looks like that exists more elegant solution without pivoting.

I hope it's possible to use Window function with group by whole datset, order by ts_begin asc and with dataframe from current row and some next:

  val windowSpec = Window
    .orderBy(col("ts_begin").asc)
    .rowsBetween(Window.currentRow, nBuckets-1)

and with UDAF that recive and accumulate all rows from Window and Pattern Dataframe and in "evaluate" make comparison equidimensional dataframes.

Also may be there is solution with left join of pattern.

Can you suggest me something? thanks.