Peft Finetune LoRA Example

Steps

  1. Get a pretrained model from Huggingface
  2. Load the model and tokenizer
  3. Prepare the dataset
  4. Prepare the training arguments
  5. Prepare the trainer
  6. Add the model
  7. Train the model
  8. Save the model

Get a pretrained model from Huggingface

先加载模型和tokenizer

1
2
3
4
5
6
7
8
9
10
from transformers import AutoModelForCausalLM, AutoTokenizer

model_name = "Qwen/Qwen3-4B-Base"

model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype="auto", # 自动选择torch.float16或torch.bfloat16,不设置的话就会变成torch.float32
device_map="auto"
)
tokenizer = AutoTokenizer.from_pretrained(model_name)

对模型进行量化以减少显存占用

1
2
3
4
5
6
7
8
9
10
from transformers import BitsAndBytesConfig
quantization_config = BitsAndBytesConfig(
load_in_8bit=True,
bnb_8bit_compute_dtype=torch.float16,
)

quantization_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.float16,
)

很神奇的是8bit在L4上计算利用率不高49%,4bit利用率较高80%。bfloat16和float16的计算利用率更高。
另外,8bit和4bit的显存占用差不多,8bit 大约9G,4bit大约7.5G。

Anyway,我们现在用4bit量化来作为base model,可以减少一些显存占用。准备好了量化模型后我们可以开始配置LoRA了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training, TaskType

model = prepare_model_for_kbit_training(model)
lora_config = LoraConfig(
r=16,
lora_alpha=16,
target_modules=[
"q_proj",
"k_proj",
"v_proj",
"o_proj",
"gate_proj",
"up_proj",
"down_proj"
],
lora_dropout=0.05, # Dropout
bias="none", # 不为 bias 插入 LoRA
task_type=TaskType.CAUSAL_LM # 任务类型:因果语言模型
)
lora_model = get_peft_model(model, lora_config)
trainable = sum(p.numel() for p in lora_model.parameters() if p.requires_grad)
total = sum(p.numel() for p in lora_model.parameters())
print(f"LoRA 模型可训练参数: {trainable:,} / {total:,} ({100*trainable/total:.2f}%)")

这里的 prepare_model_for_kbit_training 将LayerNorm转换为float32以防止激活崩溃。同时会把output embedding层的梯度计算开启。输出嵌入层(lm_head 或者 linear 层映射到词表空间)往往关系到最终生成质量,打开它的梯度计算可以让模型学习新的词表分布或纠正偏差。

Get a dataset from the Hugging Face

1
2
from datasets import load_dataset
dataset = load_dataset("timdettmers/openassistant-guanaco", split="train")

Prepare the dataset

大部分的LLM都有自己的tokenizer和对应的chat template。下面是qwen3 apply chat template后的text

1
2
3
4
5
6
7
8
9
10
11
12
prompt = "Write a quick sort unsing assembly language"
messages = [
{"role": "user", "content": prompt},
{"role": "assistant", "content": "I have no idea about how to do it, could you ask another questions ?"},
{"role": "user", "content": "Write a bubble sort in rust"},
]
text = tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True,
enable_thinking=False
)
1
2
3
4
5
6
7
8
9
<|im_start|>user
Write a quick sort unsing assembly language<|im_end|>
<|im_start|>assistant
I have no idea about how to do it, could you ask another questions ?<|im_end|>
<|im_start|>user
Write a bubble sort in rust<|im_end|>
<|im_start|>assistant
<think>
</think>

现在看一下怎么处理data,主要是把prompt和response放到chat template里面。然后用tokenizer进行tokenize,然后对于prompt部分进行mask掉不计算loss。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def preprocess_fn(examples):
"""把 prompt/response 拼成对话模板,tokenize,并 mask 掉 prompt 部分"""
input_ids, attention_mask, labels = [], [], []

for prompt, response in zip(examples["prompt"], examples["response"]):
# 构造 messages 列表
messages = [
{"role": "user", "content": prompt},
{"role": "assistant", "content": response},
]
# 拼接成完整 text
text = tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True,
enable_thinking=False
)
# tokenize,这里用最大长度2048,这里其实有些浪费,因为很多回答比2048要短很多
tok = tokenizer(
text,
truncation=True,
padding="max_length",
max_length=2048,
)

ids = tok["input_ids"]
mask = tok["attention_mask"]

# 找到 </think> token 的位置,用它来分割 prompt vs. response
sep_id = tokenizer.convert_tokens_to_ids("</think>")
try:
sep_index = ids.index(sep_id) + 1
except ValueError:
sep_index = 0

# 看一下分割的位置对不对
# print(sep_index)
# print(tokenizer.decode(ids[sep_index:], skip_special_tokens=True))

# 构造 labels:prompt 前半段设为 -100,response 部分保留真实 id
lbl = ids.copy()
lbl[:sep_index] = [-100] * sep_index

input_ids.append(ids)
attention_mask.append(mask)
labels.append(lbl)

return {
"input_ids": input_ids,
"attention_mask": attention_mask,
"labels": labels,
}
small_train = ds["train"].select(range(10000))

tokenized_train = small_train.map(
preprocess_fn,
batched=True,
num_proc=4,
remove_columns=["id", "prompt", "response"],
)

在 PyTorch/Transformers 里,nn.CrossEntropyLoss(ignore_index=-100) 会对 label 等于 -100 的位置跳过,不参与梯度计算和 loss 叠加。

Exciting Training

这里我们训练的时候,使用了 bf16 混合精度训练,可以加速训练,并且减少显存占用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from transformers import (
AutoModelForCausalLM,
AutoTokenizer,
Trainer,
TrainingArguments,
)

training_args = TrainingArguments(
output_dir="./fine-tuned-model",
per_device_train_batch_size=2,
gradient_accumulation_steps=4,
num_train_epochs=3,
learning_rate=2e-5,
bf16=True,
logging_steps=100,
save_steps=500,
save_total_limit=3,
)

trainer = Trainer(
model=lora_model,
args=training_args,
train_dataset=tokenized_train,
)

# —— 5. 运行微调 ——
trainer.train()

Load Saved Model

模型保存下来后,需要加载模型时,需要先加载基础模型,然后再加载训练好的PEFT Adapter。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from transformers import AutoTokenizer, AutoModelForCausalLM
from peft import PeftModel
import torch

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

quantization_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.float16,
)

model_name = "Qwen/Qwen3-4B-Base"

# load the tokenizer and the model
tokenizer = AutoTokenizer.from_pretrained(model_name)
base_model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype="auto",
device_map="auto",
cache_dir="/home/ec2-user/SageMaker/sagemaker",
quantization_config=quantization_config
)

# 2) 加载 PEFT adapter
model = PeftModel.from_pretrained(
base_model,
"./fine-tuned-model" # 这里是你 Trainer 输出的 output_dir
)
model.to(device)
model.eval()

# 3) 加载 tokenizer
tokenizer = AutoTokenizer.from_pretrained("./fine-tuned-model")

# —— 测试生成 ——
# —— 测试生成 ——
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

prompt = "Hello ?"
messages = [
{"role": "user", "content": prompt},
]
text = tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True,
enable_thinking=False
)
inputs = tokenizer(prompt, return_tensors="pt").to(device)
out = model.generate(**inputs, max_new_tokens=1024)
print(tokenizer.decode(out[0], skip_special_tokens=True))

Appendix

两个测试GPU Flops的example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from bitsandbytes.nn import Linear8bitLt
import torch

def benchmark_bnb_int8(
size: int = 8192,
warmup: int = 10,
iters: int = 200,
):
device = 'cuda'
# 正确用法:input_features/output_features
model = Linear8bitLt(
input_features=size,
output_features=size,
bias=False,
has_fp16_weights=True, # 混合 FP16 主权重
threshold=6.0,
index=False,
device=device
)

x = torch.randn(size, size, dtype=torch.float16, device=device)
# 预热
for _ in range(warmup):
_ = model(x)
torch.cuda.synchronize()

# 测时
start = torch.cuda.Event(enable_timing=True)
end = torch.cuda.Event(enable_timing=True)
start.record()
for _ in range(iters):
_ = model(x)
end.record()
torch.cuda.synchronize()

avg_ms = start.elapsed_time(end) / iters
flops = 2 * size**3
tflops = flops / (avg_ms/1e3) / 1e12
print(f"[bnb-INT8] Size={size}, Avg Latency={avg_ms:.2f} ms → {tflops:.2f} TFLOPS")

if __name__ == "__main__":
benchmark_bnb_int8()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import torch
import time

torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cuda.matmul.allow_fp16_reduced_precision_reduction = False

def benchmark_tflops(
size: int = 8192,
dtype: torch.dtype = torch.float32,
warmup_iters: int = 10,
iters: int = 20,
):
"""
用 size x size 的矩阵做 iters 次乘法,并测量平均时长,计算 TFLOPS。
Args:
size: 矩阵维度 N(N x N)。
dtype: torch.float32 / float16 / bfloat16 等。
warmup_iters: 预热次数,丢弃前几次不稳定开销。
iters: 正式测量次数。
"""
device = 'cuda'
# 随机矩阵
A = torch.randn(size, size, device=device, dtype=dtype)
B = torch.randn(size, size, device=device, dtype=dtype)

# 1. 预热
for _ in range(warmup_iters):
_ = A @ B
torch.cuda.synchronize() # 确保完成 [oai_citation_attribution:0‡Speechmatics](https://www.speechmatics.com/company/articles-and-news/timing-operations-in-pytorch?utm_source=chatgpt.com)

# 2. 创建 CUDA 事件
start = torch.cuda.Event(enable_timing=True)
end = torch.cuda.Event(enable_timing=True)

# 3. 正式测时
start.record() # 记录起点 [oai_citation_attribution:1‡PyTorch](https://pytorch.org/docs/stable/generated/torch.cuda.Event.html?utm_source=chatgpt.com)
for _ in range(iters):
C = A @ B
end.record() # 记录终点
torch.cuda.synchronize() # 等待所有核完成 [oai_citation_attribution:2‡PyTorch Forums](https://discuss.pytorch.org/t/how-to-measure-time-in-pytorch/26964/2?utm_source=chatgpt.com)

# 4. 计算平均耗时(ms)
elapsed_ms = start.elapsed_time(end)
avg_ms = elapsed_ms / iters

# 5. 计算 FLOPs:
# 每次矩阵乘法大约 2*N^3 次浮点运算 [oai_citation_attribution:3‡Reddit](https://www.reddit.com/r/compsci/comments/3141as/how_to_calculate_mflops_of_multiplying_square/?utm_source=chatgpt.com)
flops_per_matmul = 2 * size**3
tflops = flops_per_matmul / (avg_ms / 1e3) / 1e12

print(f"矩阵维度: {size}×{size}, dtype={dtype}")
print(f"平均耗时: {avg_ms:.3f} ms, 理论算力: {tflops:.2f} TFLOPS")

if __name__ == "__main__":
# 测试不同数据类型
for dt in [torch.float32, torch.float16, torch.bfloat16]:
benchmark_tflops(size=8192, dtype=dt)