Glue Studio Notebook

You are now running a Glue Studio notebook; before you can start using your notebook you must start an interactive session.

Available Magics

Magic Type Description
%%configure Dictionary A json-formatted dictionary consisting of all configuration parameters for a session. Each parameter can be specified here or through individual magics.
%profile String Specify a profile in your aws configuration to use as the credentials provider.
%iam_role String Specify an IAM role to execute your session with.
%region String Specify the AWS region in which to initialize a session.
%session_id String Returns the session ID for the running session.
%connections List Specify a comma separated list of connections to use in the session.
%additional_python_modules List Comma separated list of pip packages, s3 paths or private pip arguments.
%extra_py_files List Comma separated list of additional Python files from S3.
%extra_jars List Comma separated list of additional Jars to include in the cluster.
%number_of_workers Integer The number of workers of a defined worker_type that are allocated when a job runs. worker_type must be set too.
%glue_version String The version of Glue to be used by this session. Currently, the only valid options are 2.0 and 3.0 (eg: %glue_version 2.0).
%security_config String Define a security configuration to be used with this session.
%sql String Run SQL code. All lines after the initial %%sql magic will be passed as part of the SQL code.
%streaming String Changes the session type to Glue Streaming.
%etl String Changes the session type to Glue ETL.
%status Returns the status of the current Glue session including its duration, configuration and executing user / role.
%stop_session Stops the current session.
%list_sessions Lists all currently running sessions by name and ID.
%worker_type String Standard, G.1X, or G.2X. number_of_workers must be set too. Default is G.1X.
%spark_conf String Specify custom spark configurations for your session. E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer.
In [32]:
import datetime

In [38]:
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import *

In [1]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.37.0 
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::691262992979:role/glue-notebook
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: 136cd18c-8303-4545-8a1d-84012dada91c
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.37.0
--enable-glue-datacatalog true
Waiting for session 136cd18c-8303-4545-8a1d-84012dada91c to get into ready status...
Session 136cd18c-8303-4545-8a1d-84012dada91c has been created.

In [ ]:
 
In [3]:
json_fpath = "s3://acc-lakeformation-spec/db_dummy_json/"

In [5]:
df_json = glueContext.create_dynamic_frame_from_options("s3",
                        {'paths': [json_fpath],
                         'recurse': True,
                         'groupFiles': 'inPartition',
                         'groupSize': '1048576'},
                         format='json')

In [11]:
df = df_json.toDF().toPandas()

In [13]:
df.head()
     serial   status                start
0  e6dfa2c0  RUNNING  2022-10-31 23:38:00
1  6471f026  RUNNING  2022-10-31 23:37:00
2  aea6878b  RUNNING  2022-10-31 23:44:00
3  3a0d9861  RUNNING  2022-10-31 23:48:00
4  80a42904  RUNNING  2022-10-31 23:22:00
In [14]:
df2 = df.drop_duplicates(['serial'], keep='last')

In [15]:
df2
         serial    status                start
75     e6dfa2c0  FINISHED  2022-10-31 23:38:00
76     6471f026  FINISHED  2022-10-31 23:37:00
77     aea6878b  FINISHED  2022-10-31 23:44:00
79     80a42904  FINISHED  2022-10-31 23:22:00
80     038cee43  FINISHED  2022-10-31 23:25:00
...         ...       ...                  ...
62461  1f083c31   RUNNING  2022-12-31 22:13:00
62462  2192ef39   RUNNING  2022-12-31 22:29:00
62463  65e9d3ba   RUNNING  2022-12-31 22:22:00
62464  e6a26261   RUNNING  2022-12-31 22:25:00
62465  aa976927   RUNNING  2022-12-31 22:17:00

[15497 rows x 3 columns]
In [35]:
df2['anomesdia'] = df2['start'].apply(lambda x: datetime.datetime.fromisoformat(x).strftime('anomesdia=%Y-%m-%d'))
<string>:1: SettingWithCopyWarning: 
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
In [36]:
df2.head()
      serial    status                start             anomesdia
75  e6dfa2c0  FINISHED  2022-10-31 23:38:00  anomesdia=2022-10-31
76  6471f026  FINISHED  2022-10-31 23:37:00  anomesdia=2022-10-31
77  aea6878b  FINISHED  2022-10-31 23:44:00  anomesdia=2022-10-31
79  80a42904  FINISHED  2022-10-31 23:22:00  anomesdia=2022-10-31
80  038cee43  FINISHED  2022-10-31 23:25:00  anomesdia=2022-10-31
In [16]:
df_json.printSchema()
root
|-- serial: string
|-- status: string
|-- start: string
In [19]:
df_df = df_json.toDF()

In [21]:
df_df.schema
StructType(List(StructField(serial,StringType,true),StructField(status,StringType,true),StructField(start,StringType,true)))
In [ ]:
 
In [23]:
df3 = spark.createDataFrame(df2, df_json.toDF().schema)

In [24]:
df3.show()
+--------+--------+-------------------+
|  serial|  status|              start|
+--------+--------+-------------------+
|e6dfa2c0|FINISHED|2022-10-31 23:38:00|
|6471f026|FINISHED|2022-10-31 23:37:00|
|aea6878b|FINISHED|2022-10-31 23:44:00|
|80a42904|FINISHED|2022-10-31 23:22:00|
|038cee43|FINISHED|2022-10-31 23:25:00|
|33921912|FINISHED|2022-10-31 23:32:00|
|6cce62eb|FINISHED|2022-10-31 23:43:00|
|3a0d9861|FINISHED|2022-10-31 23:48:00|
|2702c9a2|FINISHED|2022-11-01 00:40:00|
|463276a2|FINISHED|2022-11-01 00:48:00|
|9e1fbac9|FINISHED|2022-11-01 00:44:00|
|5b6b5f90|FINISHED|2022-11-01 00:38:00|
|295cf480|FINISHED|2022-11-01 00:50:00|
|e1b9e568|FINISHED|2022-11-01 00:08:00|
|c8641c2b|FINISHED|2022-11-01 00:50:00|
|d92f5f54|FINISHED|2022-11-01 00:37:00|
|66438b64|FINISHED|2022-11-01 00:12:00|
|dd9865f6|FINISHED|2022-11-01 00:29:00|
|36fcc5d6|FINISHED|2022-11-01 00:10:00|
|26fc7a5d|FINISHED|2022-11-01 00:36:00|
+--------+--------+-------------------+
only showing top 20 rows
In [28]:
df3.count()
15497
In [30]:
df3['start'].apply(lambda x: x)
TypeError: 'Column' object is not callable
In [45]:
df4 = df3.select(
    df3['serial'].cast(StringType()).alias('serial'),
    df3['status'].cast(StringType()).alias('status'),
    df3['start'].cast(StringType()).alias('start'),
    df3['start'].cast(DateType()).alias('anomesdia')
)

In [46]:
df4.show()
+--------+--------+-------------------+----------+
|  serial|  status|              start| anomesdia|
+--------+--------+-------------------+----------+
|e6dfa2c0|FINISHED|2022-10-31 23:38:00|2022-10-31|
|6471f026|FINISHED|2022-10-31 23:37:00|2022-10-31|
|aea6878b|FINISHED|2022-10-31 23:44:00|2022-10-31|
|80a42904|FINISHED|2022-10-31 23:22:00|2022-10-31|
|038cee43|FINISHED|2022-10-31 23:25:00|2022-10-31|
|33921912|FINISHED|2022-10-31 23:32:00|2022-10-31|
|6cce62eb|FINISHED|2022-10-31 23:43:00|2022-10-31|
|3a0d9861|FINISHED|2022-10-31 23:48:00|2022-10-31|
|2702c9a2|FINISHED|2022-11-01 00:40:00|2022-11-01|
|463276a2|FINISHED|2022-11-01 00:48:00|2022-11-01|
|9e1fbac9|FINISHED|2022-11-01 00:44:00|2022-11-01|
|5b6b5f90|FINISHED|2022-11-01 00:38:00|2022-11-01|
|295cf480|FINISHED|2022-11-01 00:50:00|2022-11-01|
|e1b9e568|FINISHED|2022-11-01 00:08:00|2022-11-01|
|c8641c2b|FINISHED|2022-11-01 00:50:00|2022-11-01|
|d92f5f54|FINISHED|2022-11-01 00:37:00|2022-11-01|
|66438b64|FINISHED|2022-11-01 00:12:00|2022-11-01|
|dd9865f6|FINISHED|2022-11-01 00:29:00|2022-11-01|
|36fcc5d6|FINISHED|2022-11-01 00:10:00|2022-11-01|
|26fc7a5d|FINISHED|2022-11-01 00:36:00|2022-11-01|
+--------+--------+-------------------+----------+
only showing top 20 rows
In [29]:
path = 's3://acc-lakeformation-spec/db_dummy_parquet/'

In [48]:
df4.repartition(1).write.mode('append').format('parquet').partitionBy('anomesdia').save(path)

In [ ]: