/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.assignment;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.ApplicationState;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment;
import org.apache.kafka.streams.processor.assignment.KafkaStreamsState;
import org.apache.kafka.streams.processor.assignment.ProcessId;
import org.apache.kafka.streams.processor.assignment.TaskAssignor;
import org.apache.kafka.streams.processor.assignment.TaskInfo;
import org.apache.kafka.streams.processor.assignment.TaskTopicPartition;
import org.apache.kafka.streams.processor.internals.assignment.ConstrainedPrioritySet;
import org.apache.kafka.streams.processor.internals.assignment.Graph;
import org.apache.kafka.streams.processor.internals.assignment.MinTrafficGraphConstructor;
import org.apache.kafka.streams.processor.internals.assignment.RackAwareGraphConstructor;
import org.apache.kafka.streams.processor.internals.assignment.RackAwareGraphConstructorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class TaskAssignmentUtils {
    private static final Logger LOG = LoggerFactory.getLogger(TaskAssignmentUtils.class);

    private TaskAssignmentUtils() {
    }

    public static TaskAssignor.AssignmentError validateTaskAssignment(ApplicationState applicationState, TaskAssignor.TaskAssignment taskAssignment) {
        Set<TaskId> taskIdsInInput = applicationState.allTasks().keySet();
        Collection<KafkaStreamsAssignment> assignments = taskAssignment.assignment();
        HashMap<TaskId, ProcessId> activeTasksInOutput = new HashMap<TaskId, ProcessId>();
        HashMap<TaskId, ProcessId> standbyTasksInOutput = new HashMap<TaskId, ProcessId>();
        for (KafkaStreamsAssignment assignment : assignments) {
            for (KafkaStreamsAssignment.AssignedTask assignedTask : assignment.tasks().values()) {
                if (!taskIdsInInput.contains(assignedTask.id())) {
                    LOG.error("Assignment is invalid: task {} assigned to KafkaStreams client {} was unknown", (Object)assignedTask.id(), (Object)assignment.processId().id());
                    return TaskAssignor.AssignmentError.UNKNOWN_TASK_ID;
                }
                if (activeTasksInOutput.containsKey(assignedTask.id()) && assignedTask.type() == KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
                    LOG.error("Assignment is invalid: active task {} was assigned to multiple KafkaStreams clients: {} and {}", new Object[]{assignedTask.id(), assignment.processId().id(), ((ProcessId)activeTasksInOutput.get(assignedTask.id())).id()});
                    return TaskAssignor.AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES;
                }
                if (assignedTask.type() == KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
                    activeTasksInOutput.put(assignedTask.id(), assignment.processId());
                    continue;
                }
                standbyTasksInOutput.put(assignedTask.id(), assignment.processId());
            }
        }
        for (TaskInfo task : applicationState.allTasks().values()) {
            if (task.isStateful() || !standbyTasksInOutput.containsKey(task.id())) continue;
            LOG.error("Assignment is invalid: standby task for stateless task {} was assigned to KafkaStreams client {}", (Object)task.id(), (Object)((ProcessId)standbyTasksInOutput.get(task.id())).id());
            return TaskAssignor.AssignmentError.INVALID_STANDBY_TASK;
        }
        Map<ProcessId, KafkaStreamsState> clientStates = applicationState.kafkaStreamsStates(false);
        Set clientsInOutput = assignments.stream().map(KafkaStreamsAssignment::processId).collect(Collectors.toSet());
        for (Map.Entry entry : clientStates.entrySet()) {
            ProcessId processIdInInput = (ProcessId)entry.getKey();
            if (clientsInOutput.contains(processIdInInput)) continue;
            LOG.error("Assignment is invalid: KafkaStreams client {} has no assignment", (Object)processIdInInput.id());
            return TaskAssignor.AssignmentError.MISSING_PROCESS_ID;
        }
        for (ProcessId processId : clientsInOutput) {
            if (clientStates.containsKey(processId)) continue;
            LOG.error("Assignment is invalid: the KafkaStreams client {} is unknown", (Object)processId.id());
            return TaskAssignor.AssignmentError.UNKNOWN_PROCESS_ID;
        }
        return TaskAssignor.AssignmentError.NONE;
    }

    public static Map<ProcessId, KafkaStreamsAssignment> identityAssignment(ApplicationState applicationState) {
        HashMap<ProcessId, KafkaStreamsAssignment> assignments = new HashMap<ProcessId, KafkaStreamsAssignment>();
        applicationState.kafkaStreamsStates(false).forEach((processId, state) -> {
            HashSet<KafkaStreamsAssignment.AssignedTask> tasks = new HashSet<KafkaStreamsAssignment.AssignedTask>();
            state.previousActiveTasks().forEach(taskId -> tasks.add(new KafkaStreamsAssignment.AssignedTask((TaskId)taskId, KafkaStreamsAssignment.AssignedTask.Type.ACTIVE)));
            state.previousStandbyTasks().forEach(taskId -> tasks.add(new KafkaStreamsAssignment.AssignedTask((TaskId)taskId, KafkaStreamsAssignment.AssignedTask.Type.STANDBY)));
            KafkaStreamsAssignment newAssignment = KafkaStreamsAssignment.of(processId, tasks);
            assignments.put((ProcessId)processId, newAssignment);
        });
        return assignments;
    }

    public static void defaultStandbyTaskAssignment(ApplicationState applicationState, Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments) {
        if (!applicationState.assignmentConfigs().rackAwareAssignmentTags().isEmpty()) {
            TaskAssignmentUtils.tagBasedStandbyTaskAssignment(applicationState, kafkaStreamsAssignments);
        } else {
            TaskAssignmentUtils.loadBasedStandbyTaskAssignment(applicationState, kafkaStreamsAssignments);
        }
    }

    public static void optimizeRackAwareActiveTasks(RackAwareOptimizationParams optimizationParams, Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments) {
        ApplicationState applicationState = optimizationParams.applicationState;
        SortedSet<TaskId> activeTasksToOptimize = TaskAssignmentUtils.getTasksToOptimize(kafkaStreamsAssignments, optimizationParams, KafkaStreamsAssignment.AssignedTask.Type.ACTIVE);
        if (activeTasksToOptimize.isEmpty()) {
            return;
        }
        if (!TaskAssignmentUtils.canPerformRackAwareOptimization(applicationState, optimizationParams, KafkaStreamsAssignment.AssignedTask.Type.ACTIVE)) {
            return;
        }
        TaskAssignmentUtils.initializeAssignmentsForAllClients(applicationState, kafkaStreamsAssignments);
        int crossRackTrafficCost = optimizationParams.trafficCostOverride.orElseGet(() -> applicationState.assignmentConfigs().rackAwareTrafficCost().getAsInt());
        int nonOverlapCost = optimizationParams.nonOverlapCostOverride.orElseGet(() -> applicationState.assignmentConfigs().rackAwareNonOverlapCost().getAsInt());
        Map<ProcessId, KafkaStreamsState> kafkaStreamsStates = applicationState.kafkaStreamsStates(false);
        ArrayList<TaskId> taskIds = new ArrayList<TaskId>(activeTasksToOptimize);
        Map<TaskId, Set<TaskTopicPartition>> topicPartitionsByTaskId = applicationState.allTasks().values().stream().filter(taskInfo -> activeTasksToOptimize.contains(taskInfo.id())).collect(Collectors.toMap(TaskInfo::id, TaskInfo::topicPartitions));
        ArrayList<ProcessId> clientIds = new ArrayList<ProcessId>(kafkaStreamsStates.keySet());
        long initialCost = TaskAssignmentUtils.computeTotalAssignmentCost(topicPartitionsByTaskId, taskIds, clientIds, kafkaStreamsAssignments, kafkaStreamsStates, crossRackTrafficCost, nonOverlapCost, false, false);
        LOG.info("Assignment before active task optimization has cost {}", (Object)initialCost);
        RackAwareGraphConstructor<KafkaStreamsAssignment> graphConstructor = RackAwareGraphConstructorFactory.create(applicationState.assignmentConfigs().rackAwareAssignmentStrategy(), taskIds);
        AssignmentGraph assignmentGraph = TaskAssignmentUtils.buildTaskGraph(kafkaStreamsAssignments, kafkaStreamsStates, taskIds, clientIds, topicPartitionsByTaskId, crossRackTrafficCost, nonOverlapCost, false, false, graphConstructor);
        assignmentGraph.graph.solveMinCostFlow();
        graphConstructor.assignTaskFromMinCostFlow(assignmentGraph.graph, clientIds, taskIds, kafkaStreamsAssignments, assignmentGraph.taskCountByClient, assignmentGraph.clientByTask, (assignment, taskId) -> assignment.assignTask(new KafkaStreamsAssignment.AssignedTask((TaskId)taskId, KafkaStreamsAssignment.AssignedTask.Type.ACTIVE)), (assignment, taskId) -> assignment.removeTask(new KafkaStreamsAssignment.AssignedTask((TaskId)taskId, KafkaStreamsAssignment.AssignedTask.Type.ACTIVE)), (assignment, taskId) -> assignment.tasks().containsKey(taskId) && assignment.tasks().get(taskId).type() == KafkaStreamsAssignment.AssignedTask.Type.ACTIVE);
    }

    public static void optimizeRackAwareStandbyTasks(RackAwareOptimizationParams optimizationParams, Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments) {
        ApplicationState applicationState = optimizationParams.applicationState;
        SortedSet<TaskId> standbyTasksToOptimize = TaskAssignmentUtils.getTasksToOptimize(kafkaStreamsAssignments, optimizationParams, KafkaStreamsAssignment.AssignedTask.Type.STANDBY);
        if (standbyTasksToOptimize.isEmpty()) {
            return;
        }
        if (!TaskAssignmentUtils.canPerformRackAwareOptimization(applicationState, optimizationParams, KafkaStreamsAssignment.AssignedTask.Type.STANDBY)) {
            return;
        }
        TaskAssignmentUtils.initializeAssignmentsForAllClients(applicationState, kafkaStreamsAssignments);
        int crossRackTrafficCost = optimizationParams.trafficCostOverride.orElseGet(() -> applicationState.assignmentConfigs().rackAwareTrafficCost().getAsInt());
        int nonOverlapCost = optimizationParams.nonOverlapCostOverride.orElseGet(() -> applicationState.assignmentConfigs().rackAwareNonOverlapCost().getAsInt());
        Map<ProcessId, KafkaStreamsState> kafkaStreamsStates = applicationState.kafkaStreamsStates(false);
        Map<TaskId, Set<TaskTopicPartition>> topicPartitionsByTaskId = applicationState.allTasks().values().stream().collect(Collectors.toMap(TaskInfo::id, t -> t.topicPartitions().stream().filter(TaskTopicPartition::isChangelog).collect(Collectors.toSet())));
        ArrayList<ProcessId> clientIds = new ArrayList<ProcessId>(kafkaStreamsStates.keySet());
        long initialCost = TaskAssignmentUtils.computeTotalAssignmentCost(topicPartitionsByTaskId, new ArrayList<TaskId>(standbyTasksToOptimize), clientIds, kafkaStreamsAssignments, kafkaStreamsStates, crossRackTrafficCost, nonOverlapCost, true, true);
        LOG.info("Assignment before standby task optimization has cost {}", (Object)initialCost);
        MoveStandbyTaskPredicate moveablePredicate = TaskAssignmentUtils.getStandbyTaskMovePredicate(applicationState);
        BiFunction<KafkaStreamsAssignment, KafkaStreamsAssignment, List> getMovableTasks = (source, destination) -> source.tasks().values().stream().filter(task -> task.type() == KafkaStreamsAssignment.AssignedTask.Type.STANDBY).filter(task -> !destination.tasks().containsKey(task.id())).filter(task -> {
            KafkaStreamsState sourceState = (KafkaStreamsState)kafkaStreamsStates.get(source.processId());
            KafkaStreamsState destinationState = (KafkaStreamsState)kafkaStreamsStates.get(source.processId());
            return moveablePredicate.canMoveStandbyTask(sourceState, destinationState, task.id(), kafkaStreamsAssignments);
        }).map(KafkaStreamsAssignment.AssignedTask::id).sorted().collect(Collectors.toList());
        long startTime = System.currentTimeMillis();
        boolean taskMoved = true;
        int round = 0;
        RackAwareGraphConstructor<KafkaStreamsAssignment> graphConstructor = RackAwareGraphConstructorFactory.create(applicationState.assignmentConfigs().rackAwareAssignmentStrategy(), standbyTasksToOptimize);
        while (taskMoved && round < 4) {
            taskMoved = false;
            ++round;
            for (int i = 0; i < kafkaStreamsAssignments.size(); ++i) {
                ProcessId clientId1 = (ProcessId)clientIds.get(i);
                KafkaStreamsAssignment assignment1 = kafkaStreamsAssignments.get(clientId1);
                for (int j = i + 1; j < kafkaStreamsAssignments.size(); ++j) {
                    String rack2;
                    ProcessId clientId2 = (ProcessId)clientIds.get(j);
                    KafkaStreamsAssignment assignment2 = kafkaStreamsAssignments.get(clientId2);
                    String rack1 = kafkaStreamsStates.get(clientId1).rackId().get();
                    if (rack1.equals(rack2 = kafkaStreamsStates.get(clientId2).rackId().get())) continue;
                    List movable1 = getMovableTasks.apply(assignment1, assignment2);
                    List movable2 = getMovableTasks.apply(assignment2, assignment1);
                    if (movable1.isEmpty() || movable2.isEmpty()) continue;
                    List<TaskId> moveableTaskIds = Stream.concat(movable1.stream(), movable2.stream()).sorted().collect(Collectors.toList());
                    List<ProcessId> clientsInTaskRedistributionAttempt = Stream.of(clientId1, clientId2).sorted().collect(Collectors.toList());
                    AssignmentGraph assignmentGraph = TaskAssignmentUtils.buildTaskGraph(kafkaStreamsAssignments, kafkaStreamsStates, moveableTaskIds, clientsInTaskRedistributionAttempt, topicPartitionsByTaskId, crossRackTrafficCost, nonOverlapCost, true, true, graphConstructor);
                    assignmentGraph.graph.solveMinCostFlow();
                    taskMoved |= graphConstructor.assignTaskFromMinCostFlow(assignmentGraph.graph, clientsInTaskRedistributionAttempt, moveableTaskIds, kafkaStreamsAssignments, assignmentGraph.taskCountByClient, assignmentGraph.clientByTask, (assignment, taskId) -> assignment.assignTask(new KafkaStreamsAssignment.AssignedTask((TaskId)taskId, KafkaStreamsAssignment.AssignedTask.Type.STANDBY)), (assignment, taskId) -> assignment.removeTask(new KafkaStreamsAssignment.AssignedTask((TaskId)taskId, KafkaStreamsAssignment.AssignedTask.Type.STANDBY)), (assignment, taskId) -> assignment.tasks().containsKey(taskId) && assignment.tasks().get(taskId).type() == KafkaStreamsAssignment.AssignedTask.Type.STANDBY);
                }
            }
        }
        long finalCost = TaskAssignmentUtils.computeTotalAssignmentCost(topicPartitionsByTaskId, new ArrayList<TaskId>(standbyTasksToOptimize), clientIds, kafkaStreamsAssignments, kafkaStreamsStates, crossRackTrafficCost, nonOverlapCost, true, true);
        long duration = System.currentTimeMillis() - startTime;
        LOG.info("Assignment after {} rounds and {} milliseconds for standby task optimization is {}\n with cost {}", new Object[]{round, duration, kafkaStreamsAssignments, finalCost});
    }

    private static long computeTotalAssignmentCost(Map<TaskId, Set<TaskTopicPartition>> topicPartitionsByTaskId, List<TaskId> taskIds, List<ProcessId> clientList, Map<ProcessId, KafkaStreamsAssignment> assignments, Map<ProcessId, KafkaStreamsState> clientStates, int crossRackTrafficCost, int nonOverlapCost, boolean hasReplica, boolean isStandby) {
        if (taskIds.isEmpty()) {
            return 0L;
        }
        MinTrafficGraphConstructor<KafkaStreamsAssignment> graphConstructor = new MinTrafficGraphConstructor<KafkaStreamsAssignment>();
        AssignmentGraph assignmentGraph = TaskAssignmentUtils.buildTaskGraph(assignments, clientStates, taskIds, clientList, topicPartitionsByTaskId, crossRackTrafficCost, nonOverlapCost, hasReplica, isStandby, graphConstructor);
        return assignmentGraph.graph.totalCost();
    }

    private static AssignmentGraph buildTaskGraph(Map<ProcessId, KafkaStreamsAssignment> assignments, Map<ProcessId, KafkaStreamsState> clientStates, List<TaskId> taskIds, List<ProcessId> clientList, Map<TaskId, Set<TaskTopicPartition>> topicPartitionsByTaskId, int crossRackTrafficCost, int nonOverlapCost, boolean hasReplica, boolean isStandby, RackAwareGraphConstructor<KafkaStreamsAssignment> graphConstructor) {
        HashMap<TaskId, ProcessId> clientByTask = new HashMap<TaskId, ProcessId>();
        HashMap<ProcessId, Integer> taskCountByClient = new HashMap<ProcessId, Integer>();
        KafkaStreamsAssignment.AssignedTask.Type taskType = isStandby ? KafkaStreamsAssignment.AssignedTask.Type.STANDBY : KafkaStreamsAssignment.AssignedTask.Type.ACTIVE;
        Graph<Integer> graph = graphConstructor.constructTaskGraph(clientList, taskIds, assignments, clientByTask, taskCountByClient, (assignment, taskId) -> assignment.tasks().containsKey(taskId) && assignment.tasks().get(taskId).type() == taskType, (taskId, processId, inCurrentAssignment, unused0, unused1, unused2) -> {
            String clientRack = ((KafkaStreamsState)clientStates.get(processId)).rackId().get();
            int assignmentChangeCost = !inCurrentAssignment ? nonOverlapCost : 0;
            int trafficCost = TaskAssignmentUtils.getCrossRackTrafficCost((Set)topicPartitionsByTaskId.get(taskId), clientRack, crossRackTrafficCost);
            return assignmentChangeCost + trafficCost;
        }, crossRackTrafficCost, nonOverlapCost, hasReplica, isStandby);
        return new AssignmentGraph(graph, clientByTask, taskCountByClient);
    }

    private static int getCrossRackTrafficCost(Set<TaskTopicPartition> topicPartitions, String clientRack, int crossRackTrafficCost) {
        int cost = 0;
        for (TaskTopicPartition topicPartition : topicPartitions) {
            Optional<Set<String>> topicPartitionRacks = topicPartition.rackIds();
            if (topicPartitionRacks.get().contains(clientRack)) continue;
            cost += crossRackTrafficCost;
        }
        return cost;
    }

    private static boolean canPerformRackAwareOptimization(ApplicationState applicationState, RackAwareOptimizationParams optimizationParams, KafkaStreamsAssignment.AssignedTask.Type taskType) {
        AssignmentConfigs assignmentConfigs = applicationState.assignmentConfigs();
        String rackAwareAssignmentStrategy = assignmentConfigs.rackAwareAssignmentStrategy();
        if ("none".equals(rackAwareAssignmentStrategy)) {
            LOG.warn("Rack aware task assignment optimization disabled: rack aware strategy was set to {}", (Object)rackAwareAssignmentStrategy);
            return false;
        }
        if (assignmentConfigs.rackAwareTrafficCost().isEmpty()) {
            LOG.warn("Rack aware task assignment optimization unavailable: must configure {}", (Object)"rack.aware.assignment.traffic_cost");
            return false;
        }
        if (assignmentConfigs.rackAwareNonOverlapCost().isEmpty()) {
            LOG.warn("Rack aware task assignment optimization unavailable: must configure {}", (Object)"rack.aware.assignment.non_overlap_cost");
            return false;
        }
        return TaskAssignmentUtils.hasValidRackInformation(applicationState, taskType);
    }

    private static boolean hasValidRackInformation(ApplicationState applicationState, KafkaStreamsAssignment.AssignedTask.Type taskType) {
        for (KafkaStreamsState state : applicationState.kafkaStreamsStates(false).values()) {
            if (TaskAssignmentUtils.hasValidRackInformation(state)) continue;
            return false;
        }
        for (TaskInfo task : applicationState.allTasks().values()) {
            if (TaskAssignmentUtils.hasValidRackInformation(task, taskType)) continue;
            return false;
        }
        return true;
    }

    private static boolean hasValidRackInformation(KafkaStreamsState state) {
        if (state.rackId().isEmpty()) {
            LOG.error("KafkaStreams client {} doesn't have a rack id configured.", (Object)state.processId().id());
            return false;
        }
        return true;
    }

    private static boolean hasValidRackInformation(TaskInfo task, KafkaStreamsAssignment.AssignedTask.Type taskType) {
        Set<TaskTopicPartition> topicPartitions = taskType == KafkaStreamsAssignment.AssignedTask.Type.STANDBY ? (Collection)task.topicPartitions().stream().filter(TaskTopicPartition::isChangelog).collect(Collectors.toSet()) : task.topicPartitions();
        for (TaskTopicPartition topicPartition : topicPartitions) {
            Optional<Set<String>> racks = topicPartition.rackIds();
            if (!racks.isEmpty() && !racks.get().isEmpty()) continue;
            LOG.error("Topic partition {} for task {} does not have racks configured.", (Object)topicPartition, (Object)task.id());
            return false;
        }
        return true;
    }

    private static Map<ProcessId, KafkaStreamsAssignment> tagBasedStandbyTaskAssignment(ApplicationState applicationState, Map<ProcessId, KafkaStreamsAssignment> assignments) {
        TaskAssignmentUtils.initializeAssignmentsForAllClients(applicationState, assignments);
        int numStandbyReplicas = applicationState.assignmentConfigs().numStandbyReplicas();
        Map<ProcessId, KafkaStreamsState> streamStates = applicationState.kafkaStreamsStates(false);
        HashSet<String> rackAwareAssignmentTags = new HashSet<String>(applicationState.assignmentConfigs().rackAwareAssignmentTags());
        TagStatistics tagStatistics = new TagStatistics(applicationState);
        ConstrainedPrioritySet standbyTaskClientsByTaskLoad = TaskAssignmentUtils.standbyTaskPriorityListByLoad(streamStates, assignments);
        Set statefulTaskIds = applicationState.allTasks().values().stream().filter(taskInfo -> taskInfo.topicPartitions().stream().anyMatch(TaskTopicPartition::isChangelog)).map(TaskInfo::id).collect(Collectors.toSet());
        Map<TaskId, Integer> tasksToRemainingStandbys = statefulTaskIds.stream().collect(Collectors.toMap(Function.identity(), t -> numStandbyReplicas));
        HashMap<TaskId, ProcessId> pendingStandbyTasksToClientId = new HashMap<TaskId, ProcessId>();
        for (TaskId statefulTaskId : statefulTaskIds) {
            for (KafkaStreamsAssignment assignment : assignments.values()) {
                if (!assignment.tasks().containsKey(statefulTaskId) || assignment.tasks().get(statefulTaskId).type() != KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) continue;
                TaskAssignmentUtils.assignStandbyTasksToClientsWithDifferentTags(numStandbyReplicas, standbyTaskClientsByTaskLoad, statefulTaskId, assignment.processId(), rackAwareAssignmentTags, streamStates, assignments, tasksToRemainingStandbys, tagStatistics.tagKeyToValues, tagStatistics.tagEntryToClients, pendingStandbyTasksToClientId);
            }
        }
        if (!tasksToRemainingStandbys.isEmpty()) {
            TaskAssignmentUtils.assignPendingStandbyTasksToLeastLoadedClients(assignments, numStandbyReplicas, standbyTaskClientsByTaskLoad, tasksToRemainingStandbys);
        }
        return assignments;
    }

    private static Map<ProcessId, KafkaStreamsAssignment> loadBasedStandbyTaskAssignment(ApplicationState applicationState, Map<ProcessId, KafkaStreamsAssignment> assignments) {
        TaskAssignmentUtils.initializeAssignmentsForAllClients(applicationState, assignments);
        int numStandbyReplicas = applicationState.assignmentConfigs().numStandbyReplicas();
        Map<ProcessId, KafkaStreamsState> streamStates = applicationState.kafkaStreamsStates(false);
        Set statefulTaskIds = applicationState.allTasks().values().stream().filter(taskInfo -> taskInfo.topicPartitions().stream().anyMatch(TaskTopicPartition::isChangelog)).map(TaskInfo::id).collect(Collectors.toSet());
        Map<TaskId, Integer> tasksToRemainingStandbys = statefulTaskIds.stream().collect(Collectors.toMap(Function.identity(), t -> numStandbyReplicas));
        ConstrainedPrioritySet standbyTaskClientsByTaskLoad = TaskAssignmentUtils.standbyTaskPriorityListByLoad(streamStates, assignments);
        standbyTaskClientsByTaskLoad.offerAll(streamStates.keySet());
        for (TaskId task : statefulTaskIds) {
            TaskAssignmentUtils.assignStandbyTasksForActiveTask(numStandbyReplicas, assignments, tasksToRemainingStandbys, standbyTaskClientsByTaskLoad, task);
        }
        return assignments;
    }

    private static void assignStandbyTasksForActiveTask(int numStandbyReplicas, Map<ProcessId, KafkaStreamsAssignment> assignments, Map<TaskId, Integer> tasksToRemainingStandbys, ConstrainedPrioritySet standbyTaskClientsByTaskLoad, TaskId activeTaskId) {
        ProcessId client;
        int numRemainingStandbys;
        for (numRemainingStandbys = tasksToRemainingStandbys.get(activeTaskId).intValue(); numRemainingStandbys > 0 && (client = standbyTaskClientsByTaskLoad.poll(activeTaskId)) != null; --numRemainingStandbys) {
            assignments.get(client).assignTask(new KafkaStreamsAssignment.AssignedTask(activeTaskId, KafkaStreamsAssignment.AssignedTask.Type.STANDBY));
            standbyTaskClientsByTaskLoad.offer(client);
        }
        tasksToRemainingStandbys.put(activeTaskId, numRemainingStandbys);
        if (numRemainingStandbys > 0) {
            LOG.warn("Unable to assign {} of {} standby tasks for task [{}]. There is not enough available capacity. You should increase the number of application instances to maintain the requested number of standby replicas.", new Object[]{numRemainingStandbys, numStandbyReplicas, activeTaskId});
        }
    }

    private static void assignStandbyTasksToClientsWithDifferentTags(int numberOfStandbyClients, ConstrainedPrioritySet standbyTaskClientsByTaskLoad, TaskId activeTaskId, ProcessId activeClient, Set<String> rackAwareAssignmentTags, Map<ProcessId, KafkaStreamsState> clientStates, Map<ProcessId, KafkaStreamsAssignment> assignments, Map<TaskId, Integer> tasksToRemainingStandbys, Map<String, Set<String>> tagKeyToValues, Map<KeyValue<String, String>, Set<ProcessId>> tagEntryToClients, Map<TaskId, ProcessId> pendingStandbyTasksToClientId) {
        standbyTaskClientsByTaskLoad.offerAll(clientStates.keySet());
        int countOfUsedClients = 1;
        int numRemainingStandbys = tasksToRemainingStandbys.get(activeTaskId);
        HashMap<KeyValue<String, String>, Set<ProcessId>> tagEntryToUsedClients = new HashMap<KeyValue<String, String>, Set<ProcessId>>();
        ProcessId lastUsedClient = activeClient;
        do {
            TaskAssignmentUtils.updateClientsOnAlreadyUsedTagEntries(clientStates.get(lastUsedClient), countOfUsedClients, rackAwareAssignmentTags, tagEntryToClients, tagKeyToValues, tagEntryToUsedClients);
            ProcessId clientOnUnusedTagDimensions = standbyTaskClientsByTaskLoad.poll(activeTaskId, processId -> !TaskAssignmentUtils.isClientUsedOnAnyOfTheTagEntries(processId, tagEntryToUsedClients));
            if (clientOnUnusedTagDimensions == null) break;
            KafkaStreamsState clientStateOnUsedTagDimensions = clientStates.get(clientOnUnusedTagDimensions);
            ++countOfUsedClients;
            LOG.debug("Assigning {} out of {} standby tasks for an active task [{}] with client tags {}. Standby task client tags are {}.", new Object[]{numberOfStandbyClients - --numRemainingStandbys, numberOfStandbyClients, activeTaskId, clientStates.get(activeClient).clientTags(), clientStateOnUsedTagDimensions.clientTags()});
            assignments.get(clientStateOnUsedTagDimensions.processId()).assignTask(new KafkaStreamsAssignment.AssignedTask(activeTaskId, KafkaStreamsAssignment.AssignedTask.Type.STANDBY));
            lastUsedClient = clientOnUnusedTagDimensions;
        } while (numRemainingStandbys > 0);
        if (numRemainingStandbys > 0) {
            pendingStandbyTasksToClientId.put(activeTaskId, activeClient);
            tasksToRemainingStandbys.put(activeTaskId, numRemainingStandbys);
            LOG.warn("Rack aware standby task assignment was not able to assign {} of {} standby tasks for the active task [{}] with the rack aware assignment tags {}. This may happen when there aren't enough application instances on different tag dimensions compared to an active and corresponding standby task. Consider launching application instances on different tag dimensions than [{}]. Standby task assignment will fall back to assigning standby tasks to the least loaded clients.", new Object[]{numRemainingStandbys, numberOfStandbyClients, activeTaskId, rackAwareAssignmentTags, clientStates.get(activeClient).clientTags()});
        } else {
            tasksToRemainingStandbys.remove(activeTaskId);
        }
    }

    private static boolean isClientUsedOnAnyOfTheTagEntries(ProcessId client, Map<KeyValue<String, String>, Set<ProcessId>> tagEntryToUsedClients) {
        return tagEntryToUsedClients.values().stream().anyMatch(usedClients -> usedClients.contains(client));
    }

    private static void updateClientsOnAlreadyUsedTagEntries(KafkaStreamsState usedClient, int countOfUsedClients, Set<String> rackAwareAssignmentTags, Map<KeyValue<String, String>, Set<ProcessId>> tagEntryToClients, Map<String, Set<String>> tagKeyToValues, Map<KeyValue<String, String>, Set<ProcessId>> tagEntryToUsedClients) {
        Map<String, String> usedClientTags = usedClient.clientTags();
        for (Map.Entry<String, String> usedClientTagEntry : usedClientTags.entrySet()) {
            String tagKey = usedClientTagEntry.getKey();
            if (!rackAwareAssignmentTags.contains(tagKey)) {
                LOG.warn("Client tag with key [{}] will be ignored when computing rack aware standby task assignment because it is not part of the configured rack awareness [{}].", (Object)tagKey, rackAwareAssignmentTags);
                continue;
            }
            Set<String> allTagValues = tagKeyToValues.get(tagKey);
            if (allTagValues.size() <= countOfUsedClients) {
                allTagValues.forEach(tagValue -> tagEntryToUsedClients.remove(new KeyValue<String, String>(tagKey, (String)tagValue)));
                continue;
            }
            String tagValue2 = usedClientTagEntry.getValue();
            KeyValue<String, String> tagEntry = new KeyValue<String, String>(tagKey, tagValue2);
            Set<ProcessId> clientsOnUsedTagValue = tagEntryToClients.get(tagEntry);
            tagEntryToUsedClients.put(tagEntry, clientsOnUsedTagValue);
        }
    }

    private static MoveStandbyTaskPredicate getStandbyTaskMovePredicate(ApplicationState applicationState) {
        boolean hasRackAwareAssignmentTags;
        boolean bl = hasRackAwareAssignmentTags = !applicationState.assignmentConfigs().rackAwareAssignmentTags().isEmpty();
        if (hasRackAwareAssignmentTags) {
            BiConsumer<KafkaStreamsState, Set> addTags = (cs, tagSet) -> {
                Map<String, String> tags = cs.clientTags();
                if (tags != null) {
                    tagSet.addAll(tags.entrySet().stream().map(entry -> KeyValue.pair((String)entry.getKey(), (String)entry.getValue())).collect(Collectors.toList()));
                }
            };
            Map<ProcessId, KafkaStreamsState> clients = applicationState.kafkaStreamsStates(false);
            return (source, destination, sourceTask, kafkaStreamsAssignments) -> {
                HashSet tagsWithSource = new HashSet();
                HashSet tagsWithDestination = new HashSet();
                for (KafkaStreamsAssignment assignment : kafkaStreamsAssignments.values()) {
                    boolean hasAssignedTask = assignment.tasks().containsKey(sourceTask);
                    boolean isSourceProcess = assignment.processId().equals(source.processId());
                    boolean isDestinationProcess = assignment.processId().equals(destination.processId());
                    if (!hasAssignedTask || isSourceProcess || isDestinationProcess) continue;
                    KafkaStreamsState clientState = (KafkaStreamsState)clients.get(assignment.processId());
                    addTags.accept(clientState, tagsWithSource);
                    addTags.accept(clientState, tagsWithDestination);
                }
                addTags.accept(source, tagsWithSource);
                addTags.accept(destination, tagsWithDestination);
                return tagsWithDestination.size() >= tagsWithSource.size();
            };
        }
        return (a, b, c, d) -> true;
    }

    private static ConstrainedPrioritySet standbyTaskPriorityListByLoad(Map<ProcessId, KafkaStreamsState> clientStates, Map<ProcessId, KafkaStreamsAssignment> assignments) {
        return new ConstrainedPrioritySet((processId, taskId) -> !((KafkaStreamsAssignment)assignments.get(processId)).tasks().containsKey(taskId), processId -> {
            double capacity = ((KafkaStreamsState)clientStates.get(processId)).numProcessingThreads();
            double numTasks = ((KafkaStreamsAssignment)assignments.get(processId)).tasks().size();
            return numTasks / capacity;
        });
    }

    private static void assignPendingStandbyTasksToLeastLoadedClients(Map<ProcessId, KafkaStreamsAssignment> assignments, int numStandbyReplicas, ConstrainedPrioritySet standbyTaskClientsByTaskLoad, Map<TaskId, Integer> pendingStandbyTaskToNumberRemainingStandbys) {
        standbyTaskClientsByTaskLoad.offerAll(assignments.keySet());
        for (Map.Entry<TaskId, Integer> pendingStandbyTaskAssignmentEntry : pendingStandbyTaskToNumberRemainingStandbys.entrySet()) {
            TaskId activeTaskId = pendingStandbyTaskAssignmentEntry.getKey();
            TaskAssignmentUtils.assignStandbyTasksForActiveTask(numStandbyReplicas, assignments, pendingStandbyTaskToNumberRemainingStandbys, standbyTaskClientsByTaskLoad, activeTaskId);
        }
    }

    private static void initializeAssignmentsForAllClients(ApplicationState applicationState, Map<ProcessId, KafkaStreamsAssignment> assignments) {
        for (ProcessId processId : applicationState.kafkaStreamsStates(false).keySet()) {
            if (assignments.containsKey(processId)) continue;
            assignments.put(processId, KafkaStreamsAssignment.of(processId, new HashSet<KafkaStreamsAssignment.AssignedTask>()));
        }
    }

    private static SortedSet<TaskId> getTasksToOptimize(Map<ProcessId, KafkaStreamsAssignment> assignments, RackAwareOptimizationParams optimizationParams, KafkaStreamsAssignment.AssignedTask.Type taskType) {
        if (optimizationParams != null && optimizationParams.tasksToOptimize.isPresent()) {
            return optimizationParams.tasksToOptimize.get();
        }
        return assignments.values().stream().flatMap(r -> r.tasks().values().stream()).filter(task -> task.type() == taskType).map(KafkaStreamsAssignment.AssignedTask::id).collect(Collectors.toCollection(TreeSet::new));
    }

    public static final class RackAwareOptimizationParams {
        private final ApplicationState applicationState;
        private final Optional<Integer> trafficCostOverride;
        private final Optional<Integer> nonOverlapCostOverride;
        private final Optional<SortedSet<TaskId>> tasksToOptimize;

        private RackAwareOptimizationParams(ApplicationState applicationState, Optional<Integer> trafficCostOverride, Optional<Integer> nonOverlapCostOverride, Optional<SortedSet<TaskId>> tasksToOptimize) {
            this.applicationState = applicationState;
            this.trafficCostOverride = trafficCostOverride;
            this.nonOverlapCostOverride = nonOverlapCostOverride;
            this.tasksToOptimize = tasksToOptimize;
        }

        public static RackAwareOptimizationParams of(ApplicationState applicationState) {
            return new RackAwareOptimizationParams(applicationState, Optional.empty(), Optional.empty(), Optional.empty());
        }

        public RackAwareOptimizationParams forStatefulTasks() {
            SortedSet tasks = this.applicationState.allTasks().values().stream().filter(TaskInfo::isStateful).map(TaskInfo::id).collect(Collectors.toCollection(TreeSet::new));
            return this.forTasks(tasks);
        }

        public RackAwareOptimizationParams forStatelessTasks() {
            SortedSet tasks = this.applicationState.allTasks().values().stream().filter(taskInfo -> !taskInfo.isStateful()).map(TaskInfo::id).collect(Collectors.toCollection(TreeSet::new));
            return this.forTasks(tasks);
        }

        public RackAwareOptimizationParams forTasks(SortedSet<TaskId> tasksToOptimize) {
            return new RackAwareOptimizationParams(this.applicationState, this.trafficCostOverride, this.nonOverlapCostOverride, Optional.of(tasksToOptimize));
        }

        public RackAwareOptimizationParams withTrafficCostOverride(int trafficCostOverride) {
            return new RackAwareOptimizationParams(this.applicationState, Optional.of(trafficCostOverride), this.nonOverlapCostOverride, this.tasksToOptimize);
        }

        public RackAwareOptimizationParams withNonOverlapCostOverride(int nonOverlapCostOverride) {
            return new RackAwareOptimizationParams(this.applicationState, this.trafficCostOverride, Optional.of(nonOverlapCostOverride), this.tasksToOptimize);
        }
    }

    private static final class AssignmentGraph {
        public final Graph<Integer> graph;
        public final Map<TaskId, ProcessId> clientByTask;
        public final Map<ProcessId, Integer> taskCountByClient;

        public AssignmentGraph(Graph<Integer> graph, Map<TaskId, ProcessId> clientByTask, Map<ProcessId, Integer> taskCountByClient) {
            this.graph = graph;
            this.clientByTask = clientByTask;
            this.taskCountByClient = taskCountByClient;
        }
    }

    @FunctionalInterface
    public static interface MoveStandbyTaskPredicate {
        public boolean canMoveStandbyTask(KafkaStreamsState var1, KafkaStreamsState var2, TaskId var3, Map<ProcessId, KafkaStreamsAssignment> var4);
    }

    private static class TagStatistics {
        private final Map<String, Set<String>> tagKeyToValues;
        private final Map<KeyValue<String, String>, Set<ProcessId>> tagEntryToClients;

        public TagStatistics(ApplicationState applicationState) {
            Map<ProcessId, KafkaStreamsState> clientStates = applicationState.kafkaStreamsStates(false);
            HashMap<String, Set<String>> tagKeyToValues = new HashMap<String, Set<String>>();
            HashMap<KeyValue<String, String>, Set<ProcessId>> tagEntryToClients = new HashMap<KeyValue<String, String>, Set<ProcessId>>();
            for (KafkaStreamsState state : clientStates.values()) {
                state.clientTags().forEach((tagKey, tagValue) -> {
                    tagKeyToValues.computeIfAbsent((String)tagKey, ignored -> new HashSet()).add(tagValue);
                    tagEntryToClients.computeIfAbsent(new KeyValue<String, String>((String)tagKey, (String)tagValue), ignored -> new HashSet()).add(state.processId());
                });
            }
            this.tagKeyToValues = tagKeyToValues;
            this.tagEntryToClients = tagEntryToClients;
        }
    }
}

