Python - ESCRIBIR XLSX

 
Vista:

ESCRIBIR XLSX

Publicado por Darwin Garcia (1 intervención) el 11/09/2023 23:15:43
Hola a todos,

Estoy tratanto de modificar este código de manera que en lugar de CSV, me genere XLSX:

# -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE
#Librerias necesarias
import dataiku,pandas as pd,os
from dataiku import pandasutils as pdu
from google.cloud import storage



from dataiku import spark as dkuspark
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col



# -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE
#Definicion de funciones.
def list_blobs(bucket,root):
blobs = bucket.list_blobs()



for blob in blobs:
print(blob.name)



def delete_blobs(bucket,root):
blobs = bucket.list_blobs()



for blob in blobs:
if root[1:] in blob.name and root[1:]+"/" != blob.name:
print("Deleting file: "+bucket_name+ blob.name)
blob.reload() # Fetch blob metadata to use in generation_match_precondition.
generation_match_precondition = blob.generation
blob.delete(if_generation_match=generation_match_precondition)



def rename_lastBlob(bucket,root, new_name):
blobs = bucket.list_blobs()



for blob in blobs:
if root[1:] in blob.name and root[1:]+"/_SUCCESS" == blob.name:
blob.reload()
generation_match_precondition = blob.generation
blob.delete(if_generation_match=generation_match_precondition)



if root[1:] in blob.name and root[1:]+"/" != blob.name and 'part-' in blob.name:
bucket.rename_blob(blob, new_name)



# -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE
#Contexto de PySpark y datos de entrada.
sparkContext = SparkContext.getOrCreate()
sqlContext = SQLContext(sparkContext)



# Create spark session.
sparkSession = SparkSession.builder.appName("PySpark Write csv").master("local").config("spark.driver.memory", "5g").getOrCreate()



# -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE
#Datos de entrada.
input_ds = dataiku.Dataset("CORREOS_NUM_prepared", ignore_flow = True)
spark_df = dkuspark.get_dataframe(sqlContext, input_ds)



# -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE
# Datos de salida del bucket.
output_folder = dataiku.Folder("NMrwIsO2")
bucket_name = output_folder.get_info()["accessInfo"]["bucket"]
storage_clinet = storage.Client()
bucket = storage_clinet.bucket(bucket_name)



root = output_folder.get_info()["accessInfo"]["root"]
output_folderpath ="gs://" + bucket_name + root



# -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE
#Eliminando archivos de ejecuciones anteriores.
delete_blobs(bucket,root)



# -------------------------------------------------------------------------------- NOTEBOOK-CELL: CODE
#Creacion de csv dinamicos.
unique_filepaths = spark_df.select("FilePath").distinct().rdd.flatMap(lambda x: x).collect()



for filepath in unique_filepaths:
filter_df = spark_df.filter(col("FilePath") == filepath)
filter_df = filter_df.drop("FilePath")
filter_df = filter_df.drop("CORREO")
filter_df = filter_df.drop("ID")



filter_df.write.format("csv").option("header",True).mode('append').save(output_folderpath)



new_fileName = (root+filepath)[1:]



rename_lastBlob(bucket,root, new_fileName)
Valora esta pregunta
Me gusta: Está pregunta es útil y esta claraNo me gusta: Está pregunta no esta clara o no es útil
0
Responder