[TF FE] Test Switch and Merge support by TF FE (#16255)

This commit is contained in:
Roman Kazantsev 2023-03-14 00:50:47 +04:00 committed by GitHub
parent 3b71286f1d
commit dcc8a36d88
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -21,14 +21,9 @@ class TestIfFloat(CommonTFLayerTest):
inputs_data['y'] = np.random.randint(-50, 50, y_shape).astype(np.float32)
return inputs_data
def create_if_net(self, x_shape, y_shape):
tf.compat.v1.reset_default_graph()
# Create the graph and model
with tf.compat.v1.Session() as sess:
cond = tf.compat.v1.placeholder(tf.bool, [], 'cond')
x = tf.compat.v1.placeholder(tf.float32, x_shape, 'x')
y = tf.compat.v1.placeholder(tf.float32, y_shape, 'y')
def create_if_net(self, x_shape, y_shape, lower_control_flow):
from tensorflow.python.framework.convert_to_constants import convert_variables_to_constants_v2
def if_function(cond, x, y):
def then_branch():
output1 = tf.add(x, y)
output2 = tf.multiply(x, y)
@ -43,18 +38,30 @@ class TestIfFloat(CommonTFLayerTest):
return output1, output2, output3
if_output = tf.cond(cond, then_branch, else_branch)
tf.identity(if_output[0], name='output1')
tf.identity(if_output[1], name='output2')
tf.identity(if_output[2], name='output3')
tf.compat.v1.global_variables_initializer()
output1 = tf.identity(if_output[0], name='output1')
output2 = tf.identity(if_output[1], name='output2')
output3 = tf.identity(if_output[2], name='output3')
return output1, output2, output3
tf_net = sess.graph_def
tf_if_graph = tf.function(if_function)
cond = np.random.randint(0, 2, []).astype(bool)
x = np.random.randint(1, 10, x_shape).astype(np.float32)
y = np.random.randint(-50, 50, y_shape).astype(np.float32)
concrete_func = tf_if_graph.get_concrete_function(cond, x, y)
# lower_control_flow defines representation of If operation
# in case of lower_control_flow=True it is decomposed into Switch and Merge nodes
frozen_func = convert_variables_to_constants_v2(concrete_func,
lower_control_flow=lower_control_flow)
tf_net = frozen_func.graph.as_graph_def(add_shapes=True)
return tf_net, None
test_data_basic = [
dict(x_shape=[3], y_shape=[2, 3]),
dict(x_shape=[2, 1, 4], y_shape=[2, 1, 4]),
dict(x_shape=[3], y_shape=[2, 3], lower_control_flow=False),
dict(x_shape=[2, 1, 4], y_shape=[2, 1, 4], lower_control_flow=False),
pytest.param(dict(x_shape=[2, 1, 4], y_shape=[2, 1, 4], lower_control_flow=True),
marks=pytest.mark.xfail(reason="92632"))
]
@pytest.mark.parametrize("params", test_data_basic)
@ -83,14 +90,9 @@ class TestIfInt(CommonTFLayerTest):
inputs_data['y'] = np.random.randint(-50, 50, y_shape).astype(np.float32)
return inputs_data
def create_if_net(self, ind_shape, y_shape):
tf.compat.v1.reset_default_graph()
# Create the graph and model
with tf.compat.v1.Session() as sess:
cond = tf.compat.v1.placeholder(tf.bool, [], 'cond')
ind = tf.compat.v1.placeholder(tf.int32, ind_shape, 'ind')
y = tf.compat.v1.placeholder(tf.float32, y_shape, 'y')
def create_if_net(self, ind_shape, y_shape, lower_control_flow):
from tensorflow.python.framework.convert_to_constants import convert_variables_to_constants_v2
def if_function(cond, ind, y):
def then_branch():
const_one = tf.constant(1, dtype=tf.int32)
output1 = tf.add(ind, const_one)
@ -106,18 +108,31 @@ class TestIfInt(CommonTFLayerTest):
return output1, output2, output3
if_output = tf.cond(cond, then_branch, else_branch)
tf.identity(if_output[0], name='output1')
tf.identity(if_output[1], name='output2')
tf.identity(if_output[2], name='output3')
tf.compat.v1.global_variables_initializer()
output1 = tf.identity(if_output[0], name='output1')
output2 = tf.identity(if_output[1], name='output2')
output3 = tf.identity(if_output[2], name='output3')
return output1, output2, output3
tf_net = sess.graph_def
tf_if_graph = tf.function(if_function)
cond = np.random.randint(0, 2, []).astype(bool)
ind = np.random.randint(1, 10, ind_shape).astype(np.int32)
y = np.random.randint(-50, 50, y_shape).astype(np.float32)
concrete_func = tf_if_graph.get_concrete_function(cond, ind, y)
return tf_net, None
# lower_control_flow defines representation of If operation
# in case of lower_control_flow=True it is decomposed into Switch and Merge nodes
frozen_func = convert_variables_to_constants_v2(concrete_func,
lower_control_flow=lower_control_flow)
graph_def = frozen_func.graph.as_graph_def(add_shapes=True)
return graph_def, None
test_data_basic = [
dict(ind_shape=[3], y_shape=[2, 3]),
dict(ind_shape=[2, 1, 4], y_shape=[2, 1, 4]),
dict(ind_shape=[3], y_shape=[2, 3], lower_control_flow=False),
dict(ind_shape=[2, 1, 4], y_shape=[2, 1, 4], lower_control_flow=False),
pytest.param(dict(ind_shape=[2, 1, 4], y_shape=[2, 1, 4], lower_control_flow=True),
marks=pytest.mark.xfail(reason="92632"))
]
@pytest.mark.parametrize("params", test_data_basic)