早速PCと接続してプログラムを手直しして実行してみましたが、画像を表示させようとするとリフレッシュが始まって画面が少し書き換わった後に例外が発生して終了してしまいました。画面を白一色で塗りつぶすだけならうまく行くのですが、写真を表示させようとするとダメです。
どうもバスパワーでは足りなそうです。幸いHAT+もUSB-SPI変換基板も5Vに対応しているので、適当なUSB電源アダプタから5Vを取ることにしました(電源の入れっぱなしが気になったので、後にPC連動型のセルフパワーUSBハブを使うようにしました)。こんなこともあろうかと先日スイッチサイエンスでUSB QI Cableを買っておいたのでそれを使います。GNDは共通にして、HAT+のVCCはUSB電源アダプタから取るようにしました。USB-SPI変換基板には5Vと3.5Vを切り替えるスイッチがあるので、それも5V側にしておきます。この辺りは電気に詳しい人なら色々と注意点があるところかもしれません。全ては自己責任でお願いします。
import argparse
import time
import math
import logging
import mcp2210
import hid
from abc import ABC, abstractmethod
from typing import Sequence, Iterable
from PIL import Image, ImagePalette
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
class EPaperBridge(ABC):
"""ホストコンピュータからEPaperディスプレイへの通信を担うブリッジ"""
@abstractmethod
def close(self):
...
@abstractmethod
def power_on(self):
...
@abstractmethod
def reset(self):
...
@abstractmethod
def power_off(self):
...
@abstractmethod
def wait_until_not_busy(self):
...
@abstractmethod
def select_driver_chips(self, chip_numbers: Iterable[int]):
...
@abstractmethod
def send_command_code(self, _command_code: int):
...
@abstractmethod
def send_data_bytes(self, _data_bytes: bytes):
...
def send_data(self, data: int | Sequence[int]):
if isinstance(data, int):
self.send_data_bytes(bytes([data]))
elif isinstance(data, bytes):
self.send_data_bytes(data)
else:
self.send_data_bytes(bytes(data))
def send_command(self, command_code: int, *params: int | Sequence[int]):
self.on_command_start()
self.send_command_code(command_code)
for p in params:
self.send_data(p)
self.on_command_end()
def on_command_start(self):
pass
def on_command_end(self):
pass
class EPaperBridgeMCP2210(EPaperBridge):
"""MCP2210を使用したブリッジ"""
@staticmethod
def is_mcp2210(device_dict):
MCP2210_VENDOR_ID = 0x04d8
MCP2210_PRODUCT_ID = 0x00de
return (device_dict.get("product_id", -1) == MCP2210_PRODUCT_ID and
device_dict.get("vendor_id", -1) == MCP2210_VENDOR_ID)
@staticmethod
def enumerate_mcp2210_devices():
return list(filter(EPaperBridgeMCP2210.is_mcp2210, hid.enumerate()))
@staticmethod
def find_mcp2210_device():
device_dicts = EPaperBridgeMCP2210.enumerate_mcp2210_devices()
if len(device_dicts) == 0:
raise RuntimeError("No MCP2210 devices found")
elif len(device_dicts) >= 2:
raise RuntimeError("Multiple MCP2210 devices found")
return device_dicts[0]["serial_number"]
def __init__(self):
serial_number = EPaperBridgeMCP2210.find_mcp2210_device()
self._pin_dc = 2
self._pin_com_led = 3
self._pin_busy = 6
self._pin_rst = 0
self._pin_pwr = 4
self._cs_pins = [1, 7]
self._cs_pins_selected = 1 << self._cs_pins[0] self._reset_wait_times = (0.03, 0.03, 0.03, 0.03, 0.03)
self._mcp = mcp2210.Mcp2210(serial_number, immediate_gpio_update=False)
self.init_mcp2210_for_paper()
def close(self):
self._mcp._hid.close()
def init_mcp2210_for_paper(self):
self._mcp.configure_spi_timing(chip_select_to_data_delay=0,
last_data_byte_to_cs=0,
delay_between_bytes=0)
self.clear_mcp2210_gpio_status()
for pin in self._cs_pins:
self._mcp.set_gpio_output_value(pin, True)
self._mcp.set_gpio_direction(self._pin_busy,
mcp2210.Mcp2210GpioDirection.INPUT)
self._mcp.set_gpio_output_value(self._pin_rst, True)
self._mcp.set_gpio_output_value(self._pin_com_led, True)
self._mcp.gpio_update()
def clear_mcp2210_gpio_status(self):
for pin_number in range(9):
self._mcp.set_gpio_designation(pin_number,
mcp2210.Mcp2210GpioDesignation.GPIO)
self._mcp.set_gpio_direction(pin_number,
mcp2210.Mcp2210GpioDirection.OUTPUT)
self._mcp.set_gpio_output_value(pin_number, False)
def power_on(self):
self._mcp.set_gpio_output_value(self._pin_pwr, True)
self._mcp.gpio_update()
time.sleep(0.2)
def reset(self):
reset_value = True
for wait_time in self._reset_wait_times:
self._mcp.set_gpio_output_value(self._pin_rst, reset_value)
self._mcp.gpio_update()
reset_value = not reset_value
time.sleep(wait_time)
self.wait_until_not_busy()
def power_off(self):
self._mcp.set_gpio_output_value(self._pin_rst, False)
self._mcp.set_gpio_output_value(self._pin_dc, False)
self._mcp.set_gpio_output_value(self._pin_pwr, False)
self._mcp.gpio_update()
def wait_until_not_busy(self):
start_time = time.time()
logger.debug("Start waiting (busy=%s)"
% self._mcp.get_gpio_value(self._pin_busy))
while(self._mcp.get_gpio_value(self._pin_busy) == False):
time.sleep(0.005)
logger.debug("End waiting (wait time=%s seconds)"
% (time.time() - start_time))
def select_driver_chips(self, chip_numbers: Iterable[int]):
bits = 0
for chip in chip_numbers:
bits |= 1 << self._cs_pins[chip]
self._cs_pins_selected = bits
def send_command_code(self, command_code: int):
self._mcp.set_gpio_output_value(self._pin_dc, False)
self._mcp.gpio_update()
self.my_spi_exchange(bytes([command_code]))
def send_data_bytes(self, data_bytes: bytes):
self._mcp.set_gpio_output_value(self._pin_dc, True)
self._mcp.gpio_update()
self.my_spi_exchange(data_bytes)
def on_command_start(self):
for pin in range(9):
if self._cs_pins_selected & (1 << pin):
self._mcp.set_gpio_output_value(pin, False)
self._mcp.gpio_update()
def on_command_end(self):
for pin in range(9):
if self._cs_pins_selected & (1 << pin):
self._mcp.set_gpio_output_value(pin, True)
self._mcp.gpio_update()
TRANSFER_SPI_DATA = 0x42
SUCCESS = 0x00
SPI_DATA_NOT_ACCEPTED = 0xF7
TRANSFER_IN_PROGRESS = 0xF8
SPI_TRANSFER_COMPLETE = 0x10
SPI_TRANSFER_PENDING_NO_RECEIVED_DATA = 0x20
SPI_TRANSFER_PENDING_RECEIVED_DATA_AVAILABLE = 0x30
def my_spi_exchange(self, payload: bytes) -> bytes:
mcp = self._mcp
mcp._spi_settings.active_chip_select_value = 0x01FF
mcp._spi_settings.transfer_size = len(payload)
mcp._set_spi_configuration()
chunked_payload = []
for i in range(math.ceil(len(payload) / 60)):
start_index = i * 60
stop_index = (i + 1) * 60
chunk = bytes(payload[start_index:stop_index])
chunked_payload.append(chunk)
chunk_index = 0
received_data = []
while 1:
if chunk_index == len(chunked_payload):
next_chunk = b''
else:
next_chunk = chunked_payload[chunk_index]
request = [EPaperBridgeMCP2210.TRANSFER_SPI_DATA, len(next_chunk), 0x00, 0x00]
response = mcp._execute_command(bytes(request) + next_chunk, check_return_code=False)
if response[1] == EPaperBridgeMCP2210.SPI_DATA_NOT_ACCEPTED:
raise mcp2210.Mcp2210SpiBusLockedException
elif response[1] == EPaperBridgeMCP2210.TRANSFER_IN_PROGRESS:
time.sleep(0.005)
continue
elif response[1] == EPaperBridgeMCP2210.SUCCESS:
chunk_index += 1
receive_data_size = response[2]
spi_transfer_status = response[3]
if spi_transfer_status == EPaperBridgeMCP2210.SPI_TRANSFER_PENDING_NO_RECEIVED_DATA:
continue
elif spi_transfer_status == EPaperBridgeMCP2210.SPI_TRANSFER_PENDING_RECEIVED_DATA_AVAILABLE:
received_data.append(response[4:receive_data_size + 4])
continue
elif spi_transfer_status == EPaperBridgeMCP2210.SPI_TRANSFER_COMPLETE:
received_data.append(response[4:receive_data_size + 4])
break
else:
raise mcp2210.Mcp2210CommandFailedException("Encountered unknown SPI transfer status")
else:
raise mcp2210.Mcp2210CommandFailedException("Received return code 0x{:02X} from device".format(response[1]))
combined_receive_data = b''.join(bytes(x) for x in received_data)
if len(combined_receive_data) != len(payload):
raise RuntimeError("Length of receive data does not match transmit data")
return combined_receive_data
class EPaperDisplayWaveshare(ABC):
"Waveshare製EPaperディスプレイモジュール(HAT)の基底クラス"
def __init__(self,
bridge: EPaperBridge,
width: int, height: int, bits_per_pixel: int,
palette: bytes, default_pixel_value: int,
all_driver_chip_numbers: Sequence[int]):
self._bridge = bridge
self._width = width
self._height = height
self._bits_per_pixel = bits_per_pixel
self._palette = palette
self._default_pixel_value = default_pixel_value
self._all_driver_chip_numbers = all_driver_chip_numbers
self.init()
def init(self):
try:
self._bridge.power_on()
self._bridge.reset()
self.init_driver_chips()
except Exception:
self._bridge.power_off()
raise
@abstractmethod
def init_driver_chips(self):
...
def shutdown(self):
try:
self.shutdown_driver_chips()
finally:
self._bridge.power_off()
def shutdown_driver_chips(self):
logger.debug("Shutdown driver chips")
self._bridge.wait_until_not_busy()
self.select_all_driver_chips()
self._bridge.send_command(0x07, 0xA5)
time.sleep(2)
def power_on_panel(self):
self._bridge.send_command(0x04)
self._bridge.wait_until_not_busy()
def power_off_panel(self):
self._bridge.send_command(0x02, 0x00)
self._bridge.wait_until_not_busy()
def select_all_driver_chips(self):
"""コマンドの送信先を全ての駆動チップとします。"""
self._bridge.select_driver_chips(self._all_driver_chip_numbers)
def select_driver_chip(self, driver_chip_number):
"""コマンドの送信先を指定されたチップのみとします。"""
self._bridge.select_driver_chips([driver_chip_number])
@abstractmethod
def set_frame_bytes(self, frame_bytes: bytes):
...
@property
def width(self) -> int:
"""パネル全体の水平方向のピクセル数です。"""
return self._width
@property
def height(self) -> int:
"""パネル全体の垂直方向のピクセル数です。"""
return self._height
@property
def default_pixel_value(self) -> int:
"""デフォルトのピクセル値です。基本的に「白」を意味する値です。"""
return self._default_pixel_value
@property
def frame_bits_per_pixel(self) -> int:
"""ピクセルあたりのビット数です。基本的に8以下の値です。"""
return self._bits_per_pixel
@property
def frame_pixels_per_byte(self) -> int:
"""1バイトあたりのピクセル数です。"""
return 8 // self._bits_per_pixel
def make_filled_frame_byte(self, pixel_value: int):
"""1バイトの中を指定されたpixel_valueで満たしたものを返します。"""
bpp = self.frame_bits_per_pixel
pixel_value = pixel_value & ((1 << bpp) - 1)
frame_byte = 0
bitpos = 8 - bpp
while bitpos >= 0:
frame_byte |= pixel_value << bitpos
bitpos = bitpos - bpp
return frame_byte
@property
def frame_line_nbytes(self) -> int:
"""フレームバッファの1行のバイト数を返します。"""
return ((self._width + self.frame_pixels_per_byte - 1) //
self.frame_pixels_per_byte)
@property
def frame_nbytes(self) -> int:
"""フレームバッファのバイト数を返します。"""
return self.frame_line_nbytes * self.height
def fill_frame_with_byte(self, frame_byte: int):
"""フレームの全バイトをframe_byteで満たします。"""
self.set_frame_bytes(bytes([frame_byte]) * self.frame_nbytes)
def fill_frame(self, pixel_value: int):
"""フレームの全ピクセルをpixel_valueで塗りつぶします。"""
self.fill_frame_with_byte(self.make_filled_frame_byte(pixel_value))
def clear_frame(self):
"""フレームの全ピクセルを白一色で塗りつぶします。"""
self.fill_frame(self.default_pixel_value)
def frame_image_palette(self) -> ImagePalette.ImagePalette:
return ImagePalette.ImagePalette("RGB", self._palette)
def convert_image_to_frame_bytes(self, image: Image.Image) -> bytes:
"""imageをselfの仕様に合わせたフレームバイト列へ変換します。"""
palette_image = Image.new("P", (1, 1))
palette_image.putpalette(self.frame_image_palette())
quantized_image = image.convert("RGB").quantize(palette=palette_image)
src_bytes = quantized_image.getdata()
src_w, src_h = quantized_image.size
return bytes(EPaperDisplayWaveshare.convert_pimage_bytes_to_frame_bytes(
src_bytes, src_w, 0, 0, src_w, src_h,
self.width, self.height, self.frame_bits_per_pixel))
@staticmethod
def convert_pimage_bytes_to_frame_bytes(
src_bytes,
src_pitch: int,
src_x: int, src_y: int, src_w: int, src_h: int,
dst_w: int, dst_h: int, dst_bits_per_pixel: int) -> bytearray:
scan_w = min(src_w, dst_w)
scan_h = min(src_h, dst_h)
dst_line_nbytes = (dst_w // (8 // dst_bits_per_pixel))
dst_pitch = dst_line_nbytes
dst_bytes = bytearray(dst_line_nbytes * dst_h)
for y in range(scan_h):
src = src_pitch * (src_y + y) + src_x
dst = dst_pitch * y
dst_bitpos = 8
dst_byte = 0
dst_bitmask = (1 << dst_bits_per_pixel) - 1
for x in range(scan_w):
dst_bitpos -= dst_bits_per_pixel
dst_byte |= ((int(src_bytes[src + x]) & dst_bitmask)
<< dst_bitpos)
if dst_bitpos < dst_bits_per_pixel:
dst_bytes[dst] = dst_byte
dst = dst + 1
dst_bitpos = 8
dst_byte = 0
if dst_bitpos < 8:
dst_bytes[dst] = dst_byte
return dst_bytes
def update_panel(self):
"""現在のフレームをディスプレイの表示に反映します。"""
logger.debug("Refresh Start")
self.select_all_driver_chips()
self.power_on_panel()
self._bridge.send_command(0x12, 0x00)
self._bridge.wait_until_not_busy()
self.power_off_panel()
logger.debug("Refresh End")
def show_image(self, image: Image.Image):
"""imageを表示します。"""
self.set_frame_bytes(self.convert_image_to_frame_bytes(image))
self.update_panel()
class EPaperDisplayWaveshare3in5G(EPaperDisplayWaveshare):
"""Waveshare 3.5インチ(G)"""
def __init__(self, bridge):
super().__init__(
bridge = bridge,
width = 184,
height = 384,
bits_per_pixel = 2,
palette = bytes((0,0,0)+(255,255,255)+(255,255,0)+(255,0,0)+
(0,0,0)*252),
default_pixel_value = 1,
all_driver_chip_numbers = [0]
)
def init_driver_chips(self):
self._bridge.send_command(0x66, 0x49, 0x55, 0x13, 0x5D, 0x05, 0x10)
self._bridge.send_command(0x4D, 0x78)
self._bridge.send_command(0x00, 0x0F, 0x29)
self._bridge.send_command(0x01, 0x07, 0x00)
self._bridge.send_command(0x03, 0x10, 0x54, 0x44)
self._bridge.send_command(0x06,
0x0F, 0x0A, 0x2F, 0x25, 0x22, 0x2E, 0x21)
self._bridge.send_command(0x50, 0x37)
self._bridge.send_command(0x60, 0x02, 0x02)
self._bridge.send_command(0x61,
self._width>>8, self._width&255,
self._height>>8, self._height&255)
self._bridge.send_command(0xE7, 0x1C)
self._bridge.send_command(0xE3, 0x22)
self._bridge.send_command(0xB6, 0x6F)
self._bridge.send_command(0xB4, 0xD0)
self._bridge.send_command(0xE9, 0x01)
self._bridge.send_command(0x30, 0x08)
def set_frame_bytes(self, frame_bytes: bytes):
"""frame_bytesを電子ペーパーに転送します(表示しない)。"""
self._bridge.send_command(0x10, frame_bytes)
class EPaperDisplayWaveshare13in3E(EPaperDisplayWaveshare):
"""Waveshare 13.3インチ(E)"""
def __init__(self, bridge):
super().__init__(
bridge = bridge,
width = 1200,
height = 1600,
bits_per_pixel = 4,
palette = bytes((0,0,0)+
(255,255,255)+
(255,255,0)+
(255,0,0)+
(0,0,0)+
(0,0,255)+
(0,255,0)+
(0,0,0)*249),
default_pixel_value = 1,
all_driver_chip_numbers = [0, 1]
)
def init_driver_chips(self):
self.select_driver_chip(0)
self._bridge.send_command(0x74, 0xC0, 0x1C, 0x1C, 0xCC,
0xCC, 0xCC, 0x15, 0x15, 0x55)
self.select_all_driver_chips()
self._bridge.send_command(0xF0, 0x49, 0x55, 0x13, 0x5D, 0x05, 0x10)
self._bridge.send_command(0x00, 0xDF, 0x69)
self._bridge.send_command(0x50, 0xF7)
self._bridge.send_command(0x60, 0x03, 0x03)
self._bridge.send_command(0x86, 0x10)
self._bridge.send_command(0xE3, 0x22) self._bridge.send_command(0xE0, 0x01)
self._bridge.send_command(0x61, 0x04, 0xB0, 0x03, 0x20) self.select_driver_chip(0)
self._bridge.send_command(0x01, 0x0F, 0x00, 0x28, 0x2C, 0x28, 0x38)
self._bridge.send_command(0xB6, 0x07)
self._bridge.send_command(0x06, 0xE8, 0x28)
self._bridge.send_command(0xB7, 0x01)
self._bridge.send_command(0x05, 0xE8, 0x28)
self._bridge.send_command(0xB0, 0x01)
self._bridge.send_command(0xB1, 0x02)
def set_frame_bytes(self, frame_bytes: bytes):
"""frame_bytesを電子ペーパーに転送します(表示しない)。"""
line_pitch = self.frame_line_nbytes
line_half = self.frame_line_nbytes // 2
height = self.height
frame_nbytes = self.frame_nbytes
self.select_driver_chip(0)
self._bridge.on_command_start()
self._bridge.send_command_code(0x10)
for y in range(height):
print(f"\r{line_half*y}/{frame_nbytes}", end="", flush=True)
self._bridge.send_data(
frame_bytes[y * line_pitch :
y * line_pitch + line_half])
self._bridge.on_command_end()
self.select_driver_chip(1)
self._bridge.on_command_start()
self._bridge.send_command_code(0x10)
for y in range(height):
print(f"\r{line_half*(height+y)}/{frame_nbytes}", end="", flush=True)
self._bridge.send_data(
frame_bytes[y * line_pitch + line_half :
y * line_pitch + line_half + line_half])
self._bridge.on_command_end()
print(f"\r{frame_nbytes}/{frame_nbytes}", flush=True)
def main():
parser = argparse.ArgumentParser(
prog="epaper_print",
usage="python epaper_print.py [options]")
parser.add_argument("filename", help="image file name")
args = parser.parse_args()
logger.debug("Load image")
im = Image.open(args.filename)
logger.debug("Rotate image")
im = im.rotate(-90, expand=True)
logger.debug("Create bridge")
bridge = EPaperBridgeMCP2210()
logger.debug("Create EPaperDisplay")
epaper = EPaperDisplayWaveshare13in3E(bridge)
try:
logger.debug("Resize image")
im = im.resize((epaper.width, epaper.height))
logger.debug("Show image")
epaper.show_image(im)
finally:
logger.debug("Shutdown")
try:
epaper.shutdown()
finally:
bridge.close()
logger.debug("End")
if __name__ == "__main__":
main()
前回の3.5インチモジュールとの一番の違いは、画面が左右二つに分割されていて制御するチップも二つに分かれていることです。そのためCS信号は二つ(CS_MとCS_S)あります。どちらかを選んでコマンドを送信することもありますし、二つのチップに同時に送信することもあります。詳しくはDouble-IC Programming Analysisを参照してください。
また、6色ディスプレイなので画像データの形式も違います。有効なピクセル値は 0:黒, 1:白, 2:黄, 3:赤, 4:青, 5:緑 の6通りです。1ピクセル4ビットで1バイトに2ピクセル入ります(左詰)。パネル全体の解像度は1200×1600ですが、上述のように左右で分割されているため600×1600を2回に分けて(CSを変えて)送信する必要があります。全体のバイト数は1200×1600/2=960000バイトになります。
細かい点では、初期化シーケンス、リセットタイミング、CS信号の出し方にも違いがありました。パネルの電源のON/OFFタイミングもサンプルコードレベルでは違いましたが、3.5インチでも13.3インチのやり方で問題なかったので13.3インチのやり方で統一しました(リフレッシュ前後でON/OFFする)。CS信号の出し方はmcp2210-pythonライブラリでは対応できない部分があったので、上のソースコードでは無理矢理解決しています。