본문 바로가기
  • 우당탕탕속의 잔잔함
Programming/Deep Learning Network

[Code] VNect

by zpstls 2023. 3. 13.
반응형

이번 포스트는 3D Pose Estimation과 관련된 VNect이라는 논문을 TensorFlow를 이용해 구현해 볼 것입니다.

 

 

우선, Pose Estimation과 VNect이라는 논문에 대한 내용은 다음 포스트를 참조해주세요.

 

[Pose Estimation] 2D/3D Pose Estimation에 관한 내용

Computer Vision과 관련된 AI, Deep Learning 분야에서 거의 필수적으로 다루는 주제가 있습니다. 바로 Pose Estimation인데요. 이번 포스트에서는 이 Pose Estimation에 관한 내용을 다루고자 합니다. Pose Estimation

mj-thump-thump-story.tistory.com

 

[Model] VNect과 XNect

이번 포스트는 Pose Estimation과 관련된 논문 중 하나인, VNect과 XNect이라는 것에 대해 다뤄보도록 하겠습니다. 해당 논문은 RGB 카메라를 통해 사람의 자세를 추정하는 방법을 다룬 것입니다. 우선,

mj-thump-thump-story.tistory.com

 

코드 구현 과정은 Top에서 Down 순서로 다룰 것입니다. 이 과정에서 일부 코드(= OpenCV, 기본적인 변환 및 프로세스 등)는 넘어가도록 하겠습니다.

 

VNect의 전반적인 구조는 다음과 같습니다.

Image에서 사람을 검출하고 해당 사람의 2D, 3D Pose를 추정하고 이 결과 Data를 Fitting 하여 사용하는 과정으로 구현되어 있습니다.

 

다음은 각 부분에 관한 자세한 내용입니다.

 

  • 파란 박스 부분

OpenCV의 HOG Detector를 이용하여 Image 속의 사람이 위치한 Boundary Data를 이용해 Original Input Image에서 해당 영역만큼 Crop 합니다. (해당 부분은 OpenCV로 쉽게 구현할 수 있으므로 설명은 생략하도록 하겠습니다.)

 

 

  • 녹색 박스 부분

Main Model 부분으로 다음과 같이 구현할 수 있습니다.

self.input_holder = tf.placeholder(dtype=tf.float32, shape=(None, 368, 368, 3))을 Input으로 하여 ResNet50에 넣습니다. 이때 ResNet50은 res4f까지만 사용하고 이후부터는 아래의 구조대로 구현합니다. (ResNet50 부분은 생략하도록 하겠습니다.)

 

 

  • 빨간 박스 부분

추정된 좌표 값을 제대로 활용하기 위해 Data를 다듬는 과정입니다. 다음과 같은 수학적 연산으로 구성되어 있습니다.

위 연산을 수행하고 마지막으로, 1 Euro Filter를 통해 3D Pose를 Filtering 합니다. 이때 사용한 Parameter 정보는 다음과 같습니다.

 

 

 

 

 

그러면, 이제 Code로 구현해 보도록 하겠습니다.

 

  • Main

다음은 전반적인 Main 부분에 관한 코드입니다. Input Image를 넣으면 최종적으로 추정된 2D Pose와 3D Pose 값을 얻을 수 있습니다.

box_size = 368
hm_factor = 8
joints_sum = 21
scales = [1, 0.85, 0.7]
original_img = cv2.imread('single_human_picture.jpg')

## Extract Persion Only - Cropping
hog = HOGBox()
rect = hog(original_img.copy())
x, y, w, h = rect
img_cropped = original_img [y: y + h, x: x + w, :]


## Estimate Pose - 2D & 3D (Use PreTrained Model)
model_tf = VNect_model()
sess_config = tf.ConfigProto(device_count=gpu_count)
sess = tf.Session(config=sess_config)
saver = tf.train.Saver()
saver.restore(sess, args.model_file)

img_batch, scaler, [offset_x, offset_y] = gen_input_batch(img_cropped, box_size, scales)
[hm, x_hm, y_hm, z_hm] = sess.run(
        [model_tf.heapmap, model_tf.x_heatmap, model_tf.y_heatmap, model_tf.z_heatmap],
        feed_dict={model_tf.input_holder: input_batch})


## Skeleton Fitting
# averaging the outputs with different scales
joints_sum = 21
hm_size = self.box_size // self.hm_factor
hm_avg = np.zeros((hm_size, hm_size, joints_sum))
xm_avg = np.zeros((hm_size, hm_size, joints_sum))
ym_avg = np.zeros((hm_size, hm_size, joints_sum))
zm_avg = np.zeros((hm_size, hm_size, joints_sum))

for i in range(len(self.scales)):
    rescale = 1.0 / self.scales[i]
    scaled_hm = img_scale(hm[i, :, :, :], rescale)
    scaled_x_hm = img_scale(xm[i, :, :, :], rescale)
    scaled_y_hm = img_scale(ym[i, :, :, :], rescale)
    scaled_z_hm = img_scale(zm[i, :, :, :], rescale)
    mid = [scaled_hm.shape[0] // 2, scaled_hm.shape[1] // 2]
    hm_avg += scaled_hm[mid[0] - hm_size // 2: mid[0] + hm_size // 2, mid[1] - hm_size // 2: mid[1] + hm_size // 2, :]
    xm_avg += scaled_x_hm[mid[0] - hm_size // 2: mid[0] + hm_size // 2, mid[1] - hm_size // 2: mid[1] + hm_size // 2, :]
    ym_avg += scaled_y_hm[mid[0] - hm_size // 2: mid[0] + hm_size // 2, mid[1] - hm_size // 2: mid[1] + hm_size // 2, :]
    zm_avg += scaled_z_hm[mid[0] - hm_size // 2: mid[0] + hm_size // 2, mid[1] - hm_size // 2: mid[1] + hm_size // 2, :]
    hm_avg /= len(scales)
    xm_avg /= len(scales)
    ym_avg /= len(scales)
    zm_avg /= len(scales)

# 2D
joints_2d = extract_2d_joints(hm_avg, box_size, hm_factor)
joints_2d = joint_filter(joints_2d, dim=2)
joints_2d[:, 0] = (joints_2d[:, 0] - offset_y) / scaler
joints_2d[:, 1] = (joints_2d[:, 1] - offset_x) / scaler
joints_2d[:, 0] += y
joints_2d[:, 1] += x

# 3D
joints_3d = extract_3d_joints(joints_2d, xm_avg, ym_avg, zm_avg, hm_factor)
joints_3d = joint_filter(joints_3d, dim=3)

 

 

  • Data Processing - Functions

다음 코드는 위 코드 중, Image를 VNect NET에 Input 할 때 Image Data를 형식에 맞게 변환시켜주는 부분과 관련된 코드입니다.

더보기

Data Processing

def img_scale(img, scale):
    """
    Resize a image by s scaler in both x and y directions.
    :param img: input image
    :param scale: scale  factor, new image side length / raw image side length
    :return: the scaled image
    """
    return cv2.resize(img, (0, 0), fx=scale, fy=scale, interpolation=cv2.INTER_LINEAR)


def img_padding(img, box_size, color='black'):
    """
    Given the input image and side length of the box, put the image into the center of the box.
    :param img: the input color image, whose longer side is equal to box size
    :param box_size: the side length of the square box
    :param color: indicating the padding area color
    :return: the padded image
    """
    h, w = img.shape[:2]
    offset_x, offset_y = 0, 0
    if color == 'black':
        pad_color = [0, 0, 0]
    elif color == 'grey':
        pad_color = [128, 128, 128]
    img_padded = np.ones((box_size, box_size, 3), dtype=np.uint8) * np.array(pad_color, dtype=np.uint8)
    if h > w:
        offset_x = box_size // 2 - w // 2
        img_padded[:, offset_x: box_size // 2 + int(np.ceil(w / 2)), :] = img
    else:  # h <= w
        offset_y = box_size // 2 - h // 2
        img_padded[offset_y: box_size // 2 + int(np.ceil(h / 2)), :, :] = img
    return img_padded, [offset_x, offset_y]


def img_scale_squarify(img, box_size):
    """
    To scale and squarify the input image into a square box with fixed size.
    :param img: the input color image
    :param box_size: the length of the square box
    :return: box image, scaler and offsets
    """
    h, w = img.shape[:2]
    scaler = box_size / max(h, w)
    img_scaled = img_scale(img, scaler)
    img_padded, [offset_x, offset_y] = img_padding(img_scaled, box_size)
    assert img_padded.shape == (box_size, box_size, 3), 'padded image shape invalid'
    return img_padded, scaler, [offset_x, offset_y]


def img_scale_padding(img, scaler, box_size, color='black'):
    """
    For a box image, scale down it and then pad the former area.
    :param img: the input box image
    :param scaler: scale factor, new image side length / raw image side length, < 1
    :param box_size: side length of the square box
    :param color: the padding area color
    """
    img_scaled = img_scale(img, scaler)
    if color == 'black':
        pad_color = (0, 0, 0)
    elif color == 'grey':
        pad_color = (128, 128, 128)
    pad_h = (box_size - img_scaled.shape[0]) // 2
    pad_w = (box_size - img_scaled.shape[1]) // 2
    pad_h_offset = (box_size - img_scaled.shape[0]) % 2
    pad_w_offset = (box_size - img_scaled.shape[1]) % 2
    img_scale_padded = np.pad(img_scaled,
                              ((pad_w, pad_w + pad_w_offset),
                               (pad_h, pad_h + pad_h_offset),
                               (0, 0)),
                              mode='constant',
                              constant_values=(
                                  (pad_color[0], pad_color[0]),
                                  (pad_color[1], pad_color[1]),
                                  (pad_color[2], pad_color[2])))
    return img_scale_padded


@staticmethod
def gen_input_batch(img_input, box_size, scales):
	# input image --> sqrared image acceptable for the model
  img_square, scaler, [offset_x, offset_y] = img_scale_squarify(img_input, box_size)
  
	# generate multi-scale image batch
	input_batch = []
	for scale in scales:
		img = img_scale_padding(img_square, scale, box_size) if scale < 1 else img_square
    input_batch.append(img)
    
		# image value range: [0, 255) --> [-0.4, 0.6)
    input_batch = np.asarray(input_batch, dtype=np.float32) / 255 - 0.4

	return input_batch, scaler, [offset_x, offset_y]

 

 

  • VNect Model

다음은 VNect NET의 구조를 구현하는 부분입니다.

더보기

VNect Model

class VNect:
    def __init__(self):
        self.is_training = False
        self.input_holder = tf.placeholder(dtype=tf.float32, shape=(None, 368, 368, 3))
        self._build_network()

    def _build_network(self):
        # Conv
        self.conv1 = tc.layers.conv2d(self.input_holder, kernel_size=7, padding='same', num_outputs=64, stride=2,
                                      scope='conv1')
        self.pool1 = tc.layers.max_pool2d(self.conv1, kernel_size=3, padding='same', scope='pool1')

        # Residual block 2a
        self.res2a_branch1 = tc.layers.conv2d(self.pool1, kernel_size=1, padding='valid', num_outputs=256,
                                              activation_fn=None, scope='res2a_branch1')
        self.res2a_branch2a = tc.layers.conv2d(self.pool1, kernel_size=1, padding='valid', num_outputs=64,
                                               scope='res2a_branch2a')
        self.res2a_branch2b = tc.layers.conv2d(self.res2a_branch2a, kernel_size=3, padding='same', num_outputs=64,
                                               scope='res2a_branch2b')
        self.res2a_branch2c = tc.layers.conv2d(self.res2a_branch2b, kernel_size=1, padding='valid', num_outputs=256,
                                               activation_fn=None, scope='res2a_branch2c')
        self.res2a = tf.add(self.res2a_branch2c, self.res2a_branch1, name='res2a_add')
        self.res2a = tf.nn.relu(self.res2a, name='res2a')

        # Residual block 2b
        self.res2b_branch2a = tc.layers.conv2d(self.res2a, kernel_size=1, padding='valid', num_outputs=64,
                                               scope='res2b_branch2a')
        self.res2b_branch2b = tc.layers.conv2d(self.res2b_branch2a, kernel_size=3, padding='same', num_outputs=64,
                                               scope='res2b_branch2b')
        self.res2b_branch2c = tc.layers.conv2d(self.res2b_branch2b, kernel_size=1, padding='valid', num_outputs=256,
                                               activation_fn=None, scope='res2b_branch2c')
        self.res2b = tf.add(self.res2b_branch2c, self.res2a, name='res2b_add')
        self.res2b = tf.nn.relu(self.res2b, name='res2b')

        # Residual block 2c
        self.res2c_branch2a = tc.layers.conv2d(self.res2b, kernel_size=1, padding='valid', num_outputs=64,
                                               scope='res2c_branch2a')
        self.res2c_branch2b = tc.layers.conv2d(self.res2b_branch2a, kernel_size=3, padding='same', num_outputs=64,
                                               scope='res2c_branch2b')
        self.res2c_branch2c = tc.layers.conv2d(self.res2c_branch2b, kernel_size=1, padding='valid', num_outputs=256,
                                               activation_fn=None, scope='res2c_branch2c')
        self.res2c = tf.add(self.res2c_branch2c, self.res2b, name='res2c_add')
        self.res2c = tf.nn.relu(self.res2c, name='res2c')

        # Residual block 3a
        self.res3a_branch1 = tc.layers.conv2d(self.res2c, kernel_size=1, padding='valid', num_outputs=512,
                                              activation_fn=None, stride=2, scope='res3a_branch1')
        self.res3a_branch2a = tc.layers.conv2d(self.res2c, kernel_size=1, padding='valid', num_outputs=128, stride=2,
                                               scope='res3a_branch2a')
        self.res3a_branch2b = tc.layers.conv2d(self.res3a_branch2a, kernel_size=3, padding='same', num_outputs=128,
                                               scope='res3a_branch2b')
        self.res3a_branch2c = tc.layers.conv2d(self.res3a_branch2b, kernel_size=1, padding='valid', num_outputs=512,
                                               activation_fn=None, scope='res3a_branch2c')
        self.res3a = tf.add(self.res3a_branch2c, self.res3a_branch1, name='res3a_add')
        self.res3a = tf.nn.relu(self.res3a, name='res3a')

        # Residual block 3b
        self.res3b_branch2a = tc.layers.conv2d(self.res3a, kernel_size=1, padding='valid', num_outputs=128,
                                               scope='res3b_branch2a')
        self.res3b_branch2b = tc.layers.conv2d(self.res3b_branch2a, kernel_size=3, padding='same', num_outputs=128,
                                               scope='res3b_branch2b')
        self.res3b_branch2c = tc.layers.conv2d(self.res3b_branch2b, kernel_size=1, padding='valid', num_outputs=512,
                                               activation_fn=None, scope='res3b_branch2c')
        self.res3b = tf.add(self.res3b_branch2c, self.res3a, name='res3b_add')
        self.res3b = tf.nn.relu(self.res3b, name='res3b')

        # Residual block 3c
        self.res3c_branch2a = tc.layers.conv2d(self.res3b, kernel_size=1, padding='valid', num_outputs=128,
                                               scope='res3c_branch2a')
        self.res3c_branch2b = tc.layers.conv2d(self.res3c_branch2a, kernel_size=3, padding='same', num_outputs=128,
                                               scope='res3c_branch2b')
        self.res3c_branch2c = tc.layers.conv2d(self.res3c_branch2b, kernel_size=1, padding='valid', num_outputs=512,
                                               activation_fn=None, scope='res3c_branch2c')
        self.res3c = tf.add(self.res3c_branch2c, self.res3b, name='res3c_add')
        self.res3c = tf.nn.relu(self.res3c, name='res3c')

        # Residual block 3d
        self.res3d_branch2a = tc.layers.conv2d(self.res3c, kernel_size=1, padding='valid', num_outputs=128,
                                               scope='res3d_branch2a')
        self.res3d_branch2b = tc.layers.conv2d(self.res3d_branch2a, kernel_size=3, padding='same', num_outputs=128,
                                               scope='res3d_branch2b')
        self.res3d_branch2c = tc.layers.conv2d(self.res3d_branch2b, kernel_size=1, padding='valid', num_outputs=512,
                                               activation_fn=None, scope='res3d_branch2c')
        self.res3d = tf.add(self.res3d_branch2c, self.res3c, name='res3d_add')
        self.res3d = tf.nn.relu(self.res3d, name='res3d')

        # Residual block 4a
        self.res4a_branch1 = tc.layers.conv2d(self.res3d, kernel_size=1, padding='valid', num_outputs=1024,
                                              activation_fn=None, stride=2, scope='res4a_branch1')
        self.res4a_branch2a = tc.layers.conv2d(self.res3d, kernel_size=1, padding='valid', num_outputs=256, stride=2,
                                               scope='res4a_branch2a')
        self.res4a_branch2b = tc.layers.conv2d(self.res4a_branch2a, kernel_size=3, padding='same', num_outputs=256,
                                               scope='res4a_branch2b')
        self.res4a_branch2c = tc.layers.conv2d(self.res4a_branch2b, kernel_size=1, padding='valid', num_outputs=1024,
                                               activation_fn=None, scope='res4a_branch2c')
        self.res4a = tf.add(self.res4a_branch2c, self.res4a_branch1, name='res4a_add')
        self.res4a = tf.nn.relu(self.res4a, name='res4a')

        # Residual block 4b
        self.res4b_branch2a = tc.layers.conv2d(self.res4a, kernel_size=1, padding='valid', num_outputs=256,
                                               scope='res4b_branch2a')
        self.res4b_branch2b = tc.layers.conv2d(self.res4b_branch2a, kernel_size=3, padding='same', num_outputs=256,
                                               scope='res4b_branch2b')
        self.res4b_branch2c = tc.layers.conv2d(self.res4b_branch2b, kernel_size=1, padding='valid', num_outputs=1024,
                                               activation_fn=None, scope='res4b_branch2c')
        self.res4b = tf.add(self.res4b_branch2c, self.res4a, name='res4b_add')
        self.res4b = tf.nn.relu(self.res4b, name='res4b')

        # Residual block 4c
        self.res4c_branch2a = tc.layers.conv2d(self.res4b, kernel_size=1, padding='valid', num_outputs=256,
                                               scope='res4c_branch2a')
        self.res4c_branch2b = tc.layers.conv2d(self.res4c_branch2a, kernel_size=3, padding='same', num_outputs=256,
                                               scope='res4c_branch2b')
        self.res4c_branch2c = tc.layers.conv2d(self.res4c_branch2b, kernel_size=1, padding='valid', num_outputs=1024,
                                               activation_fn=None, scope='res4c_branch2c')
        self.res4c = tf.add(self.res4c_branch2c, self.res4b, name='res4c_add')
        self.res4c = tf.nn.relu(self.res4c, name='res4c')

        # Residual block 4d
        self.res4d_branch2a = tc.layers.conv2d(self.res4c, kernel_size=1, padding='valid', num_outputs=256,
                                               scope='res4d_branch2a')
        self.res4d_branch2b = tc.layers.conv2d(self.res4d_branch2a, kernel_size=3, padding='same', num_outputs=256,
                                               scope='res4d_branch2b')
        self.res4d_branch2c = tc.layers.conv2d(self.res4d_branch2b, kernel_size=1, padding='valid', num_outputs=1024,
                                               activation_fn=None, scope='res4d_branch2c')
        self.res4d = tf.add(self.res4d_branch2c, self.res4c, name='res4d_add')
        self.res4d = tf.nn.relu(self.res4d, name='res4d')

        # Residual block 4e
        self.res4e_branch2a = tc.layers.conv2d(self.res4d, kernel_size=1, padding='valid', num_outputs=256,
                                               scope='res4e_branch2a')
        self.res4e_branch2b = tc.layers.conv2d(self.res4e_branch2a, kernel_size=3, padding='same', num_outputs=256,
                                               scope='res4e_branch2b')
        self.res4e_branch2c = tc.layers.conv2d(self.res4e_branch2b, kernel_size=1, padding='valid', num_outputs=1024,
                                               activation_fn=None, scope='res4e_branch2c')
        self.res4e = tf.add(self.res4e_branch2c, self.res4d, name='res4e_add')
        self.res4e = tf.nn.relu(self.res4e, name='res4e')

        # Residual block 4f
        self.res4f_branch2a = tc.layers.conv2d(self.res4e, kernel_size=1, padding='valid', num_outputs=256,
                                               scope='res4f_branch2a')
        self.res4f_branch2b = tc.layers.conv2d(self.res4f_branch2a, kernel_size=3, padding='same', num_outputs=256,
                                               scope='res4f_branch2b')
        self.res4f_branch2c = tc.layers.conv2d(self.res4f_branch2b, kernel_size=1, padding='valid', num_outputs=1024,
                                               activation_fn=None, scope='res4f_branch2c')
        self.res4f = tf.add(self.res4f_branch2c, self.res4e, name='res4f_add')
        self.res4f = tf.nn.relu(self.res4f, name='res4f')

        # Residual block 5a
        self.res5a_branch2a_new = tc.layers.conv2d(self.res4f, kernel_size=1, padding='valid', num_outputs=512,
                                                   scope='res5a_branch2a_new')
        self.res5a_branch2b_new = tc.layers.conv2d(self.res5a_branch2a_new, kernel_size=3, padding='same',
                                                   num_outputs=512, scope='res5a_branch2b_new')
        self.res5a_branch2c_new = tc.layers.conv2d(self.res5a_branch2b_new, kernel_size=1, padding='valid',
                                                   num_outputs=1024, activation_fn=None, scope='res5a_branch2c_new')
        self.res5a_branch1_new = tc.layers.conv2d(self.res4f, kernel_size=1, padding='valid', num_outputs=1024,
                                                  activation_fn=None, scope='res5a_branch1_new')
        self.res5a = tf.add(self.res5a_branch2c_new, self.res5a_branch1_new, name='res5a_add')
        self.res5a = tf.nn.relu(self.res5a, name='res5a')

        # Residual block 5b
        self.res5b_branch2a_new = tc.layers.conv2d(self.res5a, kernel_size=1, padding='valid', num_outputs=256,
                                                   scope='res5b_branch2a_new')
        self.res5b_branch2b_new = tc.layers.conv2d(self.res5b_branch2a_new, kernel_size=3, padding='same',
                                                   num_outputs=128, scope='res5b_branch2b_new')
        self.res5b_branch2c_new = tc.layers.conv2d(self.res5b_branch2b_new, kernel_size=1, padding='valid',
                                                   num_outputs=256, scope='res5b_branch2c_new')

        # Transpose Conv
        self.res5c_branch1a = tf.layers.conv2d_transpose(self.res5b_branch2c_new, kernel_size=4, filters=63,
                                                         activation=None, strides=2, padding='same', use_bias=False,
                                                         name='res5c_branch1a')
        self.res5c_branch2a = tf.layers.conv2d_transpose(self.res5b_branch2c_new, kernel_size=4, filters=128,
                                                         activation=None, strides=2, padding='same', use_bias=False,
                                                         name='res5c_branch2a')
        self.bn5c_branch2a = tc.layers.batch_norm(self.res5c_branch2a, scale=True, is_training=self.is_training,
                                                  scope='bn5c_branch2a')
        self.bn5c_branch2a = tf.nn.relu(self.bn5c_branch2a)

        self.res5c_delta_x, self.res5c_delta_y, self.res5c_delta_z = tf.split(self.res5c_branch1a, num_or_size_splits=3,
                                                                              axis=3)
        self.res5c_branch1a_sqr = tf.multiply(self.res5c_branch1a, self.res5c_branch1a, name='res5c_branch1a_sqr')
        self.res5c_delta_x_sqr, self.res5c_delta_y_sqr, self.res5c_delta_z_sqr = tf.split(self.res5c_branch1a_sqr,
                                                                                          num_or_size_splits=3, axis=3)
        self.res5c_bone_length_sqr = tf.add(tf.add(self.res5c_delta_x_sqr, self.res5c_delta_y_sqr),
                                            self.res5c_delta_z_sqr)
        self.res5c_bone_length = tf.sqrt(self.res5c_bone_length_sqr)

        self.res5c_branch2a_feat = tf.concat(
            [self.bn5c_branch2a, self.res5c_delta_x, self.res5c_delta_y, self.res5c_delta_z, self.res5c_bone_length],
            axis=3, name='res5c_branch2a_feat')

        self.res5c_branch2b = tc.layers.conv2d(self.res5c_branch2a_feat, kernel_size=3, padding='same', num_outputs=128,
                                               scope='res5c_branch2b')
        self.res5c_branch2c = tf.layers.conv2d(self.res5c_branch2b, kernel_size=1, padding='valid', filters=84,
                                               activation=None, use_bias=False, name='res5c_branch2c')
        # print(self.res5c_branch2c.get_shape())
        self.heatmap, self.x_heatmap, self.y_heatmap, self.z_heatmap = tf.split(self.res5c_branch2c,
                                                                                num_or_size_splits=4, axis=3)

 

 

  • Skeleton Fitting

다음은 Skeleton Fitting을 수행할 때, 2D와 3D Pose Data를 Processing 하는 부분과 관련된 코드입니다.

더보기

Skeleton Fitting

class LowPassFilter(object):

    def __init__(self, alpha):
        self.__setAlpha(alpha)
        self.__y = self.__s = None

    def __setAlpha(self, alpha):
        alpha = float(alpha)
        if alpha<=0 or alpha>1.0:
            raise ValueError("alpha (%s) should be in (0.0, 1.0]"%alpha)
        self.__alpha = alpha

    def __call__(self, value, timestamp=None, alpha=None):        
        if alpha is not None:
            self.__setAlpha(alpha)
        if self.__y is None:
            s = value
        else:
            s = self.__alpha*value + (1.0-self.__alpha)*self.__s
        self.__y = value
        self.__s = s
        return s

    def lastValue(self):
        return self.__y


class OneEuroFilter(object):

    def __init__(self, freq, mincutoff=1.0, beta=0.0, dcutoff=1.0):
        if freq<=0:
            raise ValueError("freq should be >0")
        if mincutoff<=0:
            raise ValueError("mincutoff should be >0")
        if dcutoff<=0:
            raise ValueError("dcutoff should be >0")
        self.__freq = float(freq)
        self.__mincutoff = float(mincutoff)
        self.__beta = float(beta)
        self.__dcutoff = float(dcutoff)
        self.__x = LowPassFilter(self.__alpha(self.__mincutoff))
        self.__dx = LowPassFilter(self.__alpha(self.__dcutoff))
        self.__lasttime = None
        
    def __alpha(self, cutoff):
        te    = 1.0 / self.__freq
        tau   = 1.0 / (2*math.pi*cutoff)
        return  1.0 / (1.0 + tau/te)

    def __call__(self, x, timestamp=None):
        # ---- update the sampling frequency based on timestamps
        if self.__lasttime and timestamp:
            self.__freq = 1.0 / (timestamp-self.__lasttime)
        self.__lasttime = timestamp
        
				# ---- estimate the current variation per second
        prev_x = self.__x.lastValue()
        dx = 0.0 if prev_x is None else (x-prev_x)*self.__freq # FIXME: 0.0 or value?
        edx = self.__dx(dx, timestamp, alpha=self.__alpha(self.__dcutoff))
        
				# ---- use it to update the cutoff frequency
        cutoff = self.__mincutoff + self.__beta*math.fabs(edx)
        
				# ---- filter the given value
        return self.__x(x, timestamp, alpha=self.__alpha(cutoff))


filter_config_2d = {
	'freq': 30,        # system frequency about 30 Hz
  'mincutoff': 1.7,  # value refer to the paper
  'beta': 0.3,       # value refer to the paper
  'dcutoff': 0.4     # not mentioned, empirically set
}
filter_config_3d = {
   'freq': 30,        # system frequency about 30 Hz
   'mincutoff': 0.8,  # value refer to the paper
   'beta': 0.4,       # value refer to the paper
   'dcutoff': 0.4     # not mentioned, empirically set
}
filter_2d = [(OneEuroFilter(**filter_config_2d), OneEuroFilter(**filter_config_2d)) for _ in range(joints_sum)]
filter_3d = [(OneEuroFilter(**filter_config_3d), OneEuroFilter(**filter_config_3d), OneEuroFilter(**filter_config_3d)) for _ in range(joints_sum)]
def hm_pt_interp_bilinear(src, scale, point):
    """
    Determine the value of one desired point by bilinear interpolation.
    :param src: input heatmap
    :param scale: scale factor, input box side length / heatmap side length
    :param point: position of the desired point in input box, [row, column]
    :return: the value of the desired point
    """
    src_h, src_w = src.shape[:]
    dst_y, dst_x = point
    src_x = (dst_x + 0.5) / scale - 0.5
    src_y = (dst_y + 0.5) / scale - 0.5
    src_x_0 = int(src_x)
    src_y_0 = int(src_y)
    src_x_1 = min(src_x_0 + 1, src_w - 1)
    src_y_1 = min(src_y_0 + 1, src_h - 1)

    value0 = (src_x_1 - src_x) * src[src_y_0, src_x_0] + (src_x - src_x_0) * src[src_y_0, src_x_1]
    value1 = (src_x_1 - src_x) * src[src_y_1, src_x_0] + (src_x - src_x_0) * src[src_y_1, src_x_1]
    dst_val = (src_y_1 - src_y) * value0 + (src_y - src_y_0) * value1
    return dst_val


def extract_2d_joints(heatmaps, box_size, hm_factor):
    """
    Rescale the heatmap to input box size, then extract the coordinates for every joint.
    :param heatmaps: the input heatmaps
    :param box_size: the side length of the input box
    :param hm_factor: heatmap factor, indicating box size / heatmap size
    :return: a 2D array with [joints_num, 2], each row of which means [row, column] coordinates of corresponding joint
    """
    joints_2d = np.zeros((heatmaps.shape[2], 2))
    for joint_num in range(heatmaps.shape[2]):
        heatmap_scaled = cv2.resize(heatmaps[:, :, joint_num], (0, 0), 
                                    fx=hm_factor, fy=hm_factor, 
                                    interpolation=cv2.INTER_LINEAR)
        joint_coord = np.unravel_index(np.argmax(heatmap_scaled), (box_size, box_size))
        joints_2d[joint_num, :] = joint_coord
    return joints_2d


def extract_3d_joints(joints_2d, x_hm, y_hm, z_hm, hm_factor):
    """
    Extract 3D coordinates of each joint according to its 2D coordinates.
    :param joints_2d: 2D array with [joints_num, 2], containing 2D coordinates the joints
    :param x_hm: x coordinate heatmaps
    :param y_hm: y coordinate heatmaps
    :param z_hm: z coordinate heatmaps
    :param hm_factor: heatmap factor, indicating box size / heatmap size
    :return: a 3D array with [joints_num, 3], each row of which contains [x, y, z] coordinates of corresponding joint
    Notation:
    x direction: left --> right
    y direction: up --> down
    z direction: nearer --> farther
    """
    scaler = 100  # scaler=100 -> mm unit; scaler=10 -> cm unit
    joints_3d = np.zeros((x_hm.shape[2], 3), dtype=np.float32)
    for joint_num in range(x_hm.shape[2]):
        y_2d, x_2d = joints_2d[joint_num][:]
        joint_x = hm_pt_interp_bilinear(x_hm[:, :, joint_num], hm_factor, (y_2d, x_2d)) * scaler
        joint_y = hm_pt_interp_bilinear(y_hm[:, :, joint_num], hm_factor, (y_2d, x_2d)) * scaler
        joint_z = hm_pt_interp_bilinear(z_hm[:, :, joint_num], hm_factor, (y_2d, x_2d)) * scaler
        joints_3d[joint_num, :] = [joint_x, joint_y, joint_z]

    # Subtract the root location to normalize the data
    joints_3d -= joints_3d[14, :]
    return joints_3d


def joint_filter(joints, dim=2):
	t = time.time()
  if dim == 2:
	  for i in range(joints_sum):
	    joints[i, 0] = filter_2d[i][0](joints[i, 0], t)
      joints[i, 1] = filter_2d[i][1](joints[i, 1], t)
  else:
	  for i in range(joints_sum):
	    joints[i, 0] = filter_3d[i][0](joints[i, 0], t)
      joints[i, 1] = filter_3d[i][1](joints[i, 1], t)
      joints[i, 2] = filter_3d[i][2](joints[i, 2], t)

	return joints

 

 

이렇게 VNect의 핵심 구조를 Code로 구현해 보았습니다.

생각보다 그리 복잡하지 않으며, 이러한 코드로도 한 사람의 자세를 Real-Time으로 추정할 수 있습니다.

시간이 된다면, VNect의 Upgrade 버전인 XNect을 Code로 구현해 보도록 하겠습니다.

 

이번 포스트는 여기서 마무리하도록 하겠습니다.

반응형

'Programming > Deep Learning Network' 카테고리의 다른 글

[Model] About seq2seq (Sequence-To-Sequence)  (0) 2023.10.31
[Code] XNect  (0) 2023.04.04
[Model] VNect과 XNect  (0) 2023.03.13
[Code] MobileNet v1  (0) 2023.03.08
[Model] RefineDet  (0) 2023.03.06

댓글