FinanceRoutines.jl

Financial data routines for Julia
Log | Files | Refs | README | LICENSE

ImportCRSP.jl (24594B)


      1 # ------------------------------------------------------------------------------------------
      2 # ImportCRSP.jl
      3 
      4 # Collection of functions that import
      5 #  financial data into julia
      6 # ------------------------------------------------------------------------------------------
      7 
      8 
      9 # ------------------------------------------------------------------------------------------
     10 # List of exported functions
     11 # export import_MSF
     12 # export build_MSF
     13 
     14 # list
     15 # ------------------------------------------------------------------------------------------
     16 
     17 
     18 
     19 # ------------------------------------------------------------------------------------------
     20 """
     21     import_MSF(wrds_conn; date_range, variables)
     22     import_MSF(;
     23         date_range::Tuple{Date, Date} = (Date("1900-01-01"), Dates.today()),
     24         variables::String = "", user="", password="")
     25 
     26 Import the CRSP Monthly Stock File (MSF) from CRSP on WRDS PostGre server
     27 
     28 # Arguments
     29 - `wrds_conn::Connection`: An existing Postgres connection to WRDS; creates one if empty
     30 
     31 # Keywords
     32 - `date_range::Tuple{Date, Date}`: A tuple of dates to select data (limits the download size)
     33 - `variables::Vector{String}`: A vector of String of additional variable to include in the download
     34 
     35 # Returns
     36 - `df_msf_final::DataFrame`: DataFrame with msf crsp file
     37 """
     38 function import_MSF(wrds_conn::Connection;
     39     date_range::Tuple{Date, Date} = (Date("1900-01-01"), Dates.today()),
     40     variables::Vector{String} = [""]
     41     )
     42 
     43     date_range = _validate_date_range(date_range)
     44 
     45     # -- GETTING COLUMN NAMES
     46     # download potential columns
     47     msenames_columns = _get_postgres_columns("crsp", "msenames"; wrds_conn=wrds_conn,
     48         prior_columns = vcat(["PERMNO", "NAMEDT", "NAMEENDT", "SHRCD", "EXCHCD", "HEXCD", 
     49                               "NAICS", "HSICCD", "CUSIP"],
     50                             uppercase.(variables))
     51     )
     52     msenames_columns = join(uppercase.(msenames_columns), ", ")
     53 
     54     msf_columns = _get_postgres_columns("crsp", "msf"; wrds_conn=wrds_conn,
     55         prior_columns = vcat(["PERMNO","PERMCO","DATE","PRC","ALTPRC","RET","RETX","SHROUT","CFACPR","CFACSHR"],
     56                              uppercase.(variables))
     57     )
     58     msf_columns = join(uppercase.(msf_columns), ", ")
     59 
     60     # -- GETTING MSF
     61     # set up the query for msf
     62     postgre_query_msf = """
     63         SELECT $msf_columns
     64             FROM crsp.msf
     65             WHERE DATE >= \$1 AND DATE <= \$2
     66     """
     67     res_q_msf = execute(wrds_conn, postgre_query_msf, (date_range[1], date_range[2]))
     68     df_msf = DataFrame(columntable(res_q_msf))
     69     transform!(df_msf,     # clean up the dataframe
     70         names(df_msf, check_integer.(eachcol(df_msf))) .=> (x->convert.(Union{Missing, Int}, x));
     71         renamecols = false);
     72 
     73     # -- GETTING MSENAMES
     74     postgre_query_msenames = """
     75         SELECT $msenames_columns
     76             FROM crsp.msenames
     77     """
     78     res_q_msenames = execute(wrds_conn, postgre_query_msenames)
     79     df_msenames = DataFrame(columntable(res_q_msenames)) ;
     80     transform!(df_msenames,
     81         names(df_msenames, check_integer.(eachcol(df_msenames))) .=> (x->convert.(Union{Missing, Int}, x));
     82         renamecols = false) ;
     83     df_msenames[!, :cusip] .= String15.(df_msenames[!, :cusip]);
     84     df_msenames[ .!ismissing.(df_msenames.naics) , :naics] .= String7.(skipmissing(df_msenames[!, :naics]));
     85     @p df_msenames |> filter!(_.exchcd <= 3 && _.shrcd ∈ (10,11))
     86     
     87 # set up the query for msedelist
     88     postgre_query_msedelist = """
     89         SELECT PERMNO, DLSTDT, DLRET, DLSTCD
     90             FROM crsp.msedelist
     91     """
     92     res_q_msedelist = execute(wrds_conn, postgre_query_msedelist)
     93     df_msedelist = DataFrame(columntable(res_q_msedelist)) ;
     94     transform!(df_msedelist,
     95         names(df_msedelist, check_integer.(eachcol(df_msedelist))) .=> (x->convert.(Union{Missing, Int}, x));
     96         renamecols = false) ;
     97     transform!(df_msedelist, :dlstdt => ByRow(MonthlyDate) => :datem)
     98 
     99 # --- merge all of the datasets together
    100     df_msf_final = FlexiJoins.innerjoin(
    101         (df_msf, df_msenames),
    102         by_key(:permno) & by_pred(:date, ∈, x->x.namedt..x.nameendt)
    103     )
    104     transform!(df_msf_final, :date => ByRow(MonthlyDate) => :datem)
    105 
    106     df_msf_final = leftjoin(df_msf_final, df_msedelist, on = [:permno, :datem])
    107 
    108     var_select = unique(vcat(
    109         :permno, # Security identifier
    110         :date, # Date of the observation
    111         :datem,
    112         :ret, # Return
    113         :retx, # Return excluding dividends
    114         :shrout, # Shares outstanding (in thousands)
    115         :prc,
    116         :altprc, # Last traded price in a month
    117         :exchcd, # Exchange code
    118         :hsiccd, # Industry code
    119         :naics, # Industry code
    120         :dlret, # Delisting return
    121         :dlstcd, # Delisting code
    122         Symbol.(intersect(variables, names(df_msf_final)))
    123     ))
    124 
    125     select!(df_msf_final, var_select)
    126 
    127     sort!(df_msf_final, [:permno, :date]);
    128     # unique(df_msf_final, [:permno, :date])
    129 
    130     return df_msf_final
    131 
    132 end
    133 
    134 # when there are no connections establisheds
    135 function import_MSF(;
    136     date_range::Tuple{Date, Date} = (Date("1900-01-01"), Date("2030-01-01")),
    137     variables::Vector{String} = [""],
    138     user::AbstractString = "", password::AbstractString = "")
    139 
    140     with_wrds_connection(user=user, password=password) do conn
    141         import_MSF(conn; date_range=date_range, variables=variables)
    142     end
    143 end
    144 # ------------------------------------------------------------------------------------------
    145 
    146 
    147 # ------------------------------------------------------------------------------------------
    148 """
    149     build_MSF!(df_msf::DataFrame; save, trim_cols, clean_cols, verbose)
    150 
    151 Clean up the CRSP Monthly Stock File (see `import_MSF`)
    152 
    153 # Arguments
    154 - `df::DataFrame`: A standard dataframe with CRSP MSF data (minimum variables are in `import_MSF`)
    155 
    156 # Keywords
    157 - `save::String`: Save a gzip version of the data on path `\$save/msf.csv.gz`; Default does not save the data.
    158 - `trim_cols::Bool`: Only keep a subset of relevant columns in the final dataset
    159 - `clean_cols::Bool`: Clean up the columns of the dataframe to be of type Float64; Default is `false` and leaves the Decimal type intact
    160 
    161 # Returns
    162 - `df::DataFrame`: DataFrame with crsp MSF file "cleaned"
    163 """
    164 function build_MSF!(
    165     df::AbstractDataFrame;
    166     save::AbstractString = "",
    167     trim_cols::Bool = false,
    168     clean_cols::Bool=false,
    169     verbose::Bool=false
    170     )
    171 
    172     required = ["shrout", "prc", "permno", "datem", "dlstcd", "ret", "dlret"]
    173     missing_cols = setdiff(required, names(df))
    174     !isempty(missing_cols) && throw(ArgumentError("Missing required columns: $(join(missing_cols, ", "))"))
    175 
    176 # Create marketcap:
    177     transform!(df, [:shrout, :prc] => ByRow( (s,p) -> s * abs(p) ) => :mktcap)
    178     # @rtransform!(df, :mktcap = :shrout * :cfacshr * abs(:altprc) / :cfacpr) # in 1000s
    179     # in some instances (spin-offs and other distributions we have cfacpr not equal to cfacshr)
    180     df[ isequal.(df.mktcap, 0), :mktcap] .= missing;
    181 
    182 # Lagged marketcap
    183     sort!(df, [:permno, :datem])
    184     # method 1: lag and then merge back
    185     # df_msf_mktcap_lag = @select(df_msf,
    186     #         :datem = :datem + Month(1), :permno, :l1m_mktcap2 = :mktcap)
    187     # df_msf = leftjoin(df_msf, df_msf_mktcap_lag, on = [:permno, :datem])
    188     # panellag!(df, :permno, :datem, :mktcap, :l1m_mktcap, Month(1))
    189     transform!(groupby(df, :permno), 
    190         [:mktcap, :datem] => ( (v, t) -> tlag(v, t, n=Month(1)) ) => :l1m_mktcap)
    191 
    192 # Adjusted returns (see tidy finance following Bali, Engle, and Murray)
    193     transform!(df, 
    194         AsTable([:ret, :dlstcd, :dlret]) => 
    195         ByRow(r -> ismissing(r.dlstcd) ? r.ret :
    196                      !ismissing(r.dlret) ? r.dlret :
    197                        (r.dlstcd in (500, 520, 580, 584) || (551 <= r.dlstcd <= 574)) ? -0.3 : 
    198                          r.dlstcd == 100 ? r.ret : -1.0
    199             ) => :ret_adj)
    200     # @rtransform! df :ret_adj =
    201     #     ismissing(:dlstcd) ? :ret :
    202     #         !ismissing(:dlret) ? :dlret :
    203     #             (:dlstcd ∈ (500, 520, 580, 584)) || ((:dlstcd >= 551) & (:dlstcd <= 574)) ? -0.3 :
    204     #                 :dlstcd == 100 ? :ret : -1.0
    205 
    206 # select variables and save
    207     if trim_cols
    208         select!(df, :permno, :date, :ret, :mktcap, :l1m_mktcap, :retx, :naics, :hsiccd)
    209     end
    210 
    211     if clean_cols
    212         verbose && (@info ". Converting decimal type columns to Float64.")
    213         for col in names(df)
    214             if eltype(df[!, col]) == Union{Missing,Decimal} || eltype(df[!, col]) <: Union{Missing,AbstractFloat}
    215                 df[!, col] = convert.(Union{Missing,Float64}, df[!, col])
    216             elseif eltype(df[!, col]) == Decimal || eltype(df[!, col]) <: AbstractFloat
    217                 df[!, col] = Float64.(df[!, col])
    218             end
    219         end
    220     end
    221 
    222     if !isempty(save)
    223         !isdir(save) && throw(ArgumentError("save argument referes to a non-existing directory: $save"))
    224         CSV.write(save * "/msf.csv.gz", df, compress=true)
    225     end
    226 
    227     return df
    228 end
    229 
    230 # --
    231 function build_MSF(
    232     df::AbstractDataFrame;
    233     save::AbstractString = "",
    234     trim_cols::Bool = false,
    235     clean_cols::Bool=false,
    236     verbose::Bool=false
    237     )
    238 
    239     df_res = copy(df)
    240     build_MSF!(df_res, save = save, trim_cols = trim_cols, clean_cols = clean_cols, verbose = verbose)
    241     return df_res
    242 end
    243 
    244 # --
    245 function build_MSF(wrds_conn::Connection;
    246     date_range::Tuple{Date, Date} = (Date("1900-01-01"), Dates.today()),
    247     save::AbstractString = "",
    248     trim_cols::Bool = false,
    249     clean_cols::Bool=false
    250     )
    251 
    252     df = import_MSF(wrds_conn; date_range=date_range);
    253     build_MSF!(df, save = save, trim_cols = trim_cols, clean_cols = clean_cols)
    254 
    255     return df
    256 end
    257 
    258 # --
    259 function build_MSF(;
    260     date_range::Tuple{Date, Date} = (Date("1900-01-01"), Dates.today()),
    261     save::AbstractString = "",
    262     trim_cols::Bool = false,
    263     clean_cols::Bool=false
    264     )
    265 
    266     df = import_MSF(; date_range = date_range);
    267     build_MSF!(df, save = save, trim_cols = trim_cols, clean_cols = clean_cols)
    268 
    269     return df
    270 end
    271 # --------------------------------------------------------------------------------------------------
    272 
    273 
    274 # --------------------------------------------------------------------------------------------------
    275 """
    276     import_MSF_v2(wrds_conn; date_range, variables, logging_level)
    277     import_MSF_v2(;
    278         date_range::Tuple{Date, Date} = (Date("1900-01-01"), Dates.today()),
    279         variables::String = "", user="", password="")
    280 
    281 Import the CRSP Monthly Stock File (MSF) from CRSP on WRDS PostGres server from the version 2.0 CIZ files
    282 
    283 # Arguments
    284 - `wrds_conn::Connection`: An existing Postgres connection to WRDS; creates one if empty
    285 
    286 # Keywords
    287 - `date_range::Tuple{Date, Date}`: A tuple of dates to select data (limits the download size)
    288 - `variables::Vector{String}`: A vector of String of additional variable to include in the download
    289 - `logging_level::Symbol`: How to log results
    290 
    291 # Returns
    292 - `df_msf_final::DataFrame`: DataFrame with msf crsp file
    293 """
    294 function import_MSF_v2(wrds_conn::Connection;
    295     date_range::Tuple{Date, Date} = (Date("1900-01-01"), Dates.today()),
    296     variables::Vector{String} = [""],
    297     logging_level::Symbol = :debug, # either none, debug, info etc... tbd
    298     )
    299 
    300     date_range = _validate_date_range(date_range)
    301 
    302     # ----------------------------------------------------------------------------------------------
    303     # the easy way
    304     @debug "Getting monthly stock file (CIZ) ... msf_v2"
    305     msf_v2_columns = _get_postgres_columns("crsp", "msf_v2"; wrds_conn=wrds_conn) |> sort
    306     col_select = ["permno", "hdrcusip", "mthcaldt", "mthprc", "mthret", "mthcap", "shrout",
    307         "mthretx", "mthprevcap", "mthprevprc", "permco"]
    308     col_query = @p vcat(col_select, variables) |> 
    309         uppercase.(__) |> filter(!isempty) |> filter(_ ∈ msf_v2_columns)
    310     # note that selecting all variables to download here is a lot slower than with msf_v1 because of the many more variables ...
    311 
    312     postgre_query_msf = """
    313     SELECT $(join(col_query, ", "))
    314         FROM crsp.msf_v2
    315         WHERE MTHCALDT >= \$1 AND MTHCALDT <= \$2
    316           AND SHARETYPE = 'NS' AND SECURITYTYPE = 'EQTY' AND SECURITYSUBTYPE = 'COM'
    317           AND USINCFLG = 'Y' AND ISSUERTYPE IN ('ACOR', 'CORP')
    318           AND PRIMARYEXCH IN ('N', 'A', 'Q') AND CONDITIONALTYPE = 'RW' AND TRADINGSTATUSFLG = 'A'
    319     """
    320     df_msf_v2 = execute(wrds_conn, postgre_query_msf, (date_range[1], date_range[2])) |> DataFrame;
    321 
    322     transform!(df_msf_v2,     # clean up the dataframe
    323         names(df_msf_v2, check_integer.(eachcol(df_msf_v2))) .=> (x->convert.(Union{Missing, Int}, x));
    324         renamecols = false);
    325     # ----------------------------------------------------------------------------------------------
    326 
    327 
    328     #=
    329     # ----------------------------------------------------------------------------------------------
    330     # the hard way
    331     # ------
    332     @debug "Getting monthly stock file (CIZ) ... stkmthsecuritydata"
    333     msf_columns = _get_postgres_columns("crsp", "stkmthsecuritydata"; wrds_conn=wrds_conn) # download potential columns
    334     # msf_columns = _get_postgres_columns("crsp", "msf_v2"; wrds_conn=wrds_conn) # this one is pre-merged!
    335     msf_columns = join(uppercase.(msf_columns), ", ")
    336 
    337     # legacy SIZ to CIZ conversion of shrcd flag (see doc)    
    338     #                   conversion of exchcd flag (see doc)
    339     postgre_query_msf = """
    340         SELECT $msf_columns
    341             FROM crsp.stkmthsecuritydata
    342             WHERE MTHCALDT >= '$(string(date_range[1]))' AND MTHCALDT <= '$(string(date_range[2]))'
    343               AND SHARETYPE = 'NS' AND SECURITYTYPE = 'EQTY' AND SECURITYSUBTYPE = 'COM' 
    344               AND USINCFLG = 'Y' AND ISSUERTYPE IN ('ACOR', 'CORP')
    345               AND PRIMARYEXCH IN ('N', 'A', 'Q') AND CONDITIONALTYPE = 'RW' AND TRADINGSTATUSFLG = 'A'
    346     """
    347     df_msf_v2 = execute(wrds_conn, postgre_query_msf) |> DataFrame;
    348     transform!(df_msf_v2,     # clean up the dataframe
    349         names(df_msf_v2, check_integer.(eachcol(df_msf_v2))) .=> (x->convert.(Union{Missing, Int}, x));
    350         renamecols = false);
    351 
    352     # subset!(df_msf_v2, [:sharetype, :securitytype, :securitysubtype, :usincflg, :issuertype] => 
    353     #                 ByRow( (sh, sec, secsub, usinc, issue) -> 
    354     #                 sh == "NS" && sec == "EQTY" && secsub == "COM" && usinc == "Y" && issue ∈ ["ACOR", "CORP"]) )
    355     # # legacy SIZ to CIZ conversion of exchcd flag (see doc)
    356     # subset!(df_msf_v2, 
    357     #     :primaryexch => ByRow(p -> p ∈ ["N", "A", "Q"]), 
    358     #     :conditionaltype => ByRow(c -> c == "RW"), :tradingstatusflg => ByRow(t -> t == "A") )
    359 
    360     
    361     # -- need to get shrout
    362     # stkshares = _get_postgres_columns("crsp", "stkshares"; wrds_conn=wrds_conn)
    363     postgre_query_stkshares = """
    364     SELECT * FROM crsp.stkshares
    365         WHERE SHRSTARTDT >= '$(string(date_range[1]))' AND SHRENDDT <= '$(string(date_range[2]))'
    366     """
    367     # df_stkshares = execute(wrds_conn, postgre_query_stkshares) |> DataFrame;
    368     df_stkshares = execute(wrds_conn, "SELECT permno, shrstartdt, shrenddt, shrout FROM crsp.stkshares") |> DataFrame;
    369 
    370     # -- no need for delisting returns (already integrated)
    371     @time df_msf_v2 = FlexiJoins.innerjoin(
    372         (disallowmissing(df_msf_v2, :mthcaldt),
    373          disallowmissing(select(df_stkshares, :permno, :shrstartdt, :shrenddt, :shrout), 
    374                          [:permno, :shrstartdt, :shrenddt]) ),
    375         by_key(:permno) & by_pred(:mthcaldt, ∈, x->x.shrstartdt..x.shrenddt) )
    376     # ----------------------------------------------------------------------------------------------
    377     =#
    378 
    379 
    380     # ----------------------------------------------------------------------------------------------
    381     # ------
    382     @debug "Getting StkSecurityInfoHist (CIZ)"
    383     # stksecurityinfo = _get_postgres_columns("crsp", "stksecurityinfohist"; wrds_conn=wrds_conn)
    384     stksecurityinfo_cols = vcat(
    385         ["PERMNO", "SecInfoStartDt", "SecInfoEndDt", "IssuerNm", "ShareClass",
    386          "PrimaryExch", "TradingStatusFlg", "NAICS", "SICCD", "HDRCUSIP"],
    387         uppercase.(variables)) |> filter(!isempty) |> unique
    388     stksecurityinfo_cols = _get_postgres_columns("crsp", "stksecurityinfohist"; wrds_conn=wrds_conn,
    389         prior_columns = stksecurityinfo_cols) |> sort
    390     stksecurityinfo_query = join(stksecurityinfo_cols, ", ")
    391 
    392     postgre_query_stksecurityinfo = "SELECT $stksecurityinfo_query FROM crsp.stksecurityinfohist"
    393     df_stksecurityinfo = execute(wrds_conn, postgre_query_stksecurityinfo) |> DataFrame;
    394     transform!(df_stksecurityinfo,
    395         names(df_stksecurityinfo, check_integer.(eachcol(df_stksecurityinfo))) .=> 
    396             (x->convert.(Union{Missing, Int}, x));
    397         renamecols = false) ;
    398     disallowmissing!(df_stksecurityinfo, [:permno, :secinfostartdt, :secinfoenddt, :issuernm, :hdrcusip])
    399 
    400     # ------
    401     @debug "Merging stock prices, info file"
    402     # we do left-join here because we dont want to lose obs. 
    403     df_msf_v2 = FlexiJoins.leftjoin( 
    404         (df_msf_v2, df_stksecurityinfo),
    405         by_key(:permno) & by_pred(:mthcaldt, ∈, x->x.secinfostartdt..x.secinfoenddt) )
    406     # ----------------------------------------------------------------------------------------------
    407 
    408 
    409     # ----------------------------------------------------------------------------------------------
    410     # only include siccd/naics if they exist in the DataFrame
    411     optional_cols = intersect([:siccd, :naics], Symbol.(names(df_msf_v2)))
    412     var_select = vcat(
    413         :permno,   # Security identifier
    414         :mthcaldt, # Date of the observation
    415         :mthret, # Return
    416         :mthretx, # Return excluding dividends
    417         :shrout, # Shares outstanding (in thousands)
    418         :mthprc,
    419         :mthcap,
    420         :mthprevcap,
    421         # :mthvol, :mthprcvol # volume and price volume
    422         optional_cols,
    423         Symbol.(intersect(variables, names(df_msf_v2)))
    424     )
    425 
    426     @p df_msf_v2 |> select!(__, var_select) |> sort!(__, [:permno, :mthcaldt]) |>
    427         disallowmissing!(__, [:mthcaldt])
    428     if "naics" in names(df_msf_v2)
    429         transform!(df_msf_v2, :naics => (x -> replace(x, "0" => missing)) => :naics)
    430     end
    431     transform!(df_msf_v2, :mthcaldt => ByRow(MonthlyDate) => :datem)
    432     # ----------------------------------------------------------------------------------------------
    433 
    434 
    435     return df_msf_v2
    436 
    437 end
    438 # ------------------------------------------------------------------------------------------
    439 
    440 
    441 # ------------------------------------------------------------------------------------------
    442 # --------------------------------------------------------------------------------------------------
    443 
    444 
    445 
    446 # --------------------------------------------------------------------------------------------------
    447 function import_DSF(wrds_conn::Connection;
    448     date_range::Tuple{Date, Date} = (Date("1900-01-01"), Dates.today()),
    449     logging_level::Symbol = :debug, # either none, debug, info etc... tbd
    450     )
    451 
    452     date_range = _validate_date_range(date_range)
    453 
    454     # set up the query for msf
    455     postgre_query_dsf = """
    456         SELECT PERMNO, DATE, RET, PRC, SHROUT, VOL
    457             FROM crsp.dsf
    458             WHERE DATE >= \$1 AND DATE <= \$2
    459     """
    460     df_dsf = execute(wrds_conn, postgre_query_dsf, (date_range[1], date_range[2])) |> DataFrame
    461     # clean up the dataframe
    462     transform!(df_dsf,
    463         names(df_dsf, check_integer.(eachcol(df_dsf))) .=> (x->convert.(Union{Missing, Int}, x));
    464         renamecols = false)
    465 
    466     return df_dsf
    467 end
    468 
    469 # when there are no connections established
    470 function import_DSF(;
    471     date_range::Tuple{Date, Date} = (Date("1900-01-01"), Dates.today()),
    472     user::AbstractString = "", password::AbstractString = "")
    473 
    474     with_wrds_connection(user=user, password=password) do conn
    475         import_DSF(conn; date_range=date_range)
    476     end
    477 end
    478 # ------------------------------------------------------------------------------------------
    479 
    480 
    481 
    482 # --------------------------------------------------------------------------------------------------
    483 function import_DSF_v2(wrds_conn::Connection;
    484     date_range::Tuple{Date, Date} = (Date("1900-01-01"), Dates.today()),
    485     logging_level::Symbol = :debug, # either none, debug, info etc... tbd
    486     )
    487 
    488     date_range = _validate_date_range(date_range)
    489 
    490     # could pick either way ... 
    491     # dsf_columns = _get_postgres_columns("crsp", "dsf_v2"; wrds_conn=wrds_conn) |> sort
    492     # stkmthsecuritydata_columns = _get_postgres_columns("crsp", "stkdlysecuritydata"; wrds_conn=wrds_conn) |> sort
    493 
    494 # set up the query for msf
    495     postgre_query_dsf = """
    496     SELECT PERMNO, DLYCALDT, DLYRET, DLYPRC, DLYVOL, DLYCAP
    497         FROM crsp.stkdlysecuritydata
    498         WHERE DLYCALDT >= \$1 AND DLYCALDT <= \$2
    499     """
    500     df_dsf_v2 = execute(wrds_conn, postgre_query_dsf, (date_range[1], date_range[2])) |> DataFrame
    501 
    502     # clean up the dataframe
    503     transform!(df_dsf_v2,
    504         names(df_dsf_v2, check_integer.(eachcol(df_dsf_v2))) .=> (x->convert.(Union{Missing, Int}, x));
    505         renamecols = false)
    506 
    507     disallowmissing!(df_dsf_v2, :dlycaldt)
    508 
    509     return df_dsf_v2
    510 end
    511 
    512 # when there are no connections established
    513 function import_DSF_v2(;
    514     date_range::Tuple{Date, Date} = (Date("1900-01-01"), Dates.today()),
    515     logging_level::Symbol = :debug,
    516     user::AbstractString = "", password::AbstractString = "")
    517 
    518     with_wrds_connection(user=user, password=password) do conn
    519         import_DSF_v2(conn; date_range=date_range, logging_level=logging_level)
    520     end
    521 end
    522 # ------------------------------------------------------------------------------------------
    523 
    524 
    525 
    526 # ------------------------------------------------------------------------------------------
    527 # PRIVATE FUNCTIONS 
    528 # TODO REWRITE THESE FUNCTIONS WITHOUT INTERPOLATION FOR SAFETY
    529 # postgres_query = """
    530 #     SELECT table_name
    531 #       FROM information_schema.tables
    532 #      WHERE table_schema = \$1
    533 # """
    534 # postgres_res = execute(wrds_conn, postgres_query, (table_schema,))
    535 function _get_postgres_columns(table_schema, table_name; wrds_conn, prior_columns::Vector{String} = [""])
    536 
    537     # Parameterized query to prevent SQL injection
    538     postgres_query = """
    539         SELECT column_name
    540           FROM information_schema.columns
    541          WHERE table_schema = \$1
    542            AND table_name   = \$2
    543         """
    544 
    545     postgres_res = execute(wrds_conn, postgres_query, (table_schema, table_name))
    546     postgres_columns = DataFrame(columntable(postgres_res)).column_name
    547     if isempty(prior_columns) || prior_columns == [""]
    548         return uppercase.(postgres_columns)
    549     else
    550         return intersect(uppercase.(postgres_columns), uppercase.(prior_columns))
    551     end
    552 end 
    553 
    554 
    555 function _get_postgres_table(table_schema, table_name; wrds_conn, prior_columns::Vector{String} = [""])
    556 
    557     if isempty(prior_columns) || prior_columns == [""]
    558         columns = "*"
    559     else
    560         # Column names are validated against schema when used with _get_postgres_columns
    561         columns = join(uppercase.(prior_columns), ", ")
    562     end
    563 
    564     # Quote identifiers to prevent SQL injection
    565     schema_q = replace(table_schema, "\"" => "\"\"")
    566     table_q = replace(table_name, "\"" => "\"\"")
    567     postgres_query = """
    568         SELECT $columns
    569         FROM \"$schema_q\".\"$table_q\"
    570     """
    571 
    572     postgres_res = execute(wrds_conn, postgres_query)
    573     return columntable(postgres_res)
    574 end 
    575 # --------------------------------------------------------------------------------------------------
    576 
    577 
    578 # --------------------------------------------------------------------------------------------------
    579 # function list_crsp(;
    580 #     wrds_conn, user, password)
    581 
    582 #     list_libraries = """
    583 #         WITH RECURSIVE "names"("name") AS (
    584 #             SELECT n.nspname AS "name"
    585 #                 FROM pg_catalog.pg_namespace n
    586 #                 WHERE n.nspname !~ '^pg_'
    587 #                     AND n.nspname <> 'information_schema')
    588 #             SELECT "name"
    589 #                 FROM "names"
    590 #                 WHERE pg_catalog.has_schema_privilege(
    591 #                     current_user, "name", 'USAGE') = TRUE;
    592 #         """
    593 #     res_list_libraries = execute(wrds_conn, list_libraries);
    594 #     df_libraries = DataFrame(columntable(res_list_libraries))
    595 #     @rsubset(df_libraries, occursin(r"crsp", :name) )
    596 
    597 #     library = "crsp"
    598 #     list_tables = """
    599 #         SELECT table_name FROM INFORMATION_SCHEMA.views
    600 #             WHERE table_schema IN ('$library');
    601 #         """
    602 #     res_list_tables = execute(wrds_conn, list_tables);
    603 #     df_tables = DataFrame(columntable(res_list_tables))
    604 #     @rsubset(df_tables, occursin(r"mse", :table_name) )
    605 
    606 #     return run_sql_query(conn, query)
    607 
    608 
    609 # end
    610 # --------------------------------------------------------------------------------------------------