본문 바로가기

Big Data

[Spark] Apache Arrow란? (Zero-Copy 직렬화에 대하여) Apache Arrow란? 서로 다른 데이터 인프라가 서로 간의 데이터 공유를 위해 API를 이용할 때 발생하는 문제점 중 하나는 직렬화와 역 직렬화의 오버헤드가 너무 높다는 것이다. 이는 애플리케이션 성능의 병목을 초래한다. Arrow는 언어, 플랫폼과 상관없이 메모리 상에서 컬럼 구조로 데이터를 정의하여, CPU와 GPU에서 메모리를 빠르게 읽고 쓸 수 있도록 한다. 직렬화(Serialization)란? 객체를 저장하거나 메모리, 데이터베이스 혹은 파일로 옮길 때 필요한 것이 직렬화이다. 직렬화란 객체를 바이트 스트림으로 바꾸는 것, 즉 객체에 저장된 데이터를 스트림에 쓰기(write) 위해 연속적인(serial) 데이터로 변환하는 것이다. 직렬화의 주된 목적은 객체를 상태 그대로 저장하고 필요할 때.. 더보기
[Spark] 하둡 hdfs 파일 하나로 합쳐 local에 내려받기 (getmerge, *.gz파일) 1. 압축파일이 아닐 경우 hdfs에 나누어 저장되어 있는 파일들을 합쳐서 로컬로 받고 싶은 경우에는 -getmerge 명령어를 사용하고, 파티션들이 저장되어 있는 상위 디렉토리를 hdfs 경로로 준다. $ hdfs dfs -getmerge [hdfs 경로] [내려받을 local위치] Ex) 위의 경우에는 20211204_161503_442837 디렉토리 내에 파일들이 나누어져 저장되어있기 때문에 해당경로까지를 첫 번째 인자로 넣어준다. $ hdfs dfs -getmerge /user/nauts/warehouse/anchor_set/wiki/all/20211204_161503_442837 [local위치] 2. 압축 파일일 경우 하지만 나누어져있는 파일이 다음과 같이 .gz와 같은 압축 파일인 경우에는 .. 더보기
[Elasticsearch] 색인의 모든 문서를 반복 / 스크롤하는 방법 목적 인덱스 내의 전체 문서를 돌며 각 필드의 데이터를 가져와 처리를 해야 하는 경우 elastic search의 scroll을 이용하는 유용한 코드가 존재한다. 전체 코드 def iterate_all_documents(es, index, logger, pagesize=250, scroll_timeout="15m", **kwargs): """ Helper to iterate ALL values from a single index Yields all the documents. """ is_first = True while True: # Scroll next try: if is_first: # Initialize scroll result = es.search(index=index, scroll=scroll_.. 더보기
[Spark] SQL - 두 컬럼을 병합하여 새로운 Dataframe 만들기 이전 글에서 list 형태로 저장되어있는 데이터를 각각의 row들로 분리하여 두 컬럼을 추출하는 것까지 진행하였다. 다음 순서로 두 컬럼을 이어붙여 아래와 같은 Dataframe을 생성하려고 했으나 pandas의 concat 역할을 수행하는 pyspark의 함수를 찾을 수 없었다. 단순히 join으로는 두 컬럼을 가로로 붙일 수 없다. 대안 두 데이터 프레임에 monotonically_increasing_id() 를 이용하여 id를 부여한다. id를 기준으로 left join 한다. id를 삭제한다. df1 = df1 .withColumn("id", monotonically_increasing_id()) df2 = df2 .withColumn("id", monotonically_increasing_id(.. 더보기
[Spark] SQL - explode()를 사용하여 list 형태의 Row 분리하기 원본 데이터셋은 위와 같이 되어있으며 여기서 Brand와 Product 칼럼을 추출하고자 하였다. 하지만 select 결과 아래와 같이 Row들이 list형태로 들어가있었다. > df.select(df.Brand, df.Product) +--------------------+--------------------+ | Brand| Product| +--------------------+--------------------+ |[LO, LO, LO, Non-...|[AirPods Pro, Air...| +--------------------+--------------------+ pyspark.sql.functions.explode explode함수는 주어진 배열 또는 맵의 각 요소에 대해 새 행을 반환한다.. 더보기