# Evaluation of time series forecasting using Spark windowing

This article is originally published at https://hameddaily.blogspot.com/

in economics applications where the economists is often interested only in directional movement of variable of interest. As an example in macroeconomics, a monetary authority who likes to know the direction of the inflation, to raises interest rates or decrease the rates if inflation is predicted to rise or drop respectively. Another example can be found in financial planning where the user wants to know if the demand has increasing direction or decreasing trend.

\[\frac{1}{N} \sum_t \mathbf{1}_{ sign(A_t-A_{t-1}) == sign(F_t-F_{t-1})} \]

### Toy example

*Note that the data is not that big that one needs Spark to do the calculation. This is just a toy to show how to use Spark window for such a problem.*By loading data into Spark dataframe we have the following data structure where each row shows the GDP and GDP prediction for each year and each country.

`df.filter(df.country=="USA").show(5)`

+-------+--------+-----------------+----+

|country| gdp| predicted|year|

+-------+--------+-----------------+----+

| USA|4.470303|5.012966057409855|1950|

| USA|4.734335|4.404831278549317|1951|

| USA|4.826502|4.978599656728077|1952|

| USA|4.981746|5.035932340179457|1953|

| USA| 4.79081|4.853806067158911|1954|

+-------+--------+-----------------+----+

##### Step 1: window configuration

`year`

since we would like to compare each year GDP to previous year.`from pyspark.sql import window`

windowSpec = window.Window.partitionBy('country')\

.orderBy('year')

`windwoSpec`

as following:`mywindow = windowSpec.rowsBetween(-1,0)`

##### Step 2: add last year column to each row

`import pyspark.sql.functions as func`

df_w = df.withColumn("gdp_last_year", func.first(df.gdp).over(mywindow))\

.withColumn("predicted_last_year", func.first(df.predicted).over(mywindow))

`+-------+--------+-------------+----+-------------+-------------------+`

|country| gdp| predicted|year|gdp_last_year|predicted_last_year|

+-------+--------+-------------+----+-------------+-------------------+

| USA|4.470303|5.01296605740|1950| 4.470303| 5.012966057409855|

| USA|4.734335|4.40483127854|1951| 4.470303| 5.012966057409855|

| USA|4.826502|4.97859965672|1952| 4.734335| 4.404831278549317|

| USA|4.981746|5.03593234017|1953| 4.826502| 4.978599656728077|

| USA| 4.79081|4.85380606715|1954| 4.981746| 5.035932340179457|

+-------+--------+-------------+----+-------------+-------------------+

##### Step 3: find the sign of GDP and GDP prediction for each year

`df_w`

data frame, this operation can be done per row. We just need to define a Spark `User Defined Function (UDF)`

and apply it to all the rows:`def f(x):`

if x > 0:

return 1.0

else:

return -1.0

sign_udf = func.UserDefinedFunction(lambda x: f(x), types.DoubleType())

df_w = df_w.withColumn("sign_gdp", sign_udf(df_w.gdp - df_w.gdp_last_year))\

.withColumn("sign_predicted", sign_udf(df_w.predicted - df_w.predicted_last_year))

`df_w`

dataframe is as follows:`df_w.filter(df_w.country=="USA").show(5)`

`+-------+--------+-----------------+----+-------------+-------------------+--------+--------------+`

|country| gdp| predicted|year|gdp_last_year|predicted_last_year|sign_gdp|sign_predicted|

+-------+--------+-----------------+----+-------------+-------------------+--------+--------------+

| USA|4.470303|5.012966057409855|1950| 4.470303| 5.012966057409855| -1.0| -1.0|

| USA|4.734335|4.404831278549317|1951| 4.470303| 5.012966057409855| 1.0| -1.0|

| USA|4.826502|4.978599656728077|1952| 4.734335| 4.404831278549317| 1.0| 1.0|

| USA|4.981746|5.035932340179457|1953| 4.826502| 4.978599656728077| 1.0| 1.0|

| USA| 4.79081|4.853806067158911|1954| 4.981746| 5.035932340179457| -1.0| -1.0|

+-------+--------+-----------------+----+-------------+-------------------+--------+--------------+

##### Step 4: calculate the MDA

`def indicator_function(x):`

if x:

return 1.0

else:

return 0.0

## apply indicator function to each directional prediction

mda_udf = func.UserDefinedFunction(lambda x: indicator_function(x), types.DoubleType())

df_w = df_w.withColumn("MDA", mda_udf(df_w.sign_gdp == df_w.sign_predicted))

## MDA calculation

mda_result = df_w.groupBy('country').mean()

mda_result = mda_result.withColumnRenamed("AVG(MDA)", "MDA")\

.select("country","MDA")

`+-------+------------------+`

|country| MDA|

+-------+------------------+

| GREECE|0.7058823529411765|

| UK|0.6470588235294118|

| CANADA|0.6470588235294118|

|AUSTRIA|0.7352941176470589|

| USA|0.7058823529411765|

| ITALY|0.7352941176470589|

|GERMANY|0.7647058823529411|

| SWEDEN|0.7352941176470589|

| FRANCE|0.7941176470588235|

+-------+------------------+

## References

- [1] https://en.wikipedia.org/wiki/Mean_Directional_Accuracy_(MDA)
- [2] https://stat.duke.edu/~mw/ts_data_sets.html
- [3] https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

Thanks for visiting r-craft.org

This article is originally published at https://hameddaily.blogspot.com/

Please visit source website for post related comments.