package monq.net;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.util.HashMap;
import java.util.Map;
import monq.stuff.Feeder;
import monq.stuff.Semaphore;

/* loaded from: input_file:monq/net/DistPipeFilter.class */
public class DistPipeFilter {
    private String host;
    private ServerSocket serverSocket;
    private Semaphore sem;
    private Feeder[] pending;
    private TcpServer serv;
    private boolean running;
    private Map<InputStream, Job> connections;

    /* loaded from: input_file:monq/net/DistPipeFilter$CatchAllFeeder.class */
    private static class CatchAllFeeder implements Feeder {
        private Exception e;
        private Feeder client;

        public CatchAllFeeder(Feeder feeder) {
            this.client = feeder;
        }

        @Override // monq.net.Service
        public Exception getException() {
            Exception exception = this.client.getException();
            return exception != null ? exception : this.e;
        }

        @Override // monq.stuff.Feeder
        public void setOut(OutputStream outputStream, boolean z) {
            this.client.setOut(outputStream, z);
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                this.client.run();
            } catch (Exception e) {
                this.e = e;
            }
        }
    }

    /* loaded from: input_file:monq/net/DistPipeFilter$Job.class */
    private static final class Job {
        public int slot;
        public FilterConnection connection;

        public Job(int i, FilterConnection filterConnection) {
            this.slot = i;
            this.connection = filterConnection;
        }
    }

    /* loaded from: input_file:monq/net/DistPipeFilter$SourceFactory.class */
    private class SourceFactory implements ServiceFactory {
        private SourceFactory() {
        }

        @Override // monq.net.ServiceFactory
        public Service createService(InputStream inputStream, OutputStream outputStream, Object obj) throws ServiceCreateException {
            try {
                try {
                    int parseInt = Integer.parseInt((String) new FilterConnection(inputStream).getParameters().get("slot"));
                    if (DistPipeFilter.this.pending[parseInt] == null) {
                        throw new ServiceUnavailException("no data waiting to be shipped in slot " + parseInt);
                    }
                    CatchAllFeeder catchAllFeeder = new CatchAllFeeder(DistPipeFilter.this.pending[parseInt]);
                    DistPipeFilter.this.pending[parseInt] = catchAllFeeder;
                    catchAllFeeder.setOut(outputStream, false);
                    return catchAllFeeder;
                } catch (NumberFormatException e) {
                    throw new ServiceUnavailException("slot received is not a number", e);
                }
            } catch (IOException e2) {
                throw new ServiceUnavailException("cannot read or parse request", e2);
            }
        }
    }

    public DistPipeFilter(int i, int i2, PrintStream printStream) throws IOException {
        this.serv = null;
        this.running = false;
        this.connections = new HashMap();
        this.serverSocket = new ServerSocket(i);
        this.host = InetAddress.getLocalHost().getHostAddress();
        this.sem = new Semaphore(i2);
        this.pending = new Feeder[i2];
        this.serv = new TcpServer(this.serverSocket, new SourceFactory(), i2);
        if (printStream != null) {
            this.serv.setLogging(printStream);
        }
    }

    public DistPipeFilter(int i, int i2) throws IOException {
        this(i, i2, null);
    }

    public synchronized void start() {
        if (this.running) {
            return;
        }
        this.running = true;
        Thread thread = new Thread(this.serv);
        thread.setDaemon(true);
        thread.start();
    }

    public synchronized void shutdown() {
        if (this.running) {
            this.running = false;
            this.serv.shutdown();
        }
    }

    public InputStream open(PipelineRequest[] pipelineRequestArr, Feeder feeder) throws IOException {
        if (!this.running) {
            throw new IllegalStateException("server not running");
        }
        int acquire = this.sem.acquire();
        this.pending[acquire] = feeder;
        PipelineRequest pipelineRequest = new PipelineRequest(this.host, this.serverSocket.getLocalPort());
        pipelineRequest.put("slot", Integer.toString(acquire));
        FilterConnection filterConnection = new FilterConnection(pipelineRequestArr);
        filterConnection.append(pipelineRequest);
        try {
            InputStream connect = filterConnection.connect();
            this.connections.put(connect, new Job(acquire, filterConnection));
            return connect;
        } catch (IOException e) {
            this.pending[acquire] = null;
            this.sem.release(acquire);
            throw e;
        }
    }

    public void close(InputStream inputStream) throws IOException {
        Job remove;
        synchronized (this.connections) {
            remove = this.connections.remove(inputStream);
        }
        if (remove == null) {
            throw new IllegalArgumentException("connection bound to given InputStream is closed already or was never open");
        }
        Feeder feeder = this.pending[remove.slot];
        this.pending[remove.slot] = null;
        this.sem.release(remove.slot);
        remove.connection.close();
        if (!(feeder instanceof CatchAllFeeder)) {
            throw new IOException("The data was never asked for. This means that the pipeline collapsed somewhere in the middle. More information can only be found in the logs of intermediate servers.");
        }
        Exception exception = feeder.getException();
        if (exception == null) {
            return;
        }
        if (exception instanceof IOException) {
            throw ((IOException) exception);
        }
        IOException iOException = new IOException("feeder threw exception, see cause");
        iOException.initCause(exception);
        throw iOException;
    }
}
