Skip to content

NXCALS and pyspark

Introduction

During Run2, the CERN Accelerator Complex data were stored in the CERN Accelerator Logging Database (CALS) provided by CO. CALS provided a crucial input for monitoring, understanding and optimizing the machines.

Since few years CO is preparing a new database (NXCALS) than will more suitable to store the ever increasing demand of storage capability. For CALS we massively use the pytimber (R. De Maria) package and pandas DF to extract and postprocess the data.

A new version of pytimber (R. De Maria and the NXCALS team) has been released: most of the interfaces used for CALS are now available also NXCALS. The aim is to have the best-possible back-compatibility from the user point of view between CALS and NXCALS.

In this ipnb we will present few examples on how to use pyspark methods to access NXCALS (therefore we will not consider pytimber in most of the cases). pyspark opens the possibility to make big data analysis. For most of the use case (at least at moment), big data analysis is NOT needed, but NXCALS is naturally oriented to it and pyspark is already available from the python/SPARK community.

With pyspark one can postprocess (e.g., aggregate, filter, transform) the data on the cluster before retrieving them.

In this ipynb we present few examples.

We are still using pytimber to search for the variable names. Indeed there is not a pure pythonic way to extract them: so we are using available java libraries via pytimber.

The final aim of this ipynb is to collect some thoughts and share it with our community to optimize (and merge in future?) the different approaches.

We are not SPARK experts but we think that ipynb can bootstrap the discussion.

Info

The full ipynb is available in the gitlab repository of this website.

Some elements about the setup

We use the following stack from SWAN

!which python
/cvmfs/sft.cern.ch/lcg/views/LCG_95apython3_nxcals/x86_64-centos7-gcc7-opt/bin/python

We are using the following configuration

Parameter Setting
spark.executor.instances 20
spark.driver.cores 8
spark.executor.cores 4
spark.driver.memory 12 GB

Note that the driver RAM memory is 12 GB. A typical query can run on a less resources. Probably the configuration can be optimized for improving resource sharing. This will require interaction with the NXCALS team.

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# You need a very simple package (on the gitlab of this web site)
# We will put on github if needed.
# Most of the ipynb can be run w/o this package
from nx2pd import nx2pd as nx 
nx.spark=spark

First steps

We can retrieve the list of variables with this simple method (it is just calling pytimber behind the scene)

nx.searchVariables(['PR%MTE%','HX:F%N','PSB.LSA%','%P%.TGM:B%D','PR-SIMA-1:CURRENT_%V'])
['CPS.TGM:BEAMID',
 'HX:FILLN',
 'PR-SIMA-1:CURRENT_18V',
 'PR-SIMA-1:CURRENT_5V',
 'PR.SCOPE48.CH01:MTE_EFFICIENCY',
 'PR.SCOPE48.CH01:MTE_SPILL',
 'PSB.LSA:CYCLE',
 'PSB.TGM:BEAMID',
 'SPS.TGM:BEAMID']

It is important to note that the name of the variables can have also special characters (like '.', ':' and '-').

We strongly suggest to use tz-aware timestamp (possibly in UTC, UTC representation is continuous and monotonic while CET is not).

Info

NXCALS timestamp are UTC based.

pd.Timestamp('2018-10-28 2:15', tz='UTC')
# pd.Timestamp('2018-10-28 2:15', tz='CET') this timestamp dows not exist, therefore you get an error
Timestamp('2018-10-28 02:15:00+0000', tz='UTC')

In the following we are going to consider two possible ways to extract data from NXCALS.

Using directly pandas DF

One could directly recover the pandas DF (dataframe). This can be used for small datas (less than the RAM of the driver) and it is quite convenient (probably is covering more than 99% of the needs). It is very similar to pytimber approach but instead of dictionaries one get directly pandas DF.

t1='2018-09-01'        # it will be converted automatic in 'UTC'
t2='2018-09-01 00:01'  # it will be converted automatic in 'UTC'
fromNXCALS=nx.nx2pd(nx.searchVariables(['CPS.LSA:CYCLE', 'CPS.TGM:USER']), t1, t2)
fromNXCALS.head()
CPS.TGM:USER CPS.LSA:CYCLE
2018-09-01 00:00:01.900000+00:00 TOF TOF
2018-09-01 00:00:03.100000+00:00 TOF TOF
2018-09-01 00:00:04.300000+00:00 EAST1 EAST_IRRAD
2018-09-01 00:00:06.700000+00:00 SFTPRO1 MTE_2018_
2018-09-01 00:00:07.900000+00:00 SFTPRO1 MTE_2018_

Using pyspark DF

For large data we cannot recover the pandas DF on the driver (RAM limits of the driver, where all pandas DF are collected). Hence we need to aggregate/reduce the data size in the spark executors and retrieve the output, once can be allocated on the driver. To do that one needs to learn pyspark. We think that learning by the few following examples from "our world" will speed up the learning curve (clearly there is a large community on pyspark).

From what we understood, the naming convention of the pyspark impose (to be compatible with the pyspark syntax) not to have '.', ':' and '-' in the name of the columns. This is not compatible with the CALS variables naming (e.g.,'PR.SCOPE48.CH01:MTE_EFFICIENCY').

As fix to this problem, we are replacing '.', ':' and '-' with two, three or four underscores, respectively.

Warning

This replacing is going to be annoying...

Todo

Ask suggestions to NXCALS team.

import io
import time
from cern.nxcals.api.extraction.data.builders import *
import pyspark.sql.functions as func
from pyspark.sql.functions import col
from pyspark.sql.types import *
import numpy as np

def _replace_specials(myString):
    return myString.replace(':','___').replace('.','__').replace('-','____')

def _invert_replace_specials(myString):
    return myString.replace('___',':').replace('__','.').replace('____','-') # clearly the operation is invertible only under some hypothesis

def _importNXCALS(variableName,t1, t2):
    ''' This hidden function takes a string a two pd datastamps. NXCALSsample is the fraction of rows to return.'''
    start_time = time.time()
    t1=t1.tz_convert('UTC').tz_localize(None)
    t2=t2.tz_convert('UTC').tz_localize(None)
    ds=DataQuery.builder(spark).byVariables().system("CMW")\
    .startTime(t1.strftime('%Y-%m-%d %H:%M:%S.%f')).endTime(t2.strftime('%Y-%m-%d %H:%M:%S.%f')).variable(variableName).buildDataset()
    selectionStringDict={'int':{'value':'nxcals_value','label':'nxcals_value'}, 'double':{'value':'nxcals_value','label':'nxcals_value'},
                        'float':{'value':'nxcals_value','label':'nxcals_value'}, 'string':{'value':'nxcals_value','label':'nxcals_value'},
                        'bigint':{'value':'nxcals_value','label':'nxcals_value'},
                        'struct<elements:array<int>,dimensions:array<int>>':{'value':'nxcals_value.elements','label':'elements'},
                        'struct<elements:array<double>,dimensions:array<int>>':{'value':'nxcals_value.elements','label':'elements'},
                        'struct<elements:array<float>,dimensions:array<int>>':{'value':'nxcals_value.elements','label':'elements'},}
    selectionStringValue=selectionStringDict[dict(ds.dtypes)['nxcals_value']]['value']
    selectionStringLabel=selectionStringDict[dict(ds.dtypes)['nxcals_value']]['label']
    aux=ds.select('nxcals_timestamp',selectionStringValue)
    return aux.withColumnRenamed('nxcals_timestamp','timestamp').withColumnRenamed(selectionStringLabel,_replace_specials(variableName))

t1=pd.Timestamp('2018-10-01', tz='UTC')
t2=pd.Timestamp('2018-10-01 00:01', tz='UTC')
df=_importNXCALS('CPS.TGM:USER', t1,t2)
df.limit(5).show()

df.filter(col('CPS__TGM___USER') == 'TOF').limit(5).show()
# or 
#df.filter(df.CPS__TGM___USER == 'TOF').limit(5).show()
+-------------------+---------------+
|          timestamp|CPS__TGM___USER|
+-------------------+---------------+
|1538352025900000000|           ZERO|
|1538352006700000000|            TOF|
|1538352042700000000|            TOF|
|1538352003100000000|        SFTPRO1|
|1538352015100000000|          EAST1|
+-------------------+---------------+

+-------------------+---------------+
|          timestamp|CPS__TGM___USER|
+-------------------+---------------+
|1538352006700000000|            TOF|
|1538352042700000000|            TOF|
|1538352010300000000|            TOF|
|1538352035500000000|            TOF|
|1538352013900000000|            TOF|
+-------------------+---------------+

Clearly now one can convert it in pandas DF.

df.filter(col('CPS__TGM___USER') == 'TOF').toPandas().head()
timestamp CPS__TGM___USER
0 1538352006700000000 TOF
1 1538352042700000000 TOF
2 1538352010300000000 TOF
3 1538352035500000000 TOF
4 1538352013900000000 TOF

Or, in a bit more systematic way, we can make a simple function.

def _to_pandas(df, sorted=True):
    aux=df.toPandas()
    aux['timestamp']=aux['timestamp'].apply(lambda x: pd.Timestamp(x).tz_localize('UTC'))
    aux=aux.set_index('timestamp'); aux.index.name=None
    if sorted:
        aux=aux.dropna().sort_index()
    myDict={}
    for i in aux.columns:
        myDict[i]=_invert_replace_specials(i)
    aux=aux.rename(columns=myDict)
    return aux

_to_pandas(df).head()
CPS.TGM:USER
2018-10-01 00:00:00.700000+00:00 EAST1
2018-10-01 00:00:03.100000+00:00 SFTPRO1
2018-10-01 00:00:04.300000+00:00 SFTPRO1
2018-10-01 00:00:05.500000+00:00 ZERO
2018-10-01 00:00:06.700000+00:00 TOF

Let us imagine now that we want to retrieve more than one variable and get only the one of with CPS.TGM:USER=TOF. We can proceed as the following.

def importNXCALS(variableList,t1, t2):
    if isinstance(variableList, str):
        variableList=[variableList]
    results=[]
    for variable in variableList:
        results.append(_importNXCALS(variable, t1,t2))
    df=results[0]
    if len(results)>1:
        for i in results[1:]:
            df=df.join(i, ["timestamp"],how='full')    
    return df

results=importNXCALS(['CPS.TGM:USER', 'PR.DCAFTINJ_1:INTENSITY', 'PR.DCBEFEJE_1:INTENSITY'], t1,t2)

results_filtered=results.filter(results.CPS__TGM___USER=='TOF')

results_postprocessed=results_filtered.withColumn('transmission_efficiency',col('PR__DCBEFEJE_1___INTENSITY')/col('PR__DCAFTINJ_1___INTENSITY'))


# LAST STEP to pandas
_to_pandas(results_postprocessed).head()
CPS.TGM:USER PR.DCAFTINJ_1:INTENSITY PR.DCBEFEJE_1:INTENSITY transmission_efficiency
2018-10-01 00:00:06.700000+00:00 TOF 813.794006 804.384277 0.988437
2018-10-01 00:00:10.300000+00:00 TOF 799.527588 790.117798 0.988231
2018-10-01 00:00:13.900000+00:00 TOF 805.901978 795.885071 0.987571
2018-10-01 00:00:28.300000+00:00 TOF 795.278015 787.689453 0.990458
2018-10-01 00:00:35.500000+00:00 TOF 824.417969 813.490479 0.986745

As you see, only at the very last step, we convert to pandas DF.

Let's apply a more involved function than a simple division using an UDF (User Defined Function).

# This function compute the transmission efficiency between injection and extraction of the PS
def my_efficiency(current_inj,current_eje):
    if current_inj>0:
        aux=current_eje/current_inj
    else:
        aux=0.    # it is important to force the double for NXCALS 
    if aux<0:
        return 0. # it is important to force the double for NXCALS 
    else:
        return aux

my_udf = func.udf(my_efficiency, DoubleType())

new_df=results.withColumn('transmission_efficiency',my_udf(col('PR__DCAFTINJ_1___INTENSITY'),col('PR__DCBEFEJE_1___INTENSITY')))

_to_pandas(new_df).head()
CPS.TGM:USER PR.DCAFTINJ_1:INTENSITY PR.DCBEFEJE_1:INTENSITY transmission_efficiency
2018-10-01 00:00:00.700000+00:00 EAST1 398.246063 396.424835 0.995427
2018-10-01 00:00:03.100000+00:00 SFTPRO1 1439.088989 1436.357056 0.998102
2018-10-01 00:00:04.300000+00:00 SFTPRO1 1406.913574 1402.663940 0.996979
2018-10-01 00:00:05.500000+00:00 ZERO -0.063178 -0.082807 0.000000
2018-10-01 00:00:06.700000+00:00 TOF 813.794006 804.384277 0.988437

You can observe that UDF are strongly typed (to handle with care in python) and it is important to have a good type conversion (e.g., we use '0.' and not '0' in the return of the function to respect the type of the return value of the function).

Let us consider even a more general case (function of a VECTORNUMERIC type) and UDF.

Let us recover a reading from a digitizer (PR.SCOPE48.CH01:MTE_SPILL). Each reading has more that 20k samples and 1 year of data typically contains ~1.5M of this arrays. This a typical case that CALS could not address in one-go.

t1=pd.Timestamp('2018-10-01', tz='UTC')
t2=pd.Timestamp('2018-10-01 00:01', tz='UTC')
df=importNXCALS(['CPS.TGM:USER','PR.SCOPE48.CH01:MTE_SPILL'], t1,t2)
df=df.filter(df.CPS__TGM___USER=='SFTPRO1')

plt.figure(figsize=(20,7))
my_row=df.take(2)[0] # it is not sorted!
plt.plot(my_row['PR__SCOPE48__CH01___MTE_SPILL'],'b')
plt.title(f'A typical MTE SPILL, {pd.Timestamp(my_row["timestamp"]).tz_localize("UTC")}') # it is a heavy signal (>20k samples)
plt.xlabel('t [arb. units]') # it is a heavy signal (>20k samples)
plt.ylabel('amplitude [arb. units]') # it is a heavy signal (>20k samples)
plt.grid(True)

png

Here we just consider a short interval (1 second) to prepare the UDF.

t1=pd.Timestamp('2018-10-01', tz='UTC')
t2=pd.Timestamp('2018-10-01 00:01', tz='UTC')
df=importNXCALS(['CPS.TGM:USER','CPS.LSA:CYCLE','PR.SCOPE48.CH01:MTE_SPILL', 'PR.SCOPE48.CH01:MTE_EFFICIENCY'], t1,t2)
df=df.filter(df.CPS__TGM___USER=='SFTPRO1')
df.limit(5).show()
+-------------------+---------------+----------------+-----------------------------+----------------------------------+
|          timestamp|CPS__TGM___USER|CPS__LSA___CYCLE|PR__SCOPE48__CH01___MTE_SPILL|PR__SCOPE48__CH01___MTE_EFFICIENCY|
+-------------------+---------------+----------------+-----------------------------+----------------------------------+
|1538352003100000000|        SFTPRO1|       MTE_2018_|         [-348.93102797674...|                      0.1991091949|
|1538352047500000000|        SFTPRO1|       MTE_2018_|         [-124.17984259390...|                      0.1969342292|
|1538352031900000000|        SFTPRO1|       MTE_2018_|         [-127.18853975281...|                      0.1994075732|
|1538352046300000000|        SFTPRO1|       MTE_2018_|         [160.132474568082...|                      0.1974465434|
|1538352004300000000|        SFTPRO1|       MTE_2018_|         [241.22672082312,...|                      0.1988264771|
+-------------------+---------------+----------------+-----------------------------+----------------------------------+

And here the UDF to compute the Multi-Turn-Extraction efficiency.

def MTE_efficiency(myNewSpill):
    b1_idx=2066-1500
    b2_idx=6267-1500
    b3_idx=10468-1500
    b4_idx=14669-1500
    b5_idx=18871-1500
    b6_idx=23072-1500
    is1=np.mean(myNewSpill[b1_idx:b2_idx])
    is2=np.mean(myNewSpill[b2_idx:b3_idx])
    is3=np.mean(myNewSpill[b3_idx:b4_idx])
    is4=np.mean(myNewSpill[b4_idx:b5_idx])
    core=np.mean(myNewSpill[b5_idx:b6_idx])
    mySum=(is1+is2+is3+is4+core);
    MTE_efficiency=np.mean([is1,is2,is3,is4])/mySum;
    return np.float(MTE_efficiency)

my_udf = func.udf(MTE_efficiency, DoubleType())


new_df=df.withColumn('recomputed_MTE_efficiency',my_udf(col('PR__SCOPE48__CH01___MTE_SPILL')))

new_df=new_df.select(['timestamp','PR__SCOPE48__CH01___MTE_EFFICIENCY','recomputed_MTE_efficiency'])

_to_pandas(new_df).head()
PR.SCOPE48.CH01:MTE_EFFICIENCY recomputed_MTE_efficiency
2018-10-01 00:00:03.100000+00:00 0.199109 0.199109
2018-10-01 00:00:04.300000+00:00 0.198826 0.198826
2018-10-01 00:00:17.500000+00:00 0.198757 0.198757
2018-10-01 00:00:18.700000+00:00 0.201958 0.201958
2018-10-01 00:00:31.900000+00:00 0.199408 0.199408

We can also make more complex operation. In the following we are doing an FFT and we take the amplitude of an arbitrary frequency (index 500 of the FFT vector). NB: we cast the return value conveniently.

def MTE_FFT(myNewSpill):
    aux=np.abs(np.fft.fft(myNewSpill))[500]
    return np.float(aux)

my_udf = func.udf(MTE_FFT, DoubleType())

new_df=df.withColumn('MTE_fft',my_udf(col('PR__SCOPE48__CH01___MTE_SPILL')))
new_df=new_df.select(['timestamp','MTE_fft'])
_to_pandas(new_df).head()
MTE_fft
2018-10-01 00:00:03.100000+00:00 197133.558863
2018-10-01 00:00:04.300000+00:00 146918.300598
2018-10-01 00:00:17.500000+00:00 187465.831250
2018-10-01 00:00:18.700000+00:00 272500.715750
2018-10-01 00:00:31.900000+00:00 161438.769078

A good balance: pyspark and pandas

Here we make a simple example of big data analysis where one can find a reasonable trade-off between pyspark and pandas. We will analyze one full year in one-go (by far out of the CALS reach).

t1=pd.Timestamp('2018', tz='UTC')
t2=pd.Timestamp('2019', tz='UTC')
df=importNXCALS(['PR.SCOPE48.CH01:MTE_SPILL'], t1,t2)

#### 

def MTE_efficiency(myNewSpill):
    b1_idx=2066-1500
    b2_idx=6267-1500
    b3_idx=10468-1500
    b4_idx=14669-1500
    b5_idx=18871-1500
    b6_idx=23072-1500
    is1=np.mean(myNewSpill[b1_idx:b2_idx])
    is2=np.mean(myNewSpill[b2_idx:b3_idx])
    is3=np.mean(myNewSpill[b3_idx:b4_idx])
    is4=np.mean(myNewSpill[b4_idx:b5_idx])
    core=np.mean(myNewSpill[b5_idx:b6_idx])
    mySum=(is1+is2+is3+is4+core);
    MTE_efficiency=np.mean([is1,is2,is3,is4])/mySum;
    return np.float(MTE_efficiency)

my_udf = func.udf(MTE_efficiency, DoubleType())

new_df=df.withColumn('recomputed_MTE_efficiency',my_udf(col('PR__SCOPE48__CH01___MTE_SPILL')))

new_df=new_df.select(['timestamp','recomputed_MTE_efficiency'])
pdf=_to_pandas(new_df)

aux=pdf[(pdf['recomputed_MTE_efficiency']>0) & (pdf['recomputed_MTE_efficiency']<0.3)]
plt.hist(aux['recomputed_MTE_efficiency'].values,1000);
plt.grid()

png

len(pdf)
1815048

Success

There were more than 1.8M of arrays (each arrays containing more that 20k of elements) processed in ~5 min.

Conclusions

We would like to thanks all the NXCALS for they support and help. We think that the framework they designed has great potential for the big data analysis.

pytimber still is fundamental for the typical use. It is not using, at least at the moment, the SPARK leverage but it is offering a simple to use interface.

Question

Should we consider a better integration between pytimber and pandas?

We investigated the pyspark approach for big data analysis. We touched only the surface. Optimizing pyspark queries needs time and expertise, it is quite a long journey: here we showed how to make just the first steps.

Info

Depending on the community input, we can make a small wrapper of the function we used. We wanted to expose the rationale before wrapping the code, to get the feedback.