summaryrefslogtreecommitdiffstatshomepage
path: root/tests/extmod/vfs_lfs_corrupt.py
blob: 0365c0c23f3c8e40a349fd6141c63e18e2327933 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# Test for VfsLittle using a RAM device, testing error handling from corrupt block device

try:
    import vfs

    vfs.VfsLfs1
    vfs.VfsLfs2
except (ImportError, AttributeError):
    print("SKIP")
    raise SystemExit


class RAMBlockDevice:
    ERASE_BLOCK_SIZE = 1024

    def __init__(self, blocks):
        self.data = bytearray(blocks * self.ERASE_BLOCK_SIZE)
        self.ret = 0

    def readblocks(self, block, buf, off):
        addr = block * self.ERASE_BLOCK_SIZE + off
        for i in range(len(buf)):
            buf[i] = self.data[addr + i]
        return self.ret

    def writeblocks(self, block, buf, off):
        addr = block * self.ERASE_BLOCK_SIZE + off
        for i in range(len(buf)):
            self.data[addr + i] = buf[i]
        return self.ret

    def ioctl(self, op, arg):
        if op == 4:  # block count
            return len(self.data) // self.ERASE_BLOCK_SIZE
        if op == 5:  # block size
            return self.ERASE_BLOCK_SIZE
        if op == 6:  # erase block
            return 0


def corrupt(bdev, block):
    addr = block * bdev.ERASE_BLOCK_SIZE
    for i in range(bdev.ERASE_BLOCK_SIZE):
        bdev.data[addr + i] = i & 0xFF


def create_vfs(bdev, vfs_class):
    bdev.ret = 0
    vfs_class.mkfs(bdev)
    fs = vfs_class(bdev)
    with fs.open("f", "w") as f:
        for i in range(100):
            f.write("test")
    return fs


def test(bdev, vfs_class):
    print("test", vfs_class)

    # statvfs
    fs = create_vfs(bdev, vfs_class)
    corrupt(bdev, 0)
    corrupt(bdev, 1)
    try:
        print(fs.statvfs(""))
    except OSError:
        print("statvfs OSError")

    # error during read
    fs = create_vfs(bdev, vfs_class)
    f = fs.open("f", "r")
    bdev.ret = -5  # EIO
    try:
        f.read(10)
    except OSError:
        print("read OSError")

    # error during write
    fs = create_vfs(bdev, vfs_class)
    f = fs.open("f", "a")
    bdev.ret = -5  # EIO
    try:
        f.write("test")
    except OSError:
        print("write OSError")

    # error during close
    fs = create_vfs(bdev, vfs_class)
    f = fs.open("f", "w")
    f.write("test")
    bdev.ret = -5  # EIO
    try:
        f.close()
    except OSError:
        print("close OSError")

    # error during flush
    fs = create_vfs(bdev, vfs_class)
    f = fs.open("f", "w")
    f.write("test")
    bdev.ret = -5  # EIO
    try:
        f.flush()
    except OSError:
        print("flush OSError")
    bdev.ret = 0
    f.close()


bdev = RAMBlockDevice(30)
test(bdev, vfs.VfsLfs1)
test(bdev, vfs.VfsLfs2)