Instructions to use isatis/kw with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- Diffusers
How to use isatis/kw with Diffusers:
pip install -U diffusers transformers accelerate
import torch from diffusers import DiffusionPipeline # switch to "mps" for apple devices pipe = DiffusionPipeline.from_pretrained("isatis/kw", dtype=torch.bfloat16, device_map="cuda") prompt = "Astronaut in a jungle, cold color palette, muted colors, detailed, 8k" image = pipe(prompt).images[0] - Notebooks
- Google Colab
- Kaggle
| import base64 | |
| import json | |
| import sys | |
| from collections import defaultdict | |
| from io import BytesIO | |
| from pprint import pprint | |
| from typing import Any, Dict, List | |
| import os | |
| import re | |
| from pathlib import Path | |
| from typing import Union | |
| from concurrent.futures import ThreadPoolExecutor | |
| import numpy as np | |
| from PIL import ImageFilter | |
| from transformers import CLIPImageProcessor, CLIPTokenizer, CLIPModel | |
| import torch | |
| from diffusers import ( | |
| DiffusionPipeline, | |
| DPMSolverMultistepScheduler, | |
| DPMSolverSinglestepScheduler, | |
| EulerAncestralDiscreteScheduler, | |
| StableDiffusionPipeline, | |
| utils, | |
| ) | |
| from safetensors.torch import load_file | |
| from torch import autocast, tensor | |
| import torchvision.transforms | |
| from PIL import Image | |
| REPO_DIR = Path(__file__).resolve().parent | |
| # if local avoid repo url | |
| # print(os.getcwd()) | |
| # set device | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| if device.type != "cuda": | |
| raise ValueError("need to run on GPU") | |
| class EndpointHandler: | |
| LORA_PATHS = { | |
| "hairdetailer": [str(REPO_DIR / "lora/hairdetailer.safetensors"), ""], | |
| "lora_leica": [str(REPO_DIR / "lora/lora_leica.safetensors"), "leica_style"], | |
| "epiNoiseoffset_v2": [str(REPO_DIR / "lora/epiNoiseoffset_v2.safetensors"), ""], | |
| "MBHU-TT2FRS": [ | |
| str(REPO_DIR / "lora/MBHU-TT2FRS.safetensors"), | |
| "flat breast, small breast, big breast, fake breast", | |
| ], | |
| "polyhedron_new_skin_v1.1": [ | |
| str(REPO_DIR / "lora/polyhedron_new_skin_v1.1.safetensors"), | |
| "skin blemish, detailed skin ", | |
| ], | |
| "ShinyOiledSkin_v20": [ | |
| str(REPO_DIR / "lora/ShinyOiledSkin_v20-LoRA.safetensors"), | |
| "shiny skin", | |
| ], | |
| "detailed_eye-10": [str(REPO_DIR / "lora/detailed_eye-10.safetensors"), ""], | |
| "add_detail": [str(REPO_DIR / "lora/add_detail.safetensors"), ""], | |
| "MuscleGirl_v1": [str(REPO_DIR / "lora/MuscleGirl_v1.safetensors"), "abs"], | |
| "nurse_v11-05": [str(REPO_DIR / "lora/nurse_v11-05.safetensors"), "nurse"], | |
| "shibari_v20": [str(REPO_DIR / "lora/shibari_v20.safetensors"), "shibari,rope"], | |
| "tajnaclub_high_heelsv1.2": [ | |
| str(REPO_DIR / "lora/tajnaclub_high_heelsv1.2.safetensors"), | |
| "high heels", | |
| ], | |
| "CyberPunkAI": [ | |
| str(REPO_DIR / "lora/CyberPunkAI.safetensors"), | |
| "neon CyberpunkAI", | |
| ], | |
| "FutaCockCloseUp-v1": [ | |
| str(REPO_DIR / "lora/FutaCockCloseUp-v1.safetensors"), | |
| "huge penis", | |
| ], | |
| "PovBlowjob-v3": [ | |
| str(REPO_DIR / "lora/PovBlowjob-v3.safetensors"), | |
| "blowjob, deepthroat, kneeling, runny makeup, creampie", | |
| ], | |
| "dp_from_behind_v0.1b": [ | |
| str(REPO_DIR / "lora/dp_from_behind_v0.1b.safetensors"), | |
| "1girl, 2boys, double penetration, multiple penises", | |
| ], | |
| "EkuneSideDoggy": [ | |
| str(REPO_DIR / "lora/EkuneSideDoggy.safetensors"), | |
| "sidedoggystyle, doggystyle", | |
| ], | |
| "qqq-grabbing_from_behind-v2-000006": [ | |
| str(REPO_DIR / "lora/qqq-grabbing_from_behind-v2-000006.safetensors"), | |
| "grabbing from behind, breast grab", | |
| ], | |
| "ftm-v0": [ | |
| str(REPO_DIR / "lora/ftm-v0.safetensors"), | |
| "big mouth, tongue, long tongue", | |
| ], | |
| "tgirls_V3_5": [ | |
| str(REPO_DIR / "lora/tgirls_V3_5.safetensors"), | |
| "large penis, penis, erect penis", | |
| ], | |
| "fapp9": [ | |
| str(REPO_DIR / "lora/fapp9.safetensors"), | |
| "large penis, penis, erect penis", | |
| ], | |
| "pov-doggy-graphos": [ | |
| str(REPO_DIR / "lora/pov-doggy-graphos.safetensors"), | |
| "penis in vagina, white man grabbing her ass", | |
| ], | |
| "reelmech1v2": [ | |
| str(REPO_DIR / "lora/reelmech1v2.safetensors"), | |
| "reelmech", | |
| ], | |
| } | |
| TEXTUAL_INVERSION = [ | |
| { | |
| "weight_name": str(REPO_DIR / "embeddings/EasyNegative.safetensors"), | |
| "token": "easynegative", | |
| }, | |
| { | |
| "weight_name": str(REPO_DIR / "embeddings/kkw-NativeAmerican.pt"), | |
| "token": "badhandv4", | |
| }, | |
| { | |
| "weight_name": str(REPO_DIR / "embeddings/badhandv4.pt"), | |
| "token": "kkw-Afro, kkw-Asian, kkw-Euro ", | |
| }, | |
| { | |
| "weight_name": str(REPO_DIR / "embeddings/bad-artist-anime.pt"), | |
| "token": "bad-artist-anime", | |
| }, | |
| { | |
| "weight_name": str(REPO_DIR / "embeddings/NegfeetV2.pt"), | |
| "token": "negfeetv2", | |
| }, | |
| { | |
| "weight_name": str(REPO_DIR / "embeddings/ng_deepnegative_v1_75t.pt"), | |
| "token": "ng_deepnegative_v1_75t", | |
| }, | |
| { | |
| "weight_name": str(REPO_DIR / "embeddings/bad-hands-5.pt"), | |
| "token": "bad-hands-5", | |
| }, | |
| ] | |
| def __init__(self, path="."): | |
| self.inference_progress = {} # Dictionary to store progress of each request | |
| self.inference_images = {} # Dictionary to store latest image of each request | |
| self.total_steps = {} | |
| self.active_request_ids = list() | |
| self.inference_in_progress = False | |
| self.executor = ThreadPoolExecutor( | |
| max_workers=1 | |
| ) # Vous pouvez ajuster max_workers en fonction de vos besoins | |
| realistic_path = str(REPO_DIR / "realistic/") | |
| self.pipe_realistic, self.safety_checker = self.load_realistic(realistic_path) | |
| anime_path = str(REPO_DIR / "anime/") | |
| self.pipe_anime, self.pipe_anime_safety_checker = self.load_anime(anime_path) | |
| # Load CLipImagePRocessor for NSFW check later | |
| self.image_processor = CLIPImageProcessor.from_pretrained( | |
| "openai/clip-vit-base-patch16" | |
| ) | |
| def load_model_essentials(self, model_path): | |
| """common to all models""" | |
| # load the optimized model | |
| if "realistic" in model_path: | |
| pipe = DiffusionPipeline.from_pretrained( | |
| pretrained_model_name_or_path=model_path, | |
| custom_pipeline="lpw_stable_diffusion", # avoid 77 token limit | |
| torch_dtype=torch.float16, # accelerate render | |
| ) | |
| safety_checker = pipe.safety_checker.to(device).to(torch.float16) | |
| else: | |
| safety_checker = None | |
| pipe = DiffusionPipeline.from_pretrained( | |
| pretrained_model_name_or_path=model_path, | |
| custom_pipeline="lpw_stable_diffusion", # avoid 77 token limit | |
| torch_dtype=torch.float16, # accelerate render | |
| safety_checker=None, # Mode boulardus | |
| ) | |
| pipe = pipe.to(device) | |
| # Disable progress bar | |
| pipe.set_progress_bar_config(disable=True) | |
| # Load negative embeddings to avoid bad hands, etc | |
| self.load_embeddings(pipe) | |
| # boosts performance by another 20% | |
| pipe.enable_xformers_memory_efficient_attention() | |
| pipe.enable_attention_slicing() # may need a requirement in the root with xformer | |
| return pipe, safety_checker | |
| def load_anime(self, path): | |
| """Load anime model""" | |
| # Init pipe | |
| pipe, safety_checker = self.load_model_essentials(path) | |
| # https://stablediffusionapi.com/docs/a1111schedulers/ | |
| # Euler a | |
| pipe.scheduler = EulerAncestralDiscreteScheduler.from_config( | |
| pipe.scheduler.config, | |
| ) | |
| # Load loras one time only | |
| # Must be replaced once we will know how to hot load/unload | |
| # it use the own made load_lora function | |
| self.load_selected_loras( | |
| pipe, | |
| [ | |
| # ["detailed_eye-10", 0.2], | |
| # ["add_detail", 0.2], | |
| ["MuscleGirl_v1", 0.1], | |
| ["tgirls_V3_5", 0.02], | |
| ["PovBlowjob-v3", 0.02], | |
| ["pov-doggy-graphos", 0.02], | |
| ["shibari_v20", 0.02], | |
| ["ftm-v0", 0.02], | |
| ["reelmech1v2", 0.02], | |
| ], | |
| ) | |
| return pipe, safety_checker | |
| def load_realistic(self, path): | |
| """Load realistic model""" | |
| # Init pipe | |
| pipe, safety_checker = self.load_model_essentials(path) | |
| # https://stablediffusionapi.com/docs/a1111schedulers/ | |
| # DPM++ 2M Karras | |
| # pipe.scheduler = DPMSolverMultistepScheduler.from_config( | |
| # pipe.scheduler.config, | |
| # use_karras_sigmas=True, | |
| # ) | |
| # DPM++ 2M SDE Karras | |
| pipe.scheduler = DPMSolverMultistepScheduler.from_config( | |
| pipe.scheduler.config, | |
| algorithm_type="sde-dpmsolver++", | |
| use_karras_sigmas=True, | |
| ) | |
| # Load loras one time only | |
| # Must be replaced once we will know how to hot load/unload | |
| # it use the own made load_lora function | |
| self.load_selected_loras( | |
| pipe, | |
| [ | |
| ["polyhedron_new_skin_v1.1", 0.15], | |
| ["detailed_eye-10", 0.2], | |
| ["add_detail", 0.1], | |
| ["MuscleGirl_v1", 0.2], | |
| ["tgirls_V3_5", 0.02], | |
| ["PovBlowjob-v3", 0.02], | |
| ["pov-doggy-graphos", 0.02], | |
| ["shibari_v20", 0.02], | |
| ["ftm-v0", 0.02], | |
| ["reelmech1v2", 0.02], | |
| ], | |
| ) | |
| return pipe, safety_checker | |
| def load_lora(self, pipeline, lora_path, lora_weight=0.5): | |
| state_dict = load_file(lora_path) | |
| LORA_PREFIX_UNET = "lora_unet" | |
| LORA_PREFIX_TEXT_ENCODER = "lora_te" | |
| alpha = lora_weight | |
| visited = [] | |
| for key in state_dict: | |
| state_dict[key] = state_dict[key].to(device) | |
| # directly update weight in diffusers model | |
| for key in state_dict: | |
| # as we have set the alpha beforehand, so just skip | |
| if ".alpha" in key or key in visited: | |
| continue | |
| if "text" in key: | |
| layer_infos = ( | |
| key.split(".")[0] | |
| .split(LORA_PREFIX_TEXT_ENCODER + "_")[-1] | |
| .split("_") | |
| ) | |
| curr_layer = pipeline.text_encoder | |
| else: | |
| layer_infos = ( | |
| key.split(".")[0].split(LORA_PREFIX_UNET + "_")[-1].split("_") | |
| ) | |
| curr_layer = pipeline.unet | |
| # find the target layer | |
| temp_name = layer_infos.pop(0) | |
| while len(layer_infos) > -1: | |
| try: | |
| curr_layer = curr_layer.__getattr__(temp_name) | |
| if len(layer_infos) > 0: | |
| temp_name = layer_infos.pop(0) | |
| elif len(layer_infos) == 0: | |
| break | |
| except Exception: | |
| if len(temp_name) > 0: | |
| temp_name += "_" + layer_infos.pop(0) | |
| else: | |
| temp_name = layer_infos.pop(0) | |
| # org_forward(x) + lora_up(lora_down(x)) * multiplier | |
| pair_keys = [] | |
| if "lora_down" in key: | |
| pair_keys.append(key.replace("lora_down", "lora_up")) | |
| pair_keys.append(key) | |
| else: | |
| pair_keys.append(key) | |
| pair_keys.append(key.replace("lora_up", "lora_down")) | |
| # update weight | |
| if len(state_dict[pair_keys[0]].shape) == 4: | |
| weight_up = ( | |
| state_dict[pair_keys[0]].squeeze(3).squeeze(2).to(torch.float32) | |
| ) | |
| weight_down = ( | |
| state_dict[pair_keys[1]].squeeze(3).squeeze(2).to(torch.float32) | |
| ) | |
| curr_layer.weight.data += alpha * torch.mm( | |
| weight_up, weight_down | |
| ).unsqueeze(2).unsqueeze(3) | |
| else: | |
| weight_up = state_dict[pair_keys[0]].to(torch.float32) | |
| weight_down = state_dict[pair_keys[1]].to(torch.float32) | |
| curr_layer.weight.data += alpha * torch.mm(weight_up, weight_down) | |
| # update visited list | |
| for item in pair_keys: | |
| visited.append(item) | |
| return pipeline | |
| def load_embeddings(self, pipeline): | |
| """Load textual inversions, avoid bad prompts""" | |
| for model in EndpointHandler.TEXTUAL_INVERSION: | |
| pipeline.load_textual_inversion( | |
| ".", weight_name=model["weight_name"], token=model["token"] | |
| ) | |
| def load_selected_loras(self, pipeline, selections): | |
| """Load Loras models, can lead to marvelous creations""" | |
| for model_name, weight in selections: | |
| lora_path = EndpointHandler.LORA_PATHS[model_name][0] | |
| # self.pipe.load_lora_weights(lora_path) | |
| self.load_lora(pipeline, lora_path, weight) | |
| def clean_negative_prompt(self, negative_prompt): | |
| """Clean negative prompt to remove already used negative prompt handlers""" | |
| # negative_prompt = ( | |
| # negative_prompt | |
| # + """, easynegative, badhandv4, bad-artist-anime, negfeetv2, ng_deepnegative_v1_75t, bad-hands-5, """ | |
| # ) | |
| tokens = [item["token"] for item in self.TEXTUAL_INVERSION] | |
| # Retirer tous les tokens de negative_prompt s'ils existent déjà | |
| for token in tokens: | |
| # Utiliser une expression régulière pour un remplacement insensible à la casse | |
| negative_prompt = re.sub( | |
| r"\b" + re.escape(token) + r"\b", | |
| "", | |
| negative_prompt, | |
| flags=re.IGNORECASE, | |
| ).strip() | |
| # Ajouter tous les tokens à la fin de negative_prompt | |
| negative_prompt += " " + " ".join(tokens) | |
| return negative_prompt | |
| def check_fields(self, data): | |
| """check for fields, if some missing return error""" | |
| # 1. Verify input arguments | |
| required_fields = [ | |
| "prompt", | |
| "negative_prompt", | |
| "width", | |
| "num_inference_steps", | |
| "height", | |
| "guidance_scale", | |
| "request_id", | |
| ] | |
| missing_fields = [field for field in required_fields if field not in data] | |
| if missing_fields: | |
| return { | |
| "flag": "error", | |
| "message": f"Missing fields: {', '.join(missing_fields)}", | |
| } | |
| return False | |
| def clean_request_data(self): | |
| """Clean up the data related to a specific request ID.""" | |
| # Remove the request ID from the progress dictionary | |
| self.inference_progress.clear() | |
| # Remove the request ID from the images dictionary | |
| self.inference_images.clear() | |
| # Remove the request ID from the total_steps dictionary | |
| self.total_steps.clear() | |
| # Delete request id | |
| self.active_request_ids.clear() | |
| # Set inference to False | |
| self.inference_in_progress = False | |
| def progress_callback( | |
| self, | |
| step: int, | |
| timestep: int, | |
| latents: Any, | |
| request_id: str, | |
| status: str, | |
| pipeline: Any, | |
| ): | |
| try: | |
| if status == "progress": | |
| # Latents to numpy | |
| img_data = pipeline.decode_latents(latents) | |
| img_data = (img_data.squeeze() * 255).astype(np.uint8) | |
| img = Image.fromarray(img_data, "RGB") | |
| # Apply a blur to the image | |
| # more intense at the beginning | |
| if step < int(self.total_steps[self.active_request_ids[0]] / 1.5): | |
| img = img.filter(ImageFilter.GaussianBlur(radius=30)) | |
| else: | |
| img = img.filter(ImageFilter.GaussianBlur(radius=10)) | |
| # print(img_data) | |
| else: | |
| # pil object | |
| # print(latents) | |
| img = latents | |
| buffered = BytesIO() | |
| img.save(buffered, format="PNG") | |
| # print(status) | |
| # Save the image to a file | |
| # img.save("squirel.png", format="PNG") | |
| # Encode the image into a base64 string representation | |
| img_str = base64.b64encode(buffered.getvalue()).decode() | |
| except Exception as e: | |
| print(f"Error: {e}") | |
| # Store progress and image | |
| progress_percentage = ( | |
| step / self.total_steps[request_id] | |
| ) * 100 # Assuming self.total_steps is the total number of steps for inference | |
| self.inference_progress[request_id] = progress_percentage | |
| self.inference_images[request_id] = img_str | |
| def check_progress(self, request_id: str) -> Dict[str, Union[str, float]]: | |
| progress = self.inference_progress.get(request_id, 0) | |
| latest_image = self.inference_images.get(request_id, None) | |
| # print(self.inference_progress) | |
| if progress >= 100: | |
| status = "complete" | |
| # Check if Image is NSFW | |
| image_data = base64.b64decode(latest_image) | |
| image_io = BytesIO(image_data) | |
| is_nsfw = self.check_nsfw(Image.open(image_io))[0] | |
| # is_nsfw = "bypass" | |
| else: | |
| status = "in-progress" | |
| is_nsfw = "" | |
| return { | |
| "flag": "success", | |
| "status": status, | |
| "progress": int(progress), | |
| "image": latest_image, | |
| "is_nsfw": is_nsfw, | |
| } | |
| def check_nsfw(self, image): | |
| """Check if image is NSFW""" | |
| safety_checker_input = self.image_processor(image, return_tensors="pt").to( | |
| device | |
| ) | |
| image, has_nsfw_concept = self.safety_checker( | |
| images=np.array(image), | |
| clip_input=safety_checker_input.pixel_values.to(torch.float16), | |
| ) | |
| return has_nsfw_concept | |
| def start_inference(self, pipeline, data: Dict) -> Dict: | |
| """Start a new inference.""" | |
| global device | |
| # Now extract the fields | |
| prompt = data["prompt"] | |
| negative_prompt = data["negative_prompt"] | |
| loras_model = data.get("loras_model", None) | |
| seed = data.get("seed", None) | |
| width = data["width"] | |
| num_inference_steps = data["num_inference_steps"] | |
| height = data["height"] | |
| guidance_scale = data["guidance_scale"] | |
| request_id = data["request_id"] | |
| # Used for progress checker | |
| self.total_steps[request_id] = num_inference_steps | |
| # USe this to add automatically some negative prompts | |
| forced_negative = self.clean_negative_prompt(negative_prompt) | |
| # Set the generator seed if provided | |
| generator = torch.Generator(device="cuda").manual_seed(seed) if seed else None | |
| # set scale of loras (mix all loras and apply common scale, can't be indivual) | |
| # scale = 0.25 # seems ok | |
| # scale = 0.2 | |
| try: | |
| # 2. Process | |
| with autocast(device.type): | |
| image = pipeline.text2img( | |
| prompt=prompt, | |
| guidance_scale=guidance_scale, | |
| num_inference_steps=num_inference_steps, | |
| height=height, | |
| width=width, | |
| negative_prompt=forced_negative, | |
| generator=generator, | |
| max_embeddings_multiples=5, | |
| callback=lambda step, timestep, latents: self.progress_callback( | |
| step, timestep, latents, request_id, "progress", pipeline | |
| ), | |
| callback_steps=5, | |
| # cross_attention_kwargs={"scale": 0.02}, | |
| ) | |
| # print(image) | |
| self.progress_callback( | |
| num_inference_steps, | |
| 0, | |
| image.images[0], | |
| request_id, | |
| "complete", | |
| pipeline, | |
| ) | |
| self.inference_in_progress = False | |
| # for debug | |
| # image.save("squirelb.png", format="PNG") | |
| except Exception as e: | |
| # Handle any other exceptions and return an error response | |
| return {"flag": "error", "message": str(e)} | |
| def __call__(self, data: Any) -> Dict: | |
| """Handle incoming requests.""" | |
| action = data.get("action", None) | |
| request_id = data.get("request_id") | |
| genre = data.get("genre") | |
| # Check if the request_id is valid for all actions | |
| if not request_id: | |
| return {"flag": "error", "message": "Missing request_id."} | |
| if action == "check_progress": | |
| if request_id not in self.active_request_ids: | |
| return { | |
| "flag": "error", | |
| "message": "Request id doesn't match any active request.", | |
| } | |
| return self.check_progress(request_id) | |
| elif action == "inference": | |
| # Check field before doing anything | |
| check_fields = self.check_fields(data) | |
| if check_fields: | |
| return check_fields | |
| # Check if an inference is already in progress | |
| if self.inference_in_progress: | |
| return { | |
| "flag": "error", | |
| "message": "Another inference is already in progress. Please wait.", | |
| } | |
| # Set the inference state to in progress | |
| self.clean_request_data() | |
| self.inference_in_progress = True | |
| self.inference_progress[request_id] = 0 | |
| self.inference_images[request_id] = None | |
| self.active_request_ids.append(request_id) | |
| # Load model according to genre | |
| if genre == "anime": | |
| pipe = self.pipe_anime | |
| else: | |
| pipe = self.pipe_realistic | |
| self.executor.submit(self.start_inference, pipe, data) | |
| # self.start_inference(data) | |
| return { | |
| "flag": "success", | |
| "status": "started", | |
| "message": "Inference started", | |
| "request_id": request_id, | |
| } | |
| else: | |
| return {"flag": "error", "message": f"Unsupported action: {action}"} | |