操作系统模型:简化实现

def main():
    sys_write('Hello, OS World\n')  # sys_write -> print
count = 0

def Tprint(name):
    global count
    for i in range(3):
        count += 1
        sys_write(f'#{count:02} Hello from {name}{i+1}\n')
        sys_sched()

def main():
    n = sys_choose([3, 4, 5])
    sys_write(f'#Thread = {n}\n')
    for name in 'ABCDE'[:n]:
        sys_spawn(Tprint, name)
    sys_sched()
#!/usr/bin/env python3

import sys
import random
from pathlib import Path

class OperatingSystem():
    """A minimal executable operating system model."""

    SYSCALLS = ['choose', 'write', 'spawn', 'sched']

    class Thread:
        """A "freezed" thread state."""

        def __init__(self, func, *args):
            self._func = func(*args)
            self.retval = None

        def step(self):
            """Proceed with the thread until its next trap."""
            syscall, args, *_ = self._func.send(self.retval)
            self.retval = None
            return syscall, args

    def __init__(self, src):
        variables = {}
        exec(src, variables)
        self._main = variables['main']

    def run(self):
        threads = [OperatingSystem.Thread(self._main)]
        while threads:  # Any thread lives
            try:
                match (t := threads[0]).step():
                    case 'choose', xs:  # Return a random choice
                        t.retval = random.choice(xs)
                    case 'write', xs:  # Write to debug console
                        print(xs, end='')
                    case 'spawn', (fn, args):  # Spawn a new thread
                        threads += [OperatingSystem.Thread(fn, *args)]
                    case 'sched', _:  # Non-deterministic schedule
                        random.shuffle(threads)
            except StopIteration:  # A thread terminates
                threads.remove(t)
                random.shuffle(threads)  # sys_sched()

if __name__ == '__main__':
    if len(sys.argv) < 2:
        print(f'Usage: {sys.argv[0]} file')
        exit(1)

    src = Path(sys.argv[1]).read_text()
    for syscall in OperatingSystem.SYSCALLS:
        src = src.replace(f'sys_{syscall}',        # sys_write(...)
                          f'yield "{syscall}", ')  #  -> yield 'write', (...)

    OperatingSystem(src).run()#!/usr/bin/env python3

import sys
import random
from pathlib import Path

class OperatingSystem():
    """A minimal executable operating system model."""

    SYSCALLS = ['choose', 'write', 'spawn', 'sched']

    class Thread:
        """A "freezed" thread state."""

        def __init__(self, func, *args):
            self._func = func(*args)
            self.retval = None

        def step(self):
            """Proceed with the thread until its next trap."""
            syscall, args, *_ = self._func.send(self.retval)
            self.retval = None
            return syscall, args

    def __init__(self, src):
        variables = {}
        exec(src, variables)
        self._main = variables['main']

    def run(self):
        threads = [OperatingSystem.Thread(self._main)]
        while threads:  # Any thread lives
            try:
                match (t := threads[0]).step():
                    case 'choose', xs:  # Return a random choice
                        t.retval = random.choice(xs)
                    case 'write', xs:  # Write to debug console
                        print(xs, end='')
                    case 'spawn', (fn, args):  # Spawn a new thread
                        threads += [OperatingSystem.Thread(fn, *args)]
                    case 'sched', _:  # Non-deterministic schedule
                        random.shuffle(threads)
            except StopIteration:  # A thread terminates
                threads.remove(t)
                random.shuffle(threads)  # sys_sched()

if __name__ == '__main__':
    if len(sys.argv) < 2:
        print(f'Usage: {sys.argv[0]} file')
        exit(1)

    src = Path(sys.argv[1]).read_text()
    for syscall in OperatingSystem.SYSCALLS:
        src = src.replace(f'sys_{syscall}',        # sys_write(...)
                          f'yield "{syscall}", ')  #  -> yield 'write', (...)

    OperatingSystem(src).run()
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>

static pthread_t threads[64];
static int nthreads = 0;

static inline void
sys_write(const char *s) {
  printf("%s", s);
  fflush(stdout);
}

static inline void
sys_sched() {
  usleep(rand() % 10000);
}

static inline void
sys_spawn(void *(*fn)(void *), void *args) {
    pthread_create(&threads[nthreads++], NULL, fn, args);
}

static inline int
sys_choose(int x) {
  return rand() % x;
}

// Constructor called before main()
static inline void __attribute__((constructor))
srand_init() {
  srand(time(0));
}

// Destructor called after main()
static inline void __attribute__((destructor))
thread_join() {
  for (int i = 0; i < nthreads; i++) {
    pthread_join(threads[i], NULL);  // Wait for thread terminations
  }
}
#include "os-real.h"

int count = 0;

void *Tprint(void *s) {
  char buf[64];
  for (int i = 0; i < 3; i++) {
    sprintf(buf, "#%02d Hello from %c%d\n", ++count, *(const char *)s, i);
    sys_write(buf);
    sys_sched();
  }
  return NULL;
}

int main() {
  int n = sys_choose(3) + 3;
  char buf[64];
  sprintf(buf, "#Thread = %d\n", n);
  sys_write(buf);
  for (int i = 0; i < n; i++) {
    sys_spawn(Tprint, &"ABCDE"[i]);
  }
}
MODEL := os-model.py

run-hello:
	python3 $(MODEL) hello.py

run-threads:
	python3 $(MODEL) threads.py

想要理解这个 os-model.py 首先要了解 Python 中的 yield 关键字。

它可以保存函数的当前状态,向外部返回一个值,外部也可以使用send 方法向函数内部传递数据。

这个特点和线程切换时保存现场、恢复现场两个动作高度契合。

下面是一个生成器的demo。

def echo_generator():
    i = ''
    while True:
        received_value = yield i
        print("Received:", received_value)
        i += str(received_value)

它实现了一个简单的双向通信的生成器。让我逐步解释这个函数的功能和实现方式。

  1. 在生成器函数开始时,我们初始化一个空字符串 i,用于累积接收到的值。

  2. 进入 while True: 循环,这表示生成器会无限循环,每次迭代都会执行以下步骤。

  3. received_value = yield i:这一行是生成器的核心。它的作用是在生成器的每一次迭代中,首先向外部(调用者)生成值 i,然后暂停生成器的执行,等待外部通过 send() 方法传递一个值给生成器。

  4. print("Received:", received_value):当生成器恢复执行并接收到外部传递的值后,会打印出 “Received:” 以及接收到的值。这里只是简单地将接收到的值打印出来,实际应用中可以根据需要进行其他操作。

  5. i += str(received_value):这一行将接收到的值转换为字符串并累积到之前的 i 中。这样,每次迭代后,i 会持续累积之前所有接收到的值。

这个生成器的使用示例:

gen = echo_generator()
next(gen)          # 启动生成器,开始第一个迭代,此时 yield i 会生成初始的空字符串 i
print(gen.send(1)) # 输出: Received: 1,yield 向外部生成了空字符串 i,接收到了 1,i 变为 "1"
print(gen.send(2)) # 输出: Received: 2,i 变为 "12"
print(gen.send(3)) # 输出: Received: 3,i 变为 "123"

在这个示例中,生成器通过 yield 将累积的字符串 i 返回给调用者。调用者可以使用 send() 方法向生成器发送值,生成器接收到值后会执行相应的操作。这个示例只是演示生成器的基本思想,实际应用中可以根据需求进行更复杂的操作和逻辑。

需要注意的是,生成器函数需要先调用 next(gen) 来启动生成器的第一个迭代,之后才能使用 gen.send() 方法。


除此之外,os-model 的代码中还用到了 Python 3.10 后的一个语法特性:match

在 Python 3.10 版本中引入了 match 语法,它是一种用于模式匹配的语法。模式匹配是一种强大的编程概念,允许你根据数据的结构和内容来匹配和提取信息,从而编写更具可读性和维护性的代码。match 语法可以用于处理复杂的条件分支,取代了传统的 if-elif-else 结构。

以下是 match 语法的基本用法示例:

match expression:
    pattern_1 => expression_1,
    pattern_2 if condition => expression_2,
    ...
    pattern_n => expression_n

在这个语法中,你可以定义多个模式和对应的表达式。每个模式都会与给定的表达式进行匹配,如果匹配成功,将执行相应的表达式。如果没有任何模式匹配成功,会引发 MatchError 异常。

以下是一个具体的示例,展示了如何使用 match 语法来处理不同类型的数据:

def process_data(data):
    match data:
        case 0 => print("Zero")
        case x if x > 0 => print("Positive")
        case x if x < 0 => print("Negative")
        case _ => print("Unknown")

# 使用示例
process_data(10)  # 输出: Positive
process_data(-5)  # 输出: Negative
process_data(0)   # 输出: Zero
process_data(7.5) # 输出: Unknown

在这个例子中,match 语法根据不同的模式进行匹配,并且只会执行匹配成功的表达式。这样可以更清晰地表达不同情况下的处理逻辑。


接下来简要总结一下 os-model 的行为。

首先,os-model.py 的代码可以看作是操作系统内核空间的代码,而threads.py 可以看作是用户空间的代码。

在用户空间每一次执行系统调用后,进入内核空间执行内核代码。所以,用户态和内核态的切换依靠的是系统调用,也即 sys_*

在 os-model 中,首先将所有的用户空间的 sys_* 替换为 yield,这就实现了用户态和内核态的切换。

OperatingSystem的初始化过程中获取主线程,在run函数的执行中将主线程加入线程列表,然后调用线程的step函数,对于操作系统而言,线程的单步执行就是这个线程一直运行到它下一次进行系统调用。

在os-model 中,根据线程所执行的系统调用不同进入不同的逻辑分支,操作系统和用户代码之间的通信通过 yield 关键字来模拟。如果没有匹配到任何系统调用,则表示当前线程执行完毕,将其从线程列表中删除。