package kafka.tools;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import joptsimple.ArgumentAcceptingOptionSpec;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import joptsimple.OptionSpecBuilder;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.utils.Utils;
import org.apache.log4j.Logger;

/* loaded from: input_file:kafka/tools/KafkaMigrationTool.class */
public class KafkaMigrationTool {
    private static final String KAFKA_07_STATIC_CONSUMER_CLASS_NAME = "kafka.consumer.Consumer";
    private static final String KAFKA_07_CONSUMER_CONFIG_CLASS_NAME = "kafka.consumer.ConsumerConfig";
    private static final String KAFKA_07_CONSUMER_STREAM_CLASS_NAME = "kafka.consumer.KafkaStream";
    private static final String KAFKA_07_CONSUMER_ITERATOR_CLASS_NAME = "kafka.consumer.ConsumerIterator";
    private static final String KAFKA_07_CONSUMER_CONNECTOR_CLASS_NAME = "kafka.javaapi.consumer.ConsumerConnector";
    private static final String KAFKA_07_MESSAGE_AND_METADATA_CLASS_NAME = "kafka.message.MessageAndMetadata";
    private static final String KAFKA_07_MESSAGE_CLASS_NAME = "kafka.message.Message";
    private static final String KAFKA_07_WHITE_LIST_CLASS_NAME = "kafka.consumer.Whitelist";
    private static final String KAFKA_07_TOPIC_FILTER_CLASS_NAME = "kafka.consumer.TopicFilter";
    private static final String KAFKA_07_BLACK_LIST_CLASS_NAME = "kafka.consumer.Blacklist";
    private static final Logger logger = Logger.getLogger(KafkaMigrationTool.class.getName());
    private static Class<?> KafkaStaticConsumer_07 = null;
    private static Class<?> ConsumerConfig_07 = null;
    private static Class<?> ConsumerConnector_07 = null;
    private static Class<?> KafkaStream_07 = null;
    private static Class<?> TopicFilter_07 = null;
    private static Class<?> WhiteList_07 = null;
    private static Class<?> BlackList_07 = null;
    private static Class<?> KafkaConsumerIteratorClass_07 = null;
    private static Class<?> KafkaMessageAndMetatDataClass_07 = null;
    private static Class<?> KafkaMessageClass_07 = null;

    /* loaded from: input_file:kafka/tools/KafkaMigrationTool$MigrationThread.class */
    private static class MigrationThread extends Thread {
        private final Object stream;
        private final ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel;
        private final int threadId;
        private final String threadName;
        private CountDownLatch shutdownComplete = new CountDownLatch(1);
        private final AtomicBoolean isRunning = new AtomicBoolean(true);
        private final Logger logger = Logger.getLogger(MigrationThread.class.getName());

        MigrationThread(Object obj, ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel, int i) {
            this.stream = obj;
            this.producerDataChannel = producerDataChannel;
            this.threadId = i;
            this.threadName = "MigrationThread-" + this.threadId;
            setName(this.threadName);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                try {
                    try {
                        Method method = KafkaMigrationTool.KafkaMessageClass_07.getMethod("payload", new Class[0]);
                        Method method2 = KafkaMigrationTool.KafkaMessageAndMetatDataClass_07.getMethod("message", new Class[0]);
                        Method method3 = KafkaMigrationTool.KafkaMessageAndMetatDataClass_07.getMethod("topic", new Class[0]);
                        Method method4 = KafkaMigrationTool.KafkaStream_07.getMethod("iterator", new Class[0]);
                        Method method5 = KafkaMigrationTool.KafkaConsumerIteratorClass_07.getMethod("hasNext", new Class[0]);
                        Method method6 = KafkaMigrationTool.KafkaConsumerIteratorClass_07.getMethod("next", new Class[0]);
                        Object invoke = method4.invoke(this.stream, new Object[0]);
                        while (((Boolean) method5.invoke(invoke, new Object[0])).booleanValue()) {
                            Object invoke2 = method6.invoke(invoke, new Object[0]);
                            Object invoke3 = method2.invoke(invoke2, new Object[0]);
                            Object invoke4 = method3.invoke(invoke2, new Object[0]);
                            Object invoke5 = method.invoke(invoke3, new Object[0]);
                            byte[] bArr = new byte[((ByteBuffer) invoke5).remaining()];
                            ((ByteBuffer) invoke5).get(bArr);
                            if (this.logger.isDebugEnabled()) {
                                this.logger.debug("Migration thread " + this.threadId + " sending message of size " + bArr.length + " to topic " + invoke4);
                            }
                            this.producerDataChannel.sendRequest(new KeyedMessage<>((String) invoke4, null, bArr));
                        }
                        this.logger.info("Migration thread " + this.threadName + " finished running");
                        this.shutdownComplete.countDown();
                    } catch (Throwable th) {
                        this.logger.fatal("Migration thread failure due to ", th);
                        this.shutdownComplete.countDown();
                    }
                } catch (InvocationTargetException e) {
                    this.logger.fatal("Migration thread failure due to root cause ", e.getCause());
                    this.shutdownComplete.countDown();
                }
            } catch (Throwable th2) {
                this.shutdownComplete.countDown();
                throw th2;
            }
        }

        public void shutdown() {
            this.logger.info("Migration thread " + this.threadName + " shutting down");
            this.isRunning.set(false);
            interrupt();
            try {
                this.shutdownComplete.await();
            } catch (InterruptedException e) {
                this.logger.warn("Interrupt during shutdown of MigrationThread", e);
            }
            this.logger.info("Migration thread " + this.threadName + " shutdown complete");
        }
    }

    /* loaded from: input_file:kafka/tools/KafkaMigrationTool$ParentLastURLClassLoader.class */
    private static class ParentLastURLClassLoader extends ClassLoader {
        private ChildURLClassLoader childClassLoader;

        /* loaded from: input_file:kafka/tools/KafkaMigrationTool$ParentLastURLClassLoader$ChildURLClassLoader.class */
        private static class ChildURLClassLoader extends URLClassLoader {
            private FindClassClassLoader realParent;

            public ChildURLClassLoader(URL[] urlArr, FindClassClassLoader findClassClassLoader) {
                super(urlArr, null);
                this.realParent = findClassClassLoader;
            }

            @Override // java.net.URLClassLoader, java.lang.ClassLoader
            public Class<?> findClass(String str) throws ClassNotFoundException {
                try {
                    return super.findClass(str);
                } catch (ClassNotFoundException e) {
                    return this.realParent.loadClass(str);
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:kafka/tools/KafkaMigrationTool$ParentLastURLClassLoader$FindClassClassLoader.class */
        public static class FindClassClassLoader extends ClassLoader {
            public FindClassClassLoader(ClassLoader classLoader) {
                super(classLoader);
            }

            @Override // java.lang.ClassLoader
            public Class<?> findClass(String str) throws ClassNotFoundException {
                return super.findClass(str);
            }
        }

        public ParentLastURLClassLoader(URL[] urlArr) {
            super(Thread.currentThread().getContextClassLoader());
            this.childClassLoader = new ChildURLClassLoader(urlArr, new FindClassClassLoader(getParent()));
        }

        @Override // java.lang.ClassLoader
        protected synchronized Class<?> loadClass(String str, boolean z) throws ClassNotFoundException {
            try {
                return this.childClassLoader.findClass(str);
            } catch (ClassNotFoundException e) {
                return super.loadClass(str, z);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:kafka/tools/KafkaMigrationTool$ProducerDataChannel.class */
    public static class ProducerDataChannel<T> {
        private final int producerQueueSize;
        private final BlockingQueue<T> producerRequestQueue;

        public ProducerDataChannel(int i) {
            this.producerQueueSize = i;
            this.producerRequestQueue = new ArrayBlockingQueue(this.producerQueueSize);
        }

        public void sendRequest(T t) throws InterruptedException {
            this.producerRequestQueue.put(t);
        }

        public T receiveRequest() throws InterruptedException {
            return this.producerRequestQueue.take();
        }
    }

    /* loaded from: input_file:kafka/tools/KafkaMigrationTool$ProducerThread.class */
    static class ProducerThread extends Thread {
        private final ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel;
        private final Producer<byte[], byte[]> producer;
        private final int threadId;
        private String threadName;
        private CountDownLatch shutdownComplete = new CountDownLatch(1);
        private KeyedMessage<byte[], byte[]> shutdownMessage = new KeyedMessage<>("shutdown", null, null);
        private Logger logger = Logger.getLogger(ProducerThread.class.getName());

        public ProducerThread(ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel, Producer<byte[], byte[]> producer, int i) {
            this.producerDataChannel = producerDataChannel;
            this.producer = producer;
            this.threadId = i;
            this.threadName = "ProducerThread-" + this.threadId;
            setName(this.threadName);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    try {
                        KeyedMessage<byte[], byte[]> receiveRequest = this.producerDataChannel.receiveRequest();
                        if (receiveRequest.equals(this.shutdownMessage)) {
                            this.logger.info("Producer thread " + this.threadName + " finished running");
                            this.shutdownComplete.countDown();
                            return;
                        } else {
                            this.producer.send(receiveRequest);
                            if (this.logger.isDebugEnabled()) {
                                this.logger.debug(String.format(new String(receiveRequest.message()), new Object[0]));
                            }
                        }
                    } catch (Throwable th) {
                        this.logger.fatal("Producer thread failure due to ", th);
                        this.shutdownComplete.countDown();
                        return;
                    }
                } catch (Throwable th2) {
                    this.shutdownComplete.countDown();
                    throw th2;
                }
            }
        }

        public void shutdown() {
            try {
                this.logger.info("Producer thread " + this.threadName + " shutting down");
                this.producerDataChannel.sendRequest(this.shutdownMessage);
            } catch (InterruptedException e) {
                this.logger.warn("Interrupt during shutdown of ProducerThread", e);
            }
        }

        public void awaitShutdown() {
            try {
                this.shutdownComplete.await();
                this.producer.close();
                this.logger.info("Producer thread " + this.threadName + " shutdown complete");
            } catch (InterruptedException e) {
                this.logger.warn("Interrupt during shutdown of ProducerThread", e);
            }
        }
    }

    public static void main(String[] strArr) throws InterruptedException, IOException {
        OptionParser optionParser = new OptionParser();
        OptionSpec ofType = optionParser.accepts("consumer.config", "Kafka 0.7 consumer config to consume from the source 0.7 cluster. You man specify multiple of these.").withRequiredArg().describedAs("config file").ofType(String.class);
        OptionSpec ofType2 = optionParser.accepts("producer.config", "Producer config.").withRequiredArg().describedAs("config file").ofType(String.class);
        ArgumentAcceptingOptionSpec defaultsTo = optionParser.accepts("num.producers", "Number of producer instances").withRequiredArg().describedAs("Number of producers").ofType(Integer.class).defaultsTo(1, new Integer[0]);
        OptionSpec ofType3 = optionParser.accepts("zkclient.01.jar", "zkClient 0.1 jar file").withRequiredArg().describedAs("zkClient 0.1 jar file required by Kafka 0.7").ofType(String.class);
        OptionSpec ofType4 = optionParser.accepts("kafka.07.jar", "Kafka 0.7 jar file").withRequiredArg().describedAs("kafka 0.7 jar").ofType(String.class);
        ArgumentAcceptingOptionSpec defaultsTo2 = optionParser.accepts("num.streams", "Number of consumer streams").withRequiredArg().describedAs("Number of consumer threads").ofType(Integer.class).defaultsTo(1, new Integer[0]);
        ArgumentAcceptingOptionSpec ofType5 = optionParser.accepts("whitelist", "Whitelist of topics to migrate from the 0.7 cluster").withRequiredArg().describedAs("Java regex (String)").ofType(String.class);
        ArgumentAcceptingOptionSpec ofType6 = optionParser.accepts("blacklist", "Blacklist of topics to migrate from the 0.7 cluster").withRequiredArg().describedAs("Java regex (String)").ofType(String.class);
        ArgumentAcceptingOptionSpec defaultsTo3 = optionParser.accepts("queue.size", "Number of messages that are buffered between the 0.7 consumer and 0.8 producer").withRequiredArg().describedAs("Queue size in terms of number of messages").ofType(Integer.class).defaultsTo(10000, new Integer[0]);
        OptionSpecBuilder accepts = optionParser.accepts("help", "Print this message.");
        OptionSet parse = optionParser.parse(strArr);
        if (parse.has(accepts)) {
            optionParser.printHelpOn(System.out);
            System.exit(0);
        }
        checkRequiredArgs(optionParser, parse, new OptionSpec[]{ofType, ofType2, ofType3, ofType4});
        if ((parse.has(ofType5) ? 1 : 0) + (parse.has(ofType6) ? 1 : 0) != 1) {
            System.err.println("Exactly one of whitelist or blacklist is required.");
            System.exit(1);
        }
        String str = (String) parse.valueOf(ofType4);
        String str2 = (String) parse.valueOf(ofType3);
        String str3 = (String) parse.valueOf(ofType);
        int intValue = ((Integer) parse.valueOf(defaultsTo2)).intValue();
        String str4 = (String) parse.valueOf(ofType2);
        int intValue2 = ((Integer) parse.valueOf(defaultsTo)).intValue();
        final ArrayList arrayList = new ArrayList(intValue);
        final ArrayList arrayList2 = new ArrayList(intValue2);
        try {
            ParentLastURLClassLoader parentLastURLClassLoader = new ParentLastURLClassLoader(new URL[]{new File(str).toURI().toURL(), new File(str2).toURI().toURL()});
            ConsumerConfig_07 = parentLastURLClassLoader.loadClass(KAFKA_07_CONSUMER_CONFIG_CLASS_NAME);
            KafkaStaticConsumer_07 = parentLastURLClassLoader.loadClass(KAFKA_07_STATIC_CONSUMER_CLASS_NAME);
            ConsumerConnector_07 = parentLastURLClassLoader.loadClass(KAFKA_07_CONSUMER_CONNECTOR_CLASS_NAME);
            KafkaStream_07 = parentLastURLClassLoader.loadClass(KAFKA_07_CONSUMER_STREAM_CLASS_NAME);
            TopicFilter_07 = parentLastURLClassLoader.loadClass(KAFKA_07_TOPIC_FILTER_CLASS_NAME);
            WhiteList_07 = parentLastURLClassLoader.loadClass(KAFKA_07_WHITE_LIST_CLASS_NAME);
            BlackList_07 = parentLastURLClassLoader.loadClass(KAFKA_07_BLACK_LIST_CLASS_NAME);
            KafkaMessageClass_07 = parentLastURLClassLoader.loadClass(KAFKA_07_MESSAGE_CLASS_NAME);
            KafkaConsumerIteratorClass_07 = parentLastURLClassLoader.loadClass(KAFKA_07_CONSUMER_ITERATOR_CLASS_NAME);
            KafkaMessageAndMetatDataClass_07 = parentLastURLClassLoader.loadClass(KAFKA_07_MESSAGE_AND_METADATA_CLASS_NAME);
            Constructor<?> constructor = ConsumerConfig_07.getConstructor(Properties.class);
            Properties properties = new Properties();
            properties.load(new FileInputStream(str3));
            if (properties.getProperty("shallow.iterator.enable", "").equals("true")) {
                logger.warn("Shallow iterator should not be used in the migration tool");
                properties.setProperty("shallow.iterator.enable", "false");
            }
            final Object invoke = KafkaStaticConsumer_07.getMethod("createJavaConsumerConnector", ConsumerConfig_07).invoke(null, constructor.newInstance(properties));
            Method method = ConsumerConnector_07.getMethod("createMessageStreamsByFilter", TopicFilter_07, Integer.TYPE);
            final Method method2 = ConsumerConnector_07.getMethod("shutdown", new Class[0]);
            Object invoke2 = method.invoke(invoke, parse.has(ofType5) ? WhiteList_07.getConstructor(String.class).newInstance(parse.valueOf(ofType5)) : BlackList_07.getConstructor(String.class).newInstance(parse.valueOf(ofType6)), Integer.valueOf(intValue));
            Properties properties2 = new Properties();
            properties2.load(new FileInputStream(str4));
            properties2.setProperty("serializer.class", "kafka.serializer.DefaultEncoder");
            ProducerDataChannel producerDataChannel = new ProducerDataChannel(((Integer) parse.valueOf(defaultsTo3)).intValue());
            int i = 0;
            Runtime.getRuntime().addShutdownHook(new Thread() { // from class: kafka.tools.KafkaMigrationTool.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        method2.invoke(invoke, new Object[0]);
                    } catch (Exception e) {
                        KafkaMigrationTool.logger.error("Error while shutting down Kafka consumer", e);
                    }
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        ((MigrationThread) it.next()).shutdown();
                    }
                    Iterator it2 = arrayList2.iterator();
                    while (it2.hasNext()) {
                        ((ProducerThread) it2.next()).shutdown();
                    }
                    Iterator it3 = arrayList2.iterator();
                    while (it3.hasNext()) {
                        ((ProducerThread) it3.next()).awaitShutdown();
                    }
                    KafkaMigrationTool.logger.info("Kafka migration tool shutdown successfully");
                }
            });
            Iterator it = ((List) invoke2).iterator();
            while (it.hasNext()) {
                MigrationThread migrationThread = new MigrationThread(it.next(), producerDataChannel, i);
                i++;
                migrationThread.start();
                arrayList.add(migrationThread);
            }
            String property = properties2.getProperty("client.id");
            for (int i2 = 0; i2 < intValue2; i2++) {
                properties2.put("client.id", property + "-" + i2);
                ProducerThread producerThread = new ProducerThread(producerDataChannel, new Producer(new ProducerConfig(properties2)), i2);
                producerThread.start();
                arrayList2.add(producerThread);
            }
        } catch (Throwable th) {
            System.out.println("Kafka migration tool failed due to: " + Utils.stackTrace(th));
            logger.error("Kafka migration tool failed: ", th);
        }
    }

    private static void checkRequiredArgs(OptionParser optionParser, OptionSet optionSet, OptionSpec[] optionSpecArr) throws IOException {
        for (OptionSpec optionSpec : optionSpecArr) {
            if (!optionSet.has(optionSpec)) {
                System.err.println("Missing required argument \"" + optionSpec + "\"");
                optionParser.printHelpOn(System.err);
                System.exit(1);
            }
        }
    }
}
