Skip to content

Commit

Permalink
Let's add type hints
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinthecheung committed Dec 25, 2021
1 parent 5c294e1 commit 9a93583
Show file tree
Hide file tree
Showing 6 changed files with 336 additions and 262 deletions.
12 changes: 7 additions & 5 deletions 8080exer.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,22 @@

import datetime
import time

from virtual8080 import Virtual8080
from virtual_device import VirtualDevice


class StubIO:
class StubIO(VirtualDevice):

def get_input(self, port_addr):
def get_input(self, port_addr: int) -> int:
return 0

def send_output(self, port_addr, val):
ch = val & 0b01111111
def send_output(self, port_addr: int, value: int) -> None:
ch = value & 0b01111111
print(bytes([ch]).decode(encoding='ascii'), end='', flush=True)


def run(program_file, bdos_file):
def run(program_file: str, bdos_file: str) -> None:
vm = Virtual8080()
vm.io = StubIO()

Expand Down
27 changes: 16 additions & 11 deletions altair_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,21 @@


import argparse
from typing import Optional

from kbhit import KBHit

from virtual8080 import Virtual8080
from virtual_device import VirtualDevice


class AltairWithTerminal:
class AltairWithTerminal(VirtualDevice):

def __init__(self):
self.input_buffer = b''
self.output_char = -1
self.input_buffer: bytes = b''
self.output_char: int = -1

def get_input(self, port_addr):
def get_input(self, port_addr: int) -> int:
if port_addr == 0xff:
# Altair "sense switches". 0b0000xxxx means the console is on a
# 2SIO configured with the console status/control register on port
Expand All @@ -61,29 +65,30 @@ def get_input(self, port_addr):
ch = self.input_buffer[0]
self.input_buffer = self.input_buffer[1:]
return ch
return 0 # Okay?
else:
return 0

def send_output(self, port_addr, val):
def send_output(self, port_addr: int, value: int) -> None:
if port_addr == 0x10:
# Control register
pass
elif port_addr == 0x11:
# I/O register
self.output_char = val & 0b01111111
self.output_char = value & 0b01111111


def console_run(program_file, autorun_file=None, init_str=''):
def console_run(program_file: str, autorun_file: Optional[str] = None, init_str: str = ''):
vm = Virtual8080()
vm.io = AltairWithTerminal()
with open(program_file, 'rb') as f:
program = f.read()
with open(program_file, 'rb') as pf:
program = pf.read()
vm.load(program)

if autorun_file is not None:
init_buffer = init_str
with open(autorun_file, 'r') as f:
for line in f.readlines():
with open(autorun_file, 'r') as af:
for line in af.readlines():
init_buffer += line.replace('\n', '\r')
vm.io.input_buffer = init_buffer.encode(encoding='ascii')

Expand Down
92 changes: 52 additions & 40 deletions cpm.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,29 +30,34 @@

import argparse
import os
from typing import Dict, List, Optional, Tuple

os.environ['PYGAME_HIDE_SUPPORT_PROMPT'] = '1'
import pygame
from pygame.locals import *
from pygame.constants import KEYDOWN, KMOD_CTRL, KMOD_SHIFT, QUIT
from pygame.font import Font
from pygame.rect import Rect
from pygame.surface import Surface

from virtual8080 import Virtual8080
from virtual_device import VirtualDevice
from cpm_disk import CPM_Disk


class CPM_Machine:
class CPM_Machine(VirtualDevice):

def __init__(self, vm, disk_images=[]):
self.vm = vm
def __init__(self, vm: Virtual8080, disk_images: List[str] = []):
self.vm: Virtual8080 = vm

self.input_buffer = b''
self.output_char = -1
self.input_buffer: bytes = b''
self.output_char: int = -1

self.dma_addr = 0
self.disk_controller_error = 0
self.dma_addr: int = 0
self.disk_controller_error: int = 0

self.current_drive = 0
self.drive_status = [{'track': 0, 'sector': 0} for _ in range(16)]
self.disk_image = [None for _ in range(16)]
self.current_drive: int = 0
self.drive_status: List[Dict[str, int]] = [{'track': 0, 'sector': 0} for _ in range(16)]
self.disk_image: List[Optional[CPM_Disk]] = [None for _ in range(16)]
for i, fn in enumerate(disk_images):
if fn is not None:
self.drive_status[i] = {'track': 0, 'sector': 0}
Expand All @@ -62,16 +67,22 @@ def __init__(self, vm, disk_images=[]):
self.vm.load(boot_sector)


def read_disk(self, drive, track, sector):
return self.disk_image[drive].get_sector(track, sector)
def read_disk(self, drive_num: int, track: int, sector: int) -> bytes:
drive = self.disk_image[drive_num]
if drive is not None:
return drive.get_sector(track, sector)
else:
raise ValueError


def write_disk(self, drive, track, sector, sector_data):
self.disk_image[drive].set_sector(track, sector, sector_data)
self.disk_image[drive].save_image()
def write_disk(self, drive_num: int, track: int, sector: int, sector_data: bytes) -> None:
drive = self.disk_image[drive_num]
if drive is not None:
drive.set_sector(track, sector, sector_data)
drive.save_image()


def get_input(self, port_addr):
def get_input(self, port_addr: int) -> Optional[int]:
if port_addr == 0:
# Console status
if len(self.input_buffer) > 0:
Expand All @@ -86,7 +97,7 @@ def get_input(self, port_addr):
return c
else:
self.vm.registers['pc'] -= 2 # Loop again with same PC
return
return None
elif port_addr == 2:
# List device status
return 1 # Ready
Expand All @@ -96,18 +107,18 @@ def get_input(self, port_addr):
raise Exception


def send_output(self, port_addr, val):
def send_output(self, port_addr: int, value: int) -> None:
if port_addr == 1:
# Console output
self.output_char = val
self.output_char = value
return
elif port_addr == 3:
# List device output
print(chr(val), end='', flush=True)
print(chr(value), end='', flush=True)
return
elif port_addr == 0xf9:
# Disk read/write commands
if val == 0:
if value == 0:
# Read current drive/track/sector into [dma]
if self.disk_image[self.current_drive] is not None:
data = self.read_disk(self.current_drive,
Expand All @@ -119,10 +130,11 @@ def send_output(self, port_addr, val):
else:
self.disk_controller_error = 0xff # Error
return
elif val == 1:
elif value == 1:
# Write [dma] to current drive/track/sector
if self.disk_image[self.current_drive] is not None:
sector_size = self.disk_image[self.current_drive].sector_size
drive = self.disk_image[self.current_drive]
if drive is not None:
sector_size = drive.sector_size
sector_data = bytes(self.vm.memory[self.dma_addr:self.dma_addr+sector_size])
self.write_disk(self.current_drive,
self.drive_status[self.current_drive]['track'],
Expand All @@ -134,23 +146,23 @@ def send_output(self, port_addr, val):
return
elif port_addr == 0xfa:
# Disk drive select
self.current_drive = val
self.current_drive = value
return
elif port_addr == 0xfb:
# Track select
self.drive_status[self.current_drive]['track'] = val
self.drive_status[self.current_drive]['track'] = value
return
elif port_addr == 0xfc:
# Sector select
self.drive_status[self.current_drive]['sector'] = val
self.drive_status[self.current_drive]['sector'] = value
return
elif port_addr == 0xfd:
# Set DMA address high byte
self.dma_addr = (self.dma_addr & 0x00ff) | (val << 8)
self.dma_addr = (self.dma_addr & 0x00ff) | (value << 8)
return
elif port_addr == 0xfe:
# Set DMA address low byte
self.dma_addr = (self.dma_addr & 0xff00) | (val & 0x00ff)
self.dma_addr = (self.dma_addr & 0xff00) | (value & 0x00ff)
return
raise Exception

Expand Down Expand Up @@ -184,20 +196,20 @@ class CPM_TTY:
ord('/'): ord('?'),
}

def __init__(self, disk_images=[]):
self.disk_images = disk_images
def __init__(self, disk_images: List[str] = []):
self.disk_images: List[str] = disk_images

self.buffer = bytearray([32 for _ in range(80 * 24)])
self.cursor = 0
self.esc_sequence = b''
self.buffer: bytearray = bytearray([32 for _ in range(80 * 24)])
self.cursor: int = 0
self.esc_sequence: bytes = b''

pygame.init()
pygame.display.set_caption('8080 Emulator')
self.screen = pygame.display.set_mode((80 * 10 + 10, 24 * 20 + 10))
self.font = pygame.font.Font('ter-u20n.bdf', 20)
self.screen: Surface = pygame.display.set_mode((80 * 10 + 10, 24 * 20 + 10))
self.font: Font = pygame.font.Font('ter-u20n.bdf', 20)


def putch(self, ch):
def putch(self, ch: int) -> None:
## ADM-3A cursor control
ch = ch & 0b01111111
if ch == 27: # Esc
Expand Down Expand Up @@ -240,7 +252,7 @@ def putch(self, ch):
print(chr(ch & 0x7f), end='', flush=True)


def render_buffer(self):
def render_buffer(self) -> List[Tuple[Surface, Rect]]:
img = self.font.render('█', False, self.foreground)
col = self.cursor % 80
row = self.cursor // 80
Expand All @@ -258,7 +270,7 @@ def render_buffer(self):
return blits


def run(self):
def run(self) -> None:
vm = Virtual8080()
vm.io = CPM_Machine(vm, self.disk_images)

Expand Down
31 changes: 20 additions & 11 deletions cpm_disk.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,31 @@
#
# For more information, please refer to <https://unlicense.org>

from typing import List, Optional, Union


class CPM_Disk:

def __init__(self, image_file, sector_size, sectors_per_track, num_tracks, skew, write_protect=False):
self.image_file = image_file
self.sector_size = sector_size
self.sectors_per_track = sectors_per_track
self.write_protect = write_protect
def __init__(self,
image_file: str,
sector_size: int,
sectors_per_track: int,
num_tracks: int,
skew: Union[int, List[int]],
write_protect: bool = False):
self.image_file: str = image_file
self.sector_size: int = sector_size
self.sectors_per_track: int = sectors_per_track
self.write_protect: bool = write_protect
self.disk_tracks: List[List[bytes]] = []
self.skew_table: List[int]

self.disk_tracks = []
with open(image_file, 'rb') as f:
disk_bytes = f.read()
total_bytes = sector_size * sectors_per_track * num_tracks
disk_bytes += bytes([0xe5 for _ in range(total_bytes - len(disk_bytes))])
for _ in range(num_tracks):
tmp_track = []
tmp_track: List[bytes] = []
for _ in range(sectors_per_track):
tmp_sect = disk_bytes[0:sector_size]
disk_bytes = disk_bytes[sector_size:]
Expand All @@ -51,18 +60,18 @@ def __init__(self, image_file, sector_size, sectors_per_track, num_tracks, skew,
self.skew_table = skew


def get_sector(self, track, sector):
def get_sector(self, track: int, sector: int) -> bytes:
raw_sector = self.skew_table[sector - 1] - 1
return self.disk_tracks[track][raw_sector]


def set_sector(self, track, sector, sector_data):
def set_sector(self, track: int, sector: int, sector_data: bytes) -> None:
if not self.write_protect:
raw_sector = self.skew_table[sector - 1] - 1
self.disk_tracks[track][raw_sector] = sector_data


def save_image(self, image_file=None):
def save_image(self, image_file: Optional[str] = None) -> None:
if image_file is None:
image_file = self.image_file
with open(image_file, 'wb') as f:
Expand All @@ -71,7 +80,7 @@ def save_image(self, image_file=None):
f.write(sector)


def make_skew_table(num_sectors, skew_factor):
def make_skew_table(num_sectors: int, skew_factor: int) -> List[int]:
skew_table = [0]
while len(skew_table) < num_sectors:
sec = (skew_table[-1] + skew_factor) % num_sectors
Expand Down

0 comments on commit 9a93583

Please sign in to comment.