ImportCRSP.jl (24594B)
1 # ------------------------------------------------------------------------------------------ 2 # ImportCRSP.jl 3 4 # Collection of functions that import 5 # financial data into julia 6 # ------------------------------------------------------------------------------------------ 7 8 9 # ------------------------------------------------------------------------------------------ 10 # List of exported functions 11 # export import_MSF 12 # export build_MSF 13 14 # list 15 # ------------------------------------------------------------------------------------------ 16 17 18 19 # ------------------------------------------------------------------------------------------ 20 """ 21 import_MSF(wrds_conn; date_range, variables) 22 import_MSF(; 23 date_range::Tuple{Date, Date} = (Date("1900-01-01"), Dates.today()), 24 variables::String = "", user="", password="") 25 26 Import the CRSP Monthly Stock File (MSF) from CRSP on WRDS PostGre server 27 28 # Arguments 29 - `wrds_conn::Connection`: An existing Postgres connection to WRDS; creates one if empty 30 31 # Keywords 32 - `date_range::Tuple{Date, Date}`: A tuple of dates to select data (limits the download size) 33 - `variables::Vector{String}`: A vector of String of additional variable to include in the download 34 35 # Returns 36 - `df_msf_final::DataFrame`: DataFrame with msf crsp file 37 """ 38 function import_MSF(wrds_conn::Connection; 39 date_range::Tuple{Date, Date} = (Date("1900-01-01"), Dates.today()), 40 variables::Vector{String} = [""] 41 ) 42 43 date_range = _validate_date_range(date_range) 44 45 # -- GETTING COLUMN NAMES 46 # download potential columns 47 msenames_columns = _get_postgres_columns("crsp", "msenames"; wrds_conn=wrds_conn, 48 prior_columns = vcat(["PERMNO", "NAMEDT", "NAMEENDT", "SHRCD", "EXCHCD", "HEXCD", 49 "NAICS", "HSICCD", "CUSIP"], 50 uppercase.(variables)) 51 ) 52 msenames_columns = join(uppercase.(msenames_columns), ", ") 53 54 msf_columns = _get_postgres_columns("crsp", "msf"; wrds_conn=wrds_conn, 55 prior_columns = vcat(["PERMNO","PERMCO","DATE","PRC","ALTPRC","RET","RETX","SHROUT","CFACPR","CFACSHR"], 56 uppercase.(variables)) 57 ) 58 msf_columns = join(uppercase.(msf_columns), ", ") 59 60 # -- GETTING MSF 61 # set up the query for msf 62 postgre_query_msf = """ 63 SELECT $msf_columns 64 FROM crsp.msf 65 WHERE DATE >= \$1 AND DATE <= \$2 66 """ 67 res_q_msf = execute(wrds_conn, postgre_query_msf, (date_range[1], date_range[2])) 68 df_msf = DataFrame(columntable(res_q_msf)) 69 transform!(df_msf, # clean up the dataframe 70 names(df_msf, check_integer.(eachcol(df_msf))) .=> (x->convert.(Union{Missing, Int}, x)); 71 renamecols = false); 72 73 # -- GETTING MSENAMES 74 postgre_query_msenames = """ 75 SELECT $msenames_columns 76 FROM crsp.msenames 77 """ 78 res_q_msenames = execute(wrds_conn, postgre_query_msenames) 79 df_msenames = DataFrame(columntable(res_q_msenames)) ; 80 transform!(df_msenames, 81 names(df_msenames, check_integer.(eachcol(df_msenames))) .=> (x->convert.(Union{Missing, Int}, x)); 82 renamecols = false) ; 83 df_msenames[!, :cusip] .= String15.(df_msenames[!, :cusip]); 84 df_msenames[ .!ismissing.(df_msenames.naics) , :naics] .= String7.(skipmissing(df_msenames[!, :naics])); 85 @p df_msenames |> filter!(_.exchcd <= 3 && _.shrcd ∈ (10,11)) 86 87 # set up the query for msedelist 88 postgre_query_msedelist = """ 89 SELECT PERMNO, DLSTDT, DLRET, DLSTCD 90 FROM crsp.msedelist 91 """ 92 res_q_msedelist = execute(wrds_conn, postgre_query_msedelist) 93 df_msedelist = DataFrame(columntable(res_q_msedelist)) ; 94 transform!(df_msedelist, 95 names(df_msedelist, check_integer.(eachcol(df_msedelist))) .=> (x->convert.(Union{Missing, Int}, x)); 96 renamecols = false) ; 97 transform!(df_msedelist, :dlstdt => ByRow(MonthlyDate) => :datem) 98 99 # --- merge all of the datasets together 100 df_msf_final = FlexiJoins.innerjoin( 101 (df_msf, df_msenames), 102 by_key(:permno) & by_pred(:date, ∈, x->x.namedt..x.nameendt) 103 ) 104 transform!(df_msf_final, :date => ByRow(MonthlyDate) => :datem) 105 106 df_msf_final = leftjoin(df_msf_final, df_msedelist, on = [:permno, :datem]) 107 108 var_select = unique(vcat( 109 :permno, # Security identifier 110 :date, # Date of the observation 111 :datem, 112 :ret, # Return 113 :retx, # Return excluding dividends 114 :shrout, # Shares outstanding (in thousands) 115 :prc, 116 :altprc, # Last traded price in a month 117 :exchcd, # Exchange code 118 :hsiccd, # Industry code 119 :naics, # Industry code 120 :dlret, # Delisting return 121 :dlstcd, # Delisting code 122 Symbol.(intersect(variables, names(df_msf_final))) 123 )) 124 125 select!(df_msf_final, var_select) 126 127 sort!(df_msf_final, [:permno, :date]); 128 # unique(df_msf_final, [:permno, :date]) 129 130 return df_msf_final 131 132 end 133 134 # when there are no connections establisheds 135 function import_MSF(; 136 date_range::Tuple{Date, Date} = (Date("1900-01-01"), Date("2030-01-01")), 137 variables::Vector{String} = [""], 138 user::AbstractString = "", password::AbstractString = "") 139 140 with_wrds_connection(user=user, password=password) do conn 141 import_MSF(conn; date_range=date_range, variables=variables) 142 end 143 end 144 # ------------------------------------------------------------------------------------------ 145 146 147 # ------------------------------------------------------------------------------------------ 148 """ 149 build_MSF!(df_msf::DataFrame; save, trim_cols, clean_cols, verbose) 150 151 Clean up the CRSP Monthly Stock File (see `import_MSF`) 152 153 # Arguments 154 - `df::DataFrame`: A standard dataframe with CRSP MSF data (minimum variables are in `import_MSF`) 155 156 # Keywords 157 - `save::String`: Save a gzip version of the data on path `\$save/msf.csv.gz`; Default does not save the data. 158 - `trim_cols::Bool`: Only keep a subset of relevant columns in the final dataset 159 - `clean_cols::Bool`: Clean up the columns of the dataframe to be of type Float64; Default is `false` and leaves the Decimal type intact 160 161 # Returns 162 - `df::DataFrame`: DataFrame with crsp MSF file "cleaned" 163 """ 164 function build_MSF!( 165 df::AbstractDataFrame; 166 save::AbstractString = "", 167 trim_cols::Bool = false, 168 clean_cols::Bool=false, 169 verbose::Bool=false 170 ) 171 172 required = ["shrout", "prc", "permno", "datem", "dlstcd", "ret", "dlret"] 173 missing_cols = setdiff(required, names(df)) 174 !isempty(missing_cols) && throw(ArgumentError("Missing required columns: $(join(missing_cols, ", "))")) 175 176 # Create marketcap: 177 transform!(df, [:shrout, :prc] => ByRow( (s,p) -> s * abs(p) ) => :mktcap) 178 # @rtransform!(df, :mktcap = :shrout * :cfacshr * abs(:altprc) / :cfacpr) # in 1000s 179 # in some instances (spin-offs and other distributions we have cfacpr not equal to cfacshr) 180 df[ isequal.(df.mktcap, 0), :mktcap] .= missing; 181 182 # Lagged marketcap 183 sort!(df, [:permno, :datem]) 184 # method 1: lag and then merge back 185 # df_msf_mktcap_lag = @select(df_msf, 186 # :datem = :datem + Month(1), :permno, :l1m_mktcap2 = :mktcap) 187 # df_msf = leftjoin(df_msf, df_msf_mktcap_lag, on = [:permno, :datem]) 188 # panellag!(df, :permno, :datem, :mktcap, :l1m_mktcap, Month(1)) 189 transform!(groupby(df, :permno), 190 [:mktcap, :datem] => ( (v, t) -> tlag(v, t, n=Month(1)) ) => :l1m_mktcap) 191 192 # Adjusted returns (see tidy finance following Bali, Engle, and Murray) 193 transform!(df, 194 AsTable([:ret, :dlstcd, :dlret]) => 195 ByRow(r -> ismissing(r.dlstcd) ? r.ret : 196 !ismissing(r.dlret) ? r.dlret : 197 (r.dlstcd in (500, 520, 580, 584) || (551 <= r.dlstcd <= 574)) ? -0.3 : 198 r.dlstcd == 100 ? r.ret : -1.0 199 ) => :ret_adj) 200 # @rtransform! df :ret_adj = 201 # ismissing(:dlstcd) ? :ret : 202 # !ismissing(:dlret) ? :dlret : 203 # (:dlstcd ∈ (500, 520, 580, 584)) || ((:dlstcd >= 551) & (:dlstcd <= 574)) ? -0.3 : 204 # :dlstcd == 100 ? :ret : -1.0 205 206 # select variables and save 207 if trim_cols 208 select!(df, :permno, :date, :ret, :mktcap, :l1m_mktcap, :retx, :naics, :hsiccd) 209 end 210 211 if clean_cols 212 verbose && (@info ". Converting decimal type columns to Float64.") 213 for col in names(df) 214 if eltype(df[!, col]) == Union{Missing,Decimal} || eltype(df[!, col]) <: Union{Missing,AbstractFloat} 215 df[!, col] = convert.(Union{Missing,Float64}, df[!, col]) 216 elseif eltype(df[!, col]) == Decimal || eltype(df[!, col]) <: AbstractFloat 217 df[!, col] = Float64.(df[!, col]) 218 end 219 end 220 end 221 222 if !isempty(save) 223 !isdir(save) && throw(ArgumentError("save argument referes to a non-existing directory: $save")) 224 CSV.write(save * "/msf.csv.gz", df, compress=true) 225 end 226 227 return df 228 end 229 230 # -- 231 function build_MSF( 232 df::AbstractDataFrame; 233 save::AbstractString = "", 234 trim_cols::Bool = false, 235 clean_cols::Bool=false, 236 verbose::Bool=false 237 ) 238 239 df_res = copy(df) 240 build_MSF!(df_res, save = save, trim_cols = trim_cols, clean_cols = clean_cols, verbose = verbose) 241 return df_res 242 end 243 244 # -- 245 function build_MSF(wrds_conn::Connection; 246 date_range::Tuple{Date, Date} = (Date("1900-01-01"), Dates.today()), 247 save::AbstractString = "", 248 trim_cols::Bool = false, 249 clean_cols::Bool=false 250 ) 251 252 df = import_MSF(wrds_conn; date_range=date_range); 253 build_MSF!(df, save = save, trim_cols = trim_cols, clean_cols = clean_cols) 254 255 return df 256 end 257 258 # -- 259 function build_MSF(; 260 date_range::Tuple{Date, Date} = (Date("1900-01-01"), Dates.today()), 261 save::AbstractString = "", 262 trim_cols::Bool = false, 263 clean_cols::Bool=false 264 ) 265 266 df = import_MSF(; date_range = date_range); 267 build_MSF!(df, save = save, trim_cols = trim_cols, clean_cols = clean_cols) 268 269 return df 270 end 271 # -------------------------------------------------------------------------------------------------- 272 273 274 # -------------------------------------------------------------------------------------------------- 275 """ 276 import_MSF_v2(wrds_conn; date_range, variables, logging_level) 277 import_MSF_v2(; 278 date_range::Tuple{Date, Date} = (Date("1900-01-01"), Dates.today()), 279 variables::String = "", user="", password="") 280 281 Import the CRSP Monthly Stock File (MSF) from CRSP on WRDS PostGres server from the version 2.0 CIZ files 282 283 # Arguments 284 - `wrds_conn::Connection`: An existing Postgres connection to WRDS; creates one if empty 285 286 # Keywords 287 - `date_range::Tuple{Date, Date}`: A tuple of dates to select data (limits the download size) 288 - `variables::Vector{String}`: A vector of String of additional variable to include in the download 289 - `logging_level::Symbol`: How to log results 290 291 # Returns 292 - `df_msf_final::DataFrame`: DataFrame with msf crsp file 293 """ 294 function import_MSF_v2(wrds_conn::Connection; 295 date_range::Tuple{Date, Date} = (Date("1900-01-01"), Dates.today()), 296 variables::Vector{String} = [""], 297 logging_level::Symbol = :debug, # either none, debug, info etc... tbd 298 ) 299 300 date_range = _validate_date_range(date_range) 301 302 # ---------------------------------------------------------------------------------------------- 303 # the easy way 304 @debug "Getting monthly stock file (CIZ) ... msf_v2" 305 msf_v2_columns = _get_postgres_columns("crsp", "msf_v2"; wrds_conn=wrds_conn) |> sort 306 col_select = ["permno", "hdrcusip", "mthcaldt", "mthprc", "mthret", "mthcap", "shrout", 307 "mthretx", "mthprevcap", "mthprevprc", "permco"] 308 col_query = @p vcat(col_select, variables) |> 309 uppercase.(__) |> filter(!isempty) |> filter(_ ∈ msf_v2_columns) 310 # note that selecting all variables to download here is a lot slower than with msf_v1 because of the many more variables ... 311 312 postgre_query_msf = """ 313 SELECT $(join(col_query, ", ")) 314 FROM crsp.msf_v2 315 WHERE MTHCALDT >= \$1 AND MTHCALDT <= \$2 316 AND SHARETYPE = 'NS' AND SECURITYTYPE = 'EQTY' AND SECURITYSUBTYPE = 'COM' 317 AND USINCFLG = 'Y' AND ISSUERTYPE IN ('ACOR', 'CORP') 318 AND PRIMARYEXCH IN ('N', 'A', 'Q') AND CONDITIONALTYPE = 'RW' AND TRADINGSTATUSFLG = 'A' 319 """ 320 df_msf_v2 = execute(wrds_conn, postgre_query_msf, (date_range[1], date_range[2])) |> DataFrame; 321 322 transform!(df_msf_v2, # clean up the dataframe 323 names(df_msf_v2, check_integer.(eachcol(df_msf_v2))) .=> (x->convert.(Union{Missing, Int}, x)); 324 renamecols = false); 325 # ---------------------------------------------------------------------------------------------- 326 327 328 #= 329 # ---------------------------------------------------------------------------------------------- 330 # the hard way 331 # ------ 332 @debug "Getting monthly stock file (CIZ) ... stkmthsecuritydata" 333 msf_columns = _get_postgres_columns("crsp", "stkmthsecuritydata"; wrds_conn=wrds_conn) # download potential columns 334 # msf_columns = _get_postgres_columns("crsp", "msf_v2"; wrds_conn=wrds_conn) # this one is pre-merged! 335 msf_columns = join(uppercase.(msf_columns), ", ") 336 337 # legacy SIZ to CIZ conversion of shrcd flag (see doc) 338 # conversion of exchcd flag (see doc) 339 postgre_query_msf = """ 340 SELECT $msf_columns 341 FROM crsp.stkmthsecuritydata 342 WHERE MTHCALDT >= '$(string(date_range[1]))' AND MTHCALDT <= '$(string(date_range[2]))' 343 AND SHARETYPE = 'NS' AND SECURITYTYPE = 'EQTY' AND SECURITYSUBTYPE = 'COM' 344 AND USINCFLG = 'Y' AND ISSUERTYPE IN ('ACOR', 'CORP') 345 AND PRIMARYEXCH IN ('N', 'A', 'Q') AND CONDITIONALTYPE = 'RW' AND TRADINGSTATUSFLG = 'A' 346 """ 347 df_msf_v2 = execute(wrds_conn, postgre_query_msf) |> DataFrame; 348 transform!(df_msf_v2, # clean up the dataframe 349 names(df_msf_v2, check_integer.(eachcol(df_msf_v2))) .=> (x->convert.(Union{Missing, Int}, x)); 350 renamecols = false); 351 352 # subset!(df_msf_v2, [:sharetype, :securitytype, :securitysubtype, :usincflg, :issuertype] => 353 # ByRow( (sh, sec, secsub, usinc, issue) -> 354 # sh == "NS" && sec == "EQTY" && secsub == "COM" && usinc == "Y" && issue ∈ ["ACOR", "CORP"]) ) 355 # # legacy SIZ to CIZ conversion of exchcd flag (see doc) 356 # subset!(df_msf_v2, 357 # :primaryexch => ByRow(p -> p ∈ ["N", "A", "Q"]), 358 # :conditionaltype => ByRow(c -> c == "RW"), :tradingstatusflg => ByRow(t -> t == "A") ) 359 360 361 # -- need to get shrout 362 # stkshares = _get_postgres_columns("crsp", "stkshares"; wrds_conn=wrds_conn) 363 postgre_query_stkshares = """ 364 SELECT * FROM crsp.stkshares 365 WHERE SHRSTARTDT >= '$(string(date_range[1]))' AND SHRENDDT <= '$(string(date_range[2]))' 366 """ 367 # df_stkshares = execute(wrds_conn, postgre_query_stkshares) |> DataFrame; 368 df_stkshares = execute(wrds_conn, "SELECT permno, shrstartdt, shrenddt, shrout FROM crsp.stkshares") |> DataFrame; 369 370 # -- no need for delisting returns (already integrated) 371 @time df_msf_v2 = FlexiJoins.innerjoin( 372 (disallowmissing(df_msf_v2, :mthcaldt), 373 disallowmissing(select(df_stkshares, :permno, :shrstartdt, :shrenddt, :shrout), 374 [:permno, :shrstartdt, :shrenddt]) ), 375 by_key(:permno) & by_pred(:mthcaldt, ∈, x->x.shrstartdt..x.shrenddt) ) 376 # ---------------------------------------------------------------------------------------------- 377 =# 378 379 380 # ---------------------------------------------------------------------------------------------- 381 # ------ 382 @debug "Getting StkSecurityInfoHist (CIZ)" 383 # stksecurityinfo = _get_postgres_columns("crsp", "stksecurityinfohist"; wrds_conn=wrds_conn) 384 stksecurityinfo_cols = vcat( 385 ["PERMNO", "SecInfoStartDt", "SecInfoEndDt", "IssuerNm", "ShareClass", 386 "PrimaryExch", "TradingStatusFlg", "NAICS", "SICCD", "HDRCUSIP"], 387 uppercase.(variables)) |> filter(!isempty) |> unique 388 stksecurityinfo_cols = _get_postgres_columns("crsp", "stksecurityinfohist"; wrds_conn=wrds_conn, 389 prior_columns = stksecurityinfo_cols) |> sort 390 stksecurityinfo_query = join(stksecurityinfo_cols, ", ") 391 392 postgre_query_stksecurityinfo = "SELECT $stksecurityinfo_query FROM crsp.stksecurityinfohist" 393 df_stksecurityinfo = execute(wrds_conn, postgre_query_stksecurityinfo) |> DataFrame; 394 transform!(df_stksecurityinfo, 395 names(df_stksecurityinfo, check_integer.(eachcol(df_stksecurityinfo))) .=> 396 (x->convert.(Union{Missing, Int}, x)); 397 renamecols = false) ; 398 disallowmissing!(df_stksecurityinfo, [:permno, :secinfostartdt, :secinfoenddt, :issuernm, :hdrcusip]) 399 400 # ------ 401 @debug "Merging stock prices, info file" 402 # we do left-join here because we dont want to lose obs. 403 df_msf_v2 = FlexiJoins.leftjoin( 404 (df_msf_v2, df_stksecurityinfo), 405 by_key(:permno) & by_pred(:mthcaldt, ∈, x->x.secinfostartdt..x.secinfoenddt) ) 406 # ---------------------------------------------------------------------------------------------- 407 408 409 # ---------------------------------------------------------------------------------------------- 410 # only include siccd/naics if they exist in the DataFrame 411 optional_cols = intersect([:siccd, :naics], Symbol.(names(df_msf_v2))) 412 var_select = vcat( 413 :permno, # Security identifier 414 :mthcaldt, # Date of the observation 415 :mthret, # Return 416 :mthretx, # Return excluding dividends 417 :shrout, # Shares outstanding (in thousands) 418 :mthprc, 419 :mthcap, 420 :mthprevcap, 421 # :mthvol, :mthprcvol # volume and price volume 422 optional_cols, 423 Symbol.(intersect(variables, names(df_msf_v2))) 424 ) 425 426 @p df_msf_v2 |> select!(__, var_select) |> sort!(__, [:permno, :mthcaldt]) |> 427 disallowmissing!(__, [:mthcaldt]) 428 if "naics" in names(df_msf_v2) 429 transform!(df_msf_v2, :naics => (x -> replace(x, "0" => missing)) => :naics) 430 end 431 transform!(df_msf_v2, :mthcaldt => ByRow(MonthlyDate) => :datem) 432 # ---------------------------------------------------------------------------------------------- 433 434 435 return df_msf_v2 436 437 end 438 # ------------------------------------------------------------------------------------------ 439 440 441 # ------------------------------------------------------------------------------------------ 442 # -------------------------------------------------------------------------------------------------- 443 444 445 446 # -------------------------------------------------------------------------------------------------- 447 function import_DSF(wrds_conn::Connection; 448 date_range::Tuple{Date, Date} = (Date("1900-01-01"), Dates.today()), 449 logging_level::Symbol = :debug, # either none, debug, info etc... tbd 450 ) 451 452 date_range = _validate_date_range(date_range) 453 454 # set up the query for msf 455 postgre_query_dsf = """ 456 SELECT PERMNO, DATE, RET, PRC, SHROUT, VOL 457 FROM crsp.dsf 458 WHERE DATE >= \$1 AND DATE <= \$2 459 """ 460 df_dsf = execute(wrds_conn, postgre_query_dsf, (date_range[1], date_range[2])) |> DataFrame 461 # clean up the dataframe 462 transform!(df_dsf, 463 names(df_dsf, check_integer.(eachcol(df_dsf))) .=> (x->convert.(Union{Missing, Int}, x)); 464 renamecols = false) 465 466 return df_dsf 467 end 468 469 # when there are no connections established 470 function import_DSF(; 471 date_range::Tuple{Date, Date} = (Date("1900-01-01"), Dates.today()), 472 user::AbstractString = "", password::AbstractString = "") 473 474 with_wrds_connection(user=user, password=password) do conn 475 import_DSF(conn; date_range=date_range) 476 end 477 end 478 # ------------------------------------------------------------------------------------------ 479 480 481 482 # -------------------------------------------------------------------------------------------------- 483 function import_DSF_v2(wrds_conn::Connection; 484 date_range::Tuple{Date, Date} = (Date("1900-01-01"), Dates.today()), 485 logging_level::Symbol = :debug, # either none, debug, info etc... tbd 486 ) 487 488 date_range = _validate_date_range(date_range) 489 490 # could pick either way ... 491 # dsf_columns = _get_postgres_columns("crsp", "dsf_v2"; wrds_conn=wrds_conn) |> sort 492 # stkmthsecuritydata_columns = _get_postgres_columns("crsp", "stkdlysecuritydata"; wrds_conn=wrds_conn) |> sort 493 494 # set up the query for msf 495 postgre_query_dsf = """ 496 SELECT PERMNO, DLYCALDT, DLYRET, DLYPRC, DLYVOL, DLYCAP 497 FROM crsp.stkdlysecuritydata 498 WHERE DLYCALDT >= \$1 AND DLYCALDT <= \$2 499 """ 500 df_dsf_v2 = execute(wrds_conn, postgre_query_dsf, (date_range[1], date_range[2])) |> DataFrame 501 502 # clean up the dataframe 503 transform!(df_dsf_v2, 504 names(df_dsf_v2, check_integer.(eachcol(df_dsf_v2))) .=> (x->convert.(Union{Missing, Int}, x)); 505 renamecols = false) 506 507 disallowmissing!(df_dsf_v2, :dlycaldt) 508 509 return df_dsf_v2 510 end 511 512 # when there are no connections established 513 function import_DSF_v2(; 514 date_range::Tuple{Date, Date} = (Date("1900-01-01"), Dates.today()), 515 logging_level::Symbol = :debug, 516 user::AbstractString = "", password::AbstractString = "") 517 518 with_wrds_connection(user=user, password=password) do conn 519 import_DSF_v2(conn; date_range=date_range, logging_level=logging_level) 520 end 521 end 522 # ------------------------------------------------------------------------------------------ 523 524 525 526 # ------------------------------------------------------------------------------------------ 527 # PRIVATE FUNCTIONS 528 # TODO REWRITE THESE FUNCTIONS WITHOUT INTERPOLATION FOR SAFETY 529 # postgres_query = """ 530 # SELECT table_name 531 # FROM information_schema.tables 532 # WHERE table_schema = \$1 533 # """ 534 # postgres_res = execute(wrds_conn, postgres_query, (table_schema,)) 535 function _get_postgres_columns(table_schema, table_name; wrds_conn, prior_columns::Vector{String} = [""]) 536 537 # Parameterized query to prevent SQL injection 538 postgres_query = """ 539 SELECT column_name 540 FROM information_schema.columns 541 WHERE table_schema = \$1 542 AND table_name = \$2 543 """ 544 545 postgres_res = execute(wrds_conn, postgres_query, (table_schema, table_name)) 546 postgres_columns = DataFrame(columntable(postgres_res)).column_name 547 if isempty(prior_columns) || prior_columns == [""] 548 return uppercase.(postgres_columns) 549 else 550 return intersect(uppercase.(postgres_columns), uppercase.(prior_columns)) 551 end 552 end 553 554 555 function _get_postgres_table(table_schema, table_name; wrds_conn, prior_columns::Vector{String} = [""]) 556 557 if isempty(prior_columns) || prior_columns == [""] 558 columns = "*" 559 else 560 # Column names are validated against schema when used with _get_postgres_columns 561 columns = join(uppercase.(prior_columns), ", ") 562 end 563 564 # Quote identifiers to prevent SQL injection 565 schema_q = replace(table_schema, "\"" => "\"\"") 566 table_q = replace(table_name, "\"" => "\"\"") 567 postgres_query = """ 568 SELECT $columns 569 FROM \"$schema_q\".\"$table_q\" 570 """ 571 572 postgres_res = execute(wrds_conn, postgres_query) 573 return columntable(postgres_res) 574 end 575 # -------------------------------------------------------------------------------------------------- 576 577 578 # -------------------------------------------------------------------------------------------------- 579 # function list_crsp(; 580 # wrds_conn, user, password) 581 582 # list_libraries = """ 583 # WITH RECURSIVE "names"("name") AS ( 584 # SELECT n.nspname AS "name" 585 # FROM pg_catalog.pg_namespace n 586 # WHERE n.nspname !~ '^pg_' 587 # AND n.nspname <> 'information_schema') 588 # SELECT "name" 589 # FROM "names" 590 # WHERE pg_catalog.has_schema_privilege( 591 # current_user, "name", 'USAGE') = TRUE; 592 # """ 593 # res_list_libraries = execute(wrds_conn, list_libraries); 594 # df_libraries = DataFrame(columntable(res_list_libraries)) 595 # @rsubset(df_libraries, occursin(r"crsp", :name) ) 596 597 # library = "crsp" 598 # list_tables = """ 599 # SELECT table_name FROM INFORMATION_SCHEMA.views 600 # WHERE table_schema IN ('$library'); 601 # """ 602 # res_list_tables = execute(wrds_conn, list_tables); 603 # df_tables = DataFrame(columntable(res_list_tables)) 604 # @rsubset(df_tables, occursin(r"mse", :table_name) ) 605 606 # return run_sql_query(conn, query) 607 608 609 # end 610 # --------------------------------------------------------------------------------------------------