/
rnn.py
87 lines (75 loc) · 3.9 KB
/
rnn.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import tensorflow as tf
class RNN:
def __init__(self, sequence_length, num_classes, vocab_size, embedding_size,
cell_type, hidden_size, l2_reg_lambda=0.0):
# Placeholders for input, output and dropout
self.input_text = tf.placeholder(tf.int32, shape=[None, sequence_length], name='input_text')
self.input_y = tf.placeholder(tf.float32, shape=[None, num_classes], name='input_y')
self.dropout_keep_prob = tf.placeholder(tf.float32, name='dropout_keep_prob')
l2_loss = tf.constant(0.0)
text_length = self._length(self.input_text)
# Embedding layer
with tf.device('/cpu:0'), tf.name_scope("text-embedding"):
self.W_text = tf.Variable(tf.random_uniform([vocab_size, embedding_size], -1.0, 1.0), name="W_text")
self.embedded_chars = tf.nn.embedding_lookup(self.W_text, self.input_text)
# Recurrent Neural Network
with tf.name_scope("rnn"):
cell = self._get_cell(hidden_size, cell_type)
cell = tf.nn.rnn_cell.DropoutWrapper(cell, output_keep_prob=self.dropout_keep_prob)
all_outputs, _ = tf.nn.dynamic_rnn(cell=cell,
inputs=self.embedded_chars,
sequence_length=text_length,
dtype=tf.float32)
self.h_outputs = self.last_relevant(all_outputs, text_length)
# Final scores and predictions
with tf.name_scope("output"):
W = tf.get_variable("W", shape=[hidden_size, num_classes], initializer=tf.contrib.layers.xavier_initializer())
b = tf.Variable(tf.constant(0.1, shape=[num_classes]), name="b")
l2_loss += tf.nn.l2_loss(W)
l2_loss += tf.nn.l2_loss(b)
self.logits = tf.nn.xw_plus_b(self.h_outputs, W, b, name="logits")
self.predictions = tf.argmax(self.logits, 1, name="predictions")
# Calculate mean cross-entropy loss
with tf.name_scope("loss"):
losses = tf.nn.softmax_cross_entropy_with_logits(logits=self.logits, labels=self.input_y)
self.loss = tf.reduce_mean(losses) + l2_reg_lambda * l2_loss
# Accuracy
with tf.name_scope("accuracy"):
correct_predictions = tf.equal(self.predictions, tf.argmax(self.input_y, axis=1))
self.accuracy = tf.reduce_mean(tf.cast(correct_predictions, tf.float32), name="accuracy")
@staticmethod
def _get_cell(hidden_size, cell_type):
if cell_type == "vanilla":
return tf.nn.rnn_cell.BasicRNNCell(hidden_size)
elif cell_type == "lstm":
return tf.nn.rnn_cell.BasicLSTMCell(hidden_size)
elif cell_type == "gru":
return tf.nn.rnn_cell.GRUCell(hidden_size)
else:
print("ERROR: '" + cell_type + "' is a wrong cell type !!!")
return None
# Length of the sequence data
@staticmethod
def _length(seq):
relevant = tf.sign(tf.abs(seq))
length = tf.reduce_sum(relevant, reduction_indices=1)
length = tf.cast(length, tf.int32)
return length
# Extract the output of last cell of each sequence
# Ex) The movie is good -> length = 4
# output = [ [1.314, -3.32, ..., 0.98]
# [0.287, -0.50, ..., 1.55]
# [2.194, -2.12, ..., 0.63]
# [1.938, -1.88, ..., 1.31]
# [ 0.0, 0.0, ..., 0.0]
# ...
# [ 0.0, 0.0, ..., 0.0] ]
# The output we need is 4th output of cell, so extract it.
@staticmethod
def last_relevant(seq, length):
batch_size = tf.shape(seq)[0]
max_length = int(seq.get_shape()[1])
input_size = int(seq.get_shape()[2])
index = tf.range(0, batch_size) * max_length + (length - 1)
flat = tf.reshape(seq, [-1, input_size])
return tf.gather(flat, index)