From 49fa5aa2a127bdf8924d02bf77e5086b39c7a447 Mon Sep 17 00:00:00 2001 From: Calvin Morrison Date: Wed, 3 Sep 2025 21:15:36 -0400 Subject: i vibe coded it --- server/_build/default/lib/poolboy/src/poolboy.erl | 357 ++++++++++++++++++++++ 1 file changed, 357 insertions(+) create mode 100644 server/_build/default/lib/poolboy/src/poolboy.erl (limited to 'server/_build/default/lib/poolboy/src/poolboy.erl') diff --git a/server/_build/default/lib/poolboy/src/poolboy.erl b/server/_build/default/lib/poolboy/src/poolboy.erl new file mode 100644 index 0000000..0023412 --- /dev/null +++ b/server/_build/default/lib/poolboy/src/poolboy.erl @@ -0,0 +1,357 @@ +%% Poolboy - A hunky Erlang worker pool factory + +-module(poolboy). +-behaviour(gen_server). + +-export([checkout/1, checkout/2, checkout/3, checkin/2, transaction/2, + transaction/3, child_spec/2, child_spec/3, start/1, start/2, + start_link/1, start_link/2, stop/1, status/1]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). +-export_type([pool/0]). + +-define(TIMEOUT, 5000). + +-ifdef(pre17). +-type pid_queue() :: queue(). +-else. +-type pid_queue() :: queue:queue(). +-endif. + +-ifdef(OTP_RELEASE). %% this implies 21 or higher +-define(EXCEPTION(Class, Reason, Stacktrace), Class:Reason:Stacktrace). +-define(GET_STACK(Stacktrace), Stacktrace). +-else. +-define(EXCEPTION(Class, Reason, _), Class:Reason). +-define(GET_STACK(_), erlang:get_stacktrace()). +-endif. + +-type pool() :: + Name :: (atom() | pid()) | + {Name :: atom(), node()} | + {local, Name :: atom()} | + {global, GlobalName :: any()} | + {via, Module :: atom(), ViaName :: any()}. + +% Copied from gen:start_ret/0 +-type start_ret() :: {'ok', pid()} | 'ignore' | {'error', term()}. + +-record(state, { + supervisor :: undefined | pid(), + workers = [] :: [pid()], + waiting :: pid_queue(), + monitors :: ets:tid(), + size = 5 :: non_neg_integer(), + overflow = 0 :: non_neg_integer(), + max_overflow = 10 :: non_neg_integer(), + strategy = lifo :: lifo | fifo +}). + +-spec checkout(Pool :: pool()) -> pid(). +checkout(Pool) -> + checkout(Pool, true). + +-spec checkout(Pool :: pool(), Block :: boolean()) -> pid() | full. +checkout(Pool, Block) -> + checkout(Pool, Block, ?TIMEOUT). + +-spec checkout(Pool :: pool(), Block :: boolean(), Timeout :: timeout()) + -> pid() | full. +checkout(Pool, Block, Timeout) -> + CRef = make_ref(), + try + gen_server:call(Pool, {checkout, CRef, Block}, Timeout) + catch + ?EXCEPTION(Class, Reason, Stacktrace) -> + gen_server:cast(Pool, {cancel_waiting, CRef}), + erlang:raise(Class, Reason, ?GET_STACK(Stacktrace)) + end. + +-spec checkin(Pool :: pool(), Worker :: pid()) -> ok. +checkin(Pool, Worker) when is_pid(Worker) -> + gen_server:cast(Pool, {checkin, Worker}). + +-spec transaction(Pool :: pool(), Fun :: fun((Worker :: pid()) -> any())) + -> any(). +transaction(Pool, Fun) -> + transaction(Pool, Fun, ?TIMEOUT). + +-spec transaction(Pool :: pool(), Fun :: fun((Worker :: pid()) -> any()), + Timeout :: timeout()) -> any(). +transaction(Pool, Fun, Timeout) -> + Worker = poolboy:checkout(Pool, true, Timeout), + try + Fun(Worker) + after + ok = poolboy:checkin(Pool, Worker) + end. + +-spec child_spec(PoolId :: term(), PoolArgs :: proplists:proplist()) + -> supervisor:child_spec(). +child_spec(PoolId, PoolArgs) -> + child_spec(PoolId, PoolArgs, []). + +-spec child_spec(PoolId :: term(), + PoolArgs :: proplists:proplist(), + WorkerArgs :: proplists:proplist()) + -> supervisor:child_spec(). +child_spec(PoolId, PoolArgs, WorkerArgs) -> + {PoolId, {poolboy, start_link, [PoolArgs, WorkerArgs]}, + permanent, 5000, worker, [poolboy]}. + +-spec start(PoolArgs :: proplists:proplist()) + -> start_ret(). +start(PoolArgs) -> + start(PoolArgs, PoolArgs). + +-spec start(PoolArgs :: proplists:proplist(), + WorkerArgs:: proplists:proplist()) + -> start_ret(). +start(PoolArgs, WorkerArgs) -> + start_pool(start, PoolArgs, WorkerArgs). + +-spec start_link(PoolArgs :: proplists:proplist()) + -> start_ret(). +start_link(PoolArgs) -> + %% for backwards compatability, pass the pool args as the worker args as well + start_link(PoolArgs, PoolArgs). + +-spec start_link(PoolArgs :: proplists:proplist(), + WorkerArgs:: proplists:proplist()) + -> start_ret(). +start_link(PoolArgs, WorkerArgs) -> + start_pool(start_link, PoolArgs, WorkerArgs). + +-spec stop(Pool :: pool()) -> ok. +stop(Pool) -> + gen_server:call(Pool, stop). + +-spec status(Pool :: pool()) -> {atom(), integer(), integer(), integer()}. +status(Pool) -> + gen_server:call(Pool, status). + +init({PoolArgs, WorkerArgs}) -> + process_flag(trap_exit, true), + Waiting = queue:new(), + Monitors = ets:new(monitors, [private]), + init(PoolArgs, WorkerArgs, #state{waiting = Waiting, monitors = Monitors}). + +init([{worker_module, Mod} | Rest], WorkerArgs, State) when is_atom(Mod) -> + {ok, Sup} = poolboy_sup:start_link(Mod, WorkerArgs), + init(Rest, WorkerArgs, State#state{supervisor = Sup}); +init([{size, Size} | Rest], WorkerArgs, State) when is_integer(Size) -> + init(Rest, WorkerArgs, State#state{size = Size}); +init([{max_overflow, MaxOverflow} | Rest], WorkerArgs, State) when is_integer(MaxOverflow) -> + init(Rest, WorkerArgs, State#state{max_overflow = MaxOverflow}); +init([{strategy, lifo} | Rest], WorkerArgs, State) -> + init(Rest, WorkerArgs, State#state{strategy = lifo}); +init([{strategy, fifo} | Rest], WorkerArgs, State) -> + init(Rest, WorkerArgs, State#state{strategy = fifo}); +init([_ | Rest], WorkerArgs, State) -> + init(Rest, WorkerArgs, State); +init([], _WorkerArgs, #state{size = Size, supervisor = Sup} = State) -> + Workers = prepopulate(Size, Sup), + {ok, State#state{workers = Workers}}. + +handle_cast({checkin, Pid}, State = #state{monitors = Monitors}) -> + case ets:lookup(Monitors, Pid) of + [{Pid, _, MRef}] -> + true = erlang:demonitor(MRef), + true = ets:delete(Monitors, Pid), + NewState = handle_checkin(Pid, State), + {noreply, NewState}; + [] -> + {noreply, State} + end; + +handle_cast({cancel_waiting, CRef}, State) -> + case ets:match(State#state.monitors, {'$1', CRef, '$2'}) of + [[Pid, MRef]] -> + demonitor(MRef, [flush]), + true = ets:delete(State#state.monitors, Pid), + NewState = handle_checkin(Pid, State), + {noreply, NewState}; + [] -> + Cancel = fun({_, Ref, MRef}) when Ref =:= CRef -> + demonitor(MRef, [flush]), + false; + (_) -> + true + end, + Waiting = queue:filter(Cancel, State#state.waiting), + {noreply, State#state{waiting = Waiting}} + end; + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_call({checkout, CRef, Block}, {FromPid, _} = From, State) -> + #state{supervisor = Sup, + workers = Workers, + monitors = Monitors, + overflow = Overflow, + max_overflow = MaxOverflow} = State, + case Workers of + [Pid | Left] -> + MRef = erlang:monitor(process, FromPid), + true = ets:insert(Monitors, {Pid, CRef, MRef}), + {reply, Pid, State#state{workers = Left}}; + [] when MaxOverflow > 0, Overflow < MaxOverflow -> + {Pid, MRef} = new_worker(Sup, FromPid), + true = ets:insert(Monitors, {Pid, CRef, MRef}), + {reply, Pid, State#state{overflow = Overflow + 1}}; + [] when Block =:= false -> + {reply, full, State}; + [] -> + MRef = erlang:monitor(process, FromPid), + Waiting = queue:in({From, CRef, MRef}, State#state.waiting), + {noreply, State#state{waiting = Waiting}} + end; + +handle_call(status, _From, State) -> + #state{workers = Workers, + monitors = Monitors, + overflow = Overflow} = State, + StateName = state_name(State), + {reply, {StateName, length(Workers), Overflow, ets:info(Monitors, size)}, State}; +handle_call(get_avail_workers, _From, State) -> + Workers = State#state.workers, + {reply, Workers, State}; +handle_call(get_all_workers, _From, State) -> + Sup = State#state.supervisor, + WorkerList = supervisor:which_children(Sup), + {reply, WorkerList, State}; +handle_call(get_all_monitors, _From, State) -> + Monitors = ets:select(State#state.monitors, + [{{'$1', '_', '$2'}, [], [{{'$1', '$2'}}]}]), + {reply, Monitors, State}; +handle_call(stop, _From, State) -> + {stop, normal, ok, State}; +handle_call(_Msg, _From, State) -> + Reply = {error, invalid_message}, + {reply, Reply, State}. + +handle_info({'DOWN', MRef, _, _, _}, State) -> + case ets:match(State#state.monitors, {'$1', '_', MRef}) of + [[Pid]] -> + true = ets:delete(State#state.monitors, Pid), + NewState = handle_checkin(Pid, State), + {noreply, NewState}; + [] -> + Waiting = queue:filter(fun ({_, _, R}) -> R =/= MRef end, State#state.waiting), + {noreply, State#state{waiting = Waiting}} + end; +handle_info({'EXIT', Pid, _Reason}, State) -> + #state{supervisor = Sup, + monitors = Monitors} = State, + case ets:lookup(Monitors, Pid) of + [{Pid, _, MRef}] -> + true = erlang:demonitor(MRef), + true = ets:delete(Monitors, Pid), + NewState = handle_worker_exit(Pid, State), + {noreply, NewState}; + [] -> + case lists:member(Pid, State#state.workers) of + true -> + W = lists:filter(fun (P) -> P =/= Pid end, State#state.workers), + {noreply, State#state{workers = [new_worker(Sup) | W]}}; + false -> + {noreply, State} + end + end; + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, State) -> + ok = lists:foreach(fun (W) -> unlink(W) end, State#state.workers), + true = exit(State#state.supervisor, shutdown), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +start_pool(StartFun, PoolArgs, WorkerArgs) -> + case proplists:get_value(name, PoolArgs) of + undefined -> + gen_server:StartFun(?MODULE, {PoolArgs, WorkerArgs}, []); + Name -> + gen_server:StartFun(Name, ?MODULE, {PoolArgs, WorkerArgs}, []) + end. + +new_worker(Sup) -> + {ok, Pid} = supervisor:start_child(Sup, []), + true = link(Pid), + Pid. + +new_worker(Sup, FromPid) -> + Pid = new_worker(Sup), + Ref = erlang:monitor(process, FromPid), + {Pid, Ref}. + +dismiss_worker(Sup, Pid) -> + true = unlink(Pid), + supervisor:terminate_child(Sup, Pid). + +prepopulate(N, _Sup) when N < 1 -> + []; +prepopulate(N, Sup) -> + prepopulate(N, Sup, []). + +prepopulate(0, _Sup, Workers) -> + Workers; +prepopulate(N, Sup, Workers) -> + prepopulate(N-1, Sup, [new_worker(Sup) | Workers]). + +handle_checkin(Pid, State) -> + #state{supervisor = Sup, + waiting = Waiting, + monitors = Monitors, + overflow = Overflow, + strategy = Strategy} = State, + case queue:out(Waiting) of + {{value, {From, CRef, MRef}}, Left} -> + true = ets:insert(Monitors, {Pid, CRef, MRef}), + gen_server:reply(From, Pid), + State#state{waiting = Left}; + {empty, Empty} when Overflow > 0 -> + ok = dismiss_worker(Sup, Pid), + State#state{waiting = Empty, overflow = Overflow - 1}; + {empty, Empty} -> + Workers = case Strategy of + lifo -> [Pid | State#state.workers]; + fifo -> State#state.workers ++ [Pid] + end, + State#state{workers = Workers, waiting = Empty, overflow = 0} + end. + +handle_worker_exit(Pid, State) -> + #state{supervisor = Sup, + monitors = Monitors, + overflow = Overflow} = State, + case queue:out(State#state.waiting) of + {{value, {From, CRef, MRef}}, LeftWaiting} -> + NewWorker = new_worker(State#state.supervisor), + true = ets:insert(Monitors, {NewWorker, CRef, MRef}), + gen_server:reply(From, NewWorker), + State#state{waiting = LeftWaiting}; + {empty, Empty} when Overflow > 0 -> + State#state{overflow = Overflow - 1, waiting = Empty}; + {empty, Empty} -> + Workers = + [new_worker(Sup) + | lists:filter(fun (P) -> P =/= Pid end, State#state.workers)], + State#state{workers = Workers, waiting = Empty} + end. + +state_name(State = #state{overflow = Overflow}) when Overflow < 1 -> + #state{max_overflow = MaxOverflow, workers = Workers} = State, + case length(Workers) == 0 of + true when MaxOverflow < 1 -> full; + true -> overflow; + false -> ready + end; +state_name(#state{overflow = MaxOverflow, max_overflow = MaxOverflow}) -> + full; +state_name(_State) -> + overflow. -- cgit v1.2.3