技術研究所 (技研) のまつけんです。
Python用のライブラリには、cv2 (OpenCV)、numpy (NumPy)、pandas (Pandas)などがあります。大抵のことが出来るので大変便利なのですが、引数の与え方などで不便を感じることがあります。そこで、今回は、私が普段、それらのライブラリをwrapするのに使っているサブルーチンを紹介したいと思います。
頻繁に利用する色 (無彩色、原色、補色) を定義します。OpenCVはRGBではなくBGRが基本なので、その順番となっています。
| BLACK = np.array([ 0, 0, 0]) |
| GREY = np.array([127, 127, 127]) |
| WHITE = np.array([255, 255, 255]) |
| RED = np.array([ 0, 0, 255]) |
| GREEN = np.array([ 0, 255, 0]) |
| BLUE = np.array([255, 0, 0]) |
| CYAN = np.array([255, 255, 0]) |
| MAGENTA = np.array([255, 0, 255]) |
| YELLOW = np.array([ 0, 255, 255]) |
動画の読み込みと書き出しの際に使うものです。cv2.CAP_PROP_XXXは、4種類ともgetすることが多い上にタイプ量が多い上に、よく忘れてしまい、使うたびに調べることが多かったので纏めました。また、cv2.VideoWriter_fourccも同様です。
| def cap_open(mov, verbose=False): |
| cap = cv2.VideoCapture(str(mov)) |
| if cap.isOpened() is False: |
| frm = -1 |
| wid = -1 |
| hei = -1 |
| fps = -1 |
| else: |
| frm = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| wid = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| hei = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
| fps = cap.get(cv2.CAP_PROP_FPS) |
| if verbose: |
| print(wid, 'x', hei, 'pixels,', round(fps, 3), 'fps,', frm, 'frames,', round(frm / fps, 3), 's,', time_str(frm / fps)[0]) |
| return cap, wid, hei, fps, frm |
| def wri_open(path, wid, hei, fps=30, codec='XVID'): |
| f = open(path, 'w') ; f.close() # 試しに新規作成してみる (ディレクトリが書き込み禁止なら例外が発生する) |
| fourcc = cv2.VideoWriter_fourcc(*codec) |
| return cv2.VideoWriter(path, fourcc, fps, (int(wid), int(hei))) |
cap_open()のverboseをTrueにすると
| 1024 x 768 pixels, 59.522 fps, 8600 frames, 144.483 s, 2 m 24 s 483 ms |
のように動画の情報が表示されます。その際、秒を日・時・分・秒・ミリ秒に変換する関数time_str()を別途、importします (私の場合は、lib_etc.pyというファイルに入れています)。
| def time_str(value, |
| units = ( |
| (' d ', 60 * 60 * 24), |
| (' h ', 60 * 60), |
| (' m ', 60), |
| (' s ', 1), |
| (' ms', 1/1000), |
| )): |
| result = [0] * len(units) |
| result_str = '' |
| for i, u in enumerate(units): |
| if u[1] < value: |
| result[i] = int(value / u[1]) |
| value %= u[1] |
| result_str += str(result[i]) + u[0] |
| return result_str, result |
このtime_str()は時間以外にも利用できるので便利です。引数unitsに((‘ M ‘, 1024 **2), (‘ k ‘, 1024), (‘ B’, 1))を指定すればMB、KBなどの容量に対応でき、[(‘,’, 1000 ** n) for n in range(10, 0, -1)] + [(”, 1)]を指定すれば3桁のコンマ区切りの文字列を生成できます。
グレイスケール、RGB、BGR、HSV、YUVの変換です。cv2.cvtColorもcv2.COLOR_XXX2XXXもタイプ量が多いのと、こちらもよく忘れて頻繁に調べがちでした。基本的に、よく使う組み合わせは網羅していますが、今後も必要に応じて追加する予定です。
| def bgr2gry(img): |
| return cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) |
| def gry2bgr(img): |
| return cv2.cvtColor(img, cv2.COLOR_GRAY2BGR) |
| def rgb2gry(img): |
| return cv2.cvtColor(img, cv2.COLOR_RGB2GRAY) |
| def gry2rgb(img): |
| return cv2.cvtColor(img, cv2.COLOR_GRAY2RGB) |
| def bgr2rgb(img): |
| return cv2.cvtColor(img, cv2.COLOR_BGR2RGB) |
| def rgb2bgr(img): |
| return cv2.cvtColor(img, cv2.COLOR_RGB2BGR) |
| def bgr2hsv(img): |
| return cv2.cvtColor(img, cv2.COLOR_BGR2HSV) |
| def hsv2bgr(img): |
| return cv2.cvtColor(img, cv2.COLOR_HSV2BGR) |
| def bgr2yuv(img): |
| return cv2.cvtColor(img, cv2.COLOR_BGR2YUV) |
| def yuv2bgr(img): |
| return cv2.cvtColor(img, cv2.COLOR_YUV2BGR) |
cv2.line()の座標は整数のtupleでないとエラーになります。画像の中央から直線を引く場合にcv2_line()ならば「np.array([width, height]) / 2」や「np.array(img.shape[: 2]) / 2」などをそのまま座標として与えることが出来ます。なお、色の指定については整数という制限が無いようなので、25%グレイを「WHITE / 4」として与えることも可能です。cv2_putText()において、textをstrでキャストしているのは、例えばpathlib.Path型をそのまま与えるためです。今後、必要ならば、rectangleやellipse、circleなどを作ります。
また、cv2.floodFill()は呼び出す前にmaskを作っておく必要がありますが、cv2_floodFill()は自動でそれを行います。
| def cv2_line(img, pt1, pt2, color, thickness=1, lineType=cv2.LINE_8): |
| pt1 = np.round(pt1).astype('i') |
| pt2 = np.round(pt2).astype('i') |
| cv2.line(img, tuple(pt1), tuple(pt2), color, thickness, lineType) |
| def cv2_putText(img, text, org, fontFace=cv2.FONT_HERSHEY_PLAIN, fontScale=2, color=BLACK, thickness=1, lineType=cv2.LINE_AA): |
| org = np.round(org).astype('i') |
| thickness = np.round(thickness) |
| cv2.putText(img, str(text), tuple(org), fontFace, fontScale, color, int(thickness), lineType) |
| def cv2_getPerspectiveTransform(src, dst): |
| src = np.array(src).astype('f4') |
| dst = np.array(dst).astype('f4') |
| return cv2.getPerspectiveTransform(src, dst) |
| def cv2_floodFill(img, seedPoint, newVal, loDiff=None, upDiff=None, flags=4): |
| h, w = img.shape[: 2] |
| mask = np.zeros((h + 2, w + 2), dtype='u1') |
| return cv2.floodFill(img, mask, seedPoint=seedPoint, newVal=newVal, loDiff=loDiff, upDiff=upDiff, flags=flags) |
| if 0: # test (抜粋) |
| from pathlib import Path |
| img = np.zeros((256, 256, 3)).astype('u1') |
| c = WHITE / 3.14 |
| p0 = np.array(img.shape[: 2]) / 3.14 |
| p1 = np.array(img.shape[: 2]) / 3.14 + 50 |
| s = Path('test') |
| cv2_line(img, p0, p1, c) |
| cv2_putText(img, s, p0, fontScale=3.14, color=c, thickness=3.14) |
| plt.figure(figsize=(30 , 30)) |
| plt.imshow(img) |
| plt.show() |
| print(np.max(img)) |
| src = ((-1, 2), ( 4, -3.14), (-5, 4), ( 3, -6)) |
| dst = (( 1, -2), (-4, 3.14), ( 5, -4), (-3, 6)) |
| cv2_getPerspectiveTransform(src, dst) |
trim_and_resize()は、トリミングと縮小を同時にこなす (例えば、1920×1080の画像の両端を破棄して640×480に縮小する) ルーチンです。
rotate_img()は、「画像中央を中心として手軽に回転したい」という場合でも、それなりに記述が必要なので作りました。
| def trim_and_resize(img, size): |
| h0, w0 = img.shape[: 2] |
| w1, h1 = size |
| r0 = h0 / w0 |
| r1 = h1 / w1 |
| if r0 < r1: |
| new_w0 = h0 / r1 |
| left_trim = round((w0 - new_w0) / 2) |
| img = img[:, left_trim : round(new_w0) + left_trim] |
| elif r1 < r0: |
| new_h0 = w0 * r1 |
| upper_trim = round((h0 - new_h0) / 2) |
| img = img[upper_trim : round(new_h0) + upper_trim, :] |
| print(img.shape) |
| return cv2.resize(img, (w1, h1)) |
| def rotate_img(img, angle, centre=None, out_size=None, scale=1., borderMode=None): |
| hei, wid = img.shape[: 2] |
| if centre is None: |
| centre = np.round(np.array([wid, hei]) / 2).astype('f') |
| if out_size is None: |
| out_size = (wid, hei) |
| mat = cv2.getRotationMatrix2D(tuple(centre), angle , scale) |
| return cv2.warpAffine(img, mat, out_size, borderMode=borderMode) |
np.zeros()やnp.ones()の引数に何らかの演算結果 (浮動小数点値) を与えたいことも多いので作りました。zerosとonesは「np_」を付けていません (私の場合、Pythonを使い始める以前は、長らくMATLAB/Octaveを使っていたので、しばしば、zeros、ones、repmatなどは「np.」を付け忘れたりします)。fullは他の変数名などとぶつかることが有り得るので、「np_full」としています。デフォルトの丸めをceilにしているのは、縮小画像の格納用に「np.array([height, width]) / n」を与えた場合に、n+1誤差で領域が足りなくなるのを防ぐためです。
| def zeros(shape, dtype=None, rounding=np.ceil): |
| shape = rounding(shape).astype('i') |
| return np.zeros(shape, dtype) |
| def zeros2(n, dtype=None, rounding=np.ceil): |
| return zeros([n, n], dtype) |
| def ones(shape, dtype=None, rounding=np.ceil): |
| shape = rounding(shape).astype('i') |
| return np.ones(shape, dtype) |
| def ones2(n, dtype=None, rounding=np.ceil): |
| return ones([n, n], dtype) |
| def np_full(shape, value, dtype=None, rounding=np.ceil): |
| shape = rounding(shape).astype('i') |
| return np.full(shape, value, dtype) |
| if 0: # test (抜粋) |
| a = zeros((3.4, 3.5, 3.6), rounding=np.floor) ; print(a.shape) |
| a = zeros((3.4, 3.5, 3.6), rounding=np.round) ; print(a.shape) |
| a = zeros((3.4, 3.5, 3.6), rounding=np.ceil ) ; print(a.shape) |
複数の処理結果をstackしたいときに、前段の処理によっては、[]やNoneが含まれることがあるので、作りました。こちらも「np_」を付けていません。
| def vstack(arrays): |
| arrays = [a for a in arrays if a is not None and 0 < len(a)] |
| return np.vstack(arrays) |
| def hstack(arrays): |
| arrays = [a for a in arrays if a is not None and 0 < len(a)] |
| return np.hstack(arrays) |
| if 0: # test |
| a = np.zeros((3, 4, 5)) |
| b = np.array([]) |
| c = np.ones((5, 4, 5)) |
| d = None |
| e = vstack((a, b, c, d)) |
| print(e.shape) |
cv2.flip()とcv2.rotate()の組み合わせでも可能ですが、np.transposeを使えば1回の変換で可能です。パラメタの「(1, 0, 2)」の順番を忘れがちなので、作りました。
| def transpose_img(img): |
| return np.transpose(img, (1, 0, 2)) |
データをCSVでsave/loadするときは、NumPy配列について、純粋にデータ部分だけを対象にすることが多いので、save_csv()とload_csv()を作りました。また、CSVで保存している途中で、誤って読み出すことを防ぐために、save_csv_safe()を作りました。
| def save_csv(path, data): |
| pd.DataFrame(data).to_csv(path, header=False, index=False) |
| def save_csv_safe(path, data): |
| tmp = "hkjlOHIUpIUngR" |
| pd.DataFrame(data).to_csv(path + tmp, header=False, index=False) |
| if os.path.exists(path): |
| os.remove(path) |
| os.rename(path + tmp, path) |
| def load_csv(path, delete=False): |
| data = np.array(pd.read_csv(path, header=None, index_col=None)) |
| if delete: |
| os.remove(path) |
| return data |
| if 0: # test |
| from pathlib import Path |
| p = Path('test.csv') |
| a = np.array([[1, 2, 3], [4, 5, 6]]) |
| save_csv(p, a) |
| b = load_csv(p, True) |
| print(a.shape, b.shape) |
| print(np.sum(np.abs(a - b))) |
こちらは、dictや2次元listからDataFrameへの変換[1]です (dictや2次元listをCSVで保存したいときなどに便利です):
| def dict2df(d): |
| return pd.DataFrame(d.values(), index=d.keys()).T |
| def lists2df(list2d): |
| d = {} |
| for i, l in enumerate(list2d): |
| d[i] = l |
| return dict2df(d) |
| if 0: # test |
| df = lists2df([[1, 2, 3], [4, 5, 6, 7], [9, 8, 7, 6]]) |
| print(df) |
如何でしょうか? 以上が私が使っているOpenCV、NumPy、Pandasのwrapperとなります。私は、
| from wrap_cv2 import * |
| from wrap_numpy import * |
| from wrap_pandas import * |
のようにインポートしています。なお、wrap_cv2.py、wrap_numpy.py、wrap_pandas.pyの先頭には、必要に応じて
| import os |
| import cv2 |
| import numpy as np |
| import pandas as pd |
などを書いておく必要があります。
このようなwrapperは、今後も増えていくことになると思いますので、適当なタイミングでアップデートを紹介したいと思います。
前述のライブラリとは関係ありませんが、Pythonを便利に使うために使っているクラスやルーチンです (Python用のライブラリは色々とあるので、もしかしたら、既に存在するものを再発明している可能性もありますが、少し探してみて無さそうなら作るという方針で)。
UNIX系OSのteeコマンドに相当することをやりたい場合に使うクラスです:
| class Tee(): |
| def __init__(self, file, mode='r', buffering=-1, encoding=None, errors=None, newline=None, closefd=True, opener=None): |
| self.f = open(file, mode=mode, buffering=buffering, encoding=encoding, errors=errors, newline=newline, closefd=closefd, opener=opener) |
| def print(self, *ss, flush=False, sep=' ', end=None): |
| print(*ss, flush=flush, sep=sep, end=end, file=self.f) |
| print(*ss, flush=flush, sep=sep, end=end) |
| def close(self): |
| self.f.close() |
| def flush(self): |
| self.f.flush() |
| if 0: # test |
| path = 'test.txt' |
| words = ['The', 'quick', 'brown', 'fox', 'jumps', 'over', 'the', 'lazy', 'dog'] |
| t = Tee(path, 'w') |
| for i, word in enumerate(words): |
| t.print(i, word) |
| t.flush() |
| t.close() |
| print('----') |
| !cat "$path" |
C言語にはprintf()に対してsprintf()があり、コンソールに出力する代わりにバッファに格納することが出来ますが、Pythonにはその機能が無さそうなので作りました:
| def sprint(*objects, sep=' ', end=None): |
| line = sep.join([str(o) for o in objects]) |
| if end is None: |
| return line |
| else: |
| return line + end |
| if 0: # test |
| buf = sprint(1, 2, 3, [1, 2, 3], (1, 2, 3), {'a': 1, 'b': 2}, {'x', 'y'}) |
| print(buf) |
| print(1, 2, 3, [1, 2, 3], (1, 2, 3), {'a': 1, 'b': 2}, {'x', 'y'}) |
for i in range(100): print(result[i])のように、処理結果を表示するとき、result[i]が短い場合、右側がデッドスペースになるので、UNIX系OSのlsコマンドのように多段組で表示するクラスを作りました:
| class MultiColumnPrint(): |
| def __init__(self): |
| self.s = [] |
| self.maxlen = 0 |
| def print(self, *objects, sep=' '): |
| line = sep.join([str(o) for o in objects]) |
| self.s.append(line) |
| self.maxlen = max(self.maxlen, len(line)) |
| def to_string(self, lines=None, columns=None, separator=' | ', alignment='left', transpose=False, verbose=False): |
| if transpose: |
| lines, columns = columns, lines |
| if lines is None and columns is None: |
| columns = int(np.ceil(np.sqrt(len(self.s)))) |
| lines = int(np.ceil(len(self.s) / columns)) |
| elif lines is None and columns is not None: |
| lines = int(np.ceil(len(self.s) / columns)) |
| elif lines is not None and columns is None: |
| columns = int(np.ceil(len(self.s) / lines)) |
| else: |
| while columns * lines < len(self.s): |
| columns += 1 |
| s = '' |
| if 0 == len(self.s): |
| return 'No item' |
| if verbose: |
| s += str(len(self.s)) + ' items ' + str(lines) + ' x ' + str(columns) + '\n' |
| for yy in range(lines): |
| for xx in range(columns): |
| if transpose: |
| idx = columns * yy + xx |
| else: |
| idx = lines * xx + yy |
| if idx < len(self.s): |
| if 'left' == alignment: |
| s += self.s[idx].ljust(self.maxlen) |
| else: |
| s += self.s[idx].rjust(self.maxlen) |
| if xx < columns - 1: |
| s += separator |
| if yy < lines - 1: |
| s += '\n' |
| return s |
| def show(self, lines=None, columns=None, sep=' | ', alignment='left', transpose=False, verbose=True): |
| print(self.to_string(lines, columns, sep, alignment, transpose, verbose)) |
| if 0: # test |
| mcp = MultiColumnPrint() |
| for i in range(27): |
| mcp.print(i, i * 10, i * 100, 'test string') |
| mcp.show() |
例えば、
| mcp = MultiColumnPrint() |
| for i in range(100): |
| mcp.print(i, i ** 2) |
| mcp.show() |
を実行した場合、以下のような表示となります (lsコマンドのように各段の幅は自動調整されません):
| 100 items 10 x 10 |
| 0 0 | 10 100 | 20 400 | 30 900 | 40 1600 | 50 2500 | 60 3600 | 70 4900 | 80 6400 | 90 8100 |
| 1 1 | 11 121 | 21 441 | 31 961 | 41 1681 | 51 2601 | 61 3721 | 71 5041 | 81 6561 | 91 8281 |
| 2 4 | 12 144 | 22 484 | 32 1024 | 42 1764 | 52 2704 | 62 3844 | 72 5184 | 82 6724 | 92 8464 |
| 3 9 | 13 169 | 23 529 | 33 1089 | 43 1849 | 53 2809 | 63 3969 | 73 5329 | 83 6889 | 93 8649 |
| 4 16 | 14 196 | 24 576 | 34 1156 | 44 1936 | 54 2916 | 64 4096 | 74 5476 | 84 7056 | 94 8836 |
| 5 25 | 15 225 | 25 625 | 35 1225 | 45 2025 | 55 3025 | 65 4225 | 75 5625 | 85 7225 | 95 9025 |
| 6 36 | 16 256 | 26 676 | 36 1296 | 46 2116 | 56 3136 | 66 4356 | 76 5776 | 86 7396 | 96 9216 |
| 7 49 | 17 289 | 27 729 | 37 1369 | 47 2209 | 57 3249 | 67 4489 | 77 5929 | 87 7569 | 97 9409 |
| 8 64 | 18 324 | 28 784 | 38 1444 | 48 2304 | 58 3364 | 68 4624 | 78 6084 | 88 7744 | 98 9604 |
| 9 81 | 19 361 | 29 841 | 39 1521 | 49 2401 | 59 3481 | 69 4761 | 79 6241 | 89 7921 | 99 9801 |
こちらは、os.makedirs()に対するwrapperです:
| def makedirs(d): |
| try: |
| os.makedirs(d) |
| except: |
| return -1 |
| return 0 |
os.makedirs()には、exist_okというオプション引数があります(「無ければ作る!」という、まるでエンジニアの生きざまのようなオプションですね)。が、これを指定しても、「有ったから作らなかった」のか「無かったから作った」のかは知りようがありません。そこで、敢えてexist_okを使わずに作ったかどうかを返します。なお、これはifを使って、
| def makedirs(d): |
| if os.path.exists(d): |
| return -1 |
| os.makedirs(d) |
| return 0 |
のように記述できます。しかしそれでは、存在確認と作成が不可分操作[2]として行われず、並列処理や分散処理でトラブルになることがありますので、try/exceptを使った実装としています。
[1] 辞書をpd.DataFrameに変換
[2] Wikipedia: 不可分操作


