Skip to content

ABP CWG 2020 05 29

NXCALS and pyspark

G. Sterbini with the help and support of the NXCALS team

Introduction

  • We will present few examples on how to use pyspark methods to access NXCALS.

  • pyspark is a ready-to-use package supported by the Apache/SPARK community.

  • pyspark opens the possibility to make big data analysis and could be complementary to pytimber.

Our aim here is to collect examples specific of our community and share them to optimize (and merge in future?) the different approaches.

We are using the SWAN/NXCALS environment but we also tested the pyspark bundle and pure python approach (refer to NXCALS web page).

We are NOT SPARK experts but these examples (of incremental complexity) may bootstrap the discussion and help to build your own use-case.

For our purpose, we privileged the clarity/compactness of code to the completeness of the physics case.

The logic

Given a time window (two ordered timestamps) and a list of variables, one can follow 4 logic steps/sets of dataframes (df): 1. Prepare a set of single spark df's (one per variable). 2. Join this set in a common spark df. 3. Perform transformation on the common spark df. 4. Retrieve a reduced spark df converting it to pandas df.

The emphasis is to do all transformation before step 4, hence make use of the NXCALS cluster.

To keep in mind:

  • Working with the injectors data is in general easier (acquisition done by cyclestamp: synchronous acquisition due to strongly-coupled/fixed-duration-cycles/PPM-telegram).

  • Later we will show the more general case, e.g., LHC: asynchronous acquisition (weakly-coupled/no-fixed-duration-cycles/no-telegram)

An auxiliary package

We put on https://github.com/sterbini/nx2pd (98 sloc) some recurrent functions we are going to use.

import matplotlib 
matplotlib.rc('xtick', labelsize=20) 
matplotlib.rc('ytick', labelsize=20)
from matplotlib import pyplot as plt
plt.rcParams.update({'font.size': 22})
from nx2pd import nx2pd as nx #install from https://github.com/sterbini/nx2pd
nx.spark=spark
pd=nx.pd
np=nx.np

Example 1a (no transformation)

Q: When and for which cycle a specific PS wire scanner was used?

In this example we are not making transformation (so we are not using the cluster in a smart way).

t1=pd.Timestamp('2018', tz='UTC')
t2=pd.Timestamp('2019', tz='UTC')
list_of_variables=['CPS.LSA:CYCLE','PR.BWS.65.H_ROT:SCAN_NUMBER']
# Step 1: set of single spark df's
pd_df_of_spark_df=nx.importNXCALS(list_of_variables, t1,t2)
# Step 2: inner-joining the pd_df on 'timestamp'
spark_df=nx._join_df_list(pd_df_of_spark_df, on=['timestamp'], how='inner')
# Step 3: going back to pandas df
# Nothing :(
# Step 4: going back to pandas df
pd_df=nx._to_pandas(spark_df, timestampConversion=True, sorted=True)
print(pd_df.head());print(f'A total of {len(pd_df)} records retrieved.')
                                 CPS.LSA:CYCLE  PR.BWS.65.H_ROT:SCAN_NUMBER
2018-03-05 14:43:51.900000+00:00    EAST_IRRAD                        46766
2018-03-05 14:44:35.100000+00:00    EAST_IRRAD                        46767
2018-03-07 08:41:18.300000+00:00    EAST_IRRAD                        46768
2018-03-07 08:41:48.300000+00:00    EAST_IRRAD                        46769
2018-03-10 09:50:08.700000+00:00   MTE_50_CORE                        46770
A total of 15829 records retrieved.

Example 1b (with aggregation)

Q: Same question, when and for which cycle a specific PS wire scanner was used?

(now using aggregation)

t1=pd.Timestamp('2018', tz='UTC')
t2=pd.Timestamp('2019', tz='UTC')
list_of_variables=['CPS.LSA:CYCLE','PR.BWS.65.H_ROT:SCAN_NUMBER']
# Step 1: set of single spark df's
pd_df_of_spark_df=nx.importNXCALS(list_of_variables, t1,t2)
# Step 2: inner-joining the pd_df on 'timestamp'
spark_df=nx._join_df_list(pd_df_of_spark_df, on=['timestamp'], how='inner')
# Step 3: aggregating the data
spark_df=spark_df.groupby('CPS@LSA:CYCLE').agg(nx.func.count("PR@BWS@65@H_ROT:SCAN_NUMBER").alias('count'))
# Step 4: going back to pandas df
pd_df=nx._to_pandas(spark_df,sorted=False).sort_values('count',ascending=False).set_index('CPS.LSA:CYCLE');pd_df.index.name=None;
print(pd_df.head(5));print(f'A total of {len(pd_df)} records retrieved.')
                                   count
MD4263_LHC25#48B_BCMS_PS_TFB_2018   2101
MD4404_BCMS                         1967
MD2586_LHC25#12B_BCMS_PS            1684
LHC25#48B_BCMS_PS                   1064
MD4407_LHC25#12B_BCMS                849
A total of 108 records retrieved.

Example 2a (3 years aggregation)

Q: What is the statistics of the production cycles (first injection current) in the PS during Run2?

You can do that in less than 1 minutes in 2020/05/29 SWAN standard configuration!

t1=pd.Timestamp('2016', tz='UTC'); t2=pd.Timestamp('2019', tz='UTC')
pd_df=nx.importNXCALS(['CPS.TGM:USER','PR.DCAFTINJ_1:INTENSITY'], t1,t2)
df=nx._join_df_list(pd_df, on=['timestamp'], how='inner')
df=df.dropna()\
    .groupby('CPS@TGM:USER').agg(nx.func.count("PR@DCAFTINJ_1:INTENSITY").alias('count'),\
                                    nx.func.sum("PR@DCAFTINJ_1:INTENSITY"),\
                                    nx.func.mean("PR@DCAFTINJ_1:INTENSITY"))
aux=nx._to_pandas(df)
aux=aux.sort_values('CPS.TGM:USER').set_index('CPS.TGM:USER').sort_values('sum(PR.DCAFTINJ_1:INTENSITY)', ascending=False)
aux.index.name=None;print(aux.head());print(f'Total number of cycles: {aux["count"].sum()}')
           count  sum(PR.DCAFTINJ_1:INTENSITY)  avg(PR.DCAFTINJ_1:INTENSITY)
TOF      7785564                  5.312979e+09                    682.414171
SFTPRO1  2972325                  3.270464e+09                   1100.304861
SFTPRO2  2242740                  1.862863e+09                    830.619392
EAST1    3814038                  1.044577e+09                    273.876976
EAST2    3677046                  1.008520e+09                    274.274491
Total number of cycles: 54628119

Example 2b (1 minute aggregation)

CAVEAT: clearly it is faster but, as expected, there are overheads.

t1=pd.Timestamp('2018-10-01', tz='UTC'); t2=pd.Timestamp('2018-10-01 00:01', tz='UTC')
pd_df=nx.importNXCALS(['CPS.TGM:USER','PR.DCAFTINJ_1:INTENSITY'], t1,t2)
df=nx._join_df_list(pd_df, on=['timestamp'], how='inner')
df=df.dropna()\
    .groupby('CPS@TGM:USER').agg(nx.func.count("PR@DCAFTINJ_1:INTENSITY").alias('count'),\
                                    nx.func.sum("PR@DCAFTINJ_1:INTENSITY").alias('sum'),\
                                    nx.func.mean("PR@DCAFTINJ_1:INTENSITY").alias('mean'),\
                                    nx.func.stddev("PR@DCAFTINJ_1:INTENSITY").alias('std'))
aux=nx._to_pandas(df)
aux=aux.sort_values('CPS.TGM:USER').set_index('CPS.TGM:USER').sort_values('sum', ascending=False)
aux.index.name=None;print(aux.head());print(f'Total number of cycles: {aux["count"].sum()}')
         count           sum         mean         std
SFTPRO1      8  11461.413086  1432.676636   18.081495
TOF          8   6481.819397   810.227425    9.617653
EAST1        7   2806.542114   400.934588    2.437349
EAST2        4    880.980518   220.245130  208.533554
MD6          2    156.822815    78.411407    5.531555
Total number of cycles: 35

Example 3 (daily cumulative aggregation)

Q: plot the integrated daily extracted current on PS TOF cycle for the 2018

We make explicit operation on the column 'timestamp' before aggregation and we use filtering.

t1=pd.Timestamp('2018', tz='UTC')
t2=pd.Timestamp('2019', tz='UTC')
pd_df=nx.importNXCALS(['CPS.TGM:USER','PR.DCBEFEJE_1:INTENSITY'], t1,t2)
df=nx._join_df_list(pd_df, on=['timestamp'], how='inner')
df=df.filter(nx.col('CPS@TGM:USER')=='TOF')
df=df.withColumn('new_timestamp',nx.col('timestamp')/1.0e9)
df = df.withColumn('new_timestamp',nx.func.from_unixtime("new_timestamp", "yyyy-MM-dd HH:mm:ss"))
df_agg = df.groupBy(nx.func.to_date('new_timestamp').alias('Day')).agg(nx.func.sum('PR@DCBEFEJE_1:INTENSITY').alias('value'))
# to pandas
aux=nx._to_pandas(df_agg)
# minimal operation in pandas after aggregation
aux=aux.sort_values('Day').set_index('Day');aux.index.name=None
plt.figure(figsize=(30,8))
plt.plot(aux['value'].cumsum()/1e9,'-b')
plt.title('Cumulative  PR.DCBEFEJE_1:INTENSITY on TOF [1e19]')
plt.xticks(rotation=20);
plt.grid()

png

Example 4 (cyclestamps arithmetic)

Q: What is the efficiency of transmission between PSB and PB for TOF?

Cyclestamps arithmetic can be used to "follow" a beam in the injector chain.

HINT: "consecutive" cyclestamps from PS and PSB are 635 ms apart.

t1=pd.Timestamp('2018-10-01 00:00', tz='UTC')
t2=pd.Timestamp('2018-10-01 00:05', tz='UTC')
pd_df=nx.importNXCALS(['CPS.TGM:USER','PR.DCAFTINJ_1:INTENSITY'], t1,t2)
df=nx._join_df_list(pd_df, on=['timestamp'], how='inner')
df_ps=df.filter(nx.col('CPS@TGM:USER')=='TOF').filter(nx.col('PR@DCAFTINJ_1:INTENSITY')>500.)

pd_df=nx.importNXCALS(['PSB.TGM:USER','BR2.BCT.ACC:INTENSITY'], t1,t2)
df=nx._join_df_list(pd_df, on=['timestamp'], how='inner')
df_psb=df.withColumn('timestamp',nx.col('timestamp')+635000000) # cyclestamps arithmetic

out=nx._join_df_list([df_ps,df_psb],how='inner')
out=out.withColumn('efficiency',nx.col('PR@DCAFTINJ_1:INTENSITY')/nx.col('BR2@BCT@ACC:INTENSITY'))
print(nx._to_pandas(out, timestampConversion=False, sorted=True)[['CPS.TGM:USER','PSB.TGM:USER','efficiency']].head())
                    CPS.TGM:USER PSB.TGM:USER  efficiency
1538352006700000000          TOF          TOF    0.980367
1538352010300000000          TOF          TOF    0.977946
1538352013900000000          TOF          TOF    0.980870
1538352028300000000          TOF          TOF    0.979842
1538352035500000000          TOF          TOF    0.981096

Example 5 (UDF scalars to scalar)

In the previous example we compute the efficiency just dividing columns. In general, for more complex, function we can use a User Defined Function.

# %% UDF scalars to scalar
# This function compute the transmission efficiency between injection and extraction of the PS
def my_efficiency(current_inj,current_eje):
    if current_inj>0:
        aux=current_eje/current_inj
    else:
        aux=0.    # it is important to force the double for NXCALS
    if aux<0:
        return 0. # it is important to force the double for NXCALS
    else:
        return aux
my_udf = nx.func.udf(my_efficiency, nx.DoubleType())

t1=pd.Timestamp('2018', tz='UTC')
t2=pd.Timestamp('2019', tz='UTC')
pd_df=nx.importNXCALS(['CPS.TGM:USER','PR.DCAFTINJ_1:INTENSITY','PR.DCBEFEJE_1:INTENSITY'], t1,t2)
spark_df=nx._join_df_list(pd_df, on=['timestamp'], how='inner')
spark_df=spark_df.withColumn('transmission efficiency',my_udf(nx.col('PR@DCAFTINJ_1:INTENSITY'),nx.col('PR@DCBEFEJE_1:INTENSITY')))

aux=nx._to_pandas(spark_df.dropna()\
    .filter(nx.col("PR@DCAFTINJ_1:INTENSITY")>2)\
    .groupby('CPS@TGM:USER').agg(nx.func.count("transmission efficiency").alias('count'),\
                                    nx.func.mean("transmission efficiency").alias('Out/In PS efficiency')))
aux=aux.sort_values('CPS.TGM:USER').set_index('CPS.TGM:USER');aux.index.name=None;print(aux.head())
         count  Out/In PS efficiency
AD      112549              0.988969
EAST1  1166882              0.990395
EAST2  1145499              0.992629
ION1     53903              0.952219
ION2     29311              0.793609

Example 6 (UDF vector to scalar)

Q: Histogram of the multi-Turn Extraction efficiency in 2018

A natural extensions is to compress vector information to scalars.

t1=pd.Timestamp('2018-10-01', tz='UTC')
t2=pd.Timestamp('2018-10-01 00:01', tz='UTC')
df=nx.importNXCALS(['CPS.TGM:USER','PR.SCOPE48.CH01:MTE_SPILL'], t1,t2)
spark_df=nx._join_df_list(df, on=['timestamp'], how='inner')
spark_df=spark_df.filter(nx.col('CPS@TGM:USER')=='SFTPRO1')
plt.figure(figsize=(30,8))
my_row=spark_df.take(2)[0]
plt.plot(my_row['PR@SCOPE48@CH01:MTE_SPILL'],'b')
plt.title(f'A typical MTE SPILL, {pd.Timestamp(my_row["timestamp"]).tz_localize("UTC")}') # it is a heavy signal (>20k samples)
plt.xlabel('t [arb. units]');plt.ylabel('amplitude [arb. units]');plt.grid(True)

png

t1=pd.Timestamp('2018-10-01', tz='UTC')
t2=pd.Timestamp('2018-10-01 00:01', tz='UTC')
df=nx.importNXCALS(['CPS.TGM:USER','CPS.LSA:CYCLE','PR.SCOPE48.CH01:MTE_SPILL','PR.SCOPE48.CH01:MTE_EFFICIENCY'], t1,t2)
df=nx._join_df_list(df, on=['timestamp'], how='inner')
df=df.filter(nx.col('CPS@TGM:USER')=='SFTPRO1')
df.limit(5).show()
+-------------------+------------+-------------+-------------------------+------------------------------+
|          timestamp|CPS@TGM:USER|CPS@LSA:CYCLE|PR@SCOPE48@CH01:MTE_SPILL|PR@SCOPE48@CH01:MTE_EFFICIENCY|
+-------------------+------------+-------------+-------------------------+------------------------------+
|1538352047500000000|     SFTPRO1|    MTE_2018_|     [-124.17984259390...|                  0.1969342292|
|1538352046300000000|     SFTPRO1|    MTE_2018_|     [160.132474568082...|                  0.1974465434|
|1538352003100000000|     SFTPRO1|    MTE_2018_|     [-348.93102797674...|                  0.1991091949|
|1538352031900000000|     SFTPRO1|    MTE_2018_|     [-127.18853975281...|                  0.1994075732|
|1538352004300000000|     SFTPRO1|    MTE_2018_|     [241.22672082312,...|                  0.1988264771|
+-------------------+------------+-------------+-------------------------+------------------------------+
def MTE_efficiency(myNewSpill):
    b1_idx=2066-1500
    b2_idx=6267-1500
    b3_idx=10468-1500
    b4_idx=14669-1500
    b5_idx=18871-1500
    b6_idx=23072-1500
    is1=np.mean(myNewSpill[b1_idx:b2_idx])
    is2=np.mean(myNewSpill[b2_idx:b3_idx])
    is3=np.mean(myNewSpill[b3_idx:b4_idx])
    is4=np.mean(myNewSpill[b4_idx:b5_idx])
    core=np.mean(myNewSpill[b5_idx:b6_idx])
    mySum=(is1+is2+is3+is4+core);
    MTE_efficiency=np.mean([is1,is2,is3,is4])/mySum;
    return np.float(MTE_efficiency)
my_udf = nx.func.udf(MTE_efficiency, nx.DoubleType())

new_df=df.withColumn('recomputed',my_udf(nx.col('PR@SCOPE48@CH01:MTE_SPILL')))
new_df=new_df.select(['timestamp','PR@SCOPE48@CH01:MTE_EFFICIENCY','recomputed'])
print(nx._to_pandas(new_df, timestampConversion=True, sorted=True).head())
                                  PR.SCOPE48.CH01:MTE_EFFICIENCY  recomputed
2018-10-01 00:00:03.100000+00:00                        0.199109    0.199109
2018-10-01 00:00:04.300000+00:00                        0.198826    0.198826
2018-10-01 00:00:17.500000+00:00                        0.198757    0.198757
2018-10-01 00:00:18.700000+00:00                        0.201958    0.201958
2018-10-01 00:00:31.900000+00:00                        0.199408    0.199408
# about 5 minutes computation
t1=pd.Timestamp('2018', tz='UTC')
t2=pd.Timestamp('2019', tz='UTC')
df=nx.importNXCALS(['CPS.TGM:USER','PR.SCOPE48.CH01:MTE_SPILL','PR.SCOPE48.CH01:MTE_EFFICIENCY'], t1,t2)
df=nx._join_df_list(df, on=['timestamp'], how='inner')
df=df.filter(nx.col('CPS@TGM:USER')=='SFTPRO1')
new_df=df.withColumn('recomputed_MTE_efficiency',my_udf(nx.col('PR@SCOPE48@CH01:MTE_SPILL')))
new_df=new_df.select(['recomputed_MTE_efficiency'])
aux=nx._to_pandas(new_df)
plt.figure(figsize=(30,8))
aux=aux[(aux['recomputed_MTE_efficiency']>0) & (aux['recomputed_MTE_efficiency']<0.3)]
plt.hist(aux['recomputed_MTE_efficiency'].values,1000);
my_ylimit=plt.ylim()
plt.plot([.2,.2],my_ylimit,'--r')
plt.ylim(my_ylimit)
plt.grid()
plt.title('MTE splitting efficiency in 2018')
plt.ylabel('count')
Text(0,0.5,'count')

png

Example 7 (subsampling and align scalars)

We are going to put all together in our LHC case.

def subSampling(variable,t1,t2,freq='300s'):
    aux=np.array(pd.date_range(t1, t2,freq=freq).astype(np.int64))
    spark_variable=nx._replace_specials(variable)
    pd_df=nx.importNXCALS([variable], t1,t2)
    def set_window(timestamp):
        try:
            return np.int(np.where(aux>timestamp)[0][0])
        except:
            return np.int(0)
    def set_window_time(window):
        try:
            return np.int((aux[window-1]+aux[window])/2)
        except:
            return np.int(0)
    udf_set_window = nx.func.udf(set_window, nx.LongType())
    udf_set_window_time = nx.func.udf(set_window_time, nx.LongType())
    spark_df=nx._join_df_list(pd_df, on=['timestamp'], how='inner')
    spark_df=spark_df.withColumn('window',udf_set_window(nx.col('timestamp')))
    spark_df=spark_df.withColumn('timeWindow',udf_set_window_time(nx.col('window')))
    return spark_df.groupby('window').agg(nx.func.mean(spark_variable).alias(spark_variable),\
        nx.func.count(spark_variable).alias('count_'+spark_variable),\
        nx.func.mean(spark_variable).alias('mean_'+spark_variable),\
        nx.func.stddev(spark_variable).alias('stddev_'+spark_variable),\
        nx.func.min(spark_variable).alias('min_'+spark_variable),\
        nx.func.max(spark_variable).alias('max_'+spark_variable),\
        nx.func.min("timeWindow").alias("timestamp")).drop('window')
t1=pd.Timestamp('2018-10-01', tz='UTC')
t2=pd.Timestamp('2018-10-02', tz='UTC')
my_variable_list=['ALICE:LUMI_TOT_INST','ATLAS:LUMI_TOT_INST','CMS:LUMI_TOT_INST','LHCB:LUMI_TOT_INST']\
    +['LHC.BCTFR.A6R4.B1:BEAM_INTENSITY','LHC.BCTFR.A6R4.B2:BEAM_INTENSITY']\
    +['LHC.BLM.LIFETIME:B1_BEAM_LIFETIME','LHC.BLM.LIFETIME:B2_BEAM_LIFETIME']+['RPMBB.RR17.ROD.A12B1:I_MEAS']
my_spark_variable_list=nx._replace_specials(my_variable_list)
my_list=[subSampling(i,t1,t2) for i in my_variable_list ]
spark_df=nx._join_df_list(my_list, how='outer')
aux=nx._to_pandas(spark_df.select(*my_spark_variable_list,'timestamp'), sorted=True, timestampConversion=True);print(aux[['ALICE:LUMI_TOT_INST','RPMBB.RR17.ROD.A12B1:I_MEAS']].head())
                           ALICE:LUMI_TOT_INST  RPMBB.RR17.ROD.A12B1:I_MEAS
2018-10-01 00:02:30+00:00             3.530905                      -338.99
2018-10-01 00:07:30+00:00             3.541737                      -338.99
2018-10-01 00:12:30+00:00             3.571198                      -338.99
2018-10-01 00:17:30+00:00             3.514797                      -338.99
2018-10-01 00:22:30+00:00             3.455339                      -338.99

Example 8 (putting together, subsampling and align vectors)

To cope with bunch-by-bunch analysis we need to subsample vactors.

def subSamplingVector(variable, t1, t2, my_DatetimeIndex):
    aux=np.array(my_DatetimeIndex.astype(np.int64))
    spark_variable=nx._replace_specials(variable)
    pd_df=nx.importNXCALS([variable], t1,t2)
    def set_window(timestamp):
        try:
            return np.int(np.where(aux>timestamp)[0][0])
        except:
            return np.int(0)
    def set_window_time(window):
        try:
            return np.int((aux[window-1]+aux[window])/2)
            #return np.int(aux[window-1])
        except:
            return np.int(0)
    def compute_mean(x):
        try:
            aux=0
            for i in len(x):
                aux=a+x[i]
            return aux/len(x)
        except:
            return x[0]
    udf_set_window = nx.func.udf(set_window, nx.LongType())
    udf_set_window_time = nx.func.udf(set_window_time, nx.LongType())
    udf_compute_mean = nx.func.udf(compute_mean, nx.ArrayType(nx.DoubleType()))
    spark_df=nx._join_df_list(pd_df, on=['timestamp'], how='inner')
    spark_df=spark_df.withColumn('window',udf_set_window(nx.col('timestamp')))
    spark_df=spark_df.withColumn('timeWindow',udf_set_window_time(nx.col('window')))
    return spark_df.groupby('window').agg(udf_compute_mean(nx.func.collect_list(spark_variable)).alias(spark_variable),\
                                          nx.func.min("timeWindow").alias("timestamp")).drop('window')

And combining the different functions...

t1=pd.Timestamp('2018-10-01', tz='UTC')
t2=pd.Timestamp('2018-10-02', tz='UTC')
my_DatetimeIndex=pd.date_range(t1, t2, freq='300s')
my_variable_list=['LHC.BCTFR.A6R4.B1:BUNCH_INTENSITY','LHC.BCTFR.A6R4.B2:BUNCH_INTENSITY','ATLAS:BUNCH_LUMI_INST','CMS:BUNCH_LUMI_INST',
                  'LHC.BSRT.5R4.B1:BUNCH_EMITTANCE_H',
                  'LHC.BSRT.5R4.B1:BUNCH_EMITTANCE_V',
                  'LHC.BSRT.5L4.B2:BUNCH_EMITTANCE_H',
                  'LHC.BSRT.5L4.B2:BUNCH_EMITTANCE_V']
my_spark_variable_list=nx._replace_specials(my_variable_list)
my_list1=[subSamplingVector(i,t1,t2,my_DatetimeIndex) for i in my_variable_list]

my_variable_list=['ALICE:LUMI_TOT_INST','ATLAS:LUMI_TOT_INST','CMS:LUMI_TOT_INST','LHCB:LUMI_TOT_INST']\
    +['LHC.BCTFR.A6R4.B1:BEAM_INTENSITY','LHC.BCTFR.A6R4.B2:BEAM_INTENSITY']\
    +['LHC.BLM.LIFETIME:B1_BEAM_LIFETIME','LHC.BLM.LIFETIME:B2_BEAM_LIFETIME']+['RPMBB.RR17.ROD.A12B1:I_MEAS']
my_spark_variable_list=nx._replace_specials(my_variable_list)
my_list2=[subSampling(i,t1,t2) for i in my_variable_list ]

my_list3=nx.importNXCALS(['LHC.RUNCONFIG:IP1-XING-V-MURAD',
 'LHC.RUNCONFIG:IP2-XING-V-MURAD',
 'LHC.RUNCONFIG:IP5-XING-H-MURAD',
 'LHC.RUNCONFIG:IP8-XING-H-MURAD']+['HX:BETASTAR_IP1', 'HX:BETASTAR_IP2', 'HX:BETASTAR_IP5', 'HX:BETASTAR_IP8'], t1,t2)

spark_df=nx._join_df_list(my_list1+my_list2+list(my_list3['pyspark df'].values), how='outer')
aux=nx._to_pandas(spark_df, timestampConversion=True, sorted=True); print(aux[['LHC.BSRT.5R4.B1:BUNCH_EMITTANCE_V','LHC.RUNCONFIG:IP2-XING-V-MURAD','ATLAS:LUMI_TOT_INST']].head())
                                                  LHC.BSRT.5R4.B1:BUNCH_EMITTANCE_V  \
2018-10-01 00:02:30+00:00         [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...   
2018-10-01 00:07:30+00:00         [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...   
2018-10-01 00:12:30+00:00         [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...   
2018-10-01 00:14:20.149000+00:00                                               None   
2018-10-01 00:17:30+00:00         [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...

                                  LHC.RUNCONFIG:IP2-XING-V-MURAD  \
2018-10-01 00:02:30+00:00                                    NaN   
2018-10-01 00:07:30+00:00                                    NaN   
2018-10-01 00:12:30+00:00                                    NaN   
2018-10-01 00:14:20.149000+00:00                             NaN   
2018-10-01 00:17:30+00:00                                    NaN

                                  ATLAS:LUMI_TOT_INST  
2018-10-01 00:02:30+00:00                12800.725273  
2018-10-01 00:07:30+00:00                12692.731878  
2018-10-01 00:12:30+00:00                12588.781626  
2018-10-01 00:14:20.149000+00:00                  NaN  
2018-10-01 00:17:30+00:00                12518.875661
# Few checks
plt.figure(figsize=(30,8))
plt.plot(aux.iloc[0]['LHC.BCTFR.A6R4.B1:BUNCH_INTENSITY'],'.-b', label='LHC.BCTFR.A6R4.B1:BUNCH_INTENSITY')
plt.legend(loc='best')
<matplotlib.legend.Legend at 0x7fa2840c0438>

png

# Few checks
plt.figure(figsize=(30,8))
plt.plot(aux['LHC.BCTFR.A6R4.B2:BUNCH_INTENSITY'].dropna().apply(lambda x:x[6]),'.b-')
plt.plot(aux['LHC.BCTFR.A6R4.B2:BUNCH_INTENSITY'].dropna().apply(lambda x:x[2000]),'.r-')
plt.legend(loc='best')
plt.grid()
plt.xticks(rotation=20);

png

Example 9 (write/read on HDFS)

Writing and reading to/from HDFS allows us to use SPARK also for the postprocessed data (maintaining the same workflow).

# to write
if 0:
    # very important to repartition to limit the number of files (for a single fill we will repartition to 1 file)
    spark_df.repartition(1).write.mode("overwrite").parquet("fill2.parquet")

You can check the hdfs path by

!hdfs dfs -ls /user/sterbini
Found 6 items
drwx------   - sterbini supergroup          0 2020-05-28 23:20 /user/sterbini/.Trash
drwxr-xr-x   - sterbini supergroup          0 2020-05-29 10:31 /user/sterbini/.sparkStaging
drwxr-xr-x   - sterbini supergroup          0 2020-02-07 14:38 /user/sterbini/bmode
drwxr-xr-x   - sterbini supergroup          0 2020-02-07 14:51 /user/sterbini/bmodes
drwxr-xr-x   - sterbini supergroup          0 2020-05-28 22:40 /user/sterbini/fill1.parquet
drwxr-xr-x   - sterbini supergroup          0 2020-05-28 23:37 /user/sterbini/fill2.parquet
spark_df=spark.read.parquet("fill*.parquet") # Very important
aux=nx._to_pandas(spark_df.select('timestamp','CMS:BUNCH_LUMI_INST'),timestampConversion=True, sorted=True)
print(aux.head())
print(aux.tail())
                                                                CMS:BUNCH_LUMI_INST
2018-10-01 00:02:30+00:00         [-0.002677366, -0.002759958, -0.004399077, -0....
2018-10-01 00:07:30+00:00         [-0.002629694, -0.002912038, -0.005541767, -0....
2018-10-01 00:12:30+00:00         [-0.003120326, -0.002868722, -0.003427472, -0....
2018-10-01 00:14:20.149000+00:00                                               None
2018-10-01 00:17:30+00:00         [-0.001818355, -0.003197438, -0.002672729, -0....
                                                                CMS:BUNCH_LUMI_INST
2018-10-02 23:42:30+00:00         [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...
2018-10-02 23:47:30+00:00                                                      None
2018-10-02 23:52:30+00:00                                                      None
2018-10-02 23:53:54.144000+00:00                                               None
2018-10-02 23:57:30+00:00                                                      None

Conclusions

  • We show some examples of pyspark approach we intend to use in our work-flow. We touched only the surface.

  • First impression: we think that NXCALS framework has great potential for the big data analysis.

  • pytimber still fundamental for the typical use (e.g., small data).

Thanks to the NXCALS team for the responsiveness and availability.

Thank you for your attention.