Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
Hertz-Lab
Research
Intelligent Museum
language-identification
Commits
67045203
Commit
67045203
authored
Aug 03, 2021
by
Paul Bethge
Browse files
add assertions
parent
3278d06b
Changes
1
Hide whitespace changes
Inline
Side-by-side
src/models/transformer_utils.py
View file @
67045203
...
...
@@ -18,195 +18,195 @@ import tensorflow as tf
def
get_angles
(
pos
,
i
,
d_model
):
angle_rates
=
1
/
(
np
.
power
(
10000
,
(
2
*
(
i
//
2
))
/
(
np
.
float32
(
d_model
))
+
K
.
epsilon
())
+
K
.
epsilon
())
return
pos
*
angle_rates
angle_rates
=
1
/
(
np
.
power
(
10000
,
(
2
*
(
i
//
2
))
/
(
np
.
float32
(
d_model
))
+
K
.
epsilon
())
+
K
.
epsilon
())
return
pos
*
angle_rates
def
positional_encoding
(
position
,
d_model
):
angle_rads
=
get_angles
(
np
.
arange
(
position
)[:,
np
.
newaxis
],
np
.
arange
(
d_model
)[
np
.
newaxis
,
:],
d_model
)
angle_rads
=
get_angles
(
np
.
arange
(
position
)[:,
np
.
newaxis
],
np
.
arange
(
d_model
)[
np
.
newaxis
,
:],
d_model
)
# apply sin to even indices in the array; 2i
angle_rads
[:,
0
::
2
]
=
np
.
sin
(
angle_rads
[:,
0
::
2
])
# apply sin to even indices in the array; 2i
angle_rads
[:,
0
::
2
]
=
np
.
sin
(
angle_rads
[:,
0
::
2
])
# apply cos to odd indices in the array; 2i+1
angle_rads
[:,
1
::
2
]
=
np
.
cos
(
angle_rads
[:,
1
::
2
])
# apply cos to odd indices in the array; 2i+1
angle_rads
[:,
1
::
2
]
=
np
.
cos
(
angle_rads
[:,
1
::
2
])
pos_encoding
=
angle_rads
[
np
.
newaxis
,
...]
pos_encoding
=
angle_rads
[
np
.
newaxis
,
...]
return
tf
.
cast
(
pos_encoding
,
dtype
=
tf
.
float32
)
return
tf
.
cast
(
pos_encoding
,
dtype
=
tf
.
float32
)
def
scaled_dot_product_attention
(
q
,
k
,
v
,
mask
):
"""Calculate the attention weights.
q, k, v must have matching leading dimensions.
k, v must have matching penultimate dimension, i.e.: seq_len_k = seq_len_v.
The mask has different shapes depending on its type(padding or look ahead)
but it must be broadcastable for addition.
"""Calculate the attention weights.
q, k, v must have matching leading dimensions.
k, v must have matching penultimate dimension, i.e.: seq_len_k = seq_len_v.
The mask has different shapes depending on its type(padding or look ahead)
but it must be broadcastable for addition.
Args:
q: query shape == (..., seq_len_q, depth)
k: key shape == (..., seq_len_k, depth)
v: value shape == (..., seq_len_v, depth_v)
mask: Float tensor with shape broadcastable
to (..., seq_len_q, seq_len_k). Defaults to None.
Args:
q: query shape == (..., seq_len_q, depth)
k: key shape == (..., seq_len_k, depth)
v: value shape == (..., seq_len_v, depth_v)
mask: Float tensor with shape broadcastable
to (..., seq_len_q, seq_len_k). Defaults to None.
Returns:
output, attention_weights
"""
Returns:
output, attention_weights
"""
matmul_qk
=
tf
.
matmul
(
q
,
k
,
transpose_b
=
True
)
# (..., seq_len_q, seq_len_k)
matmul_qk
=
tf
.
matmul
(
q
,
k
,
transpose_b
=
True
)
# (..., seq_len_q, seq_len_k)
# scale matmul_qk
dk
=
tf
.
cast
(
tf
.
shape
(
k
)[
-
1
],
tf
.
float32
)
scaled_attention_logits
=
matmul_qk
/
(
tf
.
math
.
sqrt
(
dk
)
+
K
.
epsilon
())
# scale matmul_qk
dk
=
tf
.
cast
(
tf
.
shape
(
k
)[
-
1
],
tf
.
float32
)
scaled_attention_logits
=
matmul_qk
/
(
tf
.
math
.
sqrt
(
dk
)
+
K
.
epsilon
())
# add the mask to the scaled tensor.
if
mask
is
not
None
:
scaled_attention_logits
+=
mask
*
-
1e9
# add the mask to the scaled tensor.
if
mask
is
not
None
:
scaled_attention_logits
+=
mask
*
-
1e9
# softmax is normalized on the last axis (seq_len_k) so that the scores
# add up to 1.
attention_weights
=
tf
.
nn
.
softmax
(
scaled_attention_logits
,
axis
=-
1
)
# (..., seq_len_q, seq_len_k)
# softmax is normalized on the last axis (seq_len_k) so that the scores
# add up to 1.
attention_weights
=
tf
.
nn
.
softmax
(
scaled_attention_logits
,
axis
=-
1
)
# (..., seq_len_q, seq_len_k)
output
=
tf
.
matmul
(
attention_weights
,
v
)
# (..., seq_len_q, depth_v)
output
=
tf
.
matmul
(
attention_weights
,
v
)
# (..., seq_len_q, depth_v)
return
output
,
attention_weights
return
output
,
attention_weights
class
MultiHeadAttention
(
tf
.
keras
.
layers
.
Layer
):
def
__init__
(
self
,
d_model
,
num_heads
):
super
(
MultiHeadAttention
,
self
).
__init__
()
self
.
num_heads
=
num_heads
self
.
d_model
=
d_model
def
__init__
(
self
,
d_model
,
num_heads
):
super
(
MultiHeadAttention
,
self
).
__init__
()
self
.
num_heads
=
num_heads
self
.
d_model
=
d_model
assert
d_model
%
self
.
num_heads
==
0
assert
d_model
%
self
.
num_heads
==
0
self
.
depth
=
d_model
//
self
.
num_heads
self
.
depth
=
d_model
//
self
.
num_heads
self
.
wq
=
tf
.
keras
.
layers
.
Dense
(
d_model
)
self
.
wk
=
tf
.
keras
.
layers
.
Dense
(
d_model
)
self
.
wv
=
tf
.
keras
.
layers
.
Dense
(
d_model
)
self
.
wq
=
tf
.
keras
.
layers
.
Dense
(
d_model
)
self
.
wk
=
tf
.
keras
.
layers
.
Dense
(
d_model
)
self
.
wv
=
tf
.
keras
.
layers
.
Dense
(
d_model
)
self
.
dense
=
tf
.
keras
.
layers
.
Dense
(
d_model
)
self
.
dense
=
tf
.
keras
.
layers
.
Dense
(
d_model
)
def
split_heads
(
self
,
x
,
batch_size
):
"""Split the last dimension into (num_heads, depth).
Transpose the result such that the shape is (batch_size, num_heads, seq_len, depth)
"""
x
=
tf
.
reshape
(
x
,
(
batch_size
,
-
1
,
self
.
num_heads
,
self
.
depth
))
return
tf
.
transpose
(
x
,
perm
=
[
0
,
2
,
1
,
3
])
def
split_heads
(
self
,
x
,
batch_size
):
"""Split the last dimension into (num_heads, depth).
Transpose the result such that the shape is (batch_size, num_heads, seq_len, depth)
"""
x
=
tf
.
reshape
(
x
,
(
batch_size
,
-
1
,
self
.
num_heads
,
self
.
depth
))
return
tf
.
transpose
(
x
,
perm
=
[
0
,
2
,
1
,
3
])
def
call
(
self
,
v
,
k
,
q
,
mask
=
None
):
batch_size
=
tf
.
shape
(
q
)[
0
]
def
call
(
self
,
v
,
k
,
q
,
mask
=
None
):
batch_size
=
tf
.
shape
(
q
)[
0
]
q
=
self
.
wq
(
q
)
# (batch_size, seq_len, d_model)
k
=
self
.
wk
(
k
)
# (batch_size, seq_len, d_model)
v
=
self
.
wv
(
v
)
# (batch_size, seq_len, d_model)
q
=
self
.
wq
(
q
)
# (batch_size, seq_len, d_model)
k
=
self
.
wk
(
k
)
# (batch_size, seq_len, d_model)
v
=
self
.
wv
(
v
)
# (batch_size, seq_len, d_model)
q
=
self
.
split_heads
(
q
,
batch_size
)
# (batch_size, num_heads, seq_len_q, depth)
k
=
self
.
split_heads
(
k
,
batch_size
)
# (batch_size, num_heads, seq_len_k, depth)
v
=
self
.
split_heads
(
v
,
batch_size
)
# (batch_size, num_heads, seq_len_v, depth)
q
=
self
.
split_heads
(
q
,
batch_size
)
# (batch_size, num_heads, seq_len_q, depth)
k
=
self
.
split_heads
(
k
,
batch_size
)
# (batch_size, num_heads, seq_len_k, depth)
v
=
self
.
split_heads
(
v
,
batch_size
)
# (batch_size, num_heads, seq_len_v, depth)
# scaled_attention.shape == (batch_size, num_heads, seq_len_q, depth)
# attention_weights.shape == (batch_size, num_heads, seq_len_q, seq_len_k)
scaled_attention
,
attention_weights
=
scaled_dot_product_attention
(
q
,
k
,
v
,
mask
)
# scaled_attention.shape == (batch_size, num_heads, seq_len_q, depth)
# attention_weights.shape == (batch_size, num_heads, seq_len_q, seq_len_k)
scaled_attention
,
attention_weights
=
scaled_dot_product_attention
(
q
,
k
,
v
,
mask
)
scaled_attention
=
tf
.
transpose
(
scaled_attention
,
perm
=
[
0
,
2
,
1
,
3
]
)
# (batch_size, seq_len_q, num_heads, depth)
scaled_attention
=
tf
.
transpose
(
scaled_attention
,
perm
=
[
0
,
2
,
1
,
3
]
)
# (batch_size, seq_len_q, num_heads, depth)
concat_attention
=
tf
.
reshape
(
scaled_attention
,
(
batch_size
,
-
1
,
self
.
d_model
)
)
# (batch_size, seq_len_q, d_model)
concat_attention
=
tf
.
reshape
(
scaled_attention
,
(
batch_size
,
-
1
,
self
.
d_model
)
)
# (batch_size, seq_len_q, d_model)
output
=
self
.
dense
(
concat_attention
)
# (batch_size, seq_len_q, d_model)
output
=
self
.
dense
(
concat_attention
)
# (batch_size, seq_len_q, d_model)
return
output
,
attention_weights
return
output
,
attention_weights
def
point_wise_feed_forward_network
(
d_model
,
dff
):
return
tf
.
keras
.
Sequential
(
[
tf
.
keras
.
layers
.
Dense
(
dff
,
activation
=
"relu"
),
# (batch_size, seq_len, dff)
tf
.
keras
.
layers
.
Dense
(
d_model
),
# (batch_size, seq_len, d_model)
]
)
return
tf
.
keras
.
Sequential
(
[
tf
.
keras
.
layers
.
Dense
(
dff
,
activation
=
"relu"
),
# (batch_size, seq_len, dff)
tf
.
keras
.
layers
.
Dense
(
d_model
),
# (batch_size, seq_len, d_model)
]
)
class
EncoderLayer
(
tf
.
keras
.
layers
.
Layer
):
def
__init__
(
self
,
d_model
,
num_heads
,
dff
,
rate
=
0.1
):
super
(
EncoderLayer
,
self
).
__init__
()
def
__init__
(
self
,
d_model
,
num_heads
,
dff
,
rate
=
0.1
):
super
(
EncoderLayer
,
self
).
__init__
()
self
.
mha
=
MultiHeadAttention
(
d_model
,
num_heads
)
self
.
ffn
=
point_wise_feed_forward_network
(
d_model
,
dff
)
self
.
mha
=
MultiHeadAttention
(
d_model
,
num_heads
)
self
.
ffn
=
point_wise_feed_forward_network
(
d_model
,
dff
)
self
.
layernorm1
=
tf
.
keras
.
layers
.
LayerNormalization
(
epsilon
=
1e-6
)
self
.
layernorm2
=
tf
.
keras
.
layers
.
LayerNormalization
(
epsilon
=
1e-6
)
self
.
layernorm1
=
tf
.
keras
.
layers
.
LayerNormalization
(
epsilon
=
1e-6
)
self
.
layernorm2
=
tf
.
keras
.
layers
.
LayerNormalization
(
epsilon
=
1e-6
)
self
.
dropout1
=
tf
.
keras
.
layers
.
Dropout
(
rate
)
self
.
dropout2
=
tf
.
keras
.
layers
.
Dropout
(
rate
)
self
.
dropout1
=
tf
.
keras
.
layers
.
Dropout
(
rate
)
self
.
dropout2
=
tf
.
keras
.
layers
.
Dropout
(
rate
)
def
call
(
self
,
x
,
training
=
None
,
mask
=
None
):
attn_output
,
_
=
self
.
mha
(
x
,
x
,
x
,
mask
)
# (batch_size, input_seq_len, d_model)
attn_output
=
self
.
dropout1
(
attn_output
,
training
=
training
)
out1
=
self
.
layernorm1
(
x
+
attn_output
)
# (batch_size, input_seq_len, d_model)
def
call
(
self
,
x
,
training
=
None
,
mask
=
None
):
attn_output
,
_
=
self
.
mha
(
x
,
x
,
x
,
mask
)
# (batch_size, input_seq_len, d_model)
attn_output
=
self
.
dropout1
(
attn_output
,
training
=
training
)
out1
=
self
.
layernorm1
(
x
+
attn_output
)
# (batch_size, input_seq_len, d_model)
ffn_output
=
self
.
ffn
(
out1
)
# (batch_size, input_seq_len, d_model)
ffn_output
=
self
.
dropout2
(
ffn_output
,
training
=
training
)
out2
=
self
.
layernorm2
(
out1
+
ffn_output
)
# (batch_size, input_seq_len, d_model)
ffn_output
=
self
.
ffn
(
out1
)
# (batch_size, input_seq_len, d_model)
ffn_output
=
self
.
dropout2
(
ffn_output
,
training
=
training
)
out2
=
self
.
layernorm2
(
out1
+
ffn_output
)
# (batch_size, input_seq_len, d_model)
return
out2
return
out2
class
Encoder
(
tf
.
keras
.
layers
.
Layer
):
def
__init__
(
self
,
num_layers
,
d_model
,
num_heads
,
dff
,
maximum_position_encoding
,
rate
=
0.1
,
):
super
(
Encoder
,
self
).
__init__
()
def
__init__
(
self
,
num_layers
,
d_model
,
num_heads
,
dff
,
maximum_position_encoding
,
rate
=
0.1
,
):
super
(
Encoder
,
self
).
__init__
()
self
.
d_model
=
d_model
self
.
num_layers
=
num_layers
self
.
d_model
=
d_model
self
.
num_layers
=
num_layers
self
.
pos_encoding
=
positional_encoding
(
maximum_position_encoding
,
self
.
d_model
)
self
.
pos_encoding
=
positional_encoding
(
maximum_position_encoding
,
self
.
d_model
)
self
.
enc_layers
=
[
EncoderLayer
(
d_model
,
num_heads
,
dff
,
rate
)
for
_
in
range
(
num_layers
)
]
self
.
enc_layers
=
[
EncoderLayer
(
d_model
,
num_heads
,
dff
,
rate
)
for
_
in
range
(
num_layers
)
]
self
.
dropout
=
tf
.
keras
.
layers
.
Dropout
(
rate
)
self
.
dropout
=
tf
.
keras
.
layers
.
Dropout
(
rate
)
def
call
(
self
,
x
,
training
=
None
,
mask
=
None
):
seq_len
=
tf
.
shape
(
x
)[
1
]
def
call
(
self
,
x
,
training
=
None
,
mask
=
None
):
seq_len
=
tf
.
shape
(
x
)[
1
]
x
*=
tf
.
math
.
sqrt
(
tf
.
cast
(
self
.
d_model
,
tf
.
float32
))
x
+=
self
.
pos_encoding
[:,
:
seq_len
,
:]
x
*=
tf
.
math
.
sqrt
(
tf
.
cast
(
self
.
d_model
,
tf
.
float32
))
x
+=
self
.
pos_encoding
[:,
:
seq_len
,
:]
x
=
self
.
dropout
(
x
,
training
=
training
)
x
=
self
.
dropout
(
x
,
training
=
training
)
for
i
in
range
(
self
.
num_layers
):
x
=
self
.
enc_layers
[
i
](
x
,
training
,
mask
)
for
i
in
range
(
self
.
num_layers
):
x
=
self
.
enc_layers
[
i
](
x
,
training
,
mask
)
return
x
# (batch_size, input_seq_len, d_model)
return
x
# (batch_size, input_seq_len, d_model)
#### https://github.com/CVxTz/music_genre_classification/blob/master/code/models.py
from
tensorflow.keras.layers
import
(
Input
,
GlobalAvgPool1D
,
Dense
,
Bidirectional
,
GRU
,
Dropout
,
Input
,
GlobalAvgPool1D
,
Dense
,
Bidirectional
,
GRU
,
Dropout
,
)
from
tensorflow.keras.models
import
Model
from
tensorflow.keras.optimizers
import
Adam
...
...
@@ -218,60 +218,60 @@ from tensorflow.keras.losses import mae
def
custom_binary_accuracy
(
y_true
,
y_pred
,
threshold
=
0.5
):
threshold
=
math_ops
.
cast
(
threshold
,
y_pred
.
dtype
)
y_pred
=
math_ops
.
cast
(
y_pred
>
threshold
,
y_pred
.
dtype
)
y_true
=
math_ops
.
cast
(
y_true
>
threshold
,
y_true
.
dtype
)
threshold
=
math_ops
.
cast
(
threshold
,
y_pred
.
dtype
)
y_pred
=
math_ops
.
cast
(
y_pred
>
threshold
,
y_pred
.
dtype
)
y_true
=
math_ops
.
cast
(
y_true
>
threshold
,
y_true
.
dtype
)
return
K
.
mean
(
math_ops
.
equal
(
y_true
,
y_pred
),
axis
=-
1
)
return
K
.
mean
(
math_ops
.
equal
(
y_true
,
y_pred
),
axis
=-
1
)
def
custom_binary_crossentropy
(
y_true
,
y_pred
):
y_pred
=
ops
.
convert_to_tensor
(
y_pred
)
y_true
=
math_ops
.
cast
(
y_true
,
y_pred
.
dtype
)
epsilon_
=
K
.
_constant_to_tensor
(
K
.
epsilon
(),
y_pred
.
dtype
.
base_dtype
)
output
=
clip_ops
.
clip_by_value
(
y_pred
,
epsilon_
,
1.0
-
epsilon_
)
y_pred
=
ops
.
convert_to_tensor
(
y_pred
)
y_true
=
math_ops
.
cast
(
y_true
,
y_pred
.
dtype
)
epsilon_
=
K
.
_constant_to_tensor
(
K
.
epsilon
(),
y_pred
.
dtype
.
base_dtype
)
output
=
clip_ops
.
clip_by_value
(
y_pred
,
epsilon_
,
1.0
-
epsilon_
)
# Compute cross entropy from probabilities.
bce
=
4
*
y_true
*
math_ops
.
log
(
output
+
K
.
epsilon
())
bce
+=
(
1
-
y_true
)
*
math_ops
.
log
(
1
-
output
+
K
.
epsilon
())
return
K
.
sum
(
-
bce
,
axis
=-
1
)
# Compute cross entropy from probabilities.
bce
=
4
*
y_true
*
math_ops
.
log
(
output
+
K
.
epsilon
())
bce
+=
(
1
-
y_true
)
*
math_ops
.
log
(
1
-
output
+
K
.
epsilon
())
return
K
.
sum
(
-
bce
,
axis
=-
1
)
def
transformer_classifier
(
num_layers
=
4
,
d_model
=
128
,
num_heads
=
8
,
dff
=
256
,
maximum_position_encoding
=
2048
,
n_classes
=
16
):
num_layers
=
4
,
d_model
=
128
,
num_heads
=
8
,
dff
=
256
,
maximum_position_encoding
=
2048
,
n_classes
=
16
):
inp
=
Input
((
None
,
d_model
))
inp
=
Input
((
None
,
d_model
))
tf
.
debugging
.
assert_all_finite
(
inp
,
"input nan"
)
encoder
=
Encoder
(
num_layers
=
num_layers
,
d_model
=
d_model
,
num_heads
=
num_heads
,
dff
=
dff
,
maximum_position_encoding
=
maximum_position_encoding
,
rate
=
0.3
,
)
encoder
=
Encoder
(
num_layers
=
num_layers
,
d_model
=
d_model
,
num_heads
=
num_heads
,
dff
=
dff
,
maximum_position_encoding
=
maximum_position_encoding
,
rate
=
0.3
,
)
x
=
encoder
(
inp
)
x
=
encoder
(
inp
)
tf
.
debugging
.
assert_all_finite
(
x
,
"enc nan"
)
x
=
Dropout
(
0.2
)(
x
)
x
=
Dropout
(
0.2
)(
x
)
tf
.
debugging
.
assert_all_finite
(
x
,
"drop nan"
)
x
=
GlobalAvgPool1D
()(
x
)
x
=
GlobalAvgPool1D
()(
x
)
tf
.
debugging
.
assert_all_finite
(
x
,
"globavg nan"
)
x
=
Dense
(
4
*
n_classes
,
activation
=
"selu"
)(
x
)
x
=
Dense
(
4
*
n_classes
,
activation
=
"selu"
)(
x
)
tf
.
debugging
.
assert_all_finite
(
x
,
"selu nan"
)
out
=
Dense
(
n_classes
,
activation
=
"sigmoid"
)(
x
)
out
=
Dense
(
n_classes
,
activation
=
"sigmoid"
)(
x
)
tf
.
debugging
.
assert_all_finite
(
out
,
"output nan"
)
model
=
Model
(
inputs
=
inp
,
outputs
=
out
)
model
=
Model
(
inputs
=
inp
,
outputs
=
out
)
# model.compile(
# optimizer=opt, loss=custom_binary_crossentropy, metrics=[custom_binary_accuracy]
# )
# model.summary()
return
model
\ No newline at end of file
return
model
\ No newline at end of file
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment